[PATCH 12/50] [mama][dqpublisher] Improved Message Handling


Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Since the caller may pass messages with the reserved files (ie. senderid,
seqno, etc.) populated, the dq publisher should try to be robust and update the
fields if the type is reasonable. Prior to this change they would fail if the
types did not match the expected type exactly. Now if the status, for example,
is a 16 bit integer, it will succeed even though we expect and 8 bit integer.

Also fixed formatting and whitespace.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqpublisher.c | 86 ++++++++++++++++++++++++---------
mama/c_cpp/src/c/dqpublishermanager.c | 10 +++-
2 files changed, 71 insertions(+), 25 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublisher.c b/mama/c_cpp/src/c/dqpublisher.c
index 677c117..a875399 100644
--- a/mama/c_cpp/src/c/dqpublisher.c
+++ b/mama/c_cpp/src/c/dqpublisher.c
@@ -29,9 +29,9 @@

typedef struct mamaDQPublisherImpl_
{
- mamaPublisher mPublisher;
- mamaMsgStatus mStatus;
- uint64_t mSenderId;
+ mamaPublisher mPublisher;
+ mamaMsgStatus mStatus;
+ uint64_t mSenderId;
mama_seqnum_t mSeqNum;
} mamaDQPublisherImpl;

@@ -45,11 +45,11 @@ mama_status mamaDQPublisher_allocate (mamaDQPublisher* result)

*result = impl;

- return MAMA_STATUS_OK;
+ return MAMA_STATUS_OK;
}

mama_status mamaDQPublisher_create (mamaDQPublisher pub, mamaTransport transport,
- const char* topic)
+ const char* topic)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
mama_status status = MAMA_STATUS_OK;
@@ -61,7 +61,7 @@ mama_status mamaDQPublisher_create (mamaDQPublisher pub, mamaTransport transport
NULL,
NULL);

- if (status == MAMA_STATUS_OK)
+ if (status == MAMA_STATUS_OK)
{
impl->mSenderId = mamaSenderId_getSelf ();
impl->mStatus = MAMA_MSG_STATUS_OK;
@@ -76,36 +76,79 @@ mama_status mamaDQPublisher_send (mamaDQPublisher pub, mamaMsg msg)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);

- if (impl->mSeqNum != 0)
+ if (impl->mSeqNum != 0)
{
switch (mamaMsgType_typeForMsg (msg))
{
- case MAMA_MSG_TYPE_REFRESH :
- case MAMA_MSG_TYPE_SYNC_REQUEST :
- case MAMA_MSG_TYPE_MISC :
- case MAMA_MSG_TYPE_NOT_PERMISSIONED :
- case MAMA_MSG_TYPE_NOT_FOUND :
- break;
-
+ case MAMA_MSG_TYPE_REFRESH :
+ case MAMA_MSG_TYPE_SYNC_REQUEST :
+ case MAMA_MSG_TYPE_MISC :
+ case MAMA_MSG_TYPE_NOT_PERMISSIONED :
+ case MAMA_MSG_TYPE_NOT_FOUND :
+ break;
case MAMA_MSG_TYPE_INITIAL :
case MAMA_MSG_TYPE_BOOK_INITIAL :
case MAMA_MSG_TYPE_RECAP :
case MAMA_MSG_TYPE_BOOK_RECAP :
- mamaMsg_updateU8(msg,NULL, MamaFieldMsgStatus.mFid, impl->mStatus);
+ if(MAMA_STATUS_OK !=
+ mamaMsg_updateU8(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }
break;

default:
- mamaMsg_updateU8(msg,NULL, MamaFieldMsgStatus.mFid, impl->mStatus);
+ if(MAMA_STATUS_OK !=
+ mamaMsg_updateU8(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }
impl->mSeqNum++;
break;
}
- mamaMsg_updateU32(msg, NULL, MamaFieldSeqNum.mFid, impl->mSeqNum);
+ mamaMsg_updateU32(msg, MamaFieldSeqNum.mName, MamaFieldSeqNum.mFid,
+ impl->mSeqNum);
}

if (impl->mSenderId != 0)
- mamaMsg_updateU64(msg, NULL, MamaFieldSenderId.mFid, impl->mSenderId);
+ {
+ mamaMsgField senderIdField = NULL;
+ if (MAMA_STATUS_OK == mamaMsg_getField(msg, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, &senderIdField))
+ {
+ mamaFieldType senderIdType = MAMA_FIELD_TYPE_UNKNOWN;
+ if (MAMA_STATUS_OK == mamaMsgField_getType(senderIdField,
+ &senderIdType))
+ {
+ switch(senderIdType)
+ {
+ case MAMA_FIELD_TYPE_U16:
+ mamaMsgField_updateU16(senderIdField,
+ (mama_u16_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U32:
+ mamaMsgField_updateU32(senderIdField,
+ (mama_u32_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U64:
+ default:
+ mamaMsgField_updateU64(senderIdField, impl->mSenderId);
+ break;
+ }
+ }
+ }
+ else
+ mamaMsg_addU64(msg, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, impl->mSenderId);
+ }

- return (mamaPublisher_send (impl->mPublisher, msg));
+ return (mamaPublisher_send (impl->mPublisher, msg));
}

mama_status mamaDQPublisher_sendReply (mamaDQPublisher pub,
@@ -142,21 +185,18 @@ void mamaDQPublisher_destroy (mamaDQPublisher pub)
void mamaDQPublisher_setStatus (mamaDQPublisher pub, mamaMsgStatus status)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
impl->mStatus = status;
}

void mamaDQPublisher_setSenderId (mamaDQPublisher pub, uint64_t senderid)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
- impl->mSenderId = senderid;
+ impl->mSenderId = senderid;
}

void mamaDQPublisher_setSeqNum (mamaDQPublisher pub, mama_seqnum_t num)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
impl->mSeqNum=num;
}

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index 71026ed..c1bd7d0 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -215,6 +215,9 @@ mama_status mamaDQPublisherManager_allocate(mamaDQPublisherManager* result )
calloc (1, sizeof (mamaDQPublisherManagerImpl));

if (!impl) return MAMA_STATUS_NOMEM;
+ impl->mSenderId = mamaSenderId_getSelf ();
+ impl->mStatus = MAMA_MSG_STATUS_OK;
+ impl->mSeqNum = 1;

*result = impl;

@@ -225,7 +228,10 @@ void* mamaDQPublisherManager_getClosure (mamaDQPublisherManager manager)
{
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;

- return impl->mClosure;
+ if(NULL != impl)
+ return impl->mClosure;
+ else
+ return NULL;
}


@@ -353,7 +359,7 @@ mama_status mamaDQPublisherManager_createPublisher (
mama_status status = MAMA_STATUS_OK;
char topic[80];

- newTopic = (mamaPublishTopic*) wtable_lookup (impl->mPublisherMap, (char*)symbol);
+ newTopic = (mamaPublishTopic*)wtable_lookup (impl->mPublisherMap, (char*)symbol);

if (!newTopic)
{
--
1.7.7.6

Join Openmama-dev@lists.openmama.org to automatically receive all group messages.