[PATCH 27/50] [mama] Log Messages with MAMA_MSG_STATUS_UNKNOWN


Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

On receiving a message with a status of MAMA_MSG_STATUS_UNKNOWN, log the message
if at FINEST and track it update the statistics for the transport, queue and
application.

Also tidied up the file a little: converted tabs to space, reformatted long
lines, etc.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/listenermsgcallback.c | 122 +++++++++++++++++---------------
1 files changed, 64 insertions(+), 58 deletions(-)

diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c
index cbe1c90..532a433 100644
--- a/mama/c_cpp/src/c/listenermsgcallback.c
+++ b/mama/c_cpp/src/c/listenermsgcallback.c
@@ -45,33 +45,12 @@ extern int gGenerateTransportStats;
extern int gGenerateGlobalStats;
extern int gGenerateQueueStats;

-/* *************************************************** */
-/* Private Function Prototypes. */
-/* *************************************************** */
-
-/**
- * This function will invoke the subscription's onError callback passing in a particular error code.
- *
- * @param[in] callback The impl.
- * @param[in] ctx The subscription context.
- * @param[in] mamaStatus The status that will be passed to the error callback.
- * @param[in] subscription The subscription.
- * @param[in] userSymbol The symbol.
- */
-static void
-listenerMsgCallbackImpl_invokeErrorCallback(listenerMsgCallback callback,
+/* Function prototypes. */
+void listenerMsgCallback_invokeErrorCallback(listenerMsgCallback callback,
SubjectContext *ctx, mama_status mamaStatus, mamaSubscription
subscription, const char *userSymbol);

-/**
- * This function will write a log message if an unknown message status is detected.
- *
- * @param[in] ctx The subscription context.
- * @param[in] status The message status.
- * @param[in] subscription The subscription.
- */
-static void
-listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
+void listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
mamaSubscription subscription);

/**
@@ -195,7 +174,8 @@ static void processPointToPointMessage (msgCallback* callback,

/* Mark the subscription as inactive if we are not expecting
* any more updates. */
- if (!mamaSubscription_isExpectingUpdates (self->mSubscription))
+ if (!mamaSubscription_isExpectingUpdates (self->mSubscription) &&
+ !mamaSubscription_getAcceptMultipleInitials (self->mSubscription))
{
mamaSubscription_deactivate (self->mSubscription);
}
@@ -223,6 +203,33 @@ static void processPointToPointMessage (msgCallback* callback,
* may have been destroyed in the callback! */
}

+/* Description : This function will invoke the subscription's onError callback
+ * passing in a particular error code.
+ */
+void listenerMsgCallback_invokeErrorCallback(listenerMsgCallback callback,
+ SubjectContext *ctx, mama_status mamaStatus, mamaSubscription
+ subscription, const char *userSymbol)
+{
+ /* Local variables. */
+ void *closure = NULL;
+
+ /* Get the callback object from the subscription. */
+ mamaMsgCallbacks *cbs = mamaSubscription_getUserCallbacks (subscription);
+
+ /* Wait for a response. */
+ mamaSubscription_stopWaitForResponse(subscription, ctx);
+
+ /* Get the closure from the subscription. */
+ mamaSubscription_getClosure (subscription, &closure);
+
+ mama_setLastError (MAMA_ERROR_DEFAULT);
+ cbs->onError (subscription,
+ mamaStatus,
+ NULL,
+ userSymbol,
+ closure);
+}
+
static int isInitialMessageOrRecap (msgCallback *callback, int msgType)
{
return msgType == MAMA_MSG_TYPE_INITIAL ||
@@ -247,7 +254,7 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
mamaTransport transport;
mamaStatsCollector* queueStatsCollector = NULL;
mamaStatsCollector* tportStatsCollector = NULL;
- const char* userSymbol = NULL;
+ const char* userSymbol = NULL;

if (!ctx)
{
@@ -283,22 +290,25 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
MamaStatNumMessages.mFid);

/* Get the user symbol from the subscription. */
- mamaSubscription_getSymbol(subscription, &userSymbol);
+ mamaSubscription_getSymbol(subscription, &userSymbol);

if (status != MAMA_MSG_STATUS_OK)
{
switch (status)
{
case MAMA_MSG_STATUS_NOT_PERMISSIONED:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_PERMISSIONED, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_NOT_PERMISSIONED, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_BAD_SYMBOL:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_BAD_SYMBOL, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_BAD_SYMBOL, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_NOT_FOUND:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_FOUND, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_NOT_FOUND, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_NO_SUBSCRIBERS:
@@ -338,6 +348,30 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
listenerMsgCallbackImpl_logUnknownStatus(ctx, status, subscription);
break;
}
+ case MAMA_MSG_STATUS_UNKNOWN:
+ {
+ listenerMsgCallbackImpl_logUnknownStatus(ctx, status, subscription);
+ mamaSubscription_setPossiblyStale(subscription);
+
+ if (queueStatsCollector)
+ {
+ mamaStatsCollector_incrementStat (*queueStatsCollector,
+ MamaStatUnknownMsgs.mFid);
+ }
+ if (tportStatsCollector)
+ {
+ mamaStatsCollector_incrementStat (*tportStatsCollector,
+ MamaStatUnknownMsgs.mFid);
+ }
+ if (mamaInternal_getGlobalStatsCollector())
+ {
+ mamaStatsCollector_incrementStat
+ (*(mamaInternal_getGlobalStatsCollector()),
+ MamaStatUnknownMsgs.mFid);
+ }
+ return; //throw away msg
+ break;
+ }
default:
{
/* Log the fact we have received an unknown message. */
@@ -587,34 +621,6 @@ checkEntitlement( msgCallback *callback, mamaMsg msg, SubjectContext* ctx )
#endif /* WITH_ENTITLEMENTS */
}

-/* *************************************************** */
-/* Private Functions. */
-/* *************************************************** */
-
-/* Description: This function will invoke the subscription's onError callback passing in a particular error code.
- */
-void listenerMsgCallbackImpl_invokeErrorCallback(listenerMsgCallback callback, SubjectContext *ctx, mama_status mamaStatus, mamaSubscription subscription, const char *userSymbol)
-{
- /* Local variables. */
- void *closure = NULL;
-
- /* Get the callback object from the subscription. */
- mamaMsgCallbacks *cbs = mamaSubscription_getUserCallbacks (subscription);
-
- /* Wait for a response. */
- mamaSubscription_stopWaitForResponse(subscription, ctx);
-
- /* Get the closure from the subscription. */
- mamaSubscription_getClosure (subscription, &closure);
-
- mama_setLastError (MAMA_ERROR_DEFAULT);
- cbs->onError (subscription,
- mamaStatus,
- NULL,
- userSymbol,
- closure);
-}
-
void listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
mamaSubscription subscription)
{
--
1.7.7.6

Join Openmama-dev@lists.openmama.org to automatically receive all group messages.