Date   

[PATCH 27/50] [mama] Log Messages with MAMA_MSG_STATUS_UNKNOWN

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

On receiving a message with a status of MAMA_MSG_STATUS_UNKNOWN, log the message
if at FINEST and track it update the statistics for the transport, queue and
application.

Also tidied up the file a little: converted tabs to space, reformatted long
lines, etc.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/listenermsgcallback.c | 122 +++++++++++++++++---------------
1 files changed, 64 insertions(+), 58 deletions(-)

diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c
index cbe1c90..532a433 100644
--- a/mama/c_cpp/src/c/listenermsgcallback.c
+++ b/mama/c_cpp/src/c/listenermsgcallback.c
@@ -45,33 +45,12 @@ extern int gGenerateTransportStats;
extern int gGenerateGlobalStats;
extern int gGenerateQueueStats;

-/* *************************************************** */
-/* Private Function Prototypes. */
-/* *************************************************** */
-
-/**
- * This function will invoke the subscription's onError callback passing in a particular error code.
- *
- * @param[in] callback The impl.
- * @param[in] ctx The subscription context.
- * @param[in] mamaStatus The status that will be passed to the error callback.
- * @param[in] subscription The subscription.
- * @param[in] userSymbol The symbol.
- */
-static void
-listenerMsgCallbackImpl_invokeErrorCallback(listenerMsgCallback callback,
+/* Function prototypes. */
+void listenerMsgCallback_invokeErrorCallback(listenerMsgCallback callback,
SubjectContext *ctx, mama_status mamaStatus, mamaSubscription
subscription, const char *userSymbol);

-/**
- * This function will write a log message if an unknown message status is detected.
- *
- * @param[in] ctx The subscription context.
- * @param[in] status The message status.
- * @param[in] subscription The subscription.
- */
-static void
-listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
+void listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
mamaSubscription subscription);

/**
@@ -195,7 +174,8 @@ static void processPointToPointMessage (msgCallback* callback,

/* Mark the subscription as inactive if we are not expecting
* any more updates. */
- if (!mamaSubscription_isExpectingUpdates (self->mSubscription))
+ if (!mamaSubscription_isExpectingUpdates (self->mSubscription) &&
+ !mamaSubscription_getAcceptMultipleInitials (self->mSubscription))
{
mamaSubscription_deactivate (self->mSubscription);
}
@@ -223,6 +203,33 @@ static void processPointToPointMessage (msgCallback* callback,
* may have been destroyed in the callback! */
}

+/* Description : This function will invoke the subscription's onError callback
+ * passing in a particular error code.
+ */
+void listenerMsgCallback_invokeErrorCallback(listenerMsgCallback callback,
+ SubjectContext *ctx, mama_status mamaStatus, mamaSubscription
+ subscription, const char *userSymbol)
+{
+ /* Local variables. */
+ void *closure = NULL;
+
+ /* Get the callback object from the subscription. */
+ mamaMsgCallbacks *cbs = mamaSubscription_getUserCallbacks (subscription);
+
+ /* Wait for a response. */
+ mamaSubscription_stopWaitForResponse(subscription, ctx);
+
+ /* Get the closure from the subscription. */
+ mamaSubscription_getClosure (subscription, &closure);
+
+ mama_setLastError (MAMA_ERROR_DEFAULT);
+ cbs->onError (subscription,
+ mamaStatus,
+ NULL,
+ userSymbol,
+ closure);
+}
+
static int isInitialMessageOrRecap (msgCallback *callback, int msgType)
{
return msgType == MAMA_MSG_TYPE_INITIAL ||
@@ -247,7 +254,7 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
mamaTransport transport;
mamaStatsCollector* queueStatsCollector = NULL;
mamaStatsCollector* tportStatsCollector = NULL;
- const char* userSymbol = NULL;
+ const char* userSymbol = NULL;

if (!ctx)
{
@@ -283,22 +290,25 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
MamaStatNumMessages.mFid);

/* Get the user symbol from the subscription. */
- mamaSubscription_getSymbol(subscription, &userSymbol);
+ mamaSubscription_getSymbol(subscription, &userSymbol);

if (status != MAMA_MSG_STATUS_OK)
{
switch (status)
{
case MAMA_MSG_STATUS_NOT_PERMISSIONED:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_PERMISSIONED, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_NOT_PERMISSIONED, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_BAD_SYMBOL:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_BAD_SYMBOL, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_BAD_SYMBOL, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_NOT_FOUND:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_FOUND, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_NOT_FOUND, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_NO_SUBSCRIBERS:
@@ -338,6 +348,30 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
listenerMsgCallbackImpl_logUnknownStatus(ctx, status, subscription);
break;
}
+ case MAMA_MSG_STATUS_UNKNOWN:
+ {
+ listenerMsgCallbackImpl_logUnknownStatus(ctx, status, subscription);
+ mamaSubscription_setPossiblyStale(subscription);
+
+ if (queueStatsCollector)
+ {
+ mamaStatsCollector_incrementStat (*queueStatsCollector,
+ MamaStatUnknownMsgs.mFid);
+ }
+ if (tportStatsCollector)
+ {
+ mamaStatsCollector_incrementStat (*tportStatsCollector,
+ MamaStatUnknownMsgs.mFid);
+ }
+ if (mamaInternal_getGlobalStatsCollector())
+ {
+ mamaStatsCollector_incrementStat
+ (*(mamaInternal_getGlobalStatsCollector()),
+ MamaStatUnknownMsgs.mFid);
+ }
+ return; //throw away msg
+ break;
+ }
default:
{
/* Log the fact we have received an unknown message. */
@@ -587,34 +621,6 @@ checkEntitlement( msgCallback *callback, mamaMsg msg, SubjectContext* ctx )
#endif /* WITH_ENTITLEMENTS */
}

-/* *************************************************** */
-/* Private Functions. */
-/* *************************************************** */
-
-/* Description: This function will invoke the subscription's onError callback passing in a particular error code.
- */
-void listenerMsgCallbackImpl_invokeErrorCallback(listenerMsgCallback callback, SubjectContext *ctx, mama_status mamaStatus, mamaSubscription subscription, const char *userSymbol)
-{
- /* Local variables. */
- void *closure = NULL;
-
- /* Get the callback object from the subscription. */
- mamaMsgCallbacks *cbs = mamaSubscription_getUserCallbacks (subscription);
-
- /* Wait for a response. */
- mamaSubscription_stopWaitForResponse(subscription, ctx);
-
- /* Get the closure from the subscription. */
- mamaSubscription_getClosure (subscription, &closure);
-
- mama_setLastError (MAMA_ERROR_DEFAULT);
- cbs->onError (subscription,
- mamaStatus,
- NULL,
- userSymbol,
- closure);
-}
-
void listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
mamaSubscription subscription)
{
--
1.7.7.6


[PATCH 26/50] [mama] Do not modify attached messages

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

For some middlewares with "zero-copy" semantics like shared memory or RDMA, applications share message payloads. Others may re-use buffers. In these cases, modifying a message potentially causes undesirable affects on other applications. For this reason, applications must call mamaMsg_detach() to take ownership of or copy the payload prior to updating or adding fields. This change enforces the read-only semantics prior to detaching the message.

This change also removes the unnecessary mamaMsg_createNative() method. The concept of a native payload is arbitrary and redundant since we already have the notion of a default payload.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama/status.h | 6 +-
mama/c_cpp/src/c/msg.c | 139 ++++++++++++++++++++++++++++++---------
mama/c_cpp/src/c/msgfield.c | 123 +++++++++++++++++++++++++++++-----
mama/c_cpp/src/c/msgfieldimpl.h | 2 +-
mama/c_cpp/src/c/msgimpl.h | 3 +
mama/c_cpp/src/c/status.c | 2 +-
6 files changed, 219 insertions(+), 56 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/status.h b/mama/c_cpp/src/c/mama/status.h
index fa0c5f5..1c48c00 100644
--- a/mama/c_cpp/src/c/mama/status.h
+++ b/mama/c_cpp/src/c/mama/status.h
@@ -92,6 +92,8 @@ typedef enum
MAMA_STATUS_NO_BRIDGE_IMPL = 26,
/* Invalid queue */
MAMA_STATUS_INVALID_QUEUE = 27,
+ /* Not modifiable */
+ MAMA_STATUS_NOT_MODIFIABLE = 28,
/* Not permissioned for the subject */
MAMA_STATUS_NOT_PERMISSIONED = 4001,
/* Subscription is in an invalid state. */
@@ -99,9 +101,7 @@ typedef enum
/* Queue has open objects. */
MAMA_STATUS_QUEUE_OPEN_OBJECTS = 5002,
/* The function isn't supported for this type of subscription. */
- MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE = 5003,
- /* The underlying transport saw a gap. */
- MAMA_STATUS_SUBSCRIPTION_GAP = 5004
+ MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE = 5003

#ifdef WITH_ENTITLEMENTS
/* Out of memory */
diff --git a/mama/c_cpp/src/c/msg.c b/mama/c_cpp/src/c/msg.c
index af51d25..d3c0e80 100644
--- a/mama/c_cpp/src/c/msg.c
+++ b/mama/c_cpp/src/c/msg.c
@@ -67,6 +67,7 @@ typedef struct mamaMsgImpl_
char mSubject[MAX_SUBJECT];

msgPayload mPayload;
+ msgPayload mPayloads[MAMA_PAYLOAD_MAX];
/* Set of get/set/update methods to use for a non wmsg payload */
mamaPayloadBridgeImpl* mPayloadBridge;

@@ -358,6 +359,18 @@ mamaMsg_getPayloadType (mamaMsg msg, mamaPayloadType* payloadType)
return MAMA_STATUS_OK;
}

+mama_status
+mamaMsgImpl_getPayload (const mamaMsg msg, msgPayload* payload)
+{
+ mamaMsgImpl* impl = (mamaMsgImpl*)msg;
+
+ if (!impl || !payload) return MAMA_STATUS_NULL_ARG;
+
+ *payload = impl->mPayload;
+
+ return MAMA_STATUS_OK;
+}
+
const char*
mamaPayload_convertToString (mamaPayloadType payloadType)
{
@@ -465,6 +478,7 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
mama_status status = MAMA_STATUS_OK;
msgPayload payload = NULL;
+ mamaPayloadBridgeImpl* newPayloadBridge = NULL;
if (impl == NULL)
{
mama_log (MAMA_LOG_LEVEL_WARN,
@@ -502,9 +516,10 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
impl->mMessageOwner = 0;

if (id == '\0')
- id = (char) ((const char*)data) [0];
+ id = (char) ((const char*)data) [0];

impl->mPayloadBridge = mamaInternal_findPayload(id);
+ impl->mPayload = impl->mPayloads[(uint8_t)id];

if (!impl->mPayloadBridge) return MAMA_STATUS_NO_BRIDGE_IMPL;

@@ -519,6 +534,7 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
{
return status;
}
+ impl->mPayloads[(uint8_t)id] = payload;
/* The middleware does not own this message */
return mamaMsgImpl_setPayload (msg, payload, 0);
}
@@ -709,13 +725,14 @@ mamaMsg_create (mamaMsg* msg)
mamaPayloadBridge bridge = mamaInternal_getDefaultPayload ();
msgPayload payload = NULL;

- if (MAMA_STATUS_OK !=
- (status = bridge->msgPayloadCreate (&payload)))
+ if (bridge)
{
- *msg = NULL;
- return status;
+ if (MAMA_STATUS_OK != (status = bridge->msgPayloadCreate (&payload)))
+ {
+ *msg = NULL;
+ return status;
+ }
}
-
return mamaMsgImpl_createForPayload (msg,
payload,
bridge,
@@ -723,24 +740,6 @@ mamaMsg_create (mamaMsg* msg)
}

mama_status
-mamaMsg_createNative (mamaMsg* msg, mamaBridge bridge)
-{
- mamaBridgeImpl* impl = (mamaBridgeImpl*) bridge;
- if (impl)
- {
- char*payloadName;
- char payloadId;
-
- if (impl->bridgeGetDefaultPayloadId(&payloadName, &payloadId) == MAMA_STATUS_OK)
- {
- return (mamaMsg_createForPayload (msg, payloadId));
- }
- }
- return MAMA_STATUS_INVALID_ARG;
-}
-
-
-mama_status
mamaMsg_getSendSubject (const mamaMsg msg, const char** subject)
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
@@ -797,10 +796,12 @@ mamaMsgImpl_getStatusFromMsg (mamaMsg msg)
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
int32_t result = MAMA_MSG_STATUS_UNKNOWN;
- mamaMsg_getI32 (msg,
+ if (mamaMsg_getI32 (msg,
MamaFieldMsgStatus.mName,
MamaFieldMsgStatus.mFid,
- &result);
+ &result) != MAMA_STATUS_OK)
+ result = MAMA_MSG_STATUS_UNKNOWN;
+
impl->mStatus = (mamaMsgStatus) result;
return (mamaMsgStatus) result;
}
@@ -851,6 +852,7 @@ mamaMsg_addBool(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddBool (impl->mPayload,
name,
@@ -868,6 +870,7 @@ mamaMsg_addChar(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddChar (impl->mPayload,
name,
@@ -885,6 +888,7 @@ mamaMsg_addI8(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddI8 (impl->mPayload,
name,
@@ -902,6 +906,7 @@ mamaMsg_addU8(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddU8 (impl->mPayload,
name,
@@ -919,6 +924,7 @@ mamaMsg_addI16(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddI16 (impl->mPayload,
name,
@@ -936,6 +942,7 @@ mamaMsg_addU16(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddU16 (impl->mPayload,
name,
@@ -955,6 +962,7 @@ mamaMsg_addI32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddI32 (impl->mPayload,
name,
@@ -972,6 +980,7 @@ mamaMsg_addU32(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddU32 (impl->mPayload,
name,
@@ -989,6 +998,7 @@ mamaMsg_addI64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddI64 (impl->mPayload,
name,
@@ -1006,6 +1016,7 @@ mamaMsg_addU64(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddU64 (impl->mPayload,
name,
@@ -1023,6 +1034,7 @@ mamaMsg_addF32(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddF32 (impl->mPayload,
name,
@@ -1040,6 +1052,7 @@ mamaMsg_addF64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddF64 (impl->mPayload,
name,
@@ -1057,6 +1070,7 @@ mamaMsg_addString (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddString (impl->mPayload,
name,
@@ -1075,6 +1089,7 @@ mamaMsg_addOpaque(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddOpaque (impl->mPayload,
name,
@@ -1093,6 +1108,7 @@ mamaMsg_addDateTime(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddDateTime (impl->mPayload,
name,
@@ -1110,6 +1126,7 @@ mamaMsg_addPrice(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddPrice (impl->mPayload,
name,
@@ -1128,6 +1145,7 @@ mamaMsg_addMsg(
mamaMsgImpl* subMsg = (mamaMsgImpl*)value;
if (!impl || !subMsg || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;

+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddMsg (impl->mPayload,
name,
@@ -1148,6 +1166,7 @@ mamaMsg_addVectorBool (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorBool (impl->mPayload,
name,
@@ -1167,6 +1186,7 @@ mamaMsg_addVectorChar (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorChar (impl->mPayload,
name,
@@ -1186,6 +1206,7 @@ mamaMsg_addVectorI8 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorI8 (impl->mPayload,
name,
@@ -1205,6 +1226,7 @@ mamaMsg_addVectorU8 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorU8 (impl->mPayload,
name,
@@ -1224,6 +1246,7 @@ mamaMsg_addVectorI16 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorI16 (impl->mPayload,
name,
@@ -1243,6 +1266,7 @@ mamaMsg_addVectorU16 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorU16 (impl->mPayload,
name,
@@ -1262,6 +1286,7 @@ mamaMsg_addVectorI32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorI32 (impl->mPayload,
name,
@@ -1281,6 +1306,7 @@ mamaMsg_addVectorU32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorU32 (impl->mPayload,
name,
@@ -1300,6 +1326,7 @@ mamaMsg_addVectorI64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorI64 (impl->mPayload,
name,
@@ -1319,6 +1346,7 @@ mamaMsg_addVectorU64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorU64 (impl->mPayload,
name,
@@ -1338,6 +1366,7 @@ mamaMsg_addVectorF32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorF32 (impl->mPayload,
name,
@@ -1357,6 +1386,7 @@ mamaMsg_addVectorF64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorF64 (impl->mPayload,
name,
@@ -1376,6 +1406,7 @@ mamaMsg_addVectorString (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorString (impl->mPayload,
name,
@@ -1395,6 +1426,7 @@ mamaMsg_addVectorDateTime (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorDateTime (
impl->mPayload,
@@ -1414,6 +1446,7 @@ mamaMsg_addVectorPrice (
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorPrice (impl->mPayload,
name,
@@ -1438,6 +1471,7 @@ mamaMsg_addVectorMsg(
return MAMA_STATUS_NULL_ARG;
}

+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;
return impl->mPayloadBridge->msgPayloadAddVectorMsg (impl->mPayload,
name,
fid,
@@ -1722,7 +1756,7 @@ mamaMsg_getField(
}
impl->mCurrentField->myPayloadBridge = impl->mPayloadBridge;
impl->mCurrentField->myPayload = impl->mFieldPayload;
- impl->mCurrentField->myMsgPayload = impl->mPayload;
+ impl->mCurrentField->myMsg = impl;

mamaField = (mamaMsgFieldImpl*)impl->mCurrentField;
mamaField->myDictionary = NULL;
@@ -1805,6 +1839,7 @@ mamaMsg_updateBool(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_OK;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1826,6 +1861,7 @@ mamaMsg_updateChar(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1847,6 +1883,7 @@ mamaMsg_updateI8(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1868,6 +1905,7 @@ mamaMsg_updateU8(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1889,6 +1927,7 @@ mamaMsg_updateI16(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1910,6 +1949,7 @@ mamaMsg_updateU16(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1931,6 +1971,7 @@ mamaMsg_updateI32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1952,6 +1993,7 @@ mamaMsg_updateU32(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1973,6 +2015,7 @@ mamaMsg_updateI64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateU64 (impl->mPayload,
name,
@@ -1990,6 +2033,7 @@ mamaMsg_updateU64(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateU64 (impl->mPayload,
name,
@@ -2007,6 +2051,7 @@ mamaMsg_updateF32(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateF32 (impl->mPayload,
name,
@@ -2024,6 +2069,7 @@ mamaMsg_updateF64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateF64 (impl->mPayload,
name,
@@ -2041,6 +2087,7 @@ mamaMsg_updateString (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateString (impl->mPayload,
name,
@@ -2055,6 +2102,7 @@ mamaMsg_applyMsg (mamaMsg msg, mamaMsg src)
mamaMsgImpl* source = (mamaMsgImpl*)src;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadApply (impl->mPayload,
source->mPayload);
@@ -2080,6 +2128,7 @@ mamaMsg_updateSubMsg (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateSubMsg (impl->mPayload,
name,
@@ -2098,6 +2147,7 @@ mamaMsg_updateOpaque (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateOpaque (impl->mPayload,
name,
@@ -2116,6 +2166,7 @@ mamaMsg_updateDateTime(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateDateTime (impl->mPayload,
name,
@@ -2133,6 +2184,7 @@ mamaMsg_updatePrice(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdatePrice (impl->mPayload,
name,
@@ -2151,6 +2203,7 @@ mamaMsg_updateVectorMsg (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorMsg (impl->mPayload,
name,
@@ -2170,6 +2223,7 @@ mamaMsg_updateVectorString (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorString (
impl->mPayload,
@@ -2190,6 +2244,7 @@ mamaMsg_updateVectorBool (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorBool (
impl->mPayload,
@@ -2210,6 +2265,7 @@ mamaMsg_updateVectorChar (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorChar (
impl->mPayload,
@@ -2230,6 +2286,7 @@ mamaMsg_updateVectorI8 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorI8 (impl->mPayload,
name,
@@ -2249,6 +2306,7 @@ mamaMsg_updateVectorU8 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorU8 (impl->mPayload,
name,
@@ -2269,6 +2327,7 @@ mamaMsg_updateVectorI16 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorI16 (impl->mPayload,
name,
@@ -2288,6 +2347,7 @@ mamaMsg_updateVectorU16 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorU16 (impl->mPayload,
name,
@@ -2307,6 +2367,7 @@ mamaMsg_updateVectorI32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorI32 (impl->mPayload,
name,
@@ -2326,6 +2387,7 @@ mamaMsg_updateVectorU32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorU32 (impl->mPayload,
name,
@@ -2345,6 +2407,7 @@ mamaMsg_updateVectorI64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorI64 (impl->mPayload,
name,
@@ -2364,6 +2427,7 @@ mamaMsg_updateVectorU64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorU64 (impl->mPayload,
name,
@@ -2383,6 +2447,7 @@ mamaMsg_updateVectorF32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorF32 (impl->mPayload,
name,
@@ -2402,6 +2467,7 @@ mamaMsg_updateVectorF64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorF64 (impl->mPayload,
name,
@@ -2542,7 +2608,7 @@ mamaMsg_iterateFields (const mamaMsg msg,
if (impl->mPayloadBridge)
{
impl->mCurrentField->myPayloadBridge = impl->mPayloadBridge;
- impl->mCurrentField->myMsgPayload = impl->mPayload;
+ impl->mCurrentField->myMsg = impl;
return (impl->mPayloadBridge->msgPayloadIterateFields (impl->mPayload,
msg,
impl->mCurrentField,
@@ -3312,7 +3378,7 @@ mamaMsgIterator_associate (mamaMsgIterator iterator, mamaMsg msg)
if (msgImpl->mPayloadBridge)
{
itrImpl->mCurrentField->myPayloadBridge = msgImpl->mPayloadBridge;
- itrImpl->mCurrentField->myMsgPayload = msgImpl->mPayload;
+ itrImpl->mCurrentField->myMsg = msgImpl;
itrImpl->mPayloadBridge = msgImpl->mPayloadBridge;

/* Create the native payload iter if it hasn't been created already */
@@ -3393,12 +3459,15 @@ mamaMsgIterator_next (mamaMsgIterator iterator)
if (impl->mPayloadBridge)
{
msgFieldPayload msgField = NULL;
+ msgPayload payload = NULL;
+
+ mamaMsgImpl_getPayload (currentField->myMsg, &payload);

if (NULL == (msgField =
(impl->mPayloadBridge->msgPayloadIterNext (
impl->mPayloadIter,
currentField->myPayload,
- currentField->myMsgPayload))))
+ payload))))
{
return NULL;
}
@@ -3422,9 +3491,12 @@ mamaMsgIterator_hasNext (mamaMsgIterator iterator)

if (impl->mPayloadBridge)
{
+ msgPayload payload = NULL;
+
+ mamaMsgImpl_getPayload (currentField->myMsg, &payload);
return impl->mPayloadBridge->msgPayloadIterHasNext (
impl->mPayloadIter,
- currentField->myMsgPayload);
+ payload);

}
return (0);
@@ -3442,12 +3514,15 @@ mamaMsgIterator_begin (mamaMsgIterator iterator)
if (impl->mPayloadBridge)
{
msgFieldPayload msgField = NULL;
+ msgPayload payload = NULL;
+
+ mamaMsgImpl_getPayload (currentField->myMsg, &payload);

if (NULL == (msgField =
(impl->mPayloadBridge->msgPayloadIterBegin (
impl->mPayloadIter,
currentField->myPayload,
- currentField->myMsgPayload))))
+ payload))))
{
return NULL;
}
diff --git a/mama/c_cpp/src/c/msgfield.c b/mama/c_cpp/src/c/msgfield.c
index ac5fee3..50ae29c 100644
--- a/mama/c_cpp/src/c/msgfield.c
+++ b/mama/c_cpp/src/c/msgfield.c
@@ -801,19 +801,20 @@ mamaMsgField_getAsString (
char * buf,
size_t len)
{
- mamaMsgFieldImpl* impl =
- (mamaMsgFieldImpl*)(msgField);
+ mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)(msgField);
+ msgPayload payload = NULL;
if (!impl) return MAMA_STATUS_INVALID_ARG;
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);

- if (impl->myPayloadBridge)
+ if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadGetAsString (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
buf, len-1);

}
- return MAMA_STATUS_NULL_ARG;
+ return MAMA_STATUS_NULL_ARG;
}

mama_status
@@ -822,14 +823,20 @@ mamaMsgField_updateBool (
mama_bool_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateBool (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -842,14 +849,20 @@ mamaMsgField_updateChar (
char value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateChar (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -862,14 +875,20 @@ mamaMsgField_updateI8 (
mama_i8_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI8 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -882,14 +901,20 @@ mamaMsgField_updateU8 (
mama_u8_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI8 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -902,14 +927,20 @@ mamaMsgField_updateI16 (
mama_i16_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI16 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -922,14 +953,20 @@ mamaMsgField_updateU16 (
mama_u16_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateU16 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -942,14 +979,20 @@ mamaMsgField_updateI32 (
mama_i32_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI32 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);
}
return MAMA_STATUS_NULL_ARG;
@@ -961,14 +1004,20 @@ mamaMsgField_updateU32 (
mama_u32_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateU32 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -981,14 +1030,20 @@ mamaMsgField_updateI64 (
mama_i64_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI64 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -1001,14 +1056,20 @@ mamaMsgField_updateU64 (
mama_u64_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateU64 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -1021,14 +1082,20 @@ mamaMsgField_updateF32 (
mama_f32_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateF32 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -1041,14 +1108,20 @@ mamaMsgField_updateF64 (
mama_f64_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateF64 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -1061,14 +1134,20 @@ mamaMsgField_updateDateTime (
const mamaDateTime value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateDateTime (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);
}
return MAMA_STATUS_NULL_ARG;
@@ -1080,14 +1159,20 @@ mamaMsgField_updatePrice (
const mamaPrice value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdatePrice (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
diff --git a/mama/c_cpp/src/c/msgfieldimpl.h b/mama/c_cpp/src/c/msgfieldimpl.h
index 0ee5f5b..f5b3a3e 100644
--- a/mama/c_cpp/src/c/msgfieldimpl.h
+++ b/mama/c_cpp/src/c/msgfieldimpl.h
@@ -38,7 +38,7 @@ typedef struct mamaMsgFieldImpl_
size_t myLastVectorPayloadMsgLen;
mamaMsg mySubMsg;
msgFieldPayload myPayload;
- msgPayload myMsgPayload;
+ mamaMsg myMsg;
mamaPayloadBridge myPayloadBridge;
} mamaMsgFieldImpl;

diff --git a/mama/c_cpp/src/c/msgimpl.h b/mama/c_cpp/src/c/msgimpl.h
index 95bdacf..88e29ec 100644
--- a/mama/c_cpp/src/c/msgimpl.h
+++ b/mama/c_cpp/src/c/msgimpl.h
@@ -82,6 +82,9 @@ mamaMsgImpl_getPayloadBuffer(const mamaMsg msg,
const void** buffer,
mama_size_t* bufferLength);

+/*Get the underlying payload parent mamaMsg. */
+MAMAExpDLL extern mama_status
+mamaMsgImpl_getPayload (const mamaMsg msg, msgPayload* payload);

MAMAExpDLL
extern mama_status
diff --git a/mama/c_cpp/src/c/status.c b/mama/c_cpp/src/c/status.c
index 7e7ff91..e8c1c31 100644
--- a/mama/c_cpp/src/c/status.c
+++ b/mama/c_cpp/src/c/status.c
@@ -57,11 +57,11 @@ mamaStatus_stringForStatus (mama_status status)
case MAMA_STATUS_NOT_INSTALLED : return "NOT_INSTALLED";
case MAMA_STATUS_NO_BRIDGE_IMPL : return "NO_BRIDGE_IMPL";
case MAMA_STATUS_INVALID_QUEUE : return "INVALID_QUEUE";
+ case MAMA_STATUS_NOT_MODIFIABLE : return "NOT_MODIFIABLE";
case MAMA_STATUS_NOT_PERMISSIONED : return "MAMA_STATUS_NOT_PERMISSIONED";
case MAMA_STATUS_SUBSCRIPTION_INVALID_STATE: return "MAMA_STATUS_SUBSCRIPTION_INVALID_STATE";
case MAMA_STATUS_QUEUE_OPEN_OBJECTS: return "MAMA_STATUS_QUEUE_OPEN_OBJECTS";
case MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE: return "MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE";
- case MAMA_STATUS_SUBSCRIPTION_GAP: return "MAMA_STATUS_SUBSCRIPTION_GAP";

#ifdef WITH_ENTITLEMENTS
case MAMA_ENTITLE_STATUS_NOMEM : return "ENTITLE_STATUS_NOMEM";
--
1.7.7.6


[PATCH 25/50] [mama] Add closure to mamaQueue

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Added a closure to mamaQueue to allow applcations to store queue specific
context.

The new methods are:
mamaQueue_setClosure()
mamaQueue_getClosure()

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama/queue.h | 7 +++++++
mama/c_cpp/src/c/queue.c | 27 ++++++++++++++++++++++++++-
mama/c_cpp/src/c/queueimpl.h | 6 ------
3 files changed, 33 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/queue.h b/mama/c_cpp/src/c/mama/queue.h
index f04efd6..bc3362a 100644
--- a/mama/c_cpp/src/c/mama/queue.h
+++ b/mama/c_cpp/src/c/mama/queue.h
@@ -464,6 +464,13 @@ MAMAExpDLL
extern mama_status
mamaDispatcher_destroy (mamaDispatcher dispatcher);

+MAMAExpDLL
+extern mama_status
+mamaQueue_getClosure (mamaQueue queue, void** closure);
+
+MAMAExpDLL
+extern mama_status
+mamaQueue_setClosure (mamaQueue queue, void* closure);
#if defined(__cplusplus)
}
#endif
diff --git a/mama/c_cpp/src/c/queue.c b/mama/c_cpp/src/c/queue.c
index 4026a1a..f7cbb0b 100644
--- a/mama/c_cpp/src/c/queue.c
+++ b/mama/c_cpp/src/c/queue.c
@@ -99,6 +99,7 @@ typedef struct mamaQueueImpl_

/* This flag indicates whether object locking and unlocking will be tracked by the queue. */
int mTrackObjectLocks;
+ void* mClosure;
} mamaQueueImpl;

/*Main structure for the mamaDispatcher*/
@@ -116,11 +117,35 @@ typedef struct mamaDisptacherImpl_


mama_status
+mamaQueue_setClosure ( mamaQueue queue, void* closure)
+{
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ if (!impl) return MAMA_STATUS_NULL_ARG;
+
+ impl->mClosure = closure;
+
+ return MAMA_STATUS_OK;
+}
+
+mama_status
+mamaQueue_getClosure ( mamaQueue queue, void** closure)
+{
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!closure) return MAMA_STATUS_INVALID_ARG;
+
+ *closure = impl->mClosure;
+
+ return MAMA_STATUS_OK;
+}
+
+
+mama_status
mamaQueue_createReuseableMsg (mamaQueueImpl* impl)
{
mama_status status = MAMA_STATUS_OK;
/*Create the reuseable cached mamaMsg for this queue*/
- if (MAMA_STATUS_OK != (status = mamaMsgImpl_createForPayload (&(impl->mMsg), NULL,NULL,1)))
+ if (MAMA_STATUS_OK != (status = mamaMsgImpl_createForPayload (&(impl->mMsg), NULL,NULL,0)))
{
mama_log (MAMA_LOG_LEVEL_ERROR, "mamaQueue_create(): "
"Could not create message for queue.");
diff --git a/mama/c_cpp/src/c/queueimpl.h b/mama/c_cpp/src/c/queueimpl.h
index 0fa8a15..70247ef 100644
--- a/mama/c_cpp/src/c/queueimpl.h
+++ b/mama/c_cpp/src/c/queueimpl.h
@@ -103,12 +103,6 @@ MAMAExpDLL
extern mama_status
mamaQueueImpl_lowWatermarkExceeded (mamaQueue queue, size_t size);

-#ifdef WITH_FASTMSG
-MAMAExpDLL
-extern mama_status
-mamaQueueImpl_getFastBridge (mamaQueue queue, payloadBridge* bridge);
-#endif
-
MAMAExpDLL
extern mamaStatsCollector*
mamaQueueImpl_getStatsCollector (mamaQueue queue);
--
1.7.7.6


[PATCH 24/50] [mama] mama.c cleanup

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Moved MAMA_PAYLOAD_MAX from mama.c to mamainternal.h.

Removed uncessary comments and conditionally compiled debug blocks.

Moved #includes out extern "C" block in mama.h. This causes problems on Windows
as some of the standard headers contain C++ macros.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama.c | 15 -----------
mama/c_cpp/src/c/mama/mama.h | 53 +++------------------------------------
mama/c_cpp/src/c/mamainternal.h | 1 +
3 files changed, 5 insertions(+), 64 deletions(-)

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index 277eada..1d989a6 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -127,8 +127,6 @@ mamaStat gWombatMsgsStat;
mamaStat gFastMsgsStat;
mamaStat gRvMsgsStat;

-#define MAMA_PAYLOAD_MAX CHAR_MAX
-
static mamaPayloadBridge gDefaultPayload = NULL;

static wthread_key_t last_err_key;
@@ -146,9 +144,6 @@ typedef struct mamaAppContext_
/**
* This structure contains data needed to control starting and stopping of
* mama.
- *
- * TODO: Access to this structure will ultimately be protected by a reference
- * count and a lock.
*/
typedef struct mamaImpl_
{
@@ -699,16 +694,6 @@ mama_openWithPropertiesCount (const char* path,
}
/* Code after this point is one-time initialization */

-#ifdef WITH_INACTIVE_CHECK
- mama_log (MAMA_LOG_LEVEL_WARN,
- "********************************************************");
- mama_log (MAMA_LOG_LEVEL_WARN, "WARNING!!! - In inactive subscription check mode."
- " Do not release!!!");
- mama_log (MAMA_LOG_LEVEL_WARN,
- "********************************************************");
-#endif
-
-
#ifdef DEV_RELEASE
mama_log (MAMA_LOG_LEVEL_WARN,
"\n********************************************************************************\n"
diff --git a/mama/c_cpp/src/c/mama/mama.h b/mama/c_cpp/src/c/mama/mama.h
index c6cbe1c..341e635 100644
--- a/mama/c_cpp/src/c/mama/mama.h
+++ b/mama/c_cpp/src/c/mama/mama.h
@@ -22,10 +22,6 @@
#ifndef MamaH__
#define MamaH__

-#if defined(__cplusplus)
-extern "C"
-{
-#endif
#include "mama/config.h"
#include <mama/log.h>
#include <mama/error.h>
@@ -55,6 +51,10 @@ extern "C"
#include <mama/quality.h>
#include <mama/ft.h>

+#if defined(__cplusplus)
+extern "C"
+{
+#endif

#define MAMA_OPEN_MD ((uint32_t)(0x00000001))
#define MAMA_OPEN_PUB_SUB ((uint32_t)(0x00000002))
@@ -220,40 +220,6 @@ extern "C"
extern mama_status
mama_openWithProperties (const char* path,
const char* filename);
- /**
- * Initialize MAMA.
- *
- * Allows users of the API to override the default behavior of mama_open()
- * where a file mama.properties is required to be located in the directory
- * specified by \$WOMBAT_PATH.
- *
- * The properties file must have the same structure as a standard
- * mama.properties file.
- *
- * If null is passed as the path the API will look for the properties file on
- * the \$WOMBAT_PATH.
- *
- * If null is passed as the filename the API will look for the default
- * filename of mama.properties.
- *
- * The count value on return will be the number of times that mama_openXxx()
- * has ben invoked successfully. Applicatiins can use this to perform
- * one-time initialization when the value is 1 and the return is
- * MAMA_STATUS_OK
- *
- * @param path Fully qualified path to the directory containing the properties
- * file
- * @param filename The name of the file containing MAMA properties.
- * @param count The number of times mama_OpenXXX() has been called
- * successfully.
- *
- * @return mama_status Whether the call was successful or not.
- */
- MAMAExpDLL
- extern mama_status
- mama_openWithPropertiesCount (const char* path,
- const char* filename,
- unsigned int* count);

/**
* Set a specific property for the API.
@@ -325,17 +291,6 @@ extern "C"
MAMAExpDLL
extern mama_status
mama_close (void);
-
- /**
- * Close MAMA and free all associated resource.
- *
- * @param count Filled with the number of times mama has been opened
- * successfully. Applications can perform global one-time cleanup when this
- * value is 0 and the return value is MAMA_STATUS_OK.
- */
- MAMAExpDLL
- extern mama_status
- mama_closeCount (unsigned int* count);

/**
* Return the version information for the library.
diff --git a/mama/c_cpp/src/c/mamainternal.h b/mama/c_cpp/src/c/mamainternal.h
index 23f23c7..6b901b3 100644
--- a/mama/c_cpp/src/c/mamainternal.h
+++ b/mama/c_cpp/src/c/mamainternal.h
@@ -30,6 +30,7 @@ extern "C"
{
#endif

+#define MAMA_PAYLOAD_MAX CHAR_MAX
/**
* Check whether Callbacks are run in 'debug' catch exceptions mode
*/
--
1.7.7.6


[PATCH 23/50] [mama] Cleaup Statistics Handling in mama_close

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Properly cleanup the StatsGenerator, StatsCollector and StatsPublisher to avoid
resource leaks.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama.c | 20 ++++++++++++++++++++
mama/c_cpp/src/c/statsgenerator.c | 24 ++++++++++++++++++++++++
mama/c_cpp/src/c/statsgeneratorinternal.h | 14 +++++++++++++-
3 files changed, 57 insertions(+), 1 deletions(-)

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index 885763f..277eada 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -1153,6 +1153,26 @@ mama_closeCount (unsigned int* count)

wthread_key_delete(last_err_key);

+ if (gStatsGenerator)
+ {
+ mamaStatsGenerator_stopReportTimer(gStatsGenerator);
+ }
+
+ if (gGlobalStatsCollector)
+ {
+ if (gStatsGenerator)
+ {
+ mamaStatsGenerator_removeStatsCollector (gStatsGenerator, gGlobalStatsCollector);
+ }
+ mamaStatsCollector_destroy (*gGlobalStatsCollector);
+ gGlobalStatsCollector = NULL;
+ }
+
+ if (gStatsPublisher)
+ {
+ mamaStatsLogger_destroy (gStatsPublisher);
+ gStatsPublisher = NULL;
+ }
for (middleware = 0; middleware != MAMA_MIDDLEWARE_MAX; ++middleware)
{
mamaBridge bridge = gImpl.myBridges[middleware];
diff --git a/mama/c_cpp/src/c/statsgenerator.c b/mama/c_cpp/src/c/statsgenerator.c
index 8e47268..508167a 100644
--- a/mama/c_cpp/src/c/statsgenerator.c
+++ b/mama/c_cpp/src/c/statsgenerator.c
@@ -74,6 +74,8 @@ mamaStatsGenerator_destroy (mamaStatsGenerator statsGenerator)
{
mamaStatsGeneratorImpl* impl = (mamaStatsGeneratorImpl*)statsGenerator;

+ if(impl)
+ {
impl->mStatsLogger = NULL;

if (impl->mStatMsg != NULL)
@@ -89,6 +91,7 @@ mamaStatsGenerator_destroy (mamaStatsGenerator statsGenerator)

free (impl);

+ }
return MAMA_STATUS_OK;
}

@@ -237,3 +240,24 @@ mamaStatsGenerator_allocateStatsCollector (mamaStatsGenerator statsGenerator)

return list_allocate_element (impl->mStatsCollectors);
}
+
+mama_status mamaStatsGenerator_stopReportTimer(mamaStatsGenerator statsGenerator)
+{
+ /* Returns. */
+ mama_status ret = MAMA_STATUS_NULL_ARG;
+
+ /* Get the impl. */
+ mamaStatsGeneratorImpl *impl = (mamaStatsGeneratorImpl*)statsGenerator;
+ if(NULL != impl)
+ {
+ /* Destroy the timer. */
+ ret = MAMA_STATUS_OK;
+ if(NULL != impl->mReportTimer)
+ {
+ ret = mamaTimer_destroy(impl->mReportTimer);
+ impl->mReportTimer = NULL;
+ }
+ }
+
+ return ret;
+}
diff --git a/mama/c_cpp/src/c/statsgeneratorinternal.h b/mama/c_cpp/src/c/statsgeneratorinternal.h
index 8f27dc6..97bf717 100644
--- a/mama/c_cpp/src/c/statsgeneratorinternal.h
+++ b/mama/c_cpp/src/c/statsgeneratorinternal.h
@@ -33,6 +33,18 @@ MAMAExpDLL
extern mama_status
mamaStatsGenerator_create (mamaStatsGenerator* statsGenerator, mama_f64_t reportInterval);

+/**
+ * This function should be called to stop the stats report timer before the internal event
+ * queue has been destroyed.
+ *
+ * @param[in] statsGenerator The stats generator.
+ * @returns mama_status can be one of
+ * MAMA_STATUS_NULL_ARG
+ * MAMA_STATUS_OK
+ */
+MAMAExpDLL
+extern mama_status
+mamaStatsGenerator_stopReportTimer(mamaStatsGenerator statsGenerator);
MAMAExpDLL
extern mama_status
mamaStatsGenerator_destroy (mamaStatsGenerator statsGenerator);
@@ -47,7 +59,7 @@ mamaStatsGenerator_setLogStats (mamaStatsGenerator statsGenerator, int logStats)

MAMAExpDLL
extern mama_status
-mamaStatsGenerator_setStatsLogger (mamaStatsGenerator statsGenerator, mamaStatsLogger* usageLogger);
+mamaStatsGenerator_setStatsLogger (mamaStatsGenerator statsGenerator, mamaStatsLogger* statsLogger);

MAMAExpDLL
extern void
--
1.7.7.6


[PATCH 22/50] [mama] Change static mutexes to be recursive

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

This change avoids rare instances of deadlock when starting, stoping, opening
and closing OpenMAMA.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
common/c_cpp/src/c/linux/port.h | 2 +-
mama/c_cpp/src/c/mama.c | 114 +++++++++++++++++++--------------------
2 files changed, 57 insertions(+), 59 deletions(-)

diff --git a/common/c_cpp/src/c/linux/port.h b/common/c_cpp/src/c/linux/port.h
index b3a928d..67ae0c9 100644
--- a/common/c_cpp/src/c/linux/port.h
+++ b/common/c_cpp/src/c/linux/port.h
@@ -57,7 +57,7 @@ extern "C"
#endif
/* PTHREAD static locks are easy */
typedef pthread_mutex_t wthread_static_mutex_t;
-#define WSTATIC_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
+#define WSTATIC_MUTEX_INITIALIZER PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
#define wthread_static_mutex_lock(x) pthread_mutex_lock((x))
#define wthread_static_mutex_unlock(x) pthread_mutex_unlock((x))

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index dc6dcb5..885763f 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -172,8 +172,11 @@ static mamaImpl gImpl = {{0}, {0}, {0}, {0}, 0, WSTATIC_MUTEX_INITIALIZER};
static mama_status
mama_loadBridgeWithPathInternal (mamaBridge* impl,
const char* middlewareName,
- const char* path,
- uint8_t lock);
+ const char* path);
+
+mama_status
+mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
+ const char* payloadName);

/* Description : This function will free any memory associated with a
* mamaApplicationContext object but will not free the
@@ -323,8 +326,7 @@ static mama_status mamaInternal_loadStatsPublisher ()
if (MAMA_STATUS_OK !=
(status = mama_loadBridgeWithPathInternal (&bridge,
statsLogMiddlewareName,
- NULL,
- 0)))
+ NULL)))
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"mamaInternal_loadStatsLogger(): ",
@@ -644,6 +646,8 @@ mama_openWithPropertiesCount (const char* path,
const char* appString = NULL;
const char* statsLogging = "false";
const char* catchCallbackExceptions = NULL;
+ char** payloadName;
+ char* payloadId;

wthread_static_mutex_lock (&gImpl.myLock);

@@ -720,6 +724,27 @@ mama_openWithPropertiesCount (const char* path,

/* Do not call mamaInternal_loadStatsPublisher here.
It only needs to be called if we are publishing */
+ /* Look for a bridge for each of the middlewares and open them */
+ for (middleware = 0; middleware != MAMA_MIDDLEWARE_MAX; ++middleware)
+ {
+ mamaBridgeImpl* impl = (mamaBridgeImpl*) gImpl.myBridges [middleware];
+ if (impl)
+ {
+ if (impl->bridgeGetDefaultPayloadId(&payloadName, &payloadId) == MAMA_STATUS_OK)
+ {
+ uint8_t i=0;
+ while (payloadId[i] != NULL)
+ {
+ if (!gImpl.myPayloads [(uint8_t)payloadId[i]])
+ {
+ mamaPayloadBridge payloadImpl;
+ mama_loadPayloadBridgeInternal (&payloadImpl,payloadName[i]);
+ }
+ i++;
+ }
+ }
+ }
+ }

catchCallbackExceptions = properties_Get (gProperties, "mama.catchcallbackexceptions.enable");
if (catchCallbackExceptions != NULL && strtobool(catchCallbackExceptions))
@@ -1776,16 +1801,21 @@ mama_setDefaultPayload (char id)
}

mama_status
+mama_loadPayloadBridge (mamaPayloadBridge* impl,
+ const char* payloadName)
+{
+ return mama_loadPayloadBridgeInternal (impl, payloadName);
+}
+mama_status
mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
- const char* payloadName,
- uint8_t lock)
+ const char* payloadName)
{
char bridgeImplName [256];
char initFuncName [256];
LIB_HANDLE bridgeLib = NULL;
msgPayload_createImpl initFunc = NULL;
mama_status status = MAMA_STATUS_OK;
- char payloadChar;
+ char payloadChar ='/0';

if (!impl || !payloadName)
return MAMA_STATUS_NULL_ARG;
@@ -1793,8 +1823,7 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
snprintf (bridgeImplName, 256, "mama%simpl",
payloadName);

- if (lock)
- wthread_static_mutex_lock (&gImpl.myLock);
+ wthread_static_mutex_lock (&gImpl.myLock);

bridgeLib = openSharedLib (bridgeImplName, NULL);

@@ -1806,8 +1835,7 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
"Could not open payload bridge library [%s] [%s]",
bridgeImplName ? bridgeImplName : "",
getLibError());
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

@@ -1825,27 +1853,25 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
bridgeImplName ? bridgeImplName : "");
closeSharedLib (bridgeLib);

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return MAMA_STATUS_NO_BRIDGE_IMPL;
}

if (MAMA_STATUS_OK != (status = initFunc (impl, &payloadChar)))
{
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return status;
}

- if (!impl)
+ if (!*impl)
{
mama_log (MAMA_LOG_LEVEL_ERROR,
- "mama_loadPayloadBridge(): Error in [%s] ", initFuncName);
+ "mama_loadPayloadBridge():Failed to load %s payload bridge from library [%s] ",
+ payloadName, bridgeImplName);

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return MAMA_STATUS_NO_BRIDGE_IMPL;
}
@@ -1857,8 +1883,7 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
"Payload bridge %s already loaded",
payloadName);

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return MAMA_STATUS_OK;
}
@@ -1876,19 +1901,11 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
"Sucessfully loaded %s payload bridge from library [%s]",
payloadName, bridgeImplName);

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return MAMA_STATUS_OK;
}

-mama_status
-mama_loadPayloadBridge (mamaPayloadBridge* impl,
- const char* payloadName)
-{
- return mama_loadPayloadBridgeInternal (impl, payloadName, 1);
-}
-
int
mamaInternal_generateLbmStats ()
{
@@ -1906,15 +1923,12 @@ mama_loadBridge (mamaBridge* impl,
mama_status
mama_loadBridgeWithPathInternal (mamaBridge* impl,
const char* middlewareName,
- const char* path,
- uint8_t lock)
+ const char* path)
{
char bridgeImplName [256];
char initFuncName [256];
LIB_HANDLE bridgeLib = NULL;
bridge_createImpl initFunc = NULL;
- char* payloadName = NULL;
- char payloadId = '\0';
mama_status result = MAMA_STATUS_OK;
mamaMiddleware middleware = 0;

@@ -1930,15 +1944,13 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,
middlewareName);
}

- if (lock)
- wthread_static_mutex_lock (&gImpl.myLock);
+ wthread_static_mutex_lock (&gImpl.myLock);

/* Check if a bridge has already been initialized for the middleware */
if (gImpl.myBridges [middleware])
{
*impl = gImpl.myBridges [middleware];
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_OK;
}

@@ -1966,8 +1978,7 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,
bridgeImplName ? bridgeImplName : "",
getLibError());
}
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

@@ -1984,8 +1995,7 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,
initFuncName ? initFuncName : "",
bridgeImplName ? bridgeImplName : "");
closeSharedLib (bridgeLib);
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

@@ -1995,8 +2005,7 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"mama_loadBridge(): Error in [%s] ", initFuncName);
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

@@ -2010,25 +2019,14 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,

if (MAMA_STATUS_OK != result)
{
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return result;
}

- if (((mamaBridgeImpl*)(*impl))->bridgeGetDefaultPayloadId(&payloadName, &payloadId) == MAMA_STATUS_OK)
- {
- if (!gImpl.myPayloads [(uint8_t)payloadId])
- {
- mamaPayloadBridge payloadImpl;
- mama_loadPayloadBridgeInternal (&payloadImpl,payloadName,0);
- }
- }
-
gImpl.myBridges [middleware] = *impl;
gImpl.myBridgeLibraries [middleware] = bridgeLib;

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_OK;
}

@@ -2037,7 +2035,7 @@ mama_loadBridgeWithPath (mamaBridge* impl,
const char* middlewareName,
const char* path)
{
- return mama_loadBridgeWithPathInternal(impl, middlewareName, path, 1);
+ return mama_loadBridgeWithPathInternal(impl, middlewareName, path);
}

/*
--
1.7.7.6


[PATCH 21/50] [mama] mamaTransport_forceClientDisconnect

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Add method to MAMA API that allows connection based middlewares (TCP) to
disconnect poorly behaved clients.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/transport.c | 14 ++++++++++++++
mama/c_cpp/src/c/transportimpl.h | 15 +++++++++++++++
2 files changed, 29 insertions(+), 0 deletions(-)

diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index f059280..d51d12d 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -1964,6 +1964,20 @@ mamaTransport_findConnection (mamaTransport transport,
}

mama_status
+mamaTransportImpl_forceClientDisconnect (mamaTransport transport,
+ const char* ipAddress,
+ uint16_t port)
+{
+ if (!self || ipAddress == NULL) return MAMA_STATUS_NULL_ARG;
+
+ return self->mBridgeImpl->bridgeMamaTransportForceClientDisconnect (
+ self->mTransports,
+ self->mNumTransports,
+ ipAddress,
+ port);
+}
+
+mama_status
mamaTransport_getAllConnections (mamaTransport transport,
mamaConnection** result,
uint32_t* len)
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index 18c3a28..80bf98a 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -280,5 +280,20 @@ mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(
mamaTransport transport,
short cause,
const void *platformInfo);
+/**
+ * Disconnect a client with the specified IP Address and port. This information
+ * may be retrieved from a mamaConnection object or out of band.
+ *
+ * For middleware that does not provide this functionality (non WMW middleware),
+ * the method returns MAMA_STATUS_NOT_IMPL.
+ */
+MAMAExpDLL
+extern mama_status
+mamaTransportImpl_forceClientDisconnect (mamaTransport transport,
+ const char* ipAddress,
+ uint16_t port);
+#if defined(__cplusplus)
+}
+#endif

#endif /* TransportImplH__ */
--
1.7.7.6


[PATCH 20/50] [mama] transport write queue high and low watermarks

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Allow applications to set high and low watermarks for internal write queues if
the underlying middleware queues writes. Socket based middlewares that use
scatter/gather IO and a dedicated IO thread often coalesce writes in a separate
queue for each connected client. A larger queue where that trigger a high
watermark often indicates a slow consumer.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama/transport.h | 16 +++++++++++++-
mama/c_cpp/src/c/transport.c | 40 +++++++++++++++++++++++++++++++-----
mama/c_cpp/src/c/transportimpl.h | 6 +++++
3 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/transport.h b/mama/c_cpp/src/c/mama/transport.h
index d66905c..e1a29ac 100644
--- a/mama/c_cpp/src/c/mama/transport.h
+++ b/mama/c_cpp/src/c/mama/transport.h
@@ -76,7 +76,9 @@ typedef enum
MAMA_TRANSPORT_PUBLISHER_DISCONNECT,
MAMA_TRANSPORT_QUALITY,
MAMA_TRANSPORT_NAMING_SERVICE_CONNECT,
- MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT
+ MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT,
+ MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK,
+ MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK
} mamaTransportEvent;

/**
@@ -186,6 +188,18 @@ extern mama_status
mamaTransport_setTransportCallback (mamaTransport transport,
mamaTransportCB callback,
void* closure);
+/**
+ * Set the transport write queue high and low water mark values. The
+ * MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK and
+ * MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK events will be delivered though
+ * the transport callback when the respective number of items are outstanding on
+ * a clients write queue.
+ */
+MAMAExpDLL
+extern mama_status
+mamaTransport_setWriteQueueWatermarks (mamaTransport transport,
+ uint32_t highWater,
+ uint32_t lowWater);

/**
* Set the transport topic callback. It receives advisories when a client
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 7e583d5..f059280 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -89,6 +89,8 @@ typedef struct transportImpl_
mamaSymbolMapFunc mMapFunc;
void* mMapFuncClosure;

+ uint32_t mWriteQueueHighWatermark;
+ uint32_t mWriteQueueLowWatermark;
/* These members are only needed for the market data transport */
wList mListeners;

@@ -160,12 +162,12 @@ typedef struct transportImpl_

int mGroupSizeHint;

- uint8_t mDisableDisconnectCb;
- uint8_t mDisableRefresh;
+ uint8_t mDisableRefresh;
dqStartegyScheme mDQStratScheme;
dqftStrategyScheme mFTStratScheme;

uint8_t mInternal;
+ uint8_t mDisableDisconnectCb;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -445,15 +447,15 @@ static void setFtStrategy (mamaTransport transport)
}
}

+void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
+{
+ self->mDisableRefresh=disable;
+}
/**
* Check property to disable refresh messages. Undocumented.
*
* Return non-zero to disable refresh messages.
*/
-void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
-{
- self->mDisableRefresh=disable;
-}

static int mamaTransportInternal_disableRefreshes(const char* transportName)
{
@@ -1130,6 +1132,7 @@ mamaTransport_destroy (mamaTransport transport)
if (self->mRefreshTransport)
{
refreshTransport_destroy (self->mRefreshTransport);
+ self->mRefreshTransport = NULL;
}

if (self->mThrottle)
@@ -1610,6 +1613,8 @@ mamaTransportEvent_toString (mamaTransportEvent event)
case MAMA_TRANSPORT_QUALITY: return "QUALITY";
case MAMA_TRANSPORT_NAMING_SERVICE_CONNECT: return "NAMING_SERVICE_CONNECT";
case MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT: return "NAMING_SERVICE_DISCONNECT";
+ case MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK: return "MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK";
+ case MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK: return "MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK";
default: return "UNKNOWN";
}
}
@@ -1625,6 +1630,29 @@ mamaTransport_setTransportCallback (mamaTransport transport,
return MAMA_STATUS_OK;
}

+mama_status
+mamaTransport_setWriteQueueWatermarks (mamaTransport transport,
+ uint32_t high,
+ uint32_t low)
+{
+ if (!self)
+ return MAMA_STATUS_NULL_ARG;
+ if (high < low || low == 0)
+ return MAMA_STATUS_INVALID_ARG;
+
+ self->mWriteQueueHighWatermark = high;
+ self->mWriteQueueLowWatermark = low;
+ return MAMA_STATUS_OK;
+}
+
+void
+mamaTransportImpl_getWriteQueueWatermarks (mamaTransport transport,
+ uint32_t* high,
+ uint32_t* low)
+{
+ *high = self->mWriteQueueHighWatermark;
+ *low = self->mWriteQueueLowWatermark;
+}

mama_status
mamaTransport_setTransportTopicCallback (mamaTransport transport,
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index a253e3f..18c3a28 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -139,6 +139,12 @@ mamaTransportImpl_getTransportTopicCallback (mamaTransport transport,

MAMAExpDLL
extern void
+mamaTransportImpl_getWriteQueueWatermarks (mamaTransport transport,
+ uint32_t* high,
+ uint32_t* low);
+
+MAMAExpDLL
+extern void
mamaTransportImpl_resetRefreshForListener (mamaTransport tport, void *handle);

MAMAExpDLL
--
1.7.7.6


[PATCH 19/50] [mama] Create an Internal Transport for Monitoring

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

OpenMAMA creates a dedicated internal transport for publishing statistics for
monitoring and other internal communication. This transport disables some
features of application level transports including the CM responder.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama.c | 2 +-
mama/c_cpp/src/c/transport.c | 204 ++++++++++++++++++++++---------------
mama/c_cpp/src/c/transportimpl.h | 31 +++---
3 files changed, 136 insertions(+), 101 deletions(-)

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index 206892d..dc6dcb5 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -389,7 +389,7 @@ mamaInternal_createStatsPublisher ()
statsLogTportName = "statslogger";
}

- result = mamaTransport_allocate (&statsLogTport);
+ result = mamaTransportImpl_allocateInternalTransport (&statsLogTport);
if( result != MAMA_STATUS_OK )
return result;

diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 0a4adcd..7e583d5 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -165,6 +165,7 @@ typedef struct transportImpl_
dqStartegyScheme mDQStratScheme;
dqftStrategyScheme mFTStratScheme;

+ uint8_t mInternal;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -282,60 +283,68 @@ mamaTransport_allocate (mamaTransport* result)
*
* Return non-zero if object should be created, otherwise return zero
*/
-static int mamaTransportInternal_cmResponderEnabled (const char* transportName,
+static int mamaTransportInternal_cmResponderEnabled (transportImpl *impl,
+ const char* transportName,
const char* middleware)
{
const char* propValue;
char propString[MAX_PROP_STRING];
char propStringMw[MAX_PROP_STRING];
int retVal;
+ /* Returns. */
+ int ret = 0;

- /* Check for mama.middleware.transport.transportname first */
- retVal=snprintf (propStringMw, MAX_PROP_STRING,
- "mama.%s.transport.%s.%s", middleware,
- transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
-
- if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ /* The CM responder will not be created for an internal transport. */
+ if(impl->mInternal == 0)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
-
- propValue = properties_Get (mamaInternal_getProperties (), propStringMw);
-
- if (NULL==propValue)
- {
- /* We might have specified mama.transport.transportname -
- only look for this after we've tried with middleware */
- retVal = snprintf (propString, MAX_PROP_STRING,
- "mama.transport.%s.%s",
- transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
-
- if ((retVal<0) || (retVal>=MAX_PROP_STRING))
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
-
- propValue = properties_Get (mamaInternal_getProperties (), propString);
- }
-
- /* Return default if we have specified neither mama.middleware.transport...nor
- mama.transport... */
- if (NULL==propValue)
- {
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
- else if (properties_GetPropertyValueAsBoolean (propValue))
- {
- return 1;
- }
- else
- {
- return 0;
+ /* Check for mama.middleware.transport.transportname first */
+ retVal=snprintf (propStringMw, MAX_PROP_STRING,
+ "mama.%s.transport.%s.%s", middleware,
+ transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
+
+ if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+
+ propValue = properties_Get (mamaInternal_getProperties (), propStringMw);
+
+ if (NULL==propValue)
+ {
+ /* We might have specified mama.transport.transportname -
+ only look for this after we've tried with middleware */
+ retVal = snprintf (propString, MAX_PROP_STRING,
+ "mama.transport.%s.%s",
+ transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
+
+ if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+
+ propValue = properties_Get (mamaInternal_getProperties (), propString);
+ }
+
+ /* Return default if we have specified neither mama.middleware.transport...nor
+ mama.transport... */
+ if (NULL==propValue)
+ {
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+ else if (properties_GetPropertyValueAsBoolean (propValue))
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
+ return ret;
}

static void setPreInitialStrategy (mamaTransport transport)
@@ -452,7 +461,8 @@ static int mamaTransportInternal_disableRefreshes(const char* transportName)
char propString[MAX_PROP_STRING];
int retVal;

- retVal=snprintf(propString, MAX_PROP_STRING, "mama.transport.%s.%s",
+ retVal=snprintf (propString, MAX_PROP_STRING,
+ "mama.transport.%s.%s",
transportName ? transportName : "", PROP_NAME_DISABLE_REFRESH);

if ((retVal<0) || (retVal>=MAX_PROP_STRING))
@@ -586,6 +596,7 @@ mamaTransport_create (mamaTransport transport,
const char* throttleInt = NULL;
if (!transport) return MAMA_STATUS_NULL_ARG;
if (!bridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;
+ mama_log(MAMA_LOG_LEVEL_FINER, "Entering mamaTransport_create for transport (%p) with name %s", transport, name);

self->mBridgeImpl = (mamaBridgeImpl*)bridgeImpl;

@@ -822,7 +833,7 @@ mamaTransport_create (mamaTransport transport,

setGroupSizeHint (transport, middleware);

- rval = init (self, mamaTransportInternal_cmResponderEnabled (name, middleware));
+ rval = init (self, mamaTransportInternal_cmResponderEnabled (self, name, middleware));
if (rval != MAMA_STATUS_OK) return rval;


@@ -1083,6 +1094,7 @@ mamaTransport_destroy (mamaTransport transport)
{
int i;
int allTransportsValid;
+ mama_log(MAMA_LOG_LEVEL_FINER, "Entering mamaTransport_destroy for transport (%p)", transport);

if (!self) return MAMA_STATUS_NULL_ARG;
if (!self->mBridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;
@@ -1099,6 +1111,10 @@ mamaTransport_destroy (mamaTransport transport)
}
if (allTransportsValid)
{
+ if(NULL != self->mCmResponder)
+ {
+ mamaCmResponder_destroy (self->mCmResponder);
+ }
/* Inform all listeners that the transport is about to be destroyed. */
mamaTransportImpl_clearTransportWithListeners (self);

@@ -2052,6 +2068,49 @@ mamaTransportImpl_unsetAllPossiblyStale (mamaTransport transport)
}
}

+/**
+ * Return the cause and platform info for the last message processed on the
+ * transport.
+ *
+ * @param transport The transport.
+ * @param cause To return the cause.
+ * @param platformInfo To return the bridge specific info, under no circumstances
+ * should the returned object be deleted.
+ */
+void
+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport transport,
+ short* cause,
+ const void** platformInfo)
+{
+ if (!self)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (): "
+ "NULL transport.");
+ return;
+ }
+
+ *cause = self->mCause;
+ *platformInfo = self->mPlatformInfo;
+}
+
+void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport, short cause, const void *platformInfo)
+{
+ /* Get the impl. */
+ transportImpl *impl = (transportImpl *)transport;
+ if(NULL != impl)
+ {
+ /* Set the cause. */
+ impl->mCause = cause;
+ impl->mPlatformInfo = platformInfo;
+ }
+
+ /* Otherwise write an error log. */
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL transport.");
+ }
+}
void
mamaTransportImpl_getTransportIndex (mamaTransport transport,
int* transportIndex)
@@ -2429,44 +2488,23 @@ mama_status mamaTransport_removePublisher (mamaTransport transport, void *handle
return MAMA_STATUS_OK;
}

-/* *************************************************** */
-/* Internal Functions. */
-/* *************************************************** */
-
-void mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(mamaTransport transport,
- short *cause, const void **platformInfo)
+mama_status mamaTransportImpl_allocateInternalTransport(mamaTransport *transport)
{
- /* Get the impl. */
- transportImpl *impl = (transportImpl *)transport;
- if(NULL != impl)
- {
- /* Return the cause. */
- *cause = impl->mCause;
- *platformInfo = impl->mPlatformInfo;
- }
- else
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(): NULL "
- "transport.");
- }
-}
+ /* Returns. */
+ mama_status ret = MAMA_STATUS_NULL_ARG;

-void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport,
- short cause, const void *platformInfo)
-{
- /* Get the impl. */
- transportImpl *impl = (transportImpl *)transport;
- if(NULL != impl)
- {
- /* Set the cause. */
- impl->mCause = cause;
- impl->mPlatformInfo = (void*)platformInfo;
- }
- else
+ if(NULL != transport)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL "
- "transport.");
+ /* Allocate the transport as normal. */
+ ret = mamaTransport_allocate(transport);
+ if(MAMA_STATUS_OK == ret)
+ {
+ /* Get the impl. */
+ transportImpl *impl = (transportImpl *)*transport;
+
+ /* Set the internal flag. */
+ impl->mInternal = 1;
+ }
}
+ return ret;
}
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index 5ae83c6..a253e3f 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -158,6 +158,11 @@ MAMAExpDLL
extern void
mamaTransportImpl_unsetAllPossiblyStale (mamaTransport tport);

+MAMAExpDLL
+extern void
+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport tport,
+ short* cause,
+ const void** platformInfo);
/*
Get the bridge impl associated with the specified transport.
This will be how other objects gain access to the bridge.
@@ -241,27 +246,19 @@ mama_status mamaTransport_addPublisher(mamaTransport transport, mamaPublisher pu
mama_status mamaTransport_removePublisher(mamaTransport transport, void *handle);
preInitialScheme mamaTransportImpl_getPreInitialScheme (mamaTransport transport);

-#if defined(__cplusplus)
-}
-#endif
-
-
/**
- * This function will return the cause and platform info for the last message
- * processed on the transport.
+ * This function will allocate an internal transport for use with the internal event queue, this sort
+ * of transport is limited and does not support certain features, including
+ * The CM Responder.
*
- * @param[in] transport The transport.
- * @param[out] cause To return the cause.
- * @param[out] platformInfo To return the bridge specific info, under no
- * circumstances should the returned object be deleted.
+ * @param[out] transport To return the transport.
*
+ * @returns mama_status value can be one of:
+ * MAMA_STATUS_NOMEM
+ * MAMA_STATUS_NULL_ARG
+ * MAMA_STATUS_OK
*/
-MAMAExpDLL
-extern void
-mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(
- mamaTransport tport,
- short *cause,
- const void **platformInfo);
+mama_status mamaTransportImpl_allocateInternalTransport(mamaTransport *transport);

/**
* This function will set the cause and platform info for the transport.
--
1.7.7.6


[PATCH 18/50] [mama] Subscription: don't put invalid actions on the throttle queue

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Added code to avoid race conditions when enqueing shutdown logic on the
throttle.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama/subscription.h | 7 +------
mama/c_cpp/src/c/subscription.c | 19 +++++++++++--------
2 files changed, 12 insertions(+), 14 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/subscription.h b/mama/c_cpp/src/c/mama/subscription.h
index 9eb1f0c..27978eb 100644
--- a/mama/c_cpp/src/c/mama/subscription.h
+++ b/mama/c_cpp/src/c/mama/subscription.h
@@ -106,12 +106,7 @@ typedef enum
/* The subscription has been de-allocated, this state is only supported so that the log entry will whenever the subscription
* has finally been freed.
*/
- MAMA_SUBSCRIPTION_DEALLOCATED = 10,
-
- /* The subscription is being re-activated, this state can only occur if the mamaSubscription_activate has been called while
- * the subscription is being deactivated, (i.e. its state is MAMA_SUBSCRIPTION_DEACTIVATING.
- */
- MAMA_SUBSCRIPTION_REACTIVATING = 11
+ MAMA_SUBSCRIPTION_DEALLOCATED = 10

} mamaSubscriptionState;

diff --git a/mama/c_cpp/src/c/subscription.c b/mama/c_cpp/src/c/subscription.c
index 6c73978..8c2497b 100644
--- a/mama/c_cpp/src/c/subscription.c
+++ b/mama/c_cpp/src/c/subscription.c
@@ -1772,7 +1772,7 @@ mama_status mamaSubscriptionImpl_deactivate(mamaSubscriptionImpl *impl)
mamaTransport_removeListener(impl->mTransport, impl->mSubscHandle);

/* If there is a create action on the throttle it must be removed. */
- if(NULL != impl->mAction)
+ if((NULL != throttle) && (NULL != impl->mAction))
{
wombatThrottle_removeAction(throttle, impl->mAction);
}
@@ -2195,7 +2195,7 @@ void MAMACALLTYPE mamaSubscriptionImpl_onSubscriptionDestroyed(mamaSubscription
break;

/* The subscription must be de-activated then re-activated. */
- case MAMA_SUBSCRIPTION_REACTIVATING:
+ case MAMA_SUBSCRIPTION_ACTIVATING:

/* Change the state. */
mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_DEACTIVATED);
@@ -2233,7 +2233,7 @@ void MAMACALLTYPE mamaSubscriptionImpl_onSubscriptionDestroyed(mamaSubscription
mama_status mamaSubscriptionImpl_removeFromThrottle(mamaSubscriptionImpl *impl)
{
/* Returns. */
- mama_status ret = MAMA_STATUS_OK;
+ mama_status ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE;

/* Acquire the lock before anything else is done. */
wlock_lock(impl->mCreateDestroyLock);
@@ -2262,7 +2262,8 @@ mama_status mamaSubscriptionImpl_removeFromThrottle(mamaSubscriptionImpl *impl)
* if the subscription is still in the activating state.
*/
ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE;
- if(MAMA_SUBSCRIPTION_ACTIVATING == wInterlocked_read(&impl->mState))
+ if(MAMA_SUBSCRIPTION_ACTIVATING == wInterlocked_read(&impl->mState) &&
+ NULL != impl->mAction)
{
/* Remove the subscription from the throttle. */
wombatThrottle_removeAction(throttle, impl->mAction);
@@ -2514,6 +2515,7 @@ mama_status mamaSubscription_allocate(mamaSubscription *subscription)
impl->mPreInitialCacheSize = MAMA_SUBSCRIPTION_DEFAULT_PREINITIALCACHESIZE;

/* Set the initial state of the subscription now that the memory has been allocated. */
+ wInterlocked_initialize(&impl->mState);
mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_ALLOCATED);

/* The function has succeeded. */
@@ -2570,7 +2572,7 @@ mama_status mamaSubscription_activate(mamaSubscription subscription)
case MAMA_SUBSCRIPTION_DEACTIVATING:

/* Set the state to indicate that the subscription will be reactivated. */
- mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_REACTIVATING);
+ mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_ACTIVATING);

ret = MAMA_STATUS_OK;
break;
@@ -2914,7 +2916,8 @@ mama_status mamaSubscription_deactivate(mamaSubscription subscription)
/* If invalid state is returned by this function then the subscription has become active
* while waiting for it to be removed from the throttle.
*/
- if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret)
+ if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret &&
+ MAMA_SUBSCRIPTION_ACTIVATED == wInterlocked_read(&impl->mState))
{
/* Invoke this function recursively to process the correct state. */
ret = mamaSubscription_deactivate(subscription);
@@ -3043,7 +3046,8 @@ mama_status mamaSubscription_destroy(mamaSubscription subscription)
/* If invalid state is returned by this function then the subscription has become active
* while waiting for it to be removed from the throttle.
*/
- if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret)
+ if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret &&
+ MAMA_SUBSCRIPTION_ACTIVATED == wInterlocked_read(&impl->mState))
{
/* Invoke this function recursively to process the correct state. */
ret = mamaSubscription_destroy(subscription);
@@ -4251,7 +4255,6 @@ const char* mamaSubscription_stringForState(mamaSubscriptionState state)
case MAMA_SUBSCRIPTION_DESTROYED: return "MAMA_SUBSCRIPTION_DESTROYED";
case MAMA_SUBSCRIPTION_DEALLOCATING: return "MAMA_SUBSCRIPTION_DEALLOCATING";
case MAMA_SUBSCRIPTION_DEALLOCATED: return "MAMA_SUBSCRIPTION_DEALLOCATED";
- case MAMA_SUBSCRIPTION_REACTIVATING: return "MAMA_SUBSCRIPTION_REACTIVATING";
}

return "State not recognised";
--
1.7.7.6


[PATCH 17/50] [mama] Configurable Schemes for data quality

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

The dqStratgyScheme deterines whether or not OpenMAMA ignores duplicate
messages. Valid values are:
DQ_SCHEME_DELIVER_ALL deliver all messages including duplicats
DQ_SCHEME_IGNORE_DUPS do not deliver duplicate messages

The dqftStrategyScheme determines how a subscription behaves when OpenMAMA
detects a fault tolerant take over (the sender id changes for a subscription):
DQ_FT_DO_NOT_WAIT_FOR_RECAP continue to process STALE messages
DQ_FT_WAIT_FOR_RECAP discard stale messages

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/dqstrategy.c | 46 ++++++++++++++--
mama/c_cpp/src/c/dqstrategy.h | 5 ++-
mama/c_cpp/src/c/listenermsgcallback.c | 22 +++++++-
mama/c_cpp/src/c/mama/subscription.h | 12 ++++
mama/c_cpp/src/c/transport.c | 92 ++++++++++++++++++++++++++++++++
5 files changed, 170 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/dqstrategy.c b/mama/c_cpp/src/c/dqstrategy.c
index 8f7ccda..1e495b6 100644
--- a/mama/c_cpp/src/c/dqstrategy.c
+++ b/mama/c_cpp/src/c/dqstrategy.c
@@ -141,7 +141,8 @@ handleFTTakeover (dqStrategy strategy,
int msgType,
mamaDqContext* ctx,
mama_seqnum_t seqNum,
- mama_u64_t senderId)
+ mama_u64_t senderId,
+ int recoverOnRecap)
{
const char* symbol = NULL;
mamaSubscription_getSymbol (self->mSubscription, &symbol);
@@ -151,11 +152,19 @@ handleFTTakeover (dqStrategy strategy,
"Previous SeqNum: %u. New SeqNum: %u. [%s]",
ctx->mSenderId, senderId, ctx->mSeqNum, seqNum, symbol);

+ if (recoverOnRecap)
+ {
+ ctx->mSeqNum = senderId;
+ ctx->mDQState = DQ_STATE_WAITING_FOR_RECAP_AFTER_FT;
+ }
+ else
+ {
resetDqState (strategy, ctx);

/*In all cases we reset the data quality context*/
dqStrategyImpl_resetDqContext (ctx, seqNum, senderId);

+ }
return MAMA_STATUS_OK;
}

@@ -183,6 +192,7 @@ dqStrategy_checkSeqNum (dqStrategy strategy,

mamaMsg_getSeqNum (msg, &seqNum);

+ ctx->mDoNotForward = 0;
if (mamaMsg_getU64 (msg, MamaFieldSenderId.mName, MamaFieldSenderId.mFid,
&senderId) != MAMA_STATUS_OK)
{
@@ -207,7 +217,14 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
mamaStatsCollector_incrementStat (*(mamaInternal_getGlobalStatsCollector()),
MamaStatFtTakeovers.mFid);
}
- return handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId);
+ if (DQ_FT_WAIT_FOR_RECAP==mamaTransportImpl_getFtStrategyScheme(tport))
+ {
+ handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId, 1);
+ }
+ else
+ {
+ return handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId, 0);
+ }
}

if (gMamaLogLevel >= MAMA_LOG_LEVEL_FINER)
@@ -243,7 +260,8 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
if (((ctxDqState == DQ_STATE_NOT_ESTABLISHED) ||
(seqNum == 0) ||
(seqNum == (ctxSeqNum + conflateCnt))) &&
- (ctxDqState != DQ_STATE_WAITING_FOR_RECAP))
+ ((ctxDqState != DQ_STATE_WAITING_FOR_RECAP) ||
+ (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)))
{
/* No gap */
if (self->mTryToFillGap)
@@ -268,7 +286,19 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
return MAMA_STATUS_OK;
}

- if (seqNum == ctxSeqNum)
+ /* For late joins or middlewares that support a publish cache, it is possible that you will get old updates
+ in this case take no action */
+ if (DQ_SCHEME_INGORE_DUPS==mamaTransportImpl_getDqStrategyScheme(tport))
+ {
+ if ((seqNum <= ctxSeqNum) && ((ctxDqState != DQ_STATE_WAITING_FOR_RECAP) ||
+ (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)))
+ {
+ ctx->mDoNotForward = 1;
+ return MAMA_STATUS_OK;
+ }
+ }
+
+ if ((seqNum == ctxSeqNum) && (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT))
{
/* Duplicate data - set DQQuality to DUPLICATE, invoke quality callback */
ctx->mDQState = DQ_STATE_DUPLICATE;
@@ -290,6 +320,13 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
return MAMA_STATUS_OK;
}

+ if (ctxDqState == DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)
+ {
+ ctx->mDoNotForward = 1;
+ return MAMA_STATUS_OK;
+ }
+ else
+ {
/* If we get here, we missed a sequence number. */
if ((PRE_INITIAL_SCHEME_ON_GAP==mamaTransportImpl_getPreInitialScheme(tport))
&&(self->mTryToFillGap))
@@ -323,6 +360,7 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
}

handleStaleData (self, msg, ctx);
+ }
break;
case MAMA_MSG_TYPE_INITIAL :
case MAMA_MSG_TYPE_BOOK_INITIAL :
diff --git a/mama/c_cpp/src/c/dqstrategy.h b/mama/c_cpp/src/c/dqstrategy.h
index a7ff6a7..d550e95 100644
--- a/mama/c_cpp/src/c/dqstrategy.h
+++ b/mama/c_cpp/src/c/dqstrategy.h
@@ -50,7 +50,9 @@ typedef enum dqState_
* In the case of a stale initial, we do not want
* a recap because it may also be stale data.
*/
- DQ_STATE_STALE_NO_RECAP = 5
+ DQ_STATE_STALE_NO_RECAP = 5,
+
+ DQ_STATE_WAITING_FOR_RECAP_AFTER_FT = 6
} dqState;

typedef struct
@@ -64,6 +66,7 @@ typedef struct
imageRequest mRecapRequest;
mama_u64_t mSenderId;

+ uint8_t mDoNotForward;
} mamaDqContext;

typedef struct dqStrategy_* dqStrategy;
diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c
index dca5474..cbe1c90 100644
--- a/mama/c_cpp/src/c/listenermsgcallback.c
+++ b/mama/c_cpp/src/c/listenermsgcallback.c
@@ -449,7 +449,16 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
case MAMA_MSG_TYPE_QUOTE:
case MAMA_MSG_TYPE_TRADE:
mamaSubscription_checkSeqNum(subscription, msg, msgType, ctx);
- mamaSubscription_forwardMsg(subscription, msg);
+ if (!ctx->mDqContext.mDoNotForward)
+ {
+ mamaSubscription_forwardMsg(subscription, msg);
+ }
+ else
+ {
+ mamaSubscription_getSymbol (subscription, &userSymbol);
+ mama_log (MAMA_LOG_LEVEL_FINER, "Subscription for %s not forwarded"
+ " as message seqnum is before seqnum expecting", userSymbol);
+ }
break;
case MAMA_MSG_TYPE_REFRESH:
mamaSubscription_respondToRefreshMessage(subscription);
@@ -468,7 +477,16 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
break;
default:
mamaSubscription_checkSeqNum(subscription, msg, msgType, ctx);
- mamaSubscription_forwardMsg(subscription, msg);
+ if (!ctx->mDqContext.mDoNotForward)
+ {
+ mamaSubscription_forwardMsg(subscription, msg);
+ }
+ else
+ {
+ mamaSubscription_getSymbol (subscription, &userSymbol);
+ mama_log (MAMA_LOG_LEVEL_FINER, "Subscription for %s not forwarded"
+ " as message seqnum is before seqnum expecting", userSymbol);
+ }
}
}

diff --git a/mama/c_cpp/src/c/mama/subscription.h b/mama/c_cpp/src/c/mama/subscription.h
index 058a41c..9eb1f0c 100644
--- a/mama/c_cpp/src/c/mama/subscription.h
+++ b/mama/c_cpp/src/c/mama/subscription.h
@@ -115,6 +115,18 @@ typedef enum

} mamaSubscriptionState;

+typedef enum
+{
+ DQ_SCHEME_DELIVER_ALL,
+ DQ_SCHEME_INGORE_DUPS
+} dqStartegyScheme;
+
+
+typedef enum
+{
+ DQ_FT_DO_NOT_WAIT_FOR_RECAP,
+ DQ_FT_WAIT_FOR_RECAP
+}dqftStrategyScheme;
/* *************************************************** */
/* Type Defines. */
/* *************************************************** */
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 20ee0cc..0a4adcd 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -162,6 +162,9 @@ typedef struct transportImpl_

uint8_t mDisableDisconnectCb;
uint8_t mDisableRefresh;
+ dqStartegyScheme mDQStratScheme;
+ dqftStrategyScheme mFTStratScheme;
+
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -177,6 +180,8 @@ init (transportImpl* transport, int createResponder)
self->mCause = 0;
self->mPlatformInfo = NULL;
self->mPreInitialScheme = PRE_INITIAL_SCHEME_ON_GAP;
+ self->mDQStratScheme = DQ_SCHEME_DELIVER_ALL;
+ self->mFTStratScheme = DQ_FT_DO_NOT_WAIT_FOR_RECAP;


mama_log (MAMA_LOG_LEVEL_FINEST,
@@ -365,6 +370,72 @@ static void setPreInitialStrategy (mamaTransport transport)
"%s: Using default preinitial strategy: ON_GAP", self->mName);
}
}
+static void setDQStrategy (mamaTransport transport)
+{
+ const char* propValue = NULL;
+ char propNameBuf[256];
+
+ if (!self) return;
+
+ snprintf (propNameBuf, 256, "mama.transport.%s.dqstrategy", self->mName);
+
+ propValue = properties_Get(mamaInternal_getProperties(),
+ propNameBuf);
+
+ if (NULL!=propValue)
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL, "Setting %s=%s",
+ propNameBuf, propValue);
+
+ if (0==strcmp (propValue, "ignoredups"))
+ {
+ self->mDQStratScheme = DQ_SCHEME_INGORE_DUPS;
+ }
+ else
+ {
+ self->mDQStratScheme = DQ_SCHEME_DELIVER_ALL;
+ }
+ }
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL,
+ "%s: Using default dq strategy: DQ_SCHEME_DELIVER_ALL", self->mName);
+ }
+}
+
+static void setFtStrategy (mamaTransport transport)
+{
+ const char* propValue = NULL;
+ char propNameBuf[256];
+
+ if (!self) return;
+
+ snprintf (propNameBuf, 256, "mama.transport.%s.ftstrategy", self->mName);
+
+ propValue = properties_Get(mamaInternal_getProperties(),
+ propNameBuf);
+
+ if (NULL!=propValue)
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL, "Setting %s=%s",
+ propNameBuf, propValue);
+
+ if (0==strcmp (propValue, "waitforrecap"))
+ {
+ self->mFTStratScheme = DQ_FT_WAIT_FOR_RECAP;
+ }
+ else
+ {
+ self->mFTStratScheme = DQ_FT_DO_NOT_WAIT_FOR_RECAP;
+ }
+ }
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL,
+ "%s: Using default ft strategy: DQ_FT_DO_NOT_WAIT_FOR_RECAP", self->mName);
+ }
+}
+
/**
* Check property to disable refresh messages. Undocumented.
*
@@ -756,6 +827,8 @@ mamaTransport_create (mamaTransport transport,


setPreInitialStrategy ((mamaTransport)self);
+ setDQStrategy (self);
+ setFtStrategy (self);

if (mamaTransportImpl_disableDisconnectCb (name))
{
@@ -1584,6 +1657,25 @@ mamaTransportImpl_getPreInitialScheme (mamaTransport transport)
return PRE_INITIAL_SCHEME_ON_GAP;
}

+dqStartegyScheme
+mamaTransportImpl_getDqStrategyScheme (mamaTransport transport)
+{
+ if (self)
+ {
+ return self->mDQStratScheme;
+ }
+ return DQ_SCHEME_DELIVER_ALL;
+}
+
+dqftStrategyScheme
+mamaTransportImpl_getFtStrategyScheme (mamaTransport transport)
+{
+ if (self)
+ {
+ return self->mFTStratScheme;
+ }
+ return DQ_FT_DO_NOT_WAIT_FOR_RECAP;
+}
/* Process an advisory message and invokes callbacks
* on all listeners.
* @param transport The transport.
--
1.7.7.6


[PATCH 16/50] [mama] dqpublisher destroy

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Added proper destroy implementation for dqpublisher to allow applications to
clean up and shutdown properly.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/dqpublishermanager.c | 48 +++++++++++++++++++++++++++++++++
1 files changed, 48 insertions(+), 0 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index ecf6ced..156b586 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -285,11 +285,59 @@ mama_status mamaDQPublisherManager_create (

void mamaDQPublisherManager_destroy (mamaDQPublisherManager manager)
{
+ /* Get the impl. */
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
+ if(NULL != impl)
+ {
+ /* Destroy the publisher. */
+ if(NULL != impl->mPublisher)
+ {
+ mamaPublisher_destroy(impl->mPublisher);
+ }
+
+ /* Destroy the subscription. */
+ if(NULL != impl->mSubscription)
+ {
+ mamaSubscription_destroy(impl->mSubscription);
+ mamaSubscription_deallocate(impl->mSubscription);
+ }
+
+ /* Destroy the inbox. */
+ if(NULL != impl->mInbox)
+ {
+ mamaInbox_destroy(impl->mInbox);
+ }

+ /* Destroy the re-usable messages. */
+ if(NULL != impl->mRefreshResponseMsg)
+ {
+ mamaMsg_destroy(impl->mRefreshResponseMsg);
+ }
+ if(NULL != impl->mNoSubscribersMsg)
+ {
+ mamaMsg_destroy(impl->mNoSubscribersMsg);
+ }
+ if(NULL != impl->mSyncRequestMsg)
+ {
+ mamaMsg_destroy(impl->mSyncRequestMsg);
+ }
+
+ /* Free the namespace. */
+ if(NULL != impl->mNameSpace)
+ {
+ free(impl->mNameSpace);
+ }
+
+ /* Destroy the publisher table. */
+ if(NULL != impl->mPublisherMap)
+ {
wtable_destroy ( impl->mPublisherMap );
}

+ /* Free the impl itself. */
+ free(impl);
+ }
+}
mama_status mamaDQPublisherManager_addPublisher (
mamaDQPublisherManager manager,
const char *symbol,
--
1.7.7.6


[PATCH 15/50] [mama] dqpublisher manager bug fixes

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Only invoke the application on create callback if it is not NULL. This avoids
core dumps if the application does not provide a callback.

The manager was not maintaining the publisher map correctly. It allocated new
entries when an entry already existed, and did not remove entries properly.

Create the CM subscription after the other objects are created. Creating it
early could result in CM requests invoking callbacks before the manager fully
initializes.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/dqpublishermanager.c | 28 ++++++++++++----------------
1 files changed, 12 insertions(+), 16 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index 0ee62de..ecf6ced 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -114,7 +114,9 @@ dqPublisherImplCreateCb (mamaSubscription subsc, void* closure)
{
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) (closure);

- impl->mUserCallbacks.onCreate ((mamaDQPublisherManager)impl);
+ if(NULL != impl)
+ if(NULL != impl->mUserCallbacks.onCreate)
+ impl->mUserCallbacks.onCreate ((mamaDQPublisherManager)impl);
}


@@ -263,8 +265,14 @@ mama_status mamaDQPublisherManager_create (
impl->mNameSpace = strdup(sourcename);
strcat(topic, ".");
strcat(topic, sourcename);
+ impl->mPublisherMap = wtable_create (topic, NUM_BUCKETS);

mamaSubscription_allocate (&impl->mSubscription);
+ mamaPublisher_create (&impl->mPublisher,
+ transport,
+ MAMA_CM_TOPIC,
+ NULL,
+ NULL);
mamaSubscription_createBasic (impl->mSubscription,
transport,
queue,
@@ -272,14 +280,6 @@ mama_status mamaDQPublisherManager_create (
topic,
impl);

- mamaPublisher_create (&impl->mPublisher,
- transport,
- MAMA_CM_TOPIC,
- NULL,
- NULL);
-
- impl->mPublisherMap = wtable_create (topic, NUM_BUCKETS);
-
return MAMA_STATUS_OK;
}

@@ -298,15 +298,10 @@ mama_status mamaDQPublisherManager_addPublisher (
{
mamaPublishTopic* newTopic = NULL;
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
-
- if (wtable_lookup (impl->mPublisherMap , ( char* )symbol))
- return (MAMA_STATUS_INVALID_ARG);
-
-
newTopic = (mamaPublishTopic*) wtable_lookup (impl->mPublisherMap, (char*)symbol);


- if (newTopic)
+ if (!newTopic)
{
newTopic = (mamaPublishTopic*) calloc (1, sizeof (mamaPublishTopic));

@@ -408,10 +403,11 @@ mama_status mamaDQPublisherManager_destroyPublisher (
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
mamaPublishTopic* newTopic = NULL;

- if ((newTopic = wtable_lookup (impl->mPublisherMap , ( char* )symbol)))
+ if (!(newTopic = wtable_lookup (impl->mPublisherMap , ( char* )symbol)))
return (MAMA_STATUS_INVALID_ARG);

mamaDQPublisher_destroy(newTopic->pub);
+ wtable_remove (impl->mPublisherMap, symbol);

free ((void*)newTopic->symbol);
free ((void*)newTopic);
--
1.7.7.6


[PATCH 14/50] [mama] Add cache and closure to dqpublisher

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

These are both application supplied callbacks. The closure is intended for
application specific context for the publisher and the cache is for caching
data that the publisher may send. In practice the application may use them as
they see fit.

The follow accessors get and set these values:
mamaDQPublisher_setClosure()
mamaDQPublisher_getClosure()
mamaDQPublisher_setCache()
mamaDQPublisher_getCache()

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/dqpublisher.c | 37 +++++++++++++++++++++++++++++++++
mama/c_cpp/src/c/dqpublishermanager.c | 4 +++
mama/c_cpp/src/c/mama/dqpublisher.h | 15 +++++++++++++
3 files changed, 56 insertions(+), 0 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublisher.c b/mama/c_cpp/src/c/dqpublisher.c
index 4363a1f..d20cc1f 100644
--- a/mama/c_cpp/src/c/dqpublisher.c
+++ b/mama/c_cpp/src/c/dqpublisher.c
@@ -33,6 +33,8 @@ typedef struct mamaDQPublisherImpl_
mamaMsgStatus mStatus;
uint64_t mSenderId;
mama_seqnum_t mSeqNum;
+ void* mClosure;
+ void* mCache;
} mamaDQPublisherImpl;


@@ -300,3 +302,38 @@ void mamaDQPublisher_setSeqNum (mamaDQPublisher pub, mama_seqnum_t num)
}


+void mamaDQPublisher_setClosure (mamaDQPublisher pub, void* closure)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+ if (impl)
+ impl->mClosure = closure;
+}
+
+
+void* mamaDQPublisher_getClosure (mamaDQPublisher pub)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+
+ if (impl)
+ return impl->mClosure;
+ else
+ return NULL;
+}
+
+void mamaDQPublisher_setCache (mamaDQPublisher pub, void* cache)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+ if (impl)
+ impl->mCache = cache;
+}
+
+
+void* mamaDQPublisher_getCache (mamaDQPublisher pub)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+
+ if (impl)
+ return impl->mCache;
+ else
+ return NULL;
+}
diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index c1bd7d0..0ee62de 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -381,6 +381,10 @@ mama_status mamaDQPublisherManager_createPublisher (
return status;
}

+ mamaDQPublisher_setCache(*newPublisher, cache);
+ mamaDQPublisher_setSenderId(*newPublisher, impl->mSenderId);
+ mamaDQPublisher_setStatus(*newPublisher, impl->mStatus);
+ mamaDQPublisher_setSeqNum(*newPublisher, impl->mSeqNum);
if (wtable_insert (impl->mPublisherMap, (char*)symbol, newTopic) != 1)
{
mamaDQPublisher_destroy(*newPublisher);
diff --git a/mama/c_cpp/src/c/mama/dqpublisher.h b/mama/c_cpp/src/c/mama/dqpublisher.h
index c8ae155..0f62030 100644
--- a/mama/c_cpp/src/c/mama/dqpublisher.h
+++ b/mama/c_cpp/src/c/mama/dqpublisher.h
@@ -85,6 +85,7 @@ extern mama_status
mamaDQPublisher_sendReplyWithHandle (mamaDQPublisher pub,
mamaMsgReply replyAddress,
mamaMsg reply);
+
MAMAExpDLL
extern void
mamaDQPublisher_destroy (mamaDQPublisher pub);
@@ -103,7 +104,21 @@ MAMAExpDLL
extern void
mamaDQPublisher_setSeqNum (mamaDQPublisher pub, mama_seqnum_t num);

+MAMAExpDLL
+extern void
+mamaDQPublisher_setClosure (mamaDQPublisher pub, void* closure);
+
+MAMAExpDLL
+extern void*
+mamaDQPublisher_getClosure (mamaDQPublisher pub);

+MAMAExpDLL
+extern void
+mamaDQPublisher_setCache (mamaDQPublisher pub, void* cache);
+
+MAMAExpDLL
+extern void*
+mamaDQPublisher_getCache (mamaDQPublisher pub);

#if defined( __cplusplus )
}
--
1.7.7.6


[PATCH 13/50] [mama] mamaDQPublisher_sendReplyWithHandle

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

This method allows applications to use a reply handle rather than detaching and
holding on to the original message. Since OpenMAMA attempts to reuse messages
this may improve performance for some applications.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/dqpublisher.c | 105 ++++++++++++++++++++++++++++++++++-
mama/c_cpp/src/c/mama/dqpublisher.h | 5 ++
2 files changed, 107 insertions(+), 3 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublisher.c b/mama/c_cpp/src/c/dqpublisher.c
index a875399..4363a1f 100644
--- a/mama/c_cpp/src/c/dqpublisher.c
+++ b/mama/c_cpp/src/c/dqpublisher.c
@@ -157,17 +157,116 @@ mama_status mamaDQPublisher_sendReply (mamaDQPublisher pub,
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);

- mamaMsg_updateU8(reply, NULL, MamaFieldMsgStatus.mFid, impl->mStatus);
+ if(MAMA_STATUS_OK != mamaMsg_updateU8(reply,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(reply,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }

if (impl->mSenderId != 0)
- mamaMsg_updateU64(reply, NULL, MamaFieldSenderId.mFid, impl->mSenderId);
+ {
+ mamaMsgField senderIdField = NULL;
+ if (MAMA_STATUS_OK == mamaMsg_getField(reply,
+ MamaFieldSenderId.mName, MamaFieldSenderId.mFid,
+ &senderIdField))
+ {
+ mamaFieldType senderIdType = MAMA_FIELD_TYPE_UNKNOWN;
+ if (MAMA_STATUS_OK == mamaMsgField_getType(senderIdField,
+ &senderIdType))
+ {
+ switch(senderIdType)
+ {
+ case MAMA_FIELD_TYPE_U16:
+ mamaMsgField_updateU16(senderIdField,
+ (mama_u16_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U32:
+ mamaMsgField_updateU32(senderIdField,
+ (mama_u32_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U64:
+ default:
+ mamaMsgField_updateU64(senderIdField,
+ impl->mSenderId);
+ break;
+ }
+ }
+ }
+ else
+ mamaMsg_addU64(reply, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, impl->mSenderId);
+ }

if (impl->mSeqNum != 0)
- mamaMsg_updateU32(reply, NULL, MamaFieldSeqNum.mFid, impl->mSeqNum);
+ {
+ mamaMsg_updateU32(reply, MamaFieldSeqNum.mName, MamaFieldSeqNum.mFid,
+ impl->mSeqNum);
+ }

return (mamaPublisher_sendReplyToInbox (impl->mPublisher, request, reply));
}

+mama_status mamaDQPublisher_sendReplyWithHandle (mamaDQPublisher pub,
+ mamaMsgReply replyAddress,
+ mamaMsg reply)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+
+ if(MAMA_STATUS_OK != mamaMsg_updateU8(reply,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(reply,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }
+
+ if (impl->mSenderId != 0)
+ {
+ mamaMsgField senderIdField = NULL;
+ if (MAMA_STATUS_OK == mamaMsg_getField(reply,
+ MamaFieldSenderId.mName, MamaFieldSenderId.mFid,
+ &senderIdField))
+ {
+ mamaFieldType senderIdType = MAMA_FIELD_TYPE_UNKNOWN;
+ if (MAMA_STATUS_OK == mamaMsgField_getType(senderIdField,
+ &senderIdType))
+ {
+ switch(senderIdType)
+ {
+ case MAMA_FIELD_TYPE_U16:
+ mamaMsgField_updateU16(senderIdField,
+ (mama_u16_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U32:
+ mamaMsgField_updateU32(senderIdField,
+ (mama_u32_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U64:
+ default:
+ mamaMsgField_updateU64(senderIdField,
+ impl->mSenderId);
+ break;
+ }
+ }
+ }
+ else
+ mamaMsg_addU64(reply, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, impl->mSenderId);
+ }
+
+ if (impl->mSeqNum != 0)
+ {
+ mamaMsg_updateU32(reply, MamaFieldSeqNum.mName, MamaFieldSeqNum.mFid,
+ impl->mSeqNum);
+ }
+
+ return (mamaPublisher_sendReplyToInboxHandle (impl->mPublisher,
+ replyAddress, reply));
+}

void mamaDQPublisher_destroy (mamaDQPublisher pub)
{
diff --git a/mama/c_cpp/src/c/mama/dqpublisher.h b/mama/c_cpp/src/c/mama/dqpublisher.h
index f792656..c8ae155 100644
--- a/mama/c_cpp/src/c/mama/dqpublisher.h
+++ b/mama/c_cpp/src/c/mama/dqpublisher.h
@@ -81,6 +81,11 @@ mamaDQPublisher_sendReply (mamaDQPublisher pub, mamaMsg request,


MAMAExpDLL
+extern mama_status
+mamaDQPublisher_sendReplyWithHandle (mamaDQPublisher pub,
+ mamaMsgReply replyAddress,
+ mamaMsg reply);
+MAMAExpDLL
extern void
mamaDQPublisher_destroy (mamaDQPublisher pub);

--
1.7.7.6


[PATCH 12/50] [mama][dqpublisher] Improved Message Handling

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Since the caller may pass messages with the reserved files (ie. senderid,
seqno, etc.) populated, the dq publisher should try to be robust and update the
fields if the type is reasonable. Prior to this change they would fail if the
types did not match the expected type exactly. Now if the status, for example,
is a 16 bit integer, it will succeed even though we expect and 8 bit integer.

Also fixed formatting and whitespace.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/dqpublisher.c | 86 ++++++++++++++++++++++++---------
mama/c_cpp/src/c/dqpublishermanager.c | 10 +++-
2 files changed, 71 insertions(+), 25 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublisher.c b/mama/c_cpp/src/c/dqpublisher.c
index 677c117..a875399 100644
--- a/mama/c_cpp/src/c/dqpublisher.c
+++ b/mama/c_cpp/src/c/dqpublisher.c
@@ -29,9 +29,9 @@

typedef struct mamaDQPublisherImpl_
{
- mamaPublisher mPublisher;
- mamaMsgStatus mStatus;
- uint64_t mSenderId;
+ mamaPublisher mPublisher;
+ mamaMsgStatus mStatus;
+ uint64_t mSenderId;
mama_seqnum_t mSeqNum;
} mamaDQPublisherImpl;

@@ -45,11 +45,11 @@ mama_status mamaDQPublisher_allocate (mamaDQPublisher* result)

*result = impl;

- return MAMA_STATUS_OK;
+ return MAMA_STATUS_OK;
}

mama_status mamaDQPublisher_create (mamaDQPublisher pub, mamaTransport transport,
- const char* topic)
+ const char* topic)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
mama_status status = MAMA_STATUS_OK;
@@ -61,7 +61,7 @@ mama_status mamaDQPublisher_create (mamaDQPublisher pub, mamaTransport transport
NULL,
NULL);

- if (status == MAMA_STATUS_OK)
+ if (status == MAMA_STATUS_OK)
{
impl->mSenderId = mamaSenderId_getSelf ();
impl->mStatus = MAMA_MSG_STATUS_OK;
@@ -76,36 +76,79 @@ mama_status mamaDQPublisher_send (mamaDQPublisher pub, mamaMsg msg)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);

- if (impl->mSeqNum != 0)
+ if (impl->mSeqNum != 0)
{
switch (mamaMsgType_typeForMsg (msg))
{
- case MAMA_MSG_TYPE_REFRESH :
- case MAMA_MSG_TYPE_SYNC_REQUEST :
- case MAMA_MSG_TYPE_MISC :
- case MAMA_MSG_TYPE_NOT_PERMISSIONED :
- case MAMA_MSG_TYPE_NOT_FOUND :
- break;
-
+ case MAMA_MSG_TYPE_REFRESH :
+ case MAMA_MSG_TYPE_SYNC_REQUEST :
+ case MAMA_MSG_TYPE_MISC :
+ case MAMA_MSG_TYPE_NOT_PERMISSIONED :
+ case MAMA_MSG_TYPE_NOT_FOUND :
+ break;
case MAMA_MSG_TYPE_INITIAL :
case MAMA_MSG_TYPE_BOOK_INITIAL :
case MAMA_MSG_TYPE_RECAP :
case MAMA_MSG_TYPE_BOOK_RECAP :
- mamaMsg_updateU8(msg,NULL, MamaFieldMsgStatus.mFid, impl->mStatus);
+ if(MAMA_STATUS_OK !=
+ mamaMsg_updateU8(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }
break;

default:
- mamaMsg_updateU8(msg,NULL, MamaFieldMsgStatus.mFid, impl->mStatus);
+ if(MAMA_STATUS_OK !=
+ mamaMsg_updateU8(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }
impl->mSeqNum++;
break;
}
- mamaMsg_updateU32(msg, NULL, MamaFieldSeqNum.mFid, impl->mSeqNum);
+ mamaMsg_updateU32(msg, MamaFieldSeqNum.mName, MamaFieldSeqNum.mFid,
+ impl->mSeqNum);
}

if (impl->mSenderId != 0)
- mamaMsg_updateU64(msg, NULL, MamaFieldSenderId.mFid, impl->mSenderId);
+ {
+ mamaMsgField senderIdField = NULL;
+ if (MAMA_STATUS_OK == mamaMsg_getField(msg, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, &senderIdField))
+ {
+ mamaFieldType senderIdType = MAMA_FIELD_TYPE_UNKNOWN;
+ if (MAMA_STATUS_OK == mamaMsgField_getType(senderIdField,
+ &senderIdType))
+ {
+ switch(senderIdType)
+ {
+ case MAMA_FIELD_TYPE_U16:
+ mamaMsgField_updateU16(senderIdField,
+ (mama_u16_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U32:
+ mamaMsgField_updateU32(senderIdField,
+ (mama_u32_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U64:
+ default:
+ mamaMsgField_updateU64(senderIdField, impl->mSenderId);
+ break;
+ }
+ }
+ }
+ else
+ mamaMsg_addU64(msg, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, impl->mSenderId);
+ }

- return (mamaPublisher_send (impl->mPublisher, msg));
+ return (mamaPublisher_send (impl->mPublisher, msg));
}

mama_status mamaDQPublisher_sendReply (mamaDQPublisher pub,
@@ -142,21 +185,18 @@ void mamaDQPublisher_destroy (mamaDQPublisher pub)
void mamaDQPublisher_setStatus (mamaDQPublisher pub, mamaMsgStatus status)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
impl->mStatus = status;
}

void mamaDQPublisher_setSenderId (mamaDQPublisher pub, uint64_t senderid)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
- impl->mSenderId = senderid;
+ impl->mSenderId = senderid;
}

void mamaDQPublisher_setSeqNum (mamaDQPublisher pub, mama_seqnum_t num)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
impl->mSeqNum=num;
}

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index 71026ed..c1bd7d0 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -215,6 +215,9 @@ mama_status mamaDQPublisherManager_allocate(mamaDQPublisherManager* result )
calloc (1, sizeof (mamaDQPublisherManagerImpl));

if (!impl) return MAMA_STATUS_NOMEM;
+ impl->mSenderId = mamaSenderId_getSelf ();
+ impl->mStatus = MAMA_MSG_STATUS_OK;
+ impl->mSeqNum = 1;

*result = impl;

@@ -225,7 +228,10 @@ void* mamaDQPublisherManager_getClosure (mamaDQPublisherManager manager)
{
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;

- return impl->mClosure;
+ if(NULL != impl)
+ return impl->mClosure;
+ else
+ return NULL;
}


@@ -353,7 +359,7 @@ mama_status mamaDQPublisherManager_createPublisher (
mama_status status = MAMA_STATUS_OK;
char topic[80];

- newTopic = (mamaPublishTopic*) wtable_lookup (impl->mPublisherMap, (char*)symbol);
+ newTopic = (mamaPublishTopic*)wtable_lookup (impl->mPublisherMap, (char*)symbol);

if (!newTopic)
{
--
1.7.7.6


[PATCH 11/50] [mama] minor datetime fixes

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

When formatting a date time with a time, separate the date and time with a
space.

Added parameter checking to makeTime to handle invalid values sanely: set the
year to 1970 if prior to beginning of epoch, default month to 0 (Jan) for
invalid values, and set invalid days to 0.

Also converted tabs to spaces. Use "git diff -b" to ignore white space.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/datetime.c | 155 ++++++++++++++++++++++++-------------------
1 files changed, 88 insertions(+), 67 deletions(-)

diff --git a/mama/c_cpp/src/c/datetime.c b/mama/c_cpp/src/c/datetime.c
index 1cb8cde..267bd96 100644
--- a/mama/c_cpp/src/c/datetime.c
+++ b/mama/c_cpp/src/c/datetime.c
@@ -662,11 +662,11 @@ mamaDateTime_addMicroseconds(mamaDateTime dateTime,
{
if(addMicroseconds >= -1000000)
{
- mamaDateTime_addWholeSeconds (dateTime, -1);
+ mamaDateTime_addWholeSeconds (dateTime, -1);
}
else
{
- mamaDateTime_addWholeSeconds (dateTime, addMicroseconds/1000000 - 1);
+ mamaDateTime_addWholeSeconds (dateTime, addMicroseconds/1000000 - 1);
}
addMicroseconds %= 1000000;
if(addMicroseconds < 0) addMicroseconds += 1000000;
@@ -796,72 +796,72 @@ mamaDateTime_getEpochTimeSeconds(const mamaDateTime dateTime,

mama_status mamaDateTime_addTodayToDateTime(mamaDateTime destination)
{
- mama_status ret = MAMA_STATUS_SYSTEM_ERROR;
-
- /* Create a new date time. */
- mama_time_t time;
- mamaDateTimeImpl_clear(time);
- {
- /* Get the current time of day. */
- struct timeval now;
- memset(&now, 0, sizeof(now));
- if(gettimeofday(&now, NULL) == 0)
- {
- /* Initialise the time structure with the date. */
- mamaDateTimeImpl_setSeconds (time, now.tv_sec);
- mamaDateTimeImpl_setMicroSeconds (time, now.tv_usec);
- mamaDateTimeImpl_setPrecision (time, 6);
- mamaDateTimeImpl_setHasDate (time);
- mamaDateTimeImpl_setHasTime (time);
- {
- /* Get existing number of seconds and remove any full-day seconds
- * from the destination.
- */
- mama_u32_t tmpSeconds = mamaDateTimeImpl_getSeconds(*destination) % SECONDS_IN_A_DAY;
-
- /* Append the date from today. */
- tmpSeconds += (mamaDateTimeImpl_getSeconds(time) / SECONDS_IN_A_DAY) * SECONDS_IN_A_DAY;
-
- /* Set the result in the destination. */
- mamaDateTimeImpl_setSeconds(*destination, tmpSeconds);
- mamaDateTimeImpl_setHasDate(*destination);
-
- /* Function succeeded. */
- ret = MAMA_STATUS_OK;
- }
- }
- }
-
- return ret;
+ mama_status ret = MAMA_STATUS_SYSTEM_ERROR;
+
+ /* Create a new date time. */
+ mama_time_t time;
+ mamaDateTimeImpl_clear(time);
+ {
+ /* Get the current time of day. */
+ struct timeval now;
+ memset(&now, 0, sizeof(now));
+ if(gettimeofday(&now, NULL) == 0)
+ {
+ /* Initialise the time structure with the date. */
+ mamaDateTimeImpl_setSeconds (time, now.tv_sec);
+ mamaDateTimeImpl_setMicroSeconds (time, now.tv_usec);
+ mamaDateTimeImpl_setPrecision (time, 6);
+ mamaDateTimeImpl_setHasDate (time);
+ mamaDateTimeImpl_setHasTime (time);
+ {
+ /* Get existing number of seconds and remove any full-day seconds
+ * from the destination.
+ */
+ mama_u32_t tmpSeconds = mamaDateTimeImpl_getSeconds(*destination) % SECONDS_IN_A_DAY;
+
+ /* Append the date from today. */
+ tmpSeconds += (mamaDateTimeImpl_getSeconds(time) / SECONDS_IN_A_DAY) * SECONDS_IN_A_DAY;
+
+ /* Set the result in the destination. */
+ mamaDateTimeImpl_setSeconds(*destination, tmpSeconds);
+ mamaDateTimeImpl_setHasDate(*destination);
+
+ /* Function succeeded. */
+ ret = MAMA_STATUS_OK;
+ }
+ }
+ }
+
+ return ret;
}

mama_status
mamaDateTime_getEpochTimeSecondsWithCheck(const mamaDateTime dateTime,
mama_f64_t* seconds)
{
- mama_status ret = MAMA_STATUS_NULL_ARG;
- if((dateTime != NULL) && (seconds != NULL))
- {
- /* Check to see if the date time has got a date value set. */
- uint8_t hasDate = mamaDateTimeImpl_getHasDate(*dateTime);
-
- /* No date value present, add today's date to the time value. */
- if(hasDate == 0)
- {
- ret = mamaDateTime_addTodayToDateTime(dateTime);
- if(ret == MAMA_STATUS_OK)
- {
- /* Get the seconds as normal. */
- ret = mamaDateTime_getEpochTimeSecondsWithTz(dateTime, seconds, NULL);
- }
- }
-
- /* Date value present, get the seconds as normal. */
- else
- {
- ret = mamaDateTime_getEpochTimeSecondsWithTz(dateTime, seconds, NULL);
- }
- }
+ mama_status ret = MAMA_STATUS_NULL_ARG;
+ if((dateTime != NULL) && (seconds != NULL))
+ {
+ /* Check to see if the date time has got a date value set. */
+ uint8_t hasDate = mamaDateTimeImpl_getHasDate(*dateTime);
+
+ /* No date value present, add today's date to the time value. */
+ if(hasDate == 0)
+ {
+ ret = mamaDateTime_addTodayToDateTime(dateTime);
+ if(ret == MAMA_STATUS_OK)
+ {
+ /* Get the seconds as normal. */
+ ret = mamaDateTime_getEpochTimeSecondsWithTz(dateTime, seconds, NULL);
+ }
+ }
+
+ /* Date value present, get the seconds as normal. */
+ else
+ {
+ ret = mamaDateTime_getEpochTimeSecondsWithTz(dateTime, seconds, NULL);
+ }
+ }

return ret;
}
@@ -973,6 +973,7 @@ mama_status mamaDateTime_getAsString (const mamaDateTime dateTime,
struct tm tmValue;
size_t bytesUsed;
size_t precision;
+ uint8_t hasTime = 0;

if (!dateTime || !buf)
return MAMA_STATUS_INVALID_ARG;
@@ -980,16 +981,20 @@ mama_status mamaDateTime_getAsString (const mamaDateTime dateTime,
seconds = (time_t) mamaDateTimeImpl_getSeconds(*dateTime);
utcTm (&tmValue, seconds);
buf[0] = '\0';
+ hasTime = mamaDateTimeImpl_getHasTime (*dateTime);
if (mamaDateTimeImpl_getHasDate(*dateTime))
{
- bytesUsed = strftime (buf, bufMaxLen, "%Y-%m-%d ", &tmValue);
+ if (hasTime)
+ bytesUsed = strftime (buf, bufMaxLen, "%Y-%m-%d ", &tmValue);
+ else
+ bytesUsed = strftime (buf, bufMaxLen, "%Y-%m-%d", &tmValue);
if (bytesUsed > 0)
{
buf += bytesUsed;
bufMaxLen -= bytesUsed;
}
}
- if (mamaDateTimeImpl_getHasTime(*dateTime))
+ if (hasTime)
{
bytesUsed = strftime (buf, bufMaxLen, "%H:%M:%S", &tmValue);
if (bytesUsed > 0)
@@ -1645,9 +1650,25 @@ unsigned long makeTime (
{
struct tm timeInfo;

- timeInfo.tm_year = year - 1900;
- timeInfo.tm_mon = mon - 1;
- timeInfo.tm_mday = day;
+ /* tm_year stores the years since 1900
+ * must not be less than 70 */
+ if (year > 1970)
+ timeInfo.tm_year = year - 1900;
+ else
+ timeInfo.tm_year = 70;
+
+ /* tm_mon stores the months since Jan (0 to 11) */
+ if (mon > 0)
+ timeInfo.tm_mon = mon - 1;
+ else
+ timeInfo.tm_mon = 0;
+
+ /* day of the month (1 to 31) */
+ if (day > 0)
+ timeInfo.tm_mday = day;
+ else
+ timeInfo.tm_mday = 1;
+
timeInfo.tm_hour = hour;
timeInfo.tm_min = min;
timeInfo.tm_sec = sec;
--
1.7.7.6


[PATCH 10/50] [mama] Free CM Subscription in Destroy Callback

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

The CM subscription can not be freed until it destruction completes and invokes
the onDestroy() callback. This avoids potential core dumps on cleanup.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/clientmanageresponder.c | 33 ++++++++++++++++-------------
1 files changed, 18 insertions(+), 15 deletions(-)

diff --git a/mama/c_cpp/src/c/clientmanageresponder.c b/mama/c_cpp/src/c/clientmanageresponder.c
index f307fff..4d7251d 100644
--- a/mama/c_cpp/src/c/clientmanageresponder.c
+++ b/mama/c_cpp/src/c/clientmanageresponder.c
@@ -74,6 +74,8 @@ errorCB( mamaSubscription subscription, mama_status status,
static void MAMACALLTYPE
msgCB( mamaSubscription subscription, mamaMsg msg, void* closure,
void* itemClosure);
+static void MAMACALLTYPE
+destroyCB( mamaSubscription subscription, void* closure);

/**
* Destroy the pending command
@@ -123,14 +125,11 @@ mama_status mamaCmResponder_destroy(mamaCmResponder responder)
{
/* Destroy the subscription, note that the first failure return code will be preserved.
*/
- mama_status sd = mamaSubscription_destroy(*nextSubscription);
+ mama_status sd = mamaSubscription_destroyEx(*nextSubscription);
if(ret == MAMA_STATUS_OK)
{
ret = sd;
}
-
- /* The memory will not be released until the subscription is deallocated. */
- mamaSubscription_deallocate(*nextSubscription);
}
}

@@ -160,6 +159,7 @@ mama_status mamaCmResponder_destroy(mamaCmResponder responder)

/* Free the array of publishers. */
free(impl->publishers);
+ impl->publishers = NULL;
}

/* Destroy all pending commands. */
@@ -187,6 +187,7 @@ mama_status populateCmResponder(mamaCmResponderImpl *impl)
{
/* Returns */
mama_status ret = MAMA_STATUS_NO_BRIDGE_IMPL;
+ mamaQueue internalQueue = NULL;

/* Get the default event queue from the bridgeImpl. */
mamaBridgeImpl *bridgeImpl = mamaTransportImpl_getBridgeImpl(impl->mTransport);
@@ -200,9 +201,10 @@ mama_status populateCmResponder(mamaCmResponderImpl *impl)
callbacks.onCreate = createCB;
callbacks.onError = errorCB;
callbacks.onMsg = msgCB;
+ callbacks.onDestroy = destroyCB;

- /* Reset the return code for the enumeration */
- ret = MAMA_STATUS_OK;
+ ret = mamaBridgeImpl_getInternalEventQueue((mamaBridge)bridgeImpl, &internalQueue);
+ if (ret == MAMA_STATUS_OK)
{
/* Enumerate all the sub transport bridges in the transport. */
int nextTransportIndex = 0;
@@ -234,7 +236,7 @@ mama_status populateCmResponder(mamaCmResponderImpl *impl)
ret = mamaSubscription_createBasic(
*nextSubscription,
impl->mTransport,
- bridgeImpl->mDefaultEventQueue,
+ internalQueue,
&callbacks,
MAMA_CM_TOPIC,
impl);
@@ -359,14 +361,6 @@ msgCB (mamaSubscription subscription, mamaMsg msg, void* closure,

/* Get the transport index in use by the supplied subscription. */
int transportIndex = 0;
- mamaSubscription_getTransportIndex(subscription, &transportIndex);
-
- /**
- * TODO: We need to add the element to the list. This is a bug, but I
- * don't want to fix it without testing.
- */
-
- /* list_push_back (impl->mPendingCommands, command); */

/*
* The command ID field tells us what command to execute.
@@ -384,7 +378,9 @@ msgCB (mamaSubscription subscription, mamaMsg msg, void* closure,
{
mama_log (MAMA_LOG_LEVEL_FINE, "mamaCmResponder::msgCb(): "
"CM SYNC Received" );
+ mamaSubscription_getTransportIndex(subscription, &transportIndex);
mamaSyncCommand_create (command, msg, impl->mTransport, transportIndex, impl->publishers[transportIndex], endCB, impl);
+ list_push_back(impl->mPendingCommands, command);
mamaSyncCommand_run (command);
}
break;
@@ -394,3 +390,10 @@ msgCB (mamaSubscription subscription, mamaMsg msg, void* closure,
"Bad CM command: %d", command);
}
}
+
+static void MAMACALLTYPE
+destroyCB (mamaSubscription subscription, void* closure)
+{
+ /* Deallocate the subscription. */
+ mamaSubscription_deallocate(subscription);
+}
--
1.7.7.6


[PATCH 09/50] [bridge] Added mamaTransport_forceClientDisconnect()

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

For TCP socket based middleware, allow transports to disconnect individual
clients. This is usefull for eliminating slow consumers or other applications
that may impact the entier system.

Also fixed compiler warnings in Avis bridge files.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/bridge.h | 11 +++-
mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h | 7 ++-
mama/c_cpp/src/c/bridge/avis/bridge.c | 36 +++++++-----
mama/c_cpp/src/c/bridge/avis/transportbridge.c | 59 ++++++++++++-------
4 files changed, 73 insertions(+), 40 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge.h b/mama/c_cpp/src/c/bridge.h
index 5ab7707..9ead2bd 100644
--- a/mama/c_cpp/src/c/bridge.h
+++ b/mama/c_cpp/src/c/bridge.h
@@ -116,6 +116,8 @@ do \
implIdentifier ## BridgeMamaTransport_destroy; \
bridgeImpl->bridgeMamaTransportCreate = \
implIdentifier ## BridgeMamaTransport_create; \
+ bridgeImpl->bridgeMamaTransportForceClientDisconnect = \
+ implIdentifier ## BridgeMamaTransport_forceClientDisconnect;\
bridgeImpl->bridgeMamaTransportFindConnection = \
implIdentifier ## BridgeMamaTransport_findConnection; \
bridgeImpl->bridgeMamaTransportGetAllConnections = \
@@ -269,7 +271,7 @@ typedef mama_status (*bridge_stop)(mamaQueue defaultEventQueue);
/*Called by mama_getVersion()*/
typedef const char* (*bridge_getVersion)(void);
typedef const char* (*bridge_getName)(void);
-typedef mama_status (*bridge_getDefaultPayloadId)(char**name, char* id);
+typedef mama_status (*bridge_getDefaultPayloadId)(char***name, char** id);

/*===================================================================
= mamaQueue bridge function pointers =
@@ -330,6 +332,11 @@ typedef mama_status (*bridgeMamaTransport_destroy)(transportBridge transport);
typedef mama_status (*bridgeMamaTransport_create)(transportBridge *result,
const char* name,
mamaTransport parent);
+typedef mama_status (*bridgeMamaTransport_forceClientDisconnect)
+ (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port);
/* Find a connection with specified IP Address and Port. If the port is 0, the
* call returns the first connection with the specified IP Address. If a
* connection is not found the method returns MAMA_STATUS_NOT_FOUND and
@@ -735,6 +742,8 @@ typedef struct mamaBridgeImpl
bridgeMamaTransport_isValid bridgeMamaTransportIsValid;
bridgeMamaTransport_destroy bridgeMamaTransportDestroy;
bridgeMamaTransport_create bridgeMamaTransportCreate;
+ bridgeMamaTransport_forceClientDisconnect
+ bridgeMamaTransportForceClientDisconnect;
bridgeMamaTransport_findConnection bridgeMamaTransportFindConnection;
bridgeMamaTransport_getAllConnections
bridgeMamaTransportGetAllConnections;
diff --git a/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h b/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
index 94c4a42..0dd6f8e 100644
--- a/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
+++ b/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
@@ -43,7 +43,7 @@ extern const char*
avisBridge_getVersion (void);

mama_status
-avisBridge_getDefaultPayloadId (char**name, char* id);
+avisBridge_getDefaultPayloadId (char***name, char** id);

extern mama_status
avisBridge_open (mamaBridge bridgeImpl);
@@ -125,6 +125,11 @@ avisBridgeMamaTransport_create (transportBridge* result,
const char* name,
mamaTransport parent);
extern mama_status
+avisBridgeMamaTransport_forceClientDisconnect (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port);
+extern mama_status
avisBridgeMamaTransport_findConnection (transportBridge* transports,
int numTransports,
mamaConnection* result,
diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 24c369a..4b5343a 100755
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -35,10 +35,12 @@ timerHeap gTimerHeap;
/*Responsible for creating the bridge impl structure*/
void avisBridge_createImpl (mamaBridge* result)
{
+ avisBridgeImpl* avisBridge = NULL;
+ mamaBridgeImpl* impl = NULL;
if (!result) return;
*result = NULL;

- mamaBridgeImpl* impl = (mamaBridgeImpl*)calloc (1, sizeof (mamaBridgeImpl));
+ impl = (mamaBridgeImpl*)calloc (1, sizeof (mamaBridgeImpl));
if (!impl)
{
mama_log (MAMA_LOG_LEVEL_SEVERE, "avisBridge_createImpl(): "
@@ -46,7 +48,7 @@ void avisBridge_createImpl (mamaBridge* result)
return;
}

- avisBridgeImpl* avisBridge = (avisBridgeImpl*) calloc(1, sizeof(avisBridgeImpl));
+ avisBridge = (avisBridgeImpl*) calloc(1, sizeof(avisBridgeImpl));

/*Populate the bridge impl structure with the function pointers*/
INITIALIZE_BRIDGE (impl, avis);
@@ -59,7 +61,7 @@ void avisBridge_createImpl (mamaBridge* result)
const char*
avisBridge_getVersion (void)
{
- return (const char*) VERSION;
+ return (const char*) "Unable to get version number";
}

const char*
@@ -68,14 +70,14 @@ avisBridge_getName (void)
return "avis";
}

-#define DEFAULT_PAYLOAD_NAME "avismsg"
-#define DEFAULT_PAYLOAD_ID MAMA_PAYLOAD_AVIS
+static const char* PAYLOAD_NAMES[] = {"avismsg",NULL};
+static const char PAYLOAD_IDS[] = {MAMA_PAYLOAD_AVIS,NULL};

mama_status
-avisBridge_getDefaultPayloadId (char**name, char* id)
+avisBridge_getDefaultPayloadId (char***name, char** id)
{
- *name=DEFAULT_PAYLOAD_NAME;
- *id=DEFAULT_PAYLOAD_ID;
+ *name = PAYLOAD_NAMES;
+ *id = PAYLOAD_IDS;

return MAMA_STATUS_OK;
}
@@ -87,6 +89,7 @@ avisBridge_open (mamaBridge bridgeImpl)
mama_status status = MAMA_STATUS_OK;
mamaBridgeImpl* impl = (mamaBridgeImpl*)bridgeImpl;

+ wsocketstartup();
mama_log (MAMA_LOG_LEVEL_FINEST, "avisBridge_open(): Entering.");

if (MAMA_STATUS_OK !=
@@ -123,10 +126,11 @@ avisBridge_open (mamaBridge bridgeImpl)
mama_status
avisBridge_close (mamaBridge bridgeImpl)
{
+ mama_status status = MAMA_STATUS_OK;
+ mamaBridgeImpl* impl = NULL;
mama_log (MAMA_LOG_LEVEL_FINEST, "avisBridge_close(): Entering.");

- mama_status status = MAMA_STATUS_OK;
- mamaBridgeImpl* impl = (mamaBridgeImpl*)bridgeImpl;
+ impl = (mamaBridgeImpl*)bridgeImpl;


if (0 != destroyHeap (gTimerHeap))
@@ -139,6 +143,7 @@ avisBridge_close (mamaBridge bridgeImpl)
mamaQueue_destroyWait(impl->mDefaultEventQueue);

free (impl);
+ wsocketcleanup();
return status;
}

@@ -146,12 +151,12 @@ avisBridge_close (mamaBridge bridgeImpl)
mama_status
avisBridge_start(mamaQueue defaultEventQueue)
{
- mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_start(): Start dispatching on default event queue.");
-
mama_status status = MAMA_STATUS_OK;
+ avisBridgeImpl* avisBridge = NULL;
+
+ mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_start(): Start dispatching on default event queue.");

// start Avis event loop(s)
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) mamaQueueImpl_getBridgeImpl(defaultEventQueue), (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_start(): Could not get Elvin object");
return status;
@@ -168,11 +173,10 @@ avisBridge_start(mamaQueue defaultEventQueue)
mama_status
avisBridge_stop(mamaQueue defaultEventQueue)
{
- mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_stop(): Stopping bridge.");
-
mama_status status = MAMA_STATUS_OK;
+ avisBridgeImpl* avisBridge = NULL;

- avisBridgeImpl* avisBridge;
+ mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_stop(): Stopping bridge.");
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) mamaQueueImpl_getBridgeImpl(defaultEventQueue), (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_stop(): Could not get Elvin object");
return status;
diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.c b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
index c994879..de10b61 100755
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
@@ -29,6 +29,7 @@
#include <mama/types.h>
#include <transportimpl.h>
#include <timers.h>
+#include <errno.h>
#include "transportbridge.h"
#include "avisbridgefunctions.h"
#include "avisdefs.h"
@@ -59,14 +60,12 @@ void log_avis_error(MamaLogLevel logLevel, Elvin* avis)

void closeListener(Elvin* avis, CloseReason reason, const char* message, void* closure)
{
+ const char* errMsg;
if (avisBridge(closure) == NULL) {
mama_log (MAMA_LOG_LEVEL_FINE, "Avis closeListener: could not get Avis bridge");
return;
}

- // TODO -- serialize access across multiple threads
-
- const char* errMsg;
switch( reason )
{
case REASON_CLIENT_SHUTDOWN: errMsg = "Avis client shutdown"; break;
@@ -82,16 +81,19 @@ void closeListener(Elvin* avis, CloseReason reason, const char* message, void* c
static const char*
getURL( const char *name )
{
+ int len = 0;
+ char* buff = NULL;
+ const char* property = NULL;
if (name == NULL)
return NULL;

mama_log (MAMA_LOG_LEVEL_FINE, "initializing Avis transport: %s", name);
- int len = strlen(name) + strlen( TPORT_PREFIX ) + strlen(TPORT_PARAM) + 4;
- char* buff = (char *)alloca( len );
+ len = strlen(name) + strlen( TPORT_PREFIX ) + strlen(TPORT_PARAM) + 4;
+ buff = (char *)alloca( len );
memset(buff, '\0', len);
snprintf( buff, len, "%s.%s.%s", TPORT_PREFIX, name, TPORT_PARAM );

- const char* property = properties_Get( mamaInternal_getProperties(), buff );
+ property = properties_Get( mamaInternal_getProperties(), buff );
if ( property == NULL )
return DEFAULT_URL;

@@ -123,13 +125,13 @@ void* avisDispatchThread(void* closure)

mama_status avisTransportBridge_start(avisTransportBridge* transportBridge)
{
- CHECK_TRANSPORT(transportBridge);
-
// stop Avis event loop
- pthread_t tid;
+ wthread_t tid;
int rc;
- if (0 != (rc = pthread_create(&tid, NULL, avisDispatchThread, transportBridge))) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "pthread_create returned %d", rc);
+ CHECK_TRANSPORT(transportBridge);
+
+ if (0 != (rc = wthread_create(&tid, NULL, avisDispatchThread, transportBridge))) {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "wthread_create returned %d", rc);
return MAMA_STATUS_SYSTEM_ERROR;
}

@@ -193,22 +195,24 @@ avisBridgeMamaTransport_create (transportBridge* result,
const char* name,
mamaTransport mamaTport )
{
- avisTransportBridge* transport =
- (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
+ mama_status status;
+ avisBridgeImpl* avisBridge = NULL;
+ avisTransportBridge* transport = NULL;
+ mamaBridgeImpl* bridgeImpl = NULL;
+ const char* url = NULL;
+
+ transport = (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
if (transport == NULL)
return MAMA_STATUS_NOMEM;

transport->mTransport = (mamaTransport) mamaTport;

- // TODO -- serialize access across multiple threads
- mamaBridgeImpl* bridgeImpl = mamaTransportImpl_getBridgeImpl(mamaTport);
+ bridgeImpl = mamaTransportImpl_getBridgeImpl(mamaTport);
if (!bridgeImpl) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get bridge");
free(transport);
return MAMA_STATUS_PLATFORM;
}
- mama_status status;
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) bridgeImpl, (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get Avis bridge object");
free(transport);
@@ -229,7 +233,7 @@ avisBridgeMamaTransport_create (transportBridge* result,
}

// open the server connection
- const char* url = getURL(name);
+ url = getURL(name);
if (url == NULL) {
mama_log (MAMA_LOG_LEVEL_NORMAL, "No %s property defined for transport : %s", TPORT_PARAM, name);
return MAMA_STATUS_INVALID_ARG;
@@ -253,15 +257,17 @@ avisBridgeMamaTransport_create (transportBridge* result,
mama_status
avisBridgeMamaTransport_destroy (transportBridge transport)
{
- // TODO -- serialize access across multiple threads
- mamaBridgeImpl* bridgeImpl = mamaTransportImpl_getBridgeImpl(avisTransport(transport)->mTransport);
+ mama_status status;
+ avisBridgeImpl* avisBridge = NULL;
+ mamaBridgeImpl* bridgeImpl = NULL;
+
+
+ bridgeImpl = mamaTransportImpl_getBridgeImpl(avisTransport(transport)->mTransport);
if (!bridgeImpl) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get bridge");
free(transport);
return MAMA_STATUS_PLATFORM;
}
- mama_status status;
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) bridgeImpl, (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get Avis bridge object");
free(transport);
@@ -286,6 +292,15 @@ avisBridgeMamaTransport_isValid (transportBridge transport)
}

mama_status
+avisBridgeMamaTransport_forceClientDisconnect (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port)
+{
+ return MAMA_STATUS_NOT_IMPLEMENTED;
+}
+
+mama_status
avisBridgeMamaTransport_findConnection (transportBridge* transports,
int numTransports,
mamaConnection* result,
--
1.7.7.6


[PATCH 08/50] [bridge] Timout for Stopping Internal Queue

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

When shutting down, don't wait forever for the internal queue to stop. If the
queue does not stop within the default queue timeout
(mama.defaultqueue.timeout property), continue to shutdown. The default value
is 2000 milliseconds. This prevents hanging at shutdown if the distpach thread
is blocked.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/bridge.c | 5 ++++-
1 files changed, 4 insertions(+), 1 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge.c b/mama/c_cpp/src/c/bridge.c
index 6970c8c..94c5caa 100644
--- a/mama/c_cpp/src/c/bridge.c
+++ b/mama/c_cpp/src/c/bridge.c
@@ -121,6 +121,8 @@ mamaBridgeImpl_stopInternalEventQueue (mamaBridge bridgeImpl)
if (impl->mInternalEventQueue)
{

+ /* Get the queue timeout value. */
+ int defaultQueueTimeout = mamaBridgeImpl_getDefaultQueueTimeout();
if (MAMA_STATUS_OK != mamaDispatcher_destroy (impl->mInternalDispatcher))
{
mama_log (MAMA_LOG_LEVEL_WARN, "mamaBridgeImpl_stopInternalEventQueue(): "
@@ -128,7 +130,8 @@ mamaBridgeImpl_stopInternalEventQueue (mamaBridge bridgeImpl)
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

- if (MAMA_STATUS_OK != mamaQueue_destroy (impl->mInternalEventQueue ))
+ /* Destroy the queue waiting for the appropriate time value. */
+ if (MAMA_STATUS_OK != mamaQueue_destroyTimedWait (impl->mInternalEventQueue, defaultQueueTimeout))
{
mama_log (MAMA_LOG_LEVEL_WARN, "mamaBridgeImpl_stopInternalEventQueue(): "
"Could not destroy internal queue");
--
1.7.7.6