[PATCH 19/50] [mama] Create an Internal Transport for Monitoring


Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

OpenMAMA creates a dedicated internal transport for publishing statistics for
monitoring and other internal communication. This transport disables some
features of application level transports including the CM responder.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama.c | 2 +-
mama/c_cpp/src/c/transport.c | 204 ++++++++++++++++++++++---------------
mama/c_cpp/src/c/transportimpl.h | 31 +++---
3 files changed, 136 insertions(+), 101 deletions(-)

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index 206892d..dc6dcb5 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -389,7 +389,7 @@ mamaInternal_createStatsPublisher ()
statsLogTportName = "statslogger";
}

- result = mamaTransport_allocate (&statsLogTport);
+ result = mamaTransportImpl_allocateInternalTransport (&statsLogTport);
if( result != MAMA_STATUS_OK )
return result;

diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 0a4adcd..7e583d5 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -165,6 +165,7 @@ typedef struct transportImpl_
dqStartegyScheme mDQStratScheme;
dqftStrategyScheme mFTStratScheme;

+ uint8_t mInternal;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -282,60 +283,68 @@ mamaTransport_allocate (mamaTransport* result)
*
* Return non-zero if object should be created, otherwise return zero
*/
-static int mamaTransportInternal_cmResponderEnabled (const char* transportName,
+static int mamaTransportInternal_cmResponderEnabled (transportImpl *impl,
+ const char* transportName,
const char* middleware)
{
const char* propValue;
char propString[MAX_PROP_STRING];
char propStringMw[MAX_PROP_STRING];
int retVal;
+ /* Returns. */
+ int ret = 0;

- /* Check for mama.middleware.transport.transportname first */
- retVal=snprintf (propStringMw, MAX_PROP_STRING,
- "mama.%s.transport.%s.%s", middleware,
- transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
-
- if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ /* The CM responder will not be created for an internal transport. */
+ if(impl->mInternal == 0)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
-
- propValue = properties_Get (mamaInternal_getProperties (), propStringMw);
-
- if (NULL==propValue)
- {
- /* We might have specified mama.transport.transportname -
- only look for this after we've tried with middleware */
- retVal = snprintf (propString, MAX_PROP_STRING,
- "mama.transport.%s.%s",
- transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
-
- if ((retVal<0) || (retVal>=MAX_PROP_STRING))
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
-
- propValue = properties_Get (mamaInternal_getProperties (), propString);
- }
-
- /* Return default if we have specified neither mama.middleware.transport...nor
- mama.transport... */
- if (NULL==propValue)
- {
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
- else if (properties_GetPropertyValueAsBoolean (propValue))
- {
- return 1;
- }
- else
- {
- return 0;
+ /* Check for mama.middleware.transport.transportname first */
+ retVal=snprintf (propStringMw, MAX_PROP_STRING,
+ "mama.%s.transport.%s.%s", middleware,
+ transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
+
+ if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+
+ propValue = properties_Get (mamaInternal_getProperties (), propStringMw);
+
+ if (NULL==propValue)
+ {
+ /* We might have specified mama.transport.transportname -
+ only look for this after we've tried with middleware */
+ retVal = snprintf (propString, MAX_PROP_STRING,
+ "mama.transport.%s.%s",
+ transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
+
+ if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+
+ propValue = properties_Get (mamaInternal_getProperties (), propString);
+ }
+
+ /* Return default if we have specified neither mama.middleware.transport...nor
+ mama.transport... */
+ if (NULL==propValue)
+ {
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+ else if (properties_GetPropertyValueAsBoolean (propValue))
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
+ return ret;
}

static void setPreInitialStrategy (mamaTransport transport)
@@ -452,7 +461,8 @@ static int mamaTransportInternal_disableRefreshes(const char* transportName)
char propString[MAX_PROP_STRING];
int retVal;

- retVal=snprintf(propString, MAX_PROP_STRING, "mama.transport.%s.%s",
+ retVal=snprintf (propString, MAX_PROP_STRING,
+ "mama.transport.%s.%s",
transportName ? transportName : "", PROP_NAME_DISABLE_REFRESH);

if ((retVal<0) || (retVal>=MAX_PROP_STRING))
@@ -586,6 +596,7 @@ mamaTransport_create (mamaTransport transport,
const char* throttleInt = NULL;
if (!transport) return MAMA_STATUS_NULL_ARG;
if (!bridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;
+ mama_log(MAMA_LOG_LEVEL_FINER, "Entering mamaTransport_create for transport (%p) with name %s", transport, name);

self->mBridgeImpl = (mamaBridgeImpl*)bridgeImpl;

@@ -822,7 +833,7 @@ mamaTransport_create (mamaTransport transport,

setGroupSizeHint (transport, middleware);

- rval = init (self, mamaTransportInternal_cmResponderEnabled (name, middleware));
+ rval = init (self, mamaTransportInternal_cmResponderEnabled (self, name, middleware));
if (rval != MAMA_STATUS_OK) return rval;


@@ -1083,6 +1094,7 @@ mamaTransport_destroy (mamaTransport transport)
{
int i;
int allTransportsValid;
+ mama_log(MAMA_LOG_LEVEL_FINER, "Entering mamaTransport_destroy for transport (%p)", transport);

if (!self) return MAMA_STATUS_NULL_ARG;
if (!self->mBridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;
@@ -1099,6 +1111,10 @@ mamaTransport_destroy (mamaTransport transport)
}
if (allTransportsValid)
{
+ if(NULL != self->mCmResponder)
+ {
+ mamaCmResponder_destroy (self->mCmResponder);
+ }
/* Inform all listeners that the transport is about to be destroyed. */
mamaTransportImpl_clearTransportWithListeners (self);

@@ -2052,6 +2068,49 @@ mamaTransportImpl_unsetAllPossiblyStale (mamaTransport transport)
}
}

+/**
+ * Return the cause and platform info for the last message processed on the
+ * transport.
+ *
+ * @param transport The transport.
+ * @param cause To return the cause.
+ * @param platformInfo To return the bridge specific info, under no circumstances
+ * should the returned object be deleted.
+ */
+void
+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport transport,
+ short* cause,
+ const void** platformInfo)
+{
+ if (!self)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (): "
+ "NULL transport.");
+ return;
+ }
+
+ *cause = self->mCause;
+ *platformInfo = self->mPlatformInfo;
+}
+
+void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport, short cause, const void *platformInfo)
+{
+ /* Get the impl. */
+ transportImpl *impl = (transportImpl *)transport;
+ if(NULL != impl)
+ {
+ /* Set the cause. */
+ impl->mCause = cause;
+ impl->mPlatformInfo = platformInfo;
+ }
+
+ /* Otherwise write an error log. */
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL transport.");
+ }
+}
void
mamaTransportImpl_getTransportIndex (mamaTransport transport,
int* transportIndex)
@@ -2429,44 +2488,23 @@ mama_status mamaTransport_removePublisher (mamaTransport transport, void *handle
return MAMA_STATUS_OK;
}

-/* *************************************************** */
-/* Internal Functions. */
-/* *************************************************** */
-
-void mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(mamaTransport transport,
- short *cause, const void **platformInfo)
+mama_status mamaTransportImpl_allocateInternalTransport(mamaTransport *transport)
{
- /* Get the impl. */
- transportImpl *impl = (transportImpl *)transport;
- if(NULL != impl)
- {
- /* Return the cause. */
- *cause = impl->mCause;
- *platformInfo = impl->mPlatformInfo;
- }
- else
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(): NULL "
- "transport.");
- }
-}
+ /* Returns. */
+ mama_status ret = MAMA_STATUS_NULL_ARG;

-void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport,
- short cause, const void *platformInfo)
-{
- /* Get the impl. */
- transportImpl *impl = (transportImpl *)transport;
- if(NULL != impl)
- {
- /* Set the cause. */
- impl->mCause = cause;
- impl->mPlatformInfo = (void*)platformInfo;
- }
- else
+ if(NULL != transport)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL "
- "transport.");
+ /* Allocate the transport as normal. */
+ ret = mamaTransport_allocate(transport);
+ if(MAMA_STATUS_OK == ret)
+ {
+ /* Get the impl. */
+ transportImpl *impl = (transportImpl *)*transport;
+
+ /* Set the internal flag. */
+ impl->mInternal = 1;
+ }
}
+ return ret;
}
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index 5ae83c6..a253e3f 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -158,6 +158,11 @@ MAMAExpDLL
extern void
mamaTransportImpl_unsetAllPossiblyStale (mamaTransport tport);

+MAMAExpDLL
+extern void
+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport tport,
+ short* cause,
+ const void** platformInfo);
/*
Get the bridge impl associated with the specified transport.
This will be how other objects gain access to the bridge.
@@ -241,27 +246,19 @@ mama_status mamaTransport_addPublisher(mamaTransport transport, mamaPublisher pu
mama_status mamaTransport_removePublisher(mamaTransport transport, void *handle);
preInitialScheme mamaTransportImpl_getPreInitialScheme (mamaTransport transport);

-#if defined(__cplusplus)
-}
-#endif
-
-
/**
- * This function will return the cause and platform info for the last message
- * processed on the transport.
+ * This function will allocate an internal transport for use with the internal event queue, this sort
+ * of transport is limited and does not support certain features, including
+ * The CM Responder.
*
- * @param[in] transport The transport.
- * @param[out] cause To return the cause.
- * @param[out] platformInfo To return the bridge specific info, under no
- * circumstances should the returned object be deleted.
+ * @param[out] transport To return the transport.
*
+ * @returns mama_status value can be one of:
+ * MAMA_STATUS_NOMEM
+ * MAMA_STATUS_NULL_ARG
+ * MAMA_STATUS_OK
*/
-MAMAExpDLL
-extern void
-mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(
- mamaTransport tport,
- short *cause,
- const void **platformInfo);
+mama_status mamaTransportImpl_allocateInternalTransport(mamaTransport *transport);

/**
* This function will set the cause and platform info for the transport.
--
1.7.7.6