Date   

[PATCH 20/50] [mama] transport write queue high and low watermarks

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Allow applications to set high and low watermarks for internal write queues if
the underlying middleware queues writes. Socket based middlewares that use
scatter/gather IO and a dedicated IO thread often coalesce writes in a separate
queue for each connected client. A larger queue where that trigger a high
watermark often indicates a slow consumer.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama/transport.h | 16 +++++++++++++-
mama/c_cpp/src/c/transport.c | 40 +++++++++++++++++++++++++++++++-----
mama/c_cpp/src/c/transportimpl.h | 6 +++++
3 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/transport.h b/mama/c_cpp/src/c/mama/transport.h
index d66905c..e1a29ac 100644
--- a/mama/c_cpp/src/c/mama/transport.h
+++ b/mama/c_cpp/src/c/mama/transport.h
@@ -76,7 +76,9 @@ typedef enum
MAMA_TRANSPORT_PUBLISHER_DISCONNECT,
MAMA_TRANSPORT_QUALITY,
MAMA_TRANSPORT_NAMING_SERVICE_CONNECT,
- MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT
+ MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT,
+ MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK,
+ MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK
} mamaTransportEvent;

/**
@@ -186,6 +188,18 @@ extern mama_status
mamaTransport_setTransportCallback (mamaTransport transport,
mamaTransportCB callback,
void* closure);
+/**
+ * Set the transport write queue high and low water mark values. The
+ * MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK and
+ * MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK events will be delivered though
+ * the transport callback when the respective number of items are outstanding on
+ * a clients write queue.
+ */
+MAMAExpDLL
+extern mama_status
+mamaTransport_setWriteQueueWatermarks (mamaTransport transport,
+ uint32_t highWater,
+ uint32_t lowWater);

/**
* Set the transport topic callback. It receives advisories when a client
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 7e583d5..f059280 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -89,6 +89,8 @@ typedef struct transportImpl_
mamaSymbolMapFunc mMapFunc;
void* mMapFuncClosure;

+ uint32_t mWriteQueueHighWatermark;
+ uint32_t mWriteQueueLowWatermark;
/* These members are only needed for the market data transport */
wList mListeners;

@@ -160,12 +162,12 @@ typedef struct transportImpl_

int mGroupSizeHint;

- uint8_t mDisableDisconnectCb;
- uint8_t mDisableRefresh;
+ uint8_t mDisableRefresh;
dqStartegyScheme mDQStratScheme;
dqftStrategyScheme mFTStratScheme;

uint8_t mInternal;
+ uint8_t mDisableDisconnectCb;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -445,15 +447,15 @@ static void setFtStrategy (mamaTransport transport)
}
}

+void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
+{
+ self->mDisableRefresh=disable;
+}
/**
* Check property to disable refresh messages. Undocumented.
*
* Return non-zero to disable refresh messages.
*/
-void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
-{
- self->mDisableRefresh=disable;
-}

static int mamaTransportInternal_disableRefreshes(const char* transportName)
{
@@ -1130,6 +1132,7 @@ mamaTransport_destroy (mamaTransport transport)
if (self->mRefreshTransport)
{
refreshTransport_destroy (self->mRefreshTransport);
+ self->mRefreshTransport = NULL;
}

if (self->mThrottle)
@@ -1610,6 +1613,8 @@ mamaTransportEvent_toString (mamaTransportEvent event)
case MAMA_TRANSPORT_QUALITY: return "QUALITY";
case MAMA_TRANSPORT_NAMING_SERVICE_CONNECT: return "NAMING_SERVICE_CONNECT";
case MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT: return "NAMING_SERVICE_DISCONNECT";
+ case MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK: return "MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK";
+ case MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK: return "MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK";
default: return "UNKNOWN";
}
}
@@ -1625,6 +1630,29 @@ mamaTransport_setTransportCallback (mamaTransport transport,
return MAMA_STATUS_OK;
}

+mama_status
+mamaTransport_setWriteQueueWatermarks (mamaTransport transport,
+ uint32_t high,
+ uint32_t low)
+{
+ if (!self)
+ return MAMA_STATUS_NULL_ARG;
+ if (high < low || low == 0)
+ return MAMA_STATUS_INVALID_ARG;
+
+ self->mWriteQueueHighWatermark = high;
+ self->mWriteQueueLowWatermark = low;
+ return MAMA_STATUS_OK;
+}
+
+void
+mamaTransportImpl_getWriteQueueWatermarks (mamaTransport transport,
+ uint32_t* high,
+ uint32_t* low)
+{
+ *high = self->mWriteQueueHighWatermark;
+ *low = self->mWriteQueueLowWatermark;
+}

mama_status
mamaTransport_setTransportTopicCallback (mamaTransport transport,
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index a253e3f..18c3a28 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -139,6 +139,12 @@ mamaTransportImpl_getTransportTopicCallback (mamaTransport transport,

MAMAExpDLL
extern void
+mamaTransportImpl_getWriteQueueWatermarks (mamaTransport transport,
+ uint32_t* high,
+ uint32_t* low);
+
+MAMAExpDLL
+extern void
mamaTransportImpl_resetRefreshForListener (mamaTransport tport, void *handle);

MAMAExpDLL
--
1.7.7.6


[PATCH 19/50] [mama] Create an Internal Transport for Monitoring

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

OpenMAMA creates a dedicated internal transport for publishing statistics for
monitoring and other internal communication. This transport disables some
features of application level transports including the CM responder.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama.c | 2 +-
mama/c_cpp/src/c/transport.c | 204 ++++++++++++++++++++++---------------
mama/c_cpp/src/c/transportimpl.h | 31 +++---
3 files changed, 136 insertions(+), 101 deletions(-)

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index 206892d..dc6dcb5 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -389,7 +389,7 @@ mamaInternal_createStatsPublisher ()
statsLogTportName = "statslogger";
}

- result = mamaTransport_allocate (&statsLogTport);
+ result = mamaTransportImpl_allocateInternalTransport (&statsLogTport);
if( result != MAMA_STATUS_OK )
return result;

diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 0a4adcd..7e583d5 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -165,6 +165,7 @@ typedef struct transportImpl_
dqStartegyScheme mDQStratScheme;
dqftStrategyScheme mFTStratScheme;

+ uint8_t mInternal;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -282,60 +283,68 @@ mamaTransport_allocate (mamaTransport* result)
*
* Return non-zero if object should be created, otherwise return zero
*/
-static int mamaTransportInternal_cmResponderEnabled (const char* transportName,
+static int mamaTransportInternal_cmResponderEnabled (transportImpl *impl,
+ const char* transportName,
const char* middleware)
{
const char* propValue;
char propString[MAX_PROP_STRING];
char propStringMw[MAX_PROP_STRING];
int retVal;
+ /* Returns. */
+ int ret = 0;

- /* Check for mama.middleware.transport.transportname first */
- retVal=snprintf (propStringMw, MAX_PROP_STRING,
- "mama.%s.transport.%s.%s", middleware,
- transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
-
- if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ /* The CM responder will not be created for an internal transport. */
+ if(impl->mInternal == 0)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
-
- propValue = properties_Get (mamaInternal_getProperties (), propStringMw);
-
- if (NULL==propValue)
- {
- /* We might have specified mama.transport.transportname -
- only look for this after we've tried with middleware */
- retVal = snprintf (propString, MAX_PROP_STRING,
- "mama.transport.%s.%s",
- transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
-
- if ((retVal<0) || (retVal>=MAX_PROP_STRING))
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
-
- propValue = properties_Get (mamaInternal_getProperties (), propString);
- }
-
- /* Return default if we have specified neither mama.middleware.transport...nor
- mama.transport... */
- if (NULL==propValue)
- {
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
- else if (properties_GetPropertyValueAsBoolean (propValue))
- {
- return 1;
- }
- else
- {
- return 0;
+ /* Check for mama.middleware.transport.transportname first */
+ retVal=snprintf (propStringMw, MAX_PROP_STRING,
+ "mama.%s.transport.%s.%s", middleware,
+ transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
+
+ if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+
+ propValue = properties_Get (mamaInternal_getProperties (), propStringMw);
+
+ if (NULL==propValue)
+ {
+ /* We might have specified mama.transport.transportname -
+ only look for this after we've tried with middleware */
+ retVal = snprintf (propString, MAX_PROP_STRING,
+ "mama.transport.%s.%s",
+ transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
+
+ if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+
+ propValue = properties_Get (mamaInternal_getProperties (), propString);
+ }
+
+ /* Return default if we have specified neither mama.middleware.transport...nor
+ mama.transport... */
+ if (NULL==propValue)
+ {
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+ else if (properties_GetPropertyValueAsBoolean (propValue))
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
+ return ret;
}

static void setPreInitialStrategy (mamaTransport transport)
@@ -452,7 +461,8 @@ static int mamaTransportInternal_disableRefreshes(const char* transportName)
char propString[MAX_PROP_STRING];
int retVal;

- retVal=snprintf(propString, MAX_PROP_STRING, "mama.transport.%s.%s",
+ retVal=snprintf (propString, MAX_PROP_STRING,
+ "mama.transport.%s.%s",
transportName ? transportName : "", PROP_NAME_DISABLE_REFRESH);

if ((retVal<0) || (retVal>=MAX_PROP_STRING))
@@ -586,6 +596,7 @@ mamaTransport_create (mamaTransport transport,
const char* throttleInt = NULL;
if (!transport) return MAMA_STATUS_NULL_ARG;
if (!bridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;
+ mama_log(MAMA_LOG_LEVEL_FINER, "Entering mamaTransport_create for transport (%p) with name %s", transport, name);

self->mBridgeImpl = (mamaBridgeImpl*)bridgeImpl;

@@ -822,7 +833,7 @@ mamaTransport_create (mamaTransport transport,

setGroupSizeHint (transport, middleware);

- rval = init (self, mamaTransportInternal_cmResponderEnabled (name, middleware));
+ rval = init (self, mamaTransportInternal_cmResponderEnabled (self, name, middleware));
if (rval != MAMA_STATUS_OK) return rval;


@@ -1083,6 +1094,7 @@ mamaTransport_destroy (mamaTransport transport)
{
int i;
int allTransportsValid;
+ mama_log(MAMA_LOG_LEVEL_FINER, "Entering mamaTransport_destroy for transport (%p)", transport);

if (!self) return MAMA_STATUS_NULL_ARG;
if (!self->mBridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;
@@ -1099,6 +1111,10 @@ mamaTransport_destroy (mamaTransport transport)
}
if (allTransportsValid)
{
+ if(NULL != self->mCmResponder)
+ {
+ mamaCmResponder_destroy (self->mCmResponder);
+ }
/* Inform all listeners that the transport is about to be destroyed. */
mamaTransportImpl_clearTransportWithListeners (self);

@@ -2052,6 +2068,49 @@ mamaTransportImpl_unsetAllPossiblyStale (mamaTransport transport)
}
}

+/**
+ * Return the cause and platform info for the last message processed on the
+ * transport.
+ *
+ * @param transport The transport.
+ * @param cause To return the cause.
+ * @param platformInfo To return the bridge specific info, under no circumstances
+ * should the returned object be deleted.
+ */
+void
+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport transport,
+ short* cause,
+ const void** platformInfo)
+{
+ if (!self)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (): "
+ "NULL transport.");
+ return;
+ }
+
+ *cause = self->mCause;
+ *platformInfo = self->mPlatformInfo;
+}
+
+void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport, short cause, const void *platformInfo)
+{
+ /* Get the impl. */
+ transportImpl *impl = (transportImpl *)transport;
+ if(NULL != impl)
+ {
+ /* Set the cause. */
+ impl->mCause = cause;
+ impl->mPlatformInfo = platformInfo;
+ }
+
+ /* Otherwise write an error log. */
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL transport.");
+ }
+}
void
mamaTransportImpl_getTransportIndex (mamaTransport transport,
int* transportIndex)
@@ -2429,44 +2488,23 @@ mama_status mamaTransport_removePublisher (mamaTransport transport, void *handle
return MAMA_STATUS_OK;
}

-/* *************************************************** */
-/* Internal Functions. */
-/* *************************************************** */
-
-void mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(mamaTransport transport,
- short *cause, const void **platformInfo)
+mama_status mamaTransportImpl_allocateInternalTransport(mamaTransport *transport)
{
- /* Get the impl. */
- transportImpl *impl = (transportImpl *)transport;
- if(NULL != impl)
- {
- /* Return the cause. */
- *cause = impl->mCause;
- *platformInfo = impl->mPlatformInfo;
- }
- else
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(): NULL "
- "transport.");
- }
-}
+ /* Returns. */
+ mama_status ret = MAMA_STATUS_NULL_ARG;

-void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport,
- short cause, const void *platformInfo)
-{
- /* Get the impl. */
- transportImpl *impl = (transportImpl *)transport;
- if(NULL != impl)
- {
- /* Set the cause. */
- impl->mCause = cause;
- impl->mPlatformInfo = (void*)platformInfo;
- }
- else
+ if(NULL != transport)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL "
- "transport.");
+ /* Allocate the transport as normal. */
+ ret = mamaTransport_allocate(transport);
+ if(MAMA_STATUS_OK == ret)
+ {
+ /* Get the impl. */
+ transportImpl *impl = (transportImpl *)*transport;
+
+ /* Set the internal flag. */
+ impl->mInternal = 1;
+ }
}
+ return ret;
}
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index 5ae83c6..a253e3f 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -158,6 +158,11 @@ MAMAExpDLL
extern void
mamaTransportImpl_unsetAllPossiblyStale (mamaTransport tport);

+MAMAExpDLL
+extern void
+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport tport,
+ short* cause,
+ const void** platformInfo);
/*
Get the bridge impl associated with the specified transport.
This will be how other objects gain access to the bridge.
@@ -241,27 +246,19 @@ mama_status mamaTransport_addPublisher(mamaTransport transport, mamaPublisher pu
mama_status mamaTransport_removePublisher(mamaTransport transport, void *handle);
preInitialScheme mamaTransportImpl_getPreInitialScheme (mamaTransport transport);

-#if defined(__cplusplus)
-}
-#endif
-
-
/**
- * This function will return the cause and platform info for the last message
- * processed on the transport.
+ * This function will allocate an internal transport for use with the internal event queue, this sort
+ * of transport is limited and does not support certain features, including
+ * The CM Responder.
*
- * @param[in] transport The transport.
- * @param[out] cause To return the cause.
- * @param[out] platformInfo To return the bridge specific info, under no
- * circumstances should the returned object be deleted.
+ * @param[out] transport To return the transport.
*
+ * @returns mama_status value can be one of:
+ * MAMA_STATUS_NOMEM
+ * MAMA_STATUS_NULL_ARG
+ * MAMA_STATUS_OK
*/
-MAMAExpDLL
-extern void
-mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(
- mamaTransport tport,
- short *cause,
- const void **platformInfo);
+mama_status mamaTransportImpl_allocateInternalTransport(mamaTransport *transport);

/**
* This function will set the cause and platform info for the transport.
--
1.7.7.6


[PATCH 18/50] [mama] Subscription: don't put invalid actions on the throttle queue

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Added code to avoid race conditions when enqueing shutdown logic on the
throttle.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama/subscription.h | 7 +------
mama/c_cpp/src/c/subscription.c | 19 +++++++++++--------
2 files changed, 12 insertions(+), 14 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/subscription.h b/mama/c_cpp/src/c/mama/subscription.h
index 9eb1f0c..27978eb 100644
--- a/mama/c_cpp/src/c/mama/subscription.h
+++ b/mama/c_cpp/src/c/mama/subscription.h
@@ -106,12 +106,7 @@ typedef enum
/* The subscription has been de-allocated, this state is only supported so that the log entry will whenever the subscription
* has finally been freed.
*/
- MAMA_SUBSCRIPTION_DEALLOCATED = 10,
-
- /* The subscription is being re-activated, this state can only occur if the mamaSubscription_activate has been called while
- * the subscription is being deactivated, (i.e. its state is MAMA_SUBSCRIPTION_DEACTIVATING.
- */
- MAMA_SUBSCRIPTION_REACTIVATING = 11
+ MAMA_SUBSCRIPTION_DEALLOCATED = 10

} mamaSubscriptionState;

diff --git a/mama/c_cpp/src/c/subscription.c b/mama/c_cpp/src/c/subscription.c
index 6c73978..8c2497b 100644
--- a/mama/c_cpp/src/c/subscription.c
+++ b/mama/c_cpp/src/c/subscription.c
@@ -1772,7 +1772,7 @@ mama_status mamaSubscriptionImpl_deactivate(mamaSubscriptionImpl *impl)
mamaTransport_removeListener(impl->mTransport, impl->mSubscHandle);

/* If there is a create action on the throttle it must be removed. */
- if(NULL != impl->mAction)
+ if((NULL != throttle) && (NULL != impl->mAction))
{
wombatThrottle_removeAction(throttle, impl->mAction);
}
@@ -2195,7 +2195,7 @@ void MAMACALLTYPE mamaSubscriptionImpl_onSubscriptionDestroyed(mamaSubscription
break;

/* The subscription must be de-activated then re-activated. */
- case MAMA_SUBSCRIPTION_REACTIVATING:
+ case MAMA_SUBSCRIPTION_ACTIVATING:

/* Change the state. */
mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_DEACTIVATED);
@@ -2233,7 +2233,7 @@ void MAMACALLTYPE mamaSubscriptionImpl_onSubscriptionDestroyed(mamaSubscription
mama_status mamaSubscriptionImpl_removeFromThrottle(mamaSubscriptionImpl *impl)
{
/* Returns. */
- mama_status ret = MAMA_STATUS_OK;
+ mama_status ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE;

/* Acquire the lock before anything else is done. */
wlock_lock(impl->mCreateDestroyLock);
@@ -2262,7 +2262,8 @@ mama_status mamaSubscriptionImpl_removeFromThrottle(mamaSubscriptionImpl *impl)
* if the subscription is still in the activating state.
*/
ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE;
- if(MAMA_SUBSCRIPTION_ACTIVATING == wInterlocked_read(&impl->mState))
+ if(MAMA_SUBSCRIPTION_ACTIVATING == wInterlocked_read(&impl->mState) &&
+ NULL != impl->mAction)
{
/* Remove the subscription from the throttle. */
wombatThrottle_removeAction(throttle, impl->mAction);
@@ -2514,6 +2515,7 @@ mama_status mamaSubscription_allocate(mamaSubscription *subscription)
impl->mPreInitialCacheSize = MAMA_SUBSCRIPTION_DEFAULT_PREINITIALCACHESIZE;

/* Set the initial state of the subscription now that the memory has been allocated. */
+ wInterlocked_initialize(&impl->mState);
mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_ALLOCATED);

/* The function has succeeded. */
@@ -2570,7 +2572,7 @@ mama_status mamaSubscription_activate(mamaSubscription subscription)
case MAMA_SUBSCRIPTION_DEACTIVATING:

/* Set the state to indicate that the subscription will be reactivated. */
- mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_REACTIVATING);
+ mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_ACTIVATING);

ret = MAMA_STATUS_OK;
break;
@@ -2914,7 +2916,8 @@ mama_status mamaSubscription_deactivate(mamaSubscription subscription)
/* If invalid state is returned by this function then the subscription has become active
* while waiting for it to be removed from the throttle.
*/
- if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret)
+ if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret &&
+ MAMA_SUBSCRIPTION_ACTIVATED == wInterlocked_read(&impl->mState))
{
/* Invoke this function recursively to process the correct state. */
ret = mamaSubscription_deactivate(subscription);
@@ -3043,7 +3046,8 @@ mama_status mamaSubscription_destroy(mamaSubscription subscription)
/* If invalid state is returned by this function then the subscription has become active
* while waiting for it to be removed from the throttle.
*/
- if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret)
+ if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret &&
+ MAMA_SUBSCRIPTION_ACTIVATED == wInterlocked_read(&impl->mState))
{
/* Invoke this function recursively to process the correct state. */
ret = mamaSubscription_destroy(subscription);
@@ -4251,7 +4255,6 @@ const char* mamaSubscription_stringForState(mamaSubscriptionState state)
case MAMA_SUBSCRIPTION_DESTROYED: return "MAMA_SUBSCRIPTION_DESTROYED";
case MAMA_SUBSCRIPTION_DEALLOCATING: return "MAMA_SUBSCRIPTION_DEALLOCATING";
case MAMA_SUBSCRIPTION_DEALLOCATED: return "MAMA_SUBSCRIPTION_DEALLOCATED";
- case MAMA_SUBSCRIPTION_REACTIVATING: return "MAMA_SUBSCRIPTION_REACTIVATING";
}

return "State not recognised";
--
1.7.7.6


[PATCH 17/50] [mama] Configurable Schemes for data quality

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

The dqStratgyScheme deterines whether or not OpenMAMA ignores duplicate
messages. Valid values are:
DQ_SCHEME_DELIVER_ALL deliver all messages including duplicats
DQ_SCHEME_IGNORE_DUPS do not deliver duplicate messages

The dqftStrategyScheme determines how a subscription behaves when OpenMAMA
detects a fault tolerant take over (the sender id changes for a subscription):
DQ_FT_DO_NOT_WAIT_FOR_RECAP continue to process STALE messages
DQ_FT_WAIT_FOR_RECAP discard stale messages

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqstrategy.c | 46 ++++++++++++++--
mama/c_cpp/src/c/dqstrategy.h | 5 ++-
mama/c_cpp/src/c/listenermsgcallback.c | 22 +++++++-
mama/c_cpp/src/c/mama/subscription.h | 12 ++++
mama/c_cpp/src/c/transport.c | 92 ++++++++++++++++++++++++++++++++
5 files changed, 170 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/dqstrategy.c b/mama/c_cpp/src/c/dqstrategy.c
index 8f7ccda..1e495b6 100644
--- a/mama/c_cpp/src/c/dqstrategy.c
+++ b/mama/c_cpp/src/c/dqstrategy.c
@@ -141,7 +141,8 @@ handleFTTakeover (dqStrategy strategy,
int msgType,
mamaDqContext* ctx,
mama_seqnum_t seqNum,
- mama_u64_t senderId)
+ mama_u64_t senderId,
+ int recoverOnRecap)
{
const char* symbol = NULL;
mamaSubscription_getSymbol (self->mSubscription, &symbol);
@@ -151,11 +152,19 @@ handleFTTakeover (dqStrategy strategy,
"Previous SeqNum: %u. New SeqNum: %u. [%s]",
ctx->mSenderId, senderId, ctx->mSeqNum, seqNum, symbol);

+ if (recoverOnRecap)
+ {
+ ctx->mSeqNum = senderId;
+ ctx->mDQState = DQ_STATE_WAITING_FOR_RECAP_AFTER_FT;
+ }
+ else
+ {
resetDqState (strategy, ctx);

/*In all cases we reset the data quality context*/
dqStrategyImpl_resetDqContext (ctx, seqNum, senderId);

+ }
return MAMA_STATUS_OK;
}

@@ -183,6 +192,7 @@ dqStrategy_checkSeqNum (dqStrategy strategy,

mamaMsg_getSeqNum (msg, &seqNum);

+ ctx->mDoNotForward = 0;
if (mamaMsg_getU64 (msg, MamaFieldSenderId.mName, MamaFieldSenderId.mFid,
&senderId) != MAMA_STATUS_OK)
{
@@ -207,7 +217,14 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
mamaStatsCollector_incrementStat (*(mamaInternal_getGlobalStatsCollector()),
MamaStatFtTakeovers.mFid);
}
- return handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId);
+ if (DQ_FT_WAIT_FOR_RECAP==mamaTransportImpl_getFtStrategyScheme(tport))
+ {
+ handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId, 1);
+ }
+ else
+ {
+ return handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId, 0);
+ }
}

if (gMamaLogLevel >= MAMA_LOG_LEVEL_FINER)
@@ -243,7 +260,8 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
if (((ctxDqState == DQ_STATE_NOT_ESTABLISHED) ||
(seqNum == 0) ||
(seqNum == (ctxSeqNum + conflateCnt))) &&
- (ctxDqState != DQ_STATE_WAITING_FOR_RECAP))
+ ((ctxDqState != DQ_STATE_WAITING_FOR_RECAP) ||
+ (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)))
{
/* No gap */
if (self->mTryToFillGap)
@@ -268,7 +286,19 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
return MAMA_STATUS_OK;
}

- if (seqNum == ctxSeqNum)
+ /* For late joins or middlewares that support a publish cache, it is possible that you will get old updates
+ in this case take no action */
+ if (DQ_SCHEME_INGORE_DUPS==mamaTransportImpl_getDqStrategyScheme(tport))
+ {
+ if ((seqNum <= ctxSeqNum) && ((ctxDqState != DQ_STATE_WAITING_FOR_RECAP) ||
+ (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)))
+ {
+ ctx->mDoNotForward = 1;
+ return MAMA_STATUS_OK;
+ }
+ }
+
+ if ((seqNum == ctxSeqNum) && (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT))
{
/* Duplicate data - set DQQuality to DUPLICATE, invoke quality callback */
ctx->mDQState = DQ_STATE_DUPLICATE;
@@ -290,6 +320,13 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
return MAMA_STATUS_OK;
}

+ if (ctxDqState == DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)
+ {
+ ctx->mDoNotForward = 1;
+ return MAMA_STATUS_OK;
+ }
+ else
+ {
/* If we get here, we missed a sequence number. */
if ((PRE_INITIAL_SCHEME_ON_GAP==mamaTransportImpl_getPreInitialScheme(tport))
&&(self->mTryToFillGap))
@@ -323,6 +360,7 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
}

handleStaleData (self, msg, ctx);
+ }
break;
case MAMA_MSG_TYPE_INITIAL :
case MAMA_MSG_TYPE_BOOK_INITIAL :
diff --git a/mama/c_cpp/src/c/dqstrategy.h b/mama/c_cpp/src/c/dqstrategy.h
index a7ff6a7..d550e95 100644
--- a/mama/c_cpp/src/c/dqstrategy.h
+++ b/mama/c_cpp/src/c/dqstrategy.h
@@ -50,7 +50,9 @@ typedef enum dqState_
* In the case of a stale initial, we do not want
* a recap because it may also be stale data.
*/
- DQ_STATE_STALE_NO_RECAP = 5
+ DQ_STATE_STALE_NO_RECAP = 5,
+
+ DQ_STATE_WAITING_FOR_RECAP_AFTER_FT = 6
} dqState;

typedef struct
@@ -64,6 +66,7 @@ typedef struct
imageRequest mRecapRequest;
mama_u64_t mSenderId;

+ uint8_t mDoNotForward;
} mamaDqContext;

typedef struct dqStrategy_* dqStrategy;
diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c
index dca5474..cbe1c90 100644
--- a/mama/c_cpp/src/c/listenermsgcallback.c
+++ b/mama/c_cpp/src/c/listenermsgcallback.c
@@ -449,7 +449,16 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
case MAMA_MSG_TYPE_QUOTE:
case MAMA_MSG_TYPE_TRADE:
mamaSubscription_checkSeqNum(subscription, msg, msgType, ctx);
- mamaSubscription_forwardMsg(subscription, msg);
+ if (!ctx->mDqContext.mDoNotForward)
+ {
+ mamaSubscription_forwardMsg(subscription, msg);
+ }
+ else
+ {
+ mamaSubscription_getSymbol (subscription, &userSymbol);
+ mama_log (MAMA_LOG_LEVEL_FINER, "Subscription for %s not forwarded"
+ " as message seqnum is before seqnum expecting", userSymbol);
+ }
break;
case MAMA_MSG_TYPE_REFRESH:
mamaSubscription_respondToRefreshMessage(subscription);
@@ -468,7 +477,16 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
break;
default:
mamaSubscription_checkSeqNum(subscription, msg, msgType, ctx);
- mamaSubscription_forwardMsg(subscription, msg);
+ if (!ctx->mDqContext.mDoNotForward)
+ {
+ mamaSubscription_forwardMsg(subscription, msg);
+ }
+ else
+ {
+ mamaSubscription_getSymbol (subscription, &userSymbol);
+ mama_log (MAMA_LOG_LEVEL_FINER, "Subscription for %s not forwarded"
+ " as message seqnum is before seqnum expecting", userSymbol);
+ }
}
}

diff --git a/mama/c_cpp/src/c/mama/subscription.h b/mama/c_cpp/src/c/mama/subscription.h
index 058a41c..9eb1f0c 100644
--- a/mama/c_cpp/src/c/mama/subscription.h
+++ b/mama/c_cpp/src/c/mama/subscription.h
@@ -115,6 +115,18 @@ typedef enum

} mamaSubscriptionState;

+typedef enum
+{
+ DQ_SCHEME_DELIVER_ALL,
+ DQ_SCHEME_INGORE_DUPS
+} dqStartegyScheme;
+
+
+typedef enum
+{
+ DQ_FT_DO_NOT_WAIT_FOR_RECAP,
+ DQ_FT_WAIT_FOR_RECAP
+}dqftStrategyScheme;
/* *************************************************** */
/* Type Defines. */
/* *************************************************** */
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 20ee0cc..0a4adcd 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -162,6 +162,9 @@ typedef struct transportImpl_

uint8_t mDisableDisconnectCb;
uint8_t mDisableRefresh;
+ dqStartegyScheme mDQStratScheme;
+ dqftStrategyScheme mFTStratScheme;
+
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -177,6 +180,8 @@ init (transportImpl* transport, int createResponder)
self->mCause = 0;
self->mPlatformInfo = NULL;
self->mPreInitialScheme = PRE_INITIAL_SCHEME_ON_GAP;
+ self->mDQStratScheme = DQ_SCHEME_DELIVER_ALL;
+ self->mFTStratScheme = DQ_FT_DO_NOT_WAIT_FOR_RECAP;


mama_log (MAMA_LOG_LEVEL_FINEST,
@@ -365,6 +370,72 @@ static void setPreInitialStrategy (mamaTransport transport)
"%s: Using default preinitial strategy: ON_GAP", self->mName);
}
}
+static void setDQStrategy (mamaTransport transport)
+{
+ const char* propValue = NULL;
+ char propNameBuf[256];
+
+ if (!self) return;
+
+ snprintf (propNameBuf, 256, "mama.transport.%s.dqstrategy", self->mName);
+
+ propValue = properties_Get(mamaInternal_getProperties(),
+ propNameBuf);
+
+ if (NULL!=propValue)
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL, "Setting %s=%s",
+ propNameBuf, propValue);
+
+ if (0==strcmp (propValue, "ignoredups"))
+ {
+ self->mDQStratScheme = DQ_SCHEME_INGORE_DUPS;
+ }
+ else
+ {
+ self->mDQStratScheme = DQ_SCHEME_DELIVER_ALL;
+ }
+ }
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL,
+ "%s: Using default dq strategy: DQ_SCHEME_DELIVER_ALL", self->mName);
+ }
+}
+
+static void setFtStrategy (mamaTransport transport)
+{
+ const char* propValue = NULL;
+ char propNameBuf[256];
+
+ if (!self) return;
+
+ snprintf (propNameBuf, 256, "mama.transport.%s.ftstrategy", self->mName);
+
+ propValue = properties_Get(mamaInternal_getProperties(),
+ propNameBuf);
+
+ if (NULL!=propValue)
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL, "Setting %s=%s",
+ propNameBuf, propValue);
+
+ if (0==strcmp (propValue, "waitforrecap"))
+ {
+ self->mFTStratScheme = DQ_FT_WAIT_FOR_RECAP;
+ }
+ else
+ {
+ self->mFTStratScheme = DQ_FT_DO_NOT_WAIT_FOR_RECAP;
+ }
+ }
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL,
+ "%s: Using default ft strategy: DQ_FT_DO_NOT_WAIT_FOR_RECAP", self->mName);
+ }
+}
+
/**
* Check property to disable refresh messages. Undocumented.
*
@@ -756,6 +827,8 @@ mamaTransport_create (mamaTransport transport,


setPreInitialStrategy ((mamaTransport)self);
+ setDQStrategy (self);
+ setFtStrategy (self);

if (mamaTransportImpl_disableDisconnectCb (name))
{
@@ -1584,6 +1657,25 @@ mamaTransportImpl_getPreInitialScheme (mamaTransport transport)
return PRE_INITIAL_SCHEME_ON_GAP;
}

+dqStartegyScheme
+mamaTransportImpl_getDqStrategyScheme (mamaTransport transport)
+{
+ if (self)
+ {
+ return self->mDQStratScheme;
+ }
+ return DQ_SCHEME_DELIVER_ALL;
+}
+
+dqftStrategyScheme
+mamaTransportImpl_getFtStrategyScheme (mamaTransport transport)
+{
+ if (self)
+ {
+ return self->mFTStratScheme;
+ }
+ return DQ_FT_DO_NOT_WAIT_FOR_RECAP;
+}
/* Process an advisory message and invokes callbacks
* on all listeners.
* @param transport The transport.
--
1.7.7.6


[PATCH 16/50] [mama] dqpublisher destroy

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Added proper destroy implementation for dqpublisher to allow applications to
clean up and shutdown properly.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqpublishermanager.c | 48 +++++++++++++++++++++++++++++++++
1 files changed, 48 insertions(+), 0 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index ecf6ced..156b586 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -285,11 +285,59 @@ mama_status mamaDQPublisherManager_create (

void mamaDQPublisherManager_destroy (mamaDQPublisherManager manager)
{
+ /* Get the impl. */
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
+ if(NULL != impl)
+ {
+ /* Destroy the publisher. */
+ if(NULL != impl->mPublisher)
+ {
+ mamaPublisher_destroy(impl->mPublisher);
+ }
+
+ /* Destroy the subscription. */
+ if(NULL != impl->mSubscription)
+ {
+ mamaSubscription_destroy(impl->mSubscription);
+ mamaSubscription_deallocate(impl->mSubscription);
+ }
+
+ /* Destroy the inbox. */
+ if(NULL != impl->mInbox)
+ {
+ mamaInbox_destroy(impl->mInbox);
+ }

+ /* Destroy the re-usable messages. */
+ if(NULL != impl->mRefreshResponseMsg)
+ {
+ mamaMsg_destroy(impl->mRefreshResponseMsg);
+ }
+ if(NULL != impl->mNoSubscribersMsg)
+ {
+ mamaMsg_destroy(impl->mNoSubscribersMsg);
+ }
+ if(NULL != impl->mSyncRequestMsg)
+ {
+ mamaMsg_destroy(impl->mSyncRequestMsg);
+ }
+
+ /* Free the namespace. */
+ if(NULL != impl->mNameSpace)
+ {
+ free(impl->mNameSpace);
+ }
+
+ /* Destroy the publisher table. */
+ if(NULL != impl->mPublisherMap)
+ {
wtable_destroy ( impl->mPublisherMap );
}

+ /* Free the impl itself. */
+ free(impl);
+ }
+}
mama_status mamaDQPublisherManager_addPublisher (
mamaDQPublisherManager manager,
const char *symbol,
--
1.7.7.6


[PATCH 15/50] [mama] dqpublisher manager bug fixes

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Only invoke the application on create callback if it is not NULL. This avoids
core dumps if the application does not provide a callback.

The manager was not maintaining the publisher map correctly. It allocated new
entries when an entry already existed, and did not remove entries properly.

Create the CM subscription after the other objects are created. Creating it
early could result in CM requests invoking callbacks before the manager fully
initializes.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqpublishermanager.c | 28 ++++++++++++----------------
1 files changed, 12 insertions(+), 16 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index 0ee62de..ecf6ced 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -114,7 +114,9 @@ dqPublisherImplCreateCb (mamaSubscription subsc, void* closure)
{
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) (closure);

- impl->mUserCallbacks.onCreate ((mamaDQPublisherManager)impl);
+ if(NULL != impl)
+ if(NULL != impl->mUserCallbacks.onCreate)
+ impl->mUserCallbacks.onCreate ((mamaDQPublisherManager)impl);
}


@@ -263,8 +265,14 @@ mama_status mamaDQPublisherManager_create (
impl->mNameSpace = strdup(sourcename);
strcat(topic, ".");
strcat(topic, sourcename);
+ impl->mPublisherMap = wtable_create (topic, NUM_BUCKETS);

mamaSubscription_allocate (&impl->mSubscription);
+ mamaPublisher_create (&impl->mPublisher,
+ transport,
+ MAMA_CM_TOPIC,
+ NULL,
+ NULL);
mamaSubscription_createBasic (impl->mSubscription,
transport,
queue,
@@ -272,14 +280,6 @@ mama_status mamaDQPublisherManager_create (
topic,
impl);

- mamaPublisher_create (&impl->mPublisher,
- transport,
- MAMA_CM_TOPIC,
- NULL,
- NULL);
-
- impl->mPublisherMap = wtable_create (topic, NUM_BUCKETS);
-
return MAMA_STATUS_OK;
}

@@ -298,15 +298,10 @@ mama_status mamaDQPublisherManager_addPublisher (
{
mamaPublishTopic* newTopic = NULL;
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
-
- if (wtable_lookup (impl->mPublisherMap , ( char* )symbol))
- return (MAMA_STATUS_INVALID_ARG);
-
-
newTopic = (mamaPublishTopic*) wtable_lookup (impl->mPublisherMap, (char*)symbol);


- if (newTopic)
+ if (!newTopic)
{
newTopic = (mamaPublishTopic*) calloc (1, sizeof (mamaPublishTopic));

@@ -408,10 +403,11 @@ mama_status mamaDQPublisherManager_destroyPublisher (
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
mamaPublishTopic* newTopic = NULL;

- if ((newTopic = wtable_lookup (impl->mPublisherMap , ( char* )symbol)))
+ if (!(newTopic = wtable_lookup (impl->mPublisherMap , ( char* )symbol)))
return (MAMA_STATUS_INVALID_ARG);

mamaDQPublisher_destroy(newTopic->pub);
+ wtable_remove (impl->mPublisherMap, symbol);

free ((void*)newTopic->symbol);
free ((void*)newTopic);
--
1.7.7.6


[PATCH 14/50] [mama] Add cache and closure to dqpublisher

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

These are both application supplied callbacks. The closure is intended for
application specific context for the publisher and the cache is for caching
data that the publisher may send. In practice the application may use them as
they see fit.

The follow accessors get and set these values:
mamaDQPublisher_setClosure()
mamaDQPublisher_getClosure()
mamaDQPublisher_setCache()
mamaDQPublisher_getCache()

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqpublisher.c | 37 +++++++++++++++++++++++++++++++++
mama/c_cpp/src/c/dqpublishermanager.c | 4 +++
mama/c_cpp/src/c/mama/dqpublisher.h | 15 +++++++++++++
3 files changed, 56 insertions(+), 0 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublisher.c b/mama/c_cpp/src/c/dqpublisher.c
index 4363a1f..d20cc1f 100644
--- a/mama/c_cpp/src/c/dqpublisher.c
+++ b/mama/c_cpp/src/c/dqpublisher.c
@@ -33,6 +33,8 @@ typedef struct mamaDQPublisherImpl_
mamaMsgStatus mStatus;
uint64_t mSenderId;
mama_seqnum_t mSeqNum;
+ void* mClosure;
+ void* mCache;
} mamaDQPublisherImpl;


@@ -300,3 +302,38 @@ void mamaDQPublisher_setSeqNum (mamaDQPublisher pub, mama_seqnum_t num)
}


+void mamaDQPublisher_setClosure (mamaDQPublisher pub, void* closure)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+ if (impl)
+ impl->mClosure = closure;
+}
+
+
+void* mamaDQPublisher_getClosure (mamaDQPublisher pub)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+
+ if (impl)
+ return impl->mClosure;
+ else
+ return NULL;
+}
+
+void mamaDQPublisher_setCache (mamaDQPublisher pub, void* cache)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+ if (impl)
+ impl->mCache = cache;
+}
+
+
+void* mamaDQPublisher_getCache (mamaDQPublisher pub)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+
+ if (impl)
+ return impl->mCache;
+ else
+ return NULL;
+}
diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index c1bd7d0..0ee62de 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -381,6 +381,10 @@ mama_status mamaDQPublisherManager_createPublisher (
return status;
}

+ mamaDQPublisher_setCache(*newPublisher, cache);
+ mamaDQPublisher_setSenderId(*newPublisher, impl->mSenderId);
+ mamaDQPublisher_setStatus(*newPublisher, impl->mStatus);
+ mamaDQPublisher_setSeqNum(*newPublisher, impl->mSeqNum);
if (wtable_insert (impl->mPublisherMap, (char*)symbol, newTopic) != 1)
{
mamaDQPublisher_destroy(*newPublisher);
diff --git a/mama/c_cpp/src/c/mama/dqpublisher.h b/mama/c_cpp/src/c/mama/dqpublisher.h
index c8ae155..0f62030 100644
--- a/mama/c_cpp/src/c/mama/dqpublisher.h
+++ b/mama/c_cpp/src/c/mama/dqpublisher.h
@@ -85,6 +85,7 @@ extern mama_status
mamaDQPublisher_sendReplyWithHandle (mamaDQPublisher pub,
mamaMsgReply replyAddress,
mamaMsg reply);
+
MAMAExpDLL
extern void
mamaDQPublisher_destroy (mamaDQPublisher pub);
@@ -103,7 +104,21 @@ MAMAExpDLL
extern void
mamaDQPublisher_setSeqNum (mamaDQPublisher pub, mama_seqnum_t num);

+MAMAExpDLL
+extern void
+mamaDQPublisher_setClosure (mamaDQPublisher pub, void* closure);
+
+MAMAExpDLL
+extern void*
+mamaDQPublisher_getClosure (mamaDQPublisher pub);

+MAMAExpDLL
+extern void
+mamaDQPublisher_setCache (mamaDQPublisher pub, void* cache);
+
+MAMAExpDLL
+extern void*
+mamaDQPublisher_getCache (mamaDQPublisher pub);

#if defined( __cplusplus )
}
--
1.7.7.6


[PATCH 13/50] [mama] mamaDQPublisher_sendReplyWithHandle

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

This method allows applications to use a reply handle rather than detaching and
holding on to the original message. Since OpenMAMA attempts to reuse messages
this may improve performance for some applications.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqpublisher.c | 105 ++++++++++++++++++++++++++++++++++-
mama/c_cpp/src/c/mama/dqpublisher.h | 5 ++
2 files changed, 107 insertions(+), 3 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublisher.c b/mama/c_cpp/src/c/dqpublisher.c
index a875399..4363a1f 100644
--- a/mama/c_cpp/src/c/dqpublisher.c
+++ b/mama/c_cpp/src/c/dqpublisher.c
@@ -157,17 +157,116 @@ mama_status mamaDQPublisher_sendReply (mamaDQPublisher pub,
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);

- mamaMsg_updateU8(reply, NULL, MamaFieldMsgStatus.mFid, impl->mStatus);
+ if(MAMA_STATUS_OK != mamaMsg_updateU8(reply,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(reply,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }

if (impl->mSenderId != 0)
- mamaMsg_updateU64(reply, NULL, MamaFieldSenderId.mFid, impl->mSenderId);
+ {
+ mamaMsgField senderIdField = NULL;
+ if (MAMA_STATUS_OK == mamaMsg_getField(reply,
+ MamaFieldSenderId.mName, MamaFieldSenderId.mFid,
+ &senderIdField))
+ {
+ mamaFieldType senderIdType = MAMA_FIELD_TYPE_UNKNOWN;
+ if (MAMA_STATUS_OK == mamaMsgField_getType(senderIdField,
+ &senderIdType))
+ {
+ switch(senderIdType)
+ {
+ case MAMA_FIELD_TYPE_U16:
+ mamaMsgField_updateU16(senderIdField,
+ (mama_u16_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U32:
+ mamaMsgField_updateU32(senderIdField,
+ (mama_u32_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U64:
+ default:
+ mamaMsgField_updateU64(senderIdField,
+ impl->mSenderId);
+ break;
+ }
+ }
+ }
+ else
+ mamaMsg_addU64(reply, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, impl->mSenderId);
+ }

if (impl->mSeqNum != 0)
- mamaMsg_updateU32(reply, NULL, MamaFieldSeqNum.mFid, impl->mSeqNum);
+ {
+ mamaMsg_updateU32(reply, MamaFieldSeqNum.mName, MamaFieldSeqNum.mFid,
+ impl->mSeqNum);
+ }

return (mamaPublisher_sendReplyToInbox (impl->mPublisher, request, reply));
}

+mama_status mamaDQPublisher_sendReplyWithHandle (mamaDQPublisher pub,
+ mamaMsgReply replyAddress,
+ mamaMsg reply)
+{
+ mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
+
+ if(MAMA_STATUS_OK != mamaMsg_updateU8(reply,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(reply,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }
+
+ if (impl->mSenderId != 0)
+ {
+ mamaMsgField senderIdField = NULL;
+ if (MAMA_STATUS_OK == mamaMsg_getField(reply,
+ MamaFieldSenderId.mName, MamaFieldSenderId.mFid,
+ &senderIdField))
+ {
+ mamaFieldType senderIdType = MAMA_FIELD_TYPE_UNKNOWN;
+ if (MAMA_STATUS_OK == mamaMsgField_getType(senderIdField,
+ &senderIdType))
+ {
+ switch(senderIdType)
+ {
+ case MAMA_FIELD_TYPE_U16:
+ mamaMsgField_updateU16(senderIdField,
+ (mama_u16_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U32:
+ mamaMsgField_updateU32(senderIdField,
+ (mama_u32_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U64:
+ default:
+ mamaMsgField_updateU64(senderIdField,
+ impl->mSenderId);
+ break;
+ }
+ }
+ }
+ else
+ mamaMsg_addU64(reply, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, impl->mSenderId);
+ }
+
+ if (impl->mSeqNum != 0)
+ {
+ mamaMsg_updateU32(reply, MamaFieldSeqNum.mName, MamaFieldSeqNum.mFid,
+ impl->mSeqNum);
+ }
+
+ return (mamaPublisher_sendReplyToInboxHandle (impl->mPublisher,
+ replyAddress, reply));
+}

void mamaDQPublisher_destroy (mamaDQPublisher pub)
{
diff --git a/mama/c_cpp/src/c/mama/dqpublisher.h b/mama/c_cpp/src/c/mama/dqpublisher.h
index f792656..c8ae155 100644
--- a/mama/c_cpp/src/c/mama/dqpublisher.h
+++ b/mama/c_cpp/src/c/mama/dqpublisher.h
@@ -81,6 +81,11 @@ mamaDQPublisher_sendReply (mamaDQPublisher pub, mamaMsg request,


MAMAExpDLL
+extern mama_status
+mamaDQPublisher_sendReplyWithHandle (mamaDQPublisher pub,
+ mamaMsgReply replyAddress,
+ mamaMsg reply);
+MAMAExpDLL
extern void
mamaDQPublisher_destroy (mamaDQPublisher pub);

--
1.7.7.6


[PATCH 12/50] [mama][dqpublisher] Improved Message Handling

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Since the caller may pass messages with the reserved files (ie. senderid,
seqno, etc.) populated, the dq publisher should try to be robust and update the
fields if the type is reasonable. Prior to this change they would fail if the
types did not match the expected type exactly. Now if the status, for example,
is a 16 bit integer, it will succeed even though we expect and 8 bit integer.

Also fixed formatting and whitespace.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqpublisher.c | 86 ++++++++++++++++++++++++---------
mama/c_cpp/src/c/dqpublishermanager.c | 10 +++-
2 files changed, 71 insertions(+), 25 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublisher.c b/mama/c_cpp/src/c/dqpublisher.c
index 677c117..a875399 100644
--- a/mama/c_cpp/src/c/dqpublisher.c
+++ b/mama/c_cpp/src/c/dqpublisher.c
@@ -29,9 +29,9 @@

typedef struct mamaDQPublisherImpl_
{
- mamaPublisher mPublisher;
- mamaMsgStatus mStatus;
- uint64_t mSenderId;
+ mamaPublisher mPublisher;
+ mamaMsgStatus mStatus;
+ uint64_t mSenderId;
mama_seqnum_t mSeqNum;
} mamaDQPublisherImpl;

@@ -45,11 +45,11 @@ mama_status mamaDQPublisher_allocate (mamaDQPublisher* result)

*result = impl;

- return MAMA_STATUS_OK;
+ return MAMA_STATUS_OK;
}

mama_status mamaDQPublisher_create (mamaDQPublisher pub, mamaTransport transport,
- const char* topic)
+ const char* topic)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
mama_status status = MAMA_STATUS_OK;
@@ -61,7 +61,7 @@ mama_status mamaDQPublisher_create (mamaDQPublisher pub, mamaTransport transport
NULL,
NULL);

- if (status == MAMA_STATUS_OK)
+ if (status == MAMA_STATUS_OK)
{
impl->mSenderId = mamaSenderId_getSelf ();
impl->mStatus = MAMA_MSG_STATUS_OK;
@@ -76,36 +76,79 @@ mama_status mamaDQPublisher_send (mamaDQPublisher pub, mamaMsg msg)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);

- if (impl->mSeqNum != 0)
+ if (impl->mSeqNum != 0)
{
switch (mamaMsgType_typeForMsg (msg))
{
- case MAMA_MSG_TYPE_REFRESH :
- case MAMA_MSG_TYPE_SYNC_REQUEST :
- case MAMA_MSG_TYPE_MISC :
- case MAMA_MSG_TYPE_NOT_PERMISSIONED :
- case MAMA_MSG_TYPE_NOT_FOUND :
- break;
-
+ case MAMA_MSG_TYPE_REFRESH :
+ case MAMA_MSG_TYPE_SYNC_REQUEST :
+ case MAMA_MSG_TYPE_MISC :
+ case MAMA_MSG_TYPE_NOT_PERMISSIONED :
+ case MAMA_MSG_TYPE_NOT_FOUND :
+ break;
case MAMA_MSG_TYPE_INITIAL :
case MAMA_MSG_TYPE_BOOK_INITIAL :
case MAMA_MSG_TYPE_RECAP :
case MAMA_MSG_TYPE_BOOK_RECAP :
- mamaMsg_updateU8(msg,NULL, MamaFieldMsgStatus.mFid, impl->mStatus);
+ if(MAMA_STATUS_OK !=
+ mamaMsg_updateU8(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }
break;

default:
- mamaMsg_updateU8(msg,NULL, MamaFieldMsgStatus.mFid, impl->mStatus);
+ if(MAMA_STATUS_OK !=
+ mamaMsg_updateU8(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus))
+ {
+ mamaMsg_updateI16(msg,MamaFieldMsgStatus.mName,
+ MamaFieldMsgStatus.mFid, impl->mStatus);
+ }
impl->mSeqNum++;
break;
}
- mamaMsg_updateU32(msg, NULL, MamaFieldSeqNum.mFid, impl->mSeqNum);
+ mamaMsg_updateU32(msg, MamaFieldSeqNum.mName, MamaFieldSeqNum.mFid,
+ impl->mSeqNum);
}

if (impl->mSenderId != 0)
- mamaMsg_updateU64(msg, NULL, MamaFieldSenderId.mFid, impl->mSenderId);
+ {
+ mamaMsgField senderIdField = NULL;
+ if (MAMA_STATUS_OK == mamaMsg_getField(msg, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, &senderIdField))
+ {
+ mamaFieldType senderIdType = MAMA_FIELD_TYPE_UNKNOWN;
+ if (MAMA_STATUS_OK == mamaMsgField_getType(senderIdField,
+ &senderIdType))
+ {
+ switch(senderIdType)
+ {
+ case MAMA_FIELD_TYPE_U16:
+ mamaMsgField_updateU16(senderIdField,
+ (mama_u16_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U32:
+ mamaMsgField_updateU32(senderIdField,
+ (mama_u32_t)impl->mSenderId);
+ break;
+
+ case MAMA_FIELD_TYPE_U64:
+ default:
+ mamaMsgField_updateU64(senderIdField, impl->mSenderId);
+ break;
+ }
+ }
+ }
+ else
+ mamaMsg_addU64(msg, MamaFieldSenderId.mName,
+ MamaFieldSenderId.mFid, impl->mSenderId);
+ }

- return (mamaPublisher_send (impl->mPublisher, msg));
+ return (mamaPublisher_send (impl->mPublisher, msg));
}

mama_status mamaDQPublisher_sendReply (mamaDQPublisher pub,
@@ -142,21 +185,18 @@ void mamaDQPublisher_destroy (mamaDQPublisher pub)
void mamaDQPublisher_setStatus (mamaDQPublisher pub, mamaMsgStatus status)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
impl->mStatus = status;
}

void mamaDQPublisher_setSenderId (mamaDQPublisher pub, uint64_t senderid)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
- impl->mSenderId = senderid;
+ impl->mSenderId = senderid;
}

void mamaDQPublisher_setSeqNum (mamaDQPublisher pub, mama_seqnum_t num)
{
mamaDQPublisherImpl* impl = (mamaDQPublisherImpl*) (pub);
-
impl->mSeqNum=num;
}

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index 71026ed..c1bd7d0 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -215,6 +215,9 @@ mama_status mamaDQPublisherManager_allocate(mamaDQPublisherManager* result )
calloc (1, sizeof (mamaDQPublisherManagerImpl));

if (!impl) return MAMA_STATUS_NOMEM;
+ impl->mSenderId = mamaSenderId_getSelf ();
+ impl->mStatus = MAMA_MSG_STATUS_OK;
+ impl->mSeqNum = 1;

*result = impl;

@@ -225,7 +228,10 @@ void* mamaDQPublisherManager_getClosure (mamaDQPublisherManager manager)
{
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;

- return impl->mClosure;
+ if(NULL != impl)
+ return impl->mClosure;
+ else
+ return NULL;
}


@@ -353,7 +359,7 @@ mama_status mamaDQPublisherManager_createPublisher (
mama_status status = MAMA_STATUS_OK;
char topic[80];

- newTopic = (mamaPublishTopic*) wtable_lookup (impl->mPublisherMap, (char*)symbol);
+ newTopic = (mamaPublishTopic*)wtable_lookup (impl->mPublisherMap, (char*)symbol);

if (!newTopic)
{
--
1.7.7.6


[PATCH 11/50] [mama] minor datetime fixes

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

When formatting a date time with a time, separate the date and time with a
space.

Added parameter checking to makeTime to handle invalid values sanely: set the
year to 1970 if prior to beginning of epoch, default month to 0 (Jan) for
invalid values, and set invalid days to 0.

Also converted tabs to spaces. Use "git diff -b" to ignore white space.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/datetime.c | 155 ++++++++++++++++++++++++-------------------
1 files changed, 88 insertions(+), 67 deletions(-)

diff --git a/mama/c_cpp/src/c/datetime.c b/mama/c_cpp/src/c/datetime.c
index 1cb8cde..267bd96 100644
--- a/mama/c_cpp/src/c/datetime.c
+++ b/mama/c_cpp/src/c/datetime.c
@@ -662,11 +662,11 @@ mamaDateTime_addMicroseconds(mamaDateTime dateTime,
{
if(addMicroseconds >= -1000000)
{
- mamaDateTime_addWholeSeconds (dateTime, -1);
+ mamaDateTime_addWholeSeconds (dateTime, -1);
}
else
{
- mamaDateTime_addWholeSeconds (dateTime, addMicroseconds/1000000 - 1);
+ mamaDateTime_addWholeSeconds (dateTime, addMicroseconds/1000000 - 1);
}
addMicroseconds %= 1000000;
if(addMicroseconds < 0) addMicroseconds += 1000000;
@@ -796,72 +796,72 @@ mamaDateTime_getEpochTimeSeconds(const mamaDateTime dateTime,

mama_status mamaDateTime_addTodayToDateTime(mamaDateTime destination)
{
- mama_status ret = MAMA_STATUS_SYSTEM_ERROR;
-
- /* Create a new date time. */
- mama_time_t time;
- mamaDateTimeImpl_clear(time);
- {
- /* Get the current time of day. */
- struct timeval now;
- memset(&now, 0, sizeof(now));
- if(gettimeofday(&now, NULL) == 0)
- {
- /* Initialise the time structure with the date. */
- mamaDateTimeImpl_setSeconds (time, now.tv_sec);
- mamaDateTimeImpl_setMicroSeconds (time, now.tv_usec);
- mamaDateTimeImpl_setPrecision (time, 6);
- mamaDateTimeImpl_setHasDate (time);
- mamaDateTimeImpl_setHasTime (time);
- {
- /* Get existing number of seconds and remove any full-day seconds
- * from the destination.
- */
- mama_u32_t tmpSeconds = mamaDateTimeImpl_getSeconds(*destination) % SECONDS_IN_A_DAY;
-
- /* Append the date from today. */
- tmpSeconds += (mamaDateTimeImpl_getSeconds(time) / SECONDS_IN_A_DAY) * SECONDS_IN_A_DAY;
-
- /* Set the result in the destination. */
- mamaDateTimeImpl_setSeconds(*destination, tmpSeconds);
- mamaDateTimeImpl_setHasDate(*destination);
-
- /* Function succeeded. */
- ret = MAMA_STATUS_OK;
- }
- }
- }
-
- return ret;
+ mama_status ret = MAMA_STATUS_SYSTEM_ERROR;
+
+ /* Create a new date time. */
+ mama_time_t time;
+ mamaDateTimeImpl_clear(time);
+ {
+ /* Get the current time of day. */
+ struct timeval now;
+ memset(&now, 0, sizeof(now));
+ if(gettimeofday(&now, NULL) == 0)
+ {
+ /* Initialise the time structure with the date. */
+ mamaDateTimeImpl_setSeconds (time, now.tv_sec);
+ mamaDateTimeImpl_setMicroSeconds (time, now.tv_usec);
+ mamaDateTimeImpl_setPrecision (time, 6);
+ mamaDateTimeImpl_setHasDate (time);
+ mamaDateTimeImpl_setHasTime (time);
+ {
+ /* Get existing number of seconds and remove any full-day seconds
+ * from the destination.
+ */
+ mama_u32_t tmpSeconds = mamaDateTimeImpl_getSeconds(*destination) % SECONDS_IN_A_DAY;
+
+ /* Append the date from today. */
+ tmpSeconds += (mamaDateTimeImpl_getSeconds(time) / SECONDS_IN_A_DAY) * SECONDS_IN_A_DAY;
+
+ /* Set the result in the destination. */
+ mamaDateTimeImpl_setSeconds(*destination, tmpSeconds);
+ mamaDateTimeImpl_setHasDate(*destination);
+
+ /* Function succeeded. */
+ ret = MAMA_STATUS_OK;
+ }
+ }
+ }
+
+ return ret;
}

mama_status
mamaDateTime_getEpochTimeSecondsWithCheck(const mamaDateTime dateTime,
mama_f64_t* seconds)
{
- mama_status ret = MAMA_STATUS_NULL_ARG;
- if((dateTime != NULL) && (seconds != NULL))
- {
- /* Check to see if the date time has got a date value set. */
- uint8_t hasDate = mamaDateTimeImpl_getHasDate(*dateTime);
-
- /* No date value present, add today's date to the time value. */
- if(hasDate == 0)
- {
- ret = mamaDateTime_addTodayToDateTime(dateTime);
- if(ret == MAMA_STATUS_OK)
- {
- /* Get the seconds as normal. */
- ret = mamaDateTime_getEpochTimeSecondsWithTz(dateTime, seconds, NULL);
- }
- }
-
- /* Date value present, get the seconds as normal. */
- else
- {
- ret = mamaDateTime_getEpochTimeSecondsWithTz(dateTime, seconds, NULL);
- }
- }
+ mama_status ret = MAMA_STATUS_NULL_ARG;
+ if((dateTime != NULL) && (seconds != NULL))
+ {
+ /* Check to see if the date time has got a date value set. */
+ uint8_t hasDate = mamaDateTimeImpl_getHasDate(*dateTime);
+
+ /* No date value present, add today's date to the time value. */
+ if(hasDate == 0)
+ {
+ ret = mamaDateTime_addTodayToDateTime(dateTime);
+ if(ret == MAMA_STATUS_OK)
+ {
+ /* Get the seconds as normal. */
+ ret = mamaDateTime_getEpochTimeSecondsWithTz(dateTime, seconds, NULL);
+ }
+ }
+
+ /* Date value present, get the seconds as normal. */
+ else
+ {
+ ret = mamaDateTime_getEpochTimeSecondsWithTz(dateTime, seconds, NULL);
+ }
+ }

return ret;
}
@@ -973,6 +973,7 @@ mama_status mamaDateTime_getAsString (const mamaDateTime dateTime,
struct tm tmValue;
size_t bytesUsed;
size_t precision;
+ uint8_t hasTime = 0;

if (!dateTime || !buf)
return MAMA_STATUS_INVALID_ARG;
@@ -980,16 +981,20 @@ mama_status mamaDateTime_getAsString (const mamaDateTime dateTime,
seconds = (time_t) mamaDateTimeImpl_getSeconds(*dateTime);
utcTm (&tmValue, seconds);
buf[0] = '\0';
+ hasTime = mamaDateTimeImpl_getHasTime (*dateTime);
if (mamaDateTimeImpl_getHasDate(*dateTime))
{
- bytesUsed = strftime (buf, bufMaxLen, "%Y-%m-%d ", &tmValue);
+ if (hasTime)
+ bytesUsed = strftime (buf, bufMaxLen, "%Y-%m-%d ", &tmValue);
+ else
+ bytesUsed = strftime (buf, bufMaxLen, "%Y-%m-%d", &tmValue);
if (bytesUsed > 0)
{
buf += bytesUsed;
bufMaxLen -= bytesUsed;
}
}
- if (mamaDateTimeImpl_getHasTime(*dateTime))
+ if (hasTime)
{
bytesUsed = strftime (buf, bufMaxLen, "%H:%M:%S", &tmValue);
if (bytesUsed > 0)
@@ -1645,9 +1650,25 @@ unsigned long makeTime (
{
struct tm timeInfo;

- timeInfo.tm_year = year - 1900;
- timeInfo.tm_mon = mon - 1;
- timeInfo.tm_mday = day;
+ /* tm_year stores the years since 1900
+ * must not be less than 70 */
+ if (year > 1970)
+ timeInfo.tm_year = year - 1900;
+ else
+ timeInfo.tm_year = 70;
+
+ /* tm_mon stores the months since Jan (0 to 11) */
+ if (mon > 0)
+ timeInfo.tm_mon = mon - 1;
+ else
+ timeInfo.tm_mon = 0;
+
+ /* day of the month (1 to 31) */
+ if (day > 0)
+ timeInfo.tm_mday = day;
+ else
+ timeInfo.tm_mday = 1;
+
timeInfo.tm_hour = hour;
timeInfo.tm_min = min;
timeInfo.tm_sec = sec;
--
1.7.7.6


[PATCH 10/50] [mama] Free CM Subscription in Destroy Callback

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

The CM subscription can not be freed until it destruction completes and invokes
the onDestroy() callback. This avoids potential core dumps on cleanup.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/clientmanageresponder.c | 33 ++++++++++++++++-------------
1 files changed, 18 insertions(+), 15 deletions(-)

diff --git a/mama/c_cpp/src/c/clientmanageresponder.c b/mama/c_cpp/src/c/clientmanageresponder.c
index f307fff..4d7251d 100644
--- a/mama/c_cpp/src/c/clientmanageresponder.c
+++ b/mama/c_cpp/src/c/clientmanageresponder.c
@@ -74,6 +74,8 @@ errorCB( mamaSubscription subscription, mama_status status,
static void MAMACALLTYPE
msgCB( mamaSubscription subscription, mamaMsg msg, void* closure,
void* itemClosure);
+static void MAMACALLTYPE
+destroyCB( mamaSubscription subscription, void* closure);

/**
* Destroy the pending command
@@ -123,14 +125,11 @@ mama_status mamaCmResponder_destroy(mamaCmResponder responder)
{
/* Destroy the subscription, note that the first failure return code will be preserved.
*/
- mama_status sd = mamaSubscription_destroy(*nextSubscription);
+ mama_status sd = mamaSubscription_destroyEx(*nextSubscription);
if(ret == MAMA_STATUS_OK)
{
ret = sd;
}
-
- /* The memory will not be released until the subscription is deallocated. */
- mamaSubscription_deallocate(*nextSubscription);
}
}

@@ -160,6 +159,7 @@ mama_status mamaCmResponder_destroy(mamaCmResponder responder)

/* Free the array of publishers. */
free(impl->publishers);
+ impl->publishers = NULL;
}

/* Destroy all pending commands. */
@@ -187,6 +187,7 @@ mama_status populateCmResponder(mamaCmResponderImpl *impl)
{
/* Returns */
mama_status ret = MAMA_STATUS_NO_BRIDGE_IMPL;
+ mamaQueue internalQueue = NULL;

/* Get the default event queue from the bridgeImpl. */
mamaBridgeImpl *bridgeImpl = mamaTransportImpl_getBridgeImpl(impl->mTransport);
@@ -200,9 +201,10 @@ mama_status populateCmResponder(mamaCmResponderImpl *impl)
callbacks.onCreate = createCB;
callbacks.onError = errorCB;
callbacks.onMsg = msgCB;
+ callbacks.onDestroy = destroyCB;

- /* Reset the return code for the enumeration */
- ret = MAMA_STATUS_OK;
+ ret = mamaBridgeImpl_getInternalEventQueue((mamaBridge)bridgeImpl, &internalQueue);
+ if (ret == MAMA_STATUS_OK)
{
/* Enumerate all the sub transport bridges in the transport. */
int nextTransportIndex = 0;
@@ -234,7 +236,7 @@ mama_status populateCmResponder(mamaCmResponderImpl *impl)
ret = mamaSubscription_createBasic(
*nextSubscription,
impl->mTransport,
- bridgeImpl->mDefaultEventQueue,
+ internalQueue,
&callbacks,
MAMA_CM_TOPIC,
impl);
@@ -359,14 +361,6 @@ msgCB (mamaSubscription subscription, mamaMsg msg, void* closure,

/* Get the transport index in use by the supplied subscription. */
int transportIndex = 0;
- mamaSubscription_getTransportIndex(subscription, &transportIndex);
-
- /**
- * TODO: We need to add the element to the list. This is a bug, but I
- * don't want to fix it without testing.
- */
-
- /* list_push_back (impl->mPendingCommands, command); */

/*
* The command ID field tells us what command to execute.
@@ -384,7 +378,9 @@ msgCB (mamaSubscription subscription, mamaMsg msg, void* closure,
{
mama_log (MAMA_LOG_LEVEL_FINE, "mamaCmResponder::msgCb(): "
"CM SYNC Received" );
+ mamaSubscription_getTransportIndex(subscription, &transportIndex);
mamaSyncCommand_create (command, msg, impl->mTransport, transportIndex, impl->publishers[transportIndex], endCB, impl);
+ list_push_back(impl->mPendingCommands, command);
mamaSyncCommand_run (command);
}
break;
@@ -394,3 +390,10 @@ msgCB (mamaSubscription subscription, mamaMsg msg, void* closure,
"Bad CM command: %d", command);
}
}
+
+static void MAMACALLTYPE
+destroyCB (mamaSubscription subscription, void* closure)
+{
+ /* Deallocate the subscription. */
+ mamaSubscription_deallocate(subscription);
+}
--
1.7.7.6


[PATCH 09/50] [bridge] Added mamaTransport_forceClientDisconnect()

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

For TCP socket based middleware, allow transports to disconnect individual
clients. This is usefull for eliminating slow consumers or other applications
that may impact the entier system.

Also fixed compiler warnings in Avis bridge files.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/bridge.h | 11 +++-
mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h | 7 ++-
mama/c_cpp/src/c/bridge/avis/bridge.c | 36 +++++++-----
mama/c_cpp/src/c/bridge/avis/transportbridge.c | 59 ++++++++++++-------
4 files changed, 73 insertions(+), 40 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge.h b/mama/c_cpp/src/c/bridge.h
index 5ab7707..9ead2bd 100644
--- a/mama/c_cpp/src/c/bridge.h
+++ b/mama/c_cpp/src/c/bridge.h
@@ -116,6 +116,8 @@ do \
implIdentifier ## BridgeMamaTransport_destroy; \
bridgeImpl->bridgeMamaTransportCreate = \
implIdentifier ## BridgeMamaTransport_create; \
+ bridgeImpl->bridgeMamaTransportForceClientDisconnect = \
+ implIdentifier ## BridgeMamaTransport_forceClientDisconnect;\
bridgeImpl->bridgeMamaTransportFindConnection = \
implIdentifier ## BridgeMamaTransport_findConnection; \
bridgeImpl->bridgeMamaTransportGetAllConnections = \
@@ -269,7 +271,7 @@ typedef mama_status (*bridge_stop)(mamaQueue defaultEventQueue);
/*Called by mama_getVersion()*/
typedef const char* (*bridge_getVersion)(void);
typedef const char* (*bridge_getName)(void);
-typedef mama_status (*bridge_getDefaultPayloadId)(char**name, char* id);
+typedef mama_status (*bridge_getDefaultPayloadId)(char***name, char** id);

/*===================================================================
= mamaQueue bridge function pointers =
@@ -330,6 +332,11 @@ typedef mama_status (*bridgeMamaTransport_destroy)(transportBridge transport);
typedef mama_status (*bridgeMamaTransport_create)(transportBridge *result,
const char* name,
mamaTransport parent);
+typedef mama_status (*bridgeMamaTransport_forceClientDisconnect)
+ (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port);
/* Find a connection with specified IP Address and Port. If the port is 0, the
* call returns the first connection with the specified IP Address. If a
* connection is not found the method returns MAMA_STATUS_NOT_FOUND and
@@ -735,6 +742,8 @@ typedef struct mamaBridgeImpl
bridgeMamaTransport_isValid bridgeMamaTransportIsValid;
bridgeMamaTransport_destroy bridgeMamaTransportDestroy;
bridgeMamaTransport_create bridgeMamaTransportCreate;
+ bridgeMamaTransport_forceClientDisconnect
+ bridgeMamaTransportForceClientDisconnect;
bridgeMamaTransport_findConnection bridgeMamaTransportFindConnection;
bridgeMamaTransport_getAllConnections
bridgeMamaTransportGetAllConnections;
diff --git a/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h b/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
index 94c4a42..0dd6f8e 100644
--- a/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
+++ b/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
@@ -43,7 +43,7 @@ extern const char*
avisBridge_getVersion (void);

mama_status
-avisBridge_getDefaultPayloadId (char**name, char* id);
+avisBridge_getDefaultPayloadId (char***name, char** id);

extern mama_status
avisBridge_open (mamaBridge bridgeImpl);
@@ -125,6 +125,11 @@ avisBridgeMamaTransport_create (transportBridge* result,
const char* name,
mamaTransport parent);
extern mama_status
+avisBridgeMamaTransport_forceClientDisconnect (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port);
+extern mama_status
avisBridgeMamaTransport_findConnection (transportBridge* transports,
int numTransports,
mamaConnection* result,
diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 24c369a..4b5343a 100755
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -35,10 +35,12 @@ timerHeap gTimerHeap;
/*Responsible for creating the bridge impl structure*/
void avisBridge_createImpl (mamaBridge* result)
{
+ avisBridgeImpl* avisBridge = NULL;
+ mamaBridgeImpl* impl = NULL;
if (!result) return;
*result = NULL;

- mamaBridgeImpl* impl = (mamaBridgeImpl*)calloc (1, sizeof (mamaBridgeImpl));
+ impl = (mamaBridgeImpl*)calloc (1, sizeof (mamaBridgeImpl));
if (!impl)
{
mama_log (MAMA_LOG_LEVEL_SEVERE, "avisBridge_createImpl(): "
@@ -46,7 +48,7 @@ void avisBridge_createImpl (mamaBridge* result)
return;
}

- avisBridgeImpl* avisBridge = (avisBridgeImpl*) calloc(1, sizeof(avisBridgeImpl));
+ avisBridge = (avisBridgeImpl*) calloc(1, sizeof(avisBridgeImpl));

/*Populate the bridge impl structure with the function pointers*/
INITIALIZE_BRIDGE (impl, avis);
@@ -59,7 +61,7 @@ void avisBridge_createImpl (mamaBridge* result)
const char*
avisBridge_getVersion (void)
{
- return (const char*) VERSION;
+ return (const char*) "Unable to get version number";
}

const char*
@@ -68,14 +70,14 @@ avisBridge_getName (void)
return "avis";
}

-#define DEFAULT_PAYLOAD_NAME "avismsg"
-#define DEFAULT_PAYLOAD_ID MAMA_PAYLOAD_AVIS
+static const char* PAYLOAD_NAMES[] = {"avismsg",NULL};
+static const char PAYLOAD_IDS[] = {MAMA_PAYLOAD_AVIS,NULL};

mama_status
-avisBridge_getDefaultPayloadId (char**name, char* id)
+avisBridge_getDefaultPayloadId (char***name, char** id)
{
- *name=DEFAULT_PAYLOAD_NAME;
- *id=DEFAULT_PAYLOAD_ID;
+ *name = PAYLOAD_NAMES;
+ *id = PAYLOAD_IDS;

return MAMA_STATUS_OK;
}
@@ -87,6 +89,7 @@ avisBridge_open (mamaBridge bridgeImpl)
mama_status status = MAMA_STATUS_OK;
mamaBridgeImpl* impl = (mamaBridgeImpl*)bridgeImpl;

+ wsocketstartup();
mama_log (MAMA_LOG_LEVEL_FINEST, "avisBridge_open(): Entering.");

if (MAMA_STATUS_OK !=
@@ -123,10 +126,11 @@ avisBridge_open (mamaBridge bridgeImpl)
mama_status
avisBridge_close (mamaBridge bridgeImpl)
{
+ mama_status status = MAMA_STATUS_OK;
+ mamaBridgeImpl* impl = NULL;
mama_log (MAMA_LOG_LEVEL_FINEST, "avisBridge_close(): Entering.");

- mama_status status = MAMA_STATUS_OK;
- mamaBridgeImpl* impl = (mamaBridgeImpl*)bridgeImpl;
+ impl = (mamaBridgeImpl*)bridgeImpl;


if (0 != destroyHeap (gTimerHeap))
@@ -139,6 +143,7 @@ avisBridge_close (mamaBridge bridgeImpl)
mamaQueue_destroyWait(impl->mDefaultEventQueue);

free (impl);
+ wsocketcleanup();
return status;
}

@@ -146,12 +151,12 @@ avisBridge_close (mamaBridge bridgeImpl)
mama_status
avisBridge_start(mamaQueue defaultEventQueue)
{
- mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_start(): Start dispatching on default event queue.");
-
mama_status status = MAMA_STATUS_OK;
+ avisBridgeImpl* avisBridge = NULL;
+
+ mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_start(): Start dispatching on default event queue.");

// start Avis event loop(s)
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) mamaQueueImpl_getBridgeImpl(defaultEventQueue), (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_start(): Could not get Elvin object");
return status;
@@ -168,11 +173,10 @@ avisBridge_start(mamaQueue defaultEventQueue)
mama_status
avisBridge_stop(mamaQueue defaultEventQueue)
{
- mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_stop(): Stopping bridge.");
-
mama_status status = MAMA_STATUS_OK;
+ avisBridgeImpl* avisBridge = NULL;

- avisBridgeImpl* avisBridge;
+ mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_stop(): Stopping bridge.");
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) mamaQueueImpl_getBridgeImpl(defaultEventQueue), (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_stop(): Could not get Elvin object");
return status;
diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.c b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
index c994879..de10b61 100755
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
@@ -29,6 +29,7 @@
#include <mama/types.h>
#include <transportimpl.h>
#include <timers.h>
+#include <errno.h>
#include "transportbridge.h"
#include "avisbridgefunctions.h"
#include "avisdefs.h"
@@ -59,14 +60,12 @@ void log_avis_error(MamaLogLevel logLevel, Elvin* avis)

void closeListener(Elvin* avis, CloseReason reason, const char* message, void* closure)
{
+ const char* errMsg;
if (avisBridge(closure) == NULL) {
mama_log (MAMA_LOG_LEVEL_FINE, "Avis closeListener: could not get Avis bridge");
return;
}

- // TODO -- serialize access across multiple threads
-
- const char* errMsg;
switch( reason )
{
case REASON_CLIENT_SHUTDOWN: errMsg = "Avis client shutdown"; break;
@@ -82,16 +81,19 @@ void closeListener(Elvin* avis, CloseReason reason, const char* message, void* c
static const char*
getURL( const char *name )
{
+ int len = 0;
+ char* buff = NULL;
+ const char* property = NULL;
if (name == NULL)
return NULL;

mama_log (MAMA_LOG_LEVEL_FINE, "initializing Avis transport: %s", name);
- int len = strlen(name) + strlen( TPORT_PREFIX ) + strlen(TPORT_PARAM) + 4;
- char* buff = (char *)alloca( len );
+ len = strlen(name) + strlen( TPORT_PREFIX ) + strlen(TPORT_PARAM) + 4;
+ buff = (char *)alloca( len );
memset(buff, '\0', len);
snprintf( buff, len, "%s.%s.%s", TPORT_PREFIX, name, TPORT_PARAM );

- const char* property = properties_Get( mamaInternal_getProperties(), buff );
+ property = properties_Get( mamaInternal_getProperties(), buff );
if ( property == NULL )
return DEFAULT_URL;

@@ -123,13 +125,13 @@ void* avisDispatchThread(void* closure)

mama_status avisTransportBridge_start(avisTransportBridge* transportBridge)
{
- CHECK_TRANSPORT(transportBridge);
-
// stop Avis event loop
- pthread_t tid;
+ wthread_t tid;
int rc;
- if (0 != (rc = pthread_create(&tid, NULL, avisDispatchThread, transportBridge))) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "pthread_create returned %d", rc);
+ CHECK_TRANSPORT(transportBridge);
+
+ if (0 != (rc = wthread_create(&tid, NULL, avisDispatchThread, transportBridge))) {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "wthread_create returned %d", rc);
return MAMA_STATUS_SYSTEM_ERROR;
}

@@ -193,22 +195,24 @@ avisBridgeMamaTransport_create (transportBridge* result,
const char* name,
mamaTransport mamaTport )
{
- avisTransportBridge* transport =
- (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
+ mama_status status;
+ avisBridgeImpl* avisBridge = NULL;
+ avisTransportBridge* transport = NULL;
+ mamaBridgeImpl* bridgeImpl = NULL;
+ const char* url = NULL;
+
+ transport = (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
if (transport == NULL)
return MAMA_STATUS_NOMEM;

transport->mTransport = (mamaTransport) mamaTport;

- // TODO -- serialize access across multiple threads
- mamaBridgeImpl* bridgeImpl = mamaTransportImpl_getBridgeImpl(mamaTport);
+ bridgeImpl = mamaTransportImpl_getBridgeImpl(mamaTport);
if (!bridgeImpl) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get bridge");
free(transport);
return MAMA_STATUS_PLATFORM;
}
- mama_status status;
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) bridgeImpl, (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get Avis bridge object");
free(transport);
@@ -229,7 +233,7 @@ avisBridgeMamaTransport_create (transportBridge* result,
}

// open the server connection
- const char* url = getURL(name);
+ url = getURL(name);
if (url == NULL) {
mama_log (MAMA_LOG_LEVEL_NORMAL, "No %s property defined for transport : %s", TPORT_PARAM, name);
return MAMA_STATUS_INVALID_ARG;
@@ -253,15 +257,17 @@ avisBridgeMamaTransport_create (transportBridge* result,
mama_status
avisBridgeMamaTransport_destroy (transportBridge transport)
{
- // TODO -- serialize access across multiple threads
- mamaBridgeImpl* bridgeImpl = mamaTransportImpl_getBridgeImpl(avisTransport(transport)->mTransport);
+ mama_status status;
+ avisBridgeImpl* avisBridge = NULL;
+ mamaBridgeImpl* bridgeImpl = NULL;
+
+
+ bridgeImpl = mamaTransportImpl_getBridgeImpl(avisTransport(transport)->mTransport);
if (!bridgeImpl) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get bridge");
free(transport);
return MAMA_STATUS_PLATFORM;
}
- mama_status status;
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) bridgeImpl, (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get Avis bridge object");
free(transport);
@@ -286,6 +292,15 @@ avisBridgeMamaTransport_isValid (transportBridge transport)
}

mama_status
+avisBridgeMamaTransport_forceClientDisconnect (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port)
+{
+ return MAMA_STATUS_NOT_IMPLEMENTED;
+}
+
+mama_status
avisBridgeMamaTransport_findConnection (transportBridge* transports,
int numTransports,
mamaConnection* result,
--
1.7.7.6


[PATCH 08/50] [bridge] Timout for Stopping Internal Queue

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

When shutting down, don't wait forever for the internal queue to stop. If the
queue does not stop within the default queue timeout
(mama.defaultqueue.timeout property), continue to shutdown. The default value
is 2000 milliseconds. This prevents hanging at shutdown if the distpach thread
is blocked.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/bridge.c | 5 ++++-
1 files changed, 4 insertions(+), 1 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge.c b/mama/c_cpp/src/c/bridge.c
index 6970c8c..94c5caa 100644
--- a/mama/c_cpp/src/c/bridge.c
+++ b/mama/c_cpp/src/c/bridge.c
@@ -121,6 +121,8 @@ mamaBridgeImpl_stopInternalEventQueue (mamaBridge bridgeImpl)
if (impl->mInternalEventQueue)
{

+ /* Get the queue timeout value. */
+ int defaultQueueTimeout = mamaBridgeImpl_getDefaultQueueTimeout();
if (MAMA_STATUS_OK != mamaDispatcher_destroy (impl->mInternalDispatcher))
{
mama_log (MAMA_LOG_LEVEL_WARN, "mamaBridgeImpl_stopInternalEventQueue(): "
@@ -128,7 +130,8 @@ mamaBridgeImpl_stopInternalEventQueue (mamaBridge bridgeImpl)
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

- if (MAMA_STATUS_OK != mamaQueue_destroy (impl->mInternalEventQueue ))
+ /* Destroy the queue waiting for the appropriate time value. */
+ if (MAMA_STATUS_OK != mamaQueue_destroyTimedWait (impl->mInternalEventQueue, defaultQueueTimeout))
{
mama_log (MAMA_LOG_LEVEL_WARN, "mamaBridgeImpl_stopInternalEventQueue(): "
"Could not destroy internal queue");
--
1.7.7.6


[PATCH 07/50] [windows] Macros for MSVC Version

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

These are necessary to accomodate different version of Visual Studio.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
common/c_cpp/src/c/windows/wombat/targetsxs.h | 9 +++++++++
1 files changed, 9 insertions(+), 0 deletions(-)

diff --git a/common/c_cpp/src/c/windows/wombat/targetsxs.h b/common/c_cpp/src/c/windows/wombat/targetsxs.h
index a8d8280..2341000 100644
--- a/common/c_cpp/src/c/windows/wombat/targetsxs.h
+++ b/common/c_cpp/src/c/windows/wombat/targetsxs.h
@@ -1,6 +1,15 @@
#ifndef _WOMBAT_TARGETSXS_H
#define _WOMBAT_TARGETSXS_H

+#if (_MSC_VER >= 1600 && _MSC_VER < 1700)
+#define VC10 1
+#elif (_MSC_VER >= 1600 && _MSC_VER < 1500)
+#define VC9 1
+#elif (_MSC_VER >= 1400 && _MSC_VER < 1500)
+#define VC8 1
+#elif (_MSC_VER >= 1400 && _MSC_VER < 1400)
+#define VC7 1
+#endif
#ifndef VC7

#ifdef VC8
--
1.7.7.6


[PATCH 06/50] Adding new files for C++ and Windows Support

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

These files include support for uuid's required by avis for request reply. Also
adding wMessageStats for C++ and MAMDA.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
common/c_cpp/configure.ac | 6 +-
common/c_cpp/src/c/Makefile.am | 7 +-
common/c_cpp/src/c/linux/wMath.h | 32 +
common/c_cpp/src/c/linux/wUuid.h | 14 +
common/c_cpp/src/c/wMessageStats.c | 1001 +++++++++++++++++++++++++++++
common/c_cpp/src/c/windows/wombat/wMath.h | 31 +
common/c_cpp/src/c/windows/wombat/wUuid.h | 34 +
common/c_cpp/src/c/wombat/wMessageStats.h | 190 ++++++
8 files changed, 1311 insertions(+), 4 deletions(-)
create mode 100644 common/c_cpp/src/c/linux/wMath.h
create mode 100644 common/c_cpp/src/c/linux/wUuid.h
create mode 100644 common/c_cpp/src/c/wMessageStats.c
create mode 100644 common/c_cpp/src/c/windows/wombat/wMath.h
create mode 100644 common/c_cpp/src/c/windows/wombat/wUuid.h
create mode 100644 common/c_cpp/src/c/wombat/wMessageStats.h

diff --git a/common/c_cpp/configure.ac b/common/c_cpp/configure.ac
index e288769..aed5043 100755
--- a/common/c_cpp/configure.ac
+++ b/common/c_cpp/configure.ac
@@ -25,9 +25,9 @@
# VERSION INFO
#
#################################################
-m4_define([product_version_major], [4])
+m4_define([product_version_major], [6])
m4_define([product_version_minor], [0])
-m4_define([product_version_release], [4a])
+m4_define([product_version_release], [0rc15])
m4_define([product_full_version],[product_version_major.product_version_minor.product_version_release])


@@ -47,6 +47,8 @@ AC_CANONICAL_BUILD()
AC_MSG_NOTICE([Configuring symbolic links for $build_os in $srcdir])
case $build_os in
linux*-*) AC_CONFIG_LINKS([$srcdir/src/c/wombat/port.h:src/c/linux/port.h
+ $srcdir/src/c/wombat/wMath.h:src/c/linux/wMath.h
+ $srcdir/src/c/wombat/wUuid.h:src/c/linux/wUuid.h
$srcdir/src/c/wombat/wConfig.h:src/c/linux/wConfig.h
$srcdir/src/c/wombat/wInterlocked.h:src/c/linux/wInterlocked.h
$srcdir/src/c/machine_port.c:src/c/linux/machine.c
diff --git a/common/c_cpp/src/c/Makefile.am b/common/c_cpp/src/c/Makefile.am
index 9a42259..ee0c640 100644
--- a/common/c_cpp/src/c/Makefile.am
+++ b/common/c_cpp/src/c/Makefile.am
@@ -49,13 +49,15 @@ nobase_include_HEADERS = \
wombat/wInterlocked.h \
wombat/wSemaphore.h \
wombat/wincompat.h \
- wombat/wtable.h
+ wombat/wMessageStats.h \
+ wombat/wtable.h \
+ wombat/wUuid.h

if USE_GCC_FLAGS
CPPFLAGS += -pedantic -Wno-long-long -D_GNU_SOURCE -fPIC
endif

-LIBS += -ldl
+LIBS += -ldl -luuid

libwombatcommon_la_SOURCES = \
MRSWLock.c \
@@ -75,4 +77,5 @@ libwombatcommon_la_SOURCES = \
strutils.c \
wSemaphore.c \
wlock.c \
+ wMessageStats.c \
wtable.c
diff --git a/common/c_cpp/src/c/linux/wMath.h b/common/c_cpp/src/c/linux/wMath.h
new file mode 100644
index 0000000..bf2bdcf
--- /dev/null
+++ b/common/c_cpp/src/c/linux/wMath.h
@@ -0,0 +1,32 @@
+/* $Id: wMath.h,v 1.1.2.2 2012/03/20 11:15:17 emmapollock Exp $
+ *
+ * OpenMAMA: The open middleware agnostic messaging API
+ * Copyright (C) 2011 NYSE Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#ifndef WMATH_H__
+#define WMATH_H__
+
+#include <math.h>
+
+const int wIsnan (double n)
+{
+ return isnan(n);
+}
+
+#endif /* WMATH_H__ */
diff --git a/common/c_cpp/src/c/linux/wUuid.h b/common/c_cpp/src/c/linux/wUuid.h
new file mode 100644
index 0000000..608eee7
--- /dev/null
+++ b/common/c_cpp/src/c/linux/wUuid.h
@@ -0,0 +1,14 @@
+
+
+#ifndef WUUID_H__
+#define WUUID_H__
+
+#include <uuid/uuid.h>
+
+typedef uuid_t wUuid;
+
+#define wUuid_generate_time uuid_generate_time
+
+#define wUuid_unparse uuid_unparse
+
+#endif /* WUUID_H__ */
diff --git a/common/c_cpp/src/c/wMessageStats.c b/common/c_cpp/src/c/wMessageStats.c
new file mode 100644
index 0000000..5ae85ce
--- /dev/null
+++ b/common/c_cpp/src/c/wMessageStats.c
@@ -0,0 +1,1001 @@
+
+#include "wombat/machine.h"
+#include "wombat/wMessageStats.h"
+#include "wombat/wtable.h"
+
+#include <limits.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <math.h>
+
+
+#include <sys/types.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <ctype.h>
+#include <sys/stat.h>
+#include <time.h>
+
+#ifndef WIN32
+#include <dirent.h>
+#include <unistd.h>
+#include <sys/resource.h>
+#include <sys/time.h>
+#ifndef VMS
+#ifndef __APPLE__
+#include <sys/vfs.h>
+#endif
+#endif
+#include <sys/times.h>
+#endif
+
+#ifdef WIN32
+/*#include "wombat/strptime.h"*/
+typedef struct tms {
+ long tms_utime;
+ long tms_stime;
+ long tms_cutime;
+ long tms_cstime;
+}tms;
+
+#endif
+
+
+#define MAX_SYMBOL_LENGTH 256
+#define MILLISECONDS_IN_HOUR (60*60*1000)
+#define SECONDSINHOUR 60
+
+const char* opHeaderPeriod = \
+ " Symbol Name ," \
+ "Elapsed Time ," \
+ "Msg Count," \
+ "Msg / sec," \
+ "KiloBytes," \
+ "KB / sec," \
+ "Min Lat ms," \
+ "Max Lat ms," \
+ "Ave Lat ms," \
+ "Std Deviation,\n";
+
+const char* opHeaderAcrossAll = \
+ "Msg Count," \
+ "Min Msg/sec," \
+ "Max Msg/sec," \
+ "Ave Msg/sec," \
+ "KiloBytes," \
+ "KB / sec," \
+ "Min Lat ms," \
+ "Max Lat ms," \
+ "Ave Lat ms, " \
+ "Std Dev,\n";
+
+const char* gStatsLevel1Header=\
+ " Global Count,Total Time,Interval,CPU utime,CPU stime, CPU% , Ave CPU% ," \
+ " Memory , Memory% ,\n";
+
+typedef struct statsCache_t
+{
+ perfData* mPData;
+ wtable_t mSymbolTable;
+ int mNumMsg;
+ int mHeader;
+ FILE* mOutfile;
+ long mNumMessages;
+ long mNumBytes;
+ double mLastUpdateTime;/* time in seconds of last update*/
+}statsCache_t;
+
+typedef struct perfData_t
+{
+ double mMinLatencyP; /* min latency for this capture PERIOD*/
+ double mMaxLatencyP; /* max latency for this capture PERIOD */
+ double mAveLatencyP; /* average latency for this capture PERIOD*/
+ double mMinLatencyA; /* min latency across ALL capture periods*/
+ double mMaxLatencyA; /* max latency across ALL capture periods*/
+ double mAveLatencyA; /* average latency across ALL capture periods*/
+
+ /* Separate message counter for latency in case
+ some message don't have latency timestamps
+ or cannto be parsed */
+
+ double mLatAccumP; /* latency accumlation for PERIOD*/
+ long mLatMsgCountP; /* latency message count for PERIOD*/
+ double mLatAccumA; /* latency accumlation ALL capture periods*/
+ long mLatMsgCountA; /* latency message across ALL capture periods*/
+
+ long mMsgCountP; /* rolling message count for this capture PERIOD*/
+ double mMsgPerSecP; /* average message rate for this capture PERIOD*/
+ long mMsgCountA; /* total message count ALL capture periods*/
+ double mMaxMsgPerSecA; /* max average message rate ALL capture periods*/
+ double mMinMsgPerSecA; /* min average message rate ALL capture periods*/
+ double mAveMsgPerSecA; /* average message rate ALL capture periods*/
+
+ double mByteCountP; /* rolling byte count for this capture PERIOD*/
+ double mBytePerSecP; /* average byte rate for this capture PERIOD*/
+ double mByteCountA; /* total byte count across ALL capture periods*/
+ double mMaxBytePerSecA; /* max ave message byte rate ALL capture periods*/
+ double mAveBytePerSecA; /* average message byte rate ALL capture periods*/
+
+ double mLastTime; /* this capture period*/
+ double mTotalTime; /* period since creation*/
+
+ /* Counters are used to collect stats to calculate standard deviation */
+ double mLatSquareP; /* latency squared PERIOD */
+ double mLatSquareA; /* latency squared ALL */
+ double mStdDeviationP; /* standard deviation values for capture PERIOD*/
+ double mStdDeviationA; /* standard deviation values for ALL periods */
+
+ FILE* mOutfile;
+ char mSymbol[MAX_SYMBOL_LENGTH];
+
+}perfData_t;
+
+/*Variables for testing latency stats*/
+long gCount = 0;
+double gMilliSecAdded = 0;
+double gStartTime = 0;
+double gLastTime = 0;
+double gTotalTime = 0;
+double gTotalTimeCalc = 0;
+double gTotalLatency = 0;
+double gLatencyPerThousand = 0;
+/*Variables for measuring cpu time*/
+double gCpuClockTicksPerSecond;
+
+struct cpuVals gLastCpuV={0.0,0.0,0.0,0.0};
+double gStartRealTime = 0;
+struct tms gStartProcTime;
+/*struct tms gCpuStartTime;
+struct tms gCpuEndTime;
+double gCpuStartTimeSeconds = 0;
+double gCpuEndTimeSeconds = 0;
+clock_t gProcStartTime= 0;
+struct timeval gProcessStart={0,0};*/
+
+static int gNumCpu=1;
+
+/*struct tms myAppStartClock;
+struct tms myAppEndClock;
+clock_t myGlobalStartClock;
+clock_t myGlobalEndClock;*/
+double gTotalPercent=0;
+double gRealTime = 0.0;
+
+/*internal function prototypes */
+
+/* Routines for calculating and printing performance data for each period */
+static void processWTableData (wtable_t table, void* data, const char* key, void* closure);
+static int printPerfData(perfData* pData,int headerFlag);
+static void calcPerfData(perfData* mPData,double interval,
+ performanceData* myPerformanceData);
+static void perfDataReportHeader(FILE* outfile);
+extern void getMemVals(int pid, memVals *memV);
+
+/*
+** mode "all" will calculate std deviation across all capture periods
+** mode "period" will calculate it across that period
+*/
+static double calcStdDeviation(perfData* mPData,const char* mode);
+/*static long getTotalSystemMem(void);*/
+/*reset period counters at end of time interval*/
+static void resetPeriodCounters(perfData* mPData);
+
+/* convert timeval structures to strings */
+/*void timeval2string(struct timeval *ts, char buffer[], int len);*/
+/*void ctime_r(long*, const char[],int);*/
+
+/* Routines for printing global/average stats on exiting application */
+static void wtableIteration(
+ wtable_t table, void* data,const char* key, void* closure);
+static void printPerfDataReport(perfData* pData,FILE* outfile);
+static void initPerfData(perfData* pData, FILE* outfile, const char* symbol);
+
+
+
+void startCpuTimer(void)
+{
+#ifndef WIN32
+ gCpuClockTicksPerSecond = (double)sysconf(_SC_CLK_TCK);
+ gStartRealTime = (double)times(&gStartProcTime)/(double)gCpuClockTicksPerSecond;
+ /*printf("initProcTable(getpid(),0)\n");*/
+ initProcTable(getpid(),0);
+ /*printf("getNumCpu()=%d\n",gNumCpu);*/
+#endif
+#ifdef WIN32
+ gCpuClockTicksPerSecond = CLOCKS_PER_SEC;
+ gStartRealTime = (double)clock()/(double)gCpuClockTicksPerSecond;
+#endif
+/* gProcStartTime = times(&gCpuStartTime);*/
+ gNumCpu=getNumCpu();
+}
+
+
+int createStatisticsCache(statsCache** sCache,int numMsgCategories,
+ FILE* outfile,int header)
+{
+ int mysCacheStatus;
+ struct timeval sTime;
+ /*create statsCache struct*/
+ statsCache* mysCache = (statsCache*)calloc(1,sizeof(statsCache));
+ if(!mysCache)
+ {
+ mysCacheStatus = STATS_NO_MEMORY;
+ }
+ mysCache->mNumMsg = numMsgCategories;
+ mysCache->mHeader = header;
+ mysCache->mOutfile = outfile;
+ mysCache->mNumMessages = 0;
+ mysCache->mNumBytes = 0;
+
+ (void)gettimeofday(&sTime,NULL);
+ mysCache->mLastUpdateTime = ( (double) sTime.tv_sec ) +
+ ( (double)sTime.tv_usec / 1000000 );
+
+ /* mysCache->mNumMsg flags wether calculating stats on per symbol
+ * basis or general stats on all symbols, if > 0 then a table is needed
+ * to store data on all symbols
+ */
+
+ if(mysCache->mNumMsg>0)
+ {
+ /*Per symbol/category*/
+ mysCache->mSymbolTable =
+ wtable_create("mSymbolTable",mysCache->mNumMsg);
+
+ if (!mysCache->mSymbolTable)
+ {
+ printf ("Error: wtable_create() failed.\n");
+ }
+ else
+ {
+
+ if(mysCache->mHeader == 1 && mysCache->mOutfile == stdout)
+ {
+ fprintf(mysCache->mOutfile,opHeaderPeriod);
+ mysCache->mHeader=0;
+ }
+
+ *sCache = mysCache;
+ }
+ }
+ else
+ {
+ /* Global statistics */
+ const char* dummySymbol = "Global";
+ mysCache->mPData =(perfData*)calloc(1,sizeof(perfData));
+ if(!mysCache)
+ {
+ return STATS_NO_MEMORY;
+ }
+ initPerfData(mysCache->mPData, outfile, dummySymbol);
+
+ if(mysCache->mPData)
+ {
+ if(mysCache->mHeader == 1 && mysCache->mOutfile == stdout)
+ {
+ fprintf(mysCache->mOutfile,opHeaderPeriod);
+ mysCache->mHeader=0;
+ }
+
+ *sCache = mysCache;
+ }
+ }
+ return STATS_OK;
+}
+
+
+void initPerfData(perfData* pData, FILE* outfile, const char* symbol)
+{
+ /*Initialise Perf Data Structure*/
+ pData->mMinLatencyP=999999999;
+ pData->mMaxLatencyP=0;
+ pData->mAveLatencyP=0.00;
+ pData->mMinLatencyA=999999999;
+ pData->mMaxLatencyA=0;
+ pData->mAveLatencyA=0.00;
+
+ pData->mLatAccumP=0;
+ pData->mLatMsgCountP=0;
+ pData->mLatAccumA=0;
+ pData->mLatMsgCountA=0;
+
+ pData->mMsgCountP=0;
+ pData->mMsgPerSecP=0.00;
+ pData->mMsgCountA=0;
+ pData->mMaxMsgPerSecA=0.00;
+ pData->mMinMsgPerSecA=999999999;
+ pData->mAveMsgPerSecA=0.00;
+
+ pData->mByteCountP=0;
+ pData->mBytePerSecP=0.00;
+ pData->mByteCountA=0;
+ pData->mMaxBytePerSecA=0.00; /* not used at present*/
+ pData->mAveBytePerSecA=0.00; /* not used at present*/
+
+ pData->mLastTime = 0;
+ pData->mTotalTime = 0;
+ pData->mOutfile=outfile;
+ snprintf(pData->mSymbol,MAX_SYMBOL_LENGTH,"%s",symbol);
+}
+
+void resetPeriodCounters(perfData* mPData)
+{
+ mPData->mMsgCountP = 0;
+ mPData->mByteCountP = 0;
+ mPData->mLatMsgCountP = 0;
+ mPData->mLatAccumP = 0;
+ mPData->mMinLatencyP = 999999999;
+ mPData->mMaxLatencyP = 0;
+ mPData->mAveLatencyP = 0;
+ mPData->mStdDeviationP = 0;
+ mPData->mLatSquareP = 0;
+}
+
+
+void statisticsCacheToFile(statsCache* sCache,
+ performanceData* myPerformanceData)
+{
+ double timeIntervalInSeconds;
+ struct timeval sTime;
+ double timeNow;
+ (void)gettimeofday(&sTime,NULL);
+ timeNow= ( (double) sTime.tv_sec ) +
+ ( (double)sTime.tv_usec/1000000 );
+ if(sCache)
+ {
+ timeIntervalInSeconds = timeNow - sCache->mLastUpdateTime;
+
+#ifdef WIN32
+ if(timeIntervalInSeconds<0)
+ timeIntervalInSeconds+=60;
+#endif
+ sCache->mLastUpdateTime = timeNow;
+ if(sCache->mNumMsg>0)
+ {
+ double paramArray[2];
+ paramArray[0]= timeIntervalInSeconds;
+ paramArray[1]= (double) sCache->mHeader;
+
+ /*iterate round wTable and update stats for each symbol*/
+ wtable_for_each (sCache->mSymbolTable,
+ processWTableData, paramArray);
+ }
+ else
+ {
+ /*calculate overall stats for subsCacheriptions*/
+ if(sCache->mPData)
+ {
+ calcPerfData(sCache->mPData,timeIntervalInSeconds,myPerformanceData);
+ if(!myPerformanceData)
+ {
+ /* Print out stats */
+ printPerfData (sCache->mPData,sCache->mHeader);
+ }
+ resetPeriodCounters(sCache->mPData);
+ }
+ }
+ sCache->mNumMessages=0;
+ sCache->mNumBytes=0;
+ }
+}
+
+double calcStdDeviation(perfData* mPData,const char* mode)
+{
+ /*
+ if the POPULATION formula is needed change the final denominator
+ from ( mPData->mLatMsgCountP -1 ) to ( mPData->mLatMsgCountP )
+ and ( mPData->mLatMsgCountA -1 ) to ( mPData->mLatMsgCountA )
+ */
+
+ double stdDeviation=0;
+ if(strcmp(mode,"period")==0)
+ {
+ if( mPData->mLatMsgCountP > 1 )
+ {
+ stdDeviation = sqrt ((mPData->mLatSquareP-
+ (mPData->mLatAccumP*mPData->mLatAccumP)/
+ mPData->mLatMsgCountP) /
+ (mPData->mLatMsgCountP - 1));
+
+ }
+ }
+ else if (strcmp(mode,"all")==0)
+ {
+ if( mPData->mLatMsgCountA > 1 )
+ {
+ stdDeviation =sqrt ((mPData->mLatSquareA-
+ (mPData->mLatAccumA*mPData->mLatAccumA)/
+ mPData->mLatMsgCountA) /
+ (mPData->mLatMsgCountA - 1));
+ }
+ }
+ return stdDeviation;
+}
+
+/*
+** this function is called each time
+** a new data item is retrieved from wtable
+*/
+void processWTableData (
+ wtable_t table, void* data, const char* key, void* closure)
+{
+ double* cpyParamArray = (double*) closure;
+ double interval = cpyParamArray[0];
+ double header = cpyParamArray[1];
+
+ perfData* mPData = NULL;
+ mPData = (perfData*) data;
+
+ if (mPData)
+ {
+ /*calculate overall stats for subscriptions*/
+ calcPerfData(mPData,interval,NULL);
+ /* Print out stats */
+ printPerfData (mPData,(int)header);
+ resetPeriodCounters(mPData);
+ }
+}
+
+
+int updateStatisticsCache(statsCache* sCache ,
+ const char* msgCategoryName ,
+ long numBytesRecieved,
+ long timeSecs,
+ long timeMicroSecs,
+ double cLatency,
+ struct timeval tv)
+{
+ int ret;
+ perfData *mPData = NULL;
+ double latency = 0;
+
+ if(timeSecs != 0 || timeMicroSecs != 0)
+ calcLatency(timeSecs,timeMicroSecs,&latency,tv);
+ else
+ latency=cLatency;
+
+ if(sCache->mNumMsg>0 && sCache->mSymbolTable)
+ {
+ /* More than one category*/
+ mPData =
+ (perfData*) wtable_lookup (sCache->mSymbolTable,msgCategoryName);
+ /* If category doesn't already exist*/
+ if (!mPData)
+ {
+ /* Then create & init*/
+ mPData = (perfData*)calloc(1,sizeof(perfData));
+ initPerfData(mPData,sCache->mOutfile,msgCategoryName);
+ }
+ }
+ else
+ {
+ /* No categories */
+ mPData = sCache->mPData;
+ }
+
+ /* Update statistics for period */
+ mPData->mMsgCountP++;
+ mPData->mByteCountP += (double)numBytesRecieved;
+
+ /* change to get rid of spurious results*/
+ /* if (latency > 0) */
+ if (/*latency >0 &&*/ latency<999999999)
+ {
+ mPData->mLatMsgCountP++;
+ mPData->mLatAccumP += latency;
+ if (latency < mPData->mMinLatencyP)
+ {
+ mPData->mMinLatencyP = latency;
+ }
+ if (latency > mPData->mMaxLatencyP)
+ {
+ mPData->mMaxLatencyP = latency;
+ }
+ mPData->mLatSquareP += (latency*latency);
+ }
+
+ /* Update table in structure if more than one category*/
+ if(sCache->mNumMsg>0)
+ {
+ ret=
+ wtable_insert(sCache->mSymbolTable,msgCategoryName, (void*) mPData);
+ if(ret>0)
+ {
+ /*error in insert*/
+ }
+
+ }
+
+ sCache->mNumMessages++;
+ sCache->mNumBytes+=numBytesRecieved;
+ return STATS_OK;
+}
+
+
+void destroyStatsCache(statsCache* sCache)
+{
+ if (sCache)
+ {
+ if (sCache->mPData)
+ {
+ free (sCache->mPData);
+ }
+ if (sCache->mSymbolTable)
+ {
+ wtable_destroy (sCache->mSymbolTable);
+ }
+ free (sCache);
+ }
+}
+
+latencyVals calcLatency(long timeSecs,
+ long timeMicroSecs,
+ double* latency,
+ struct timeval tv)
+{
+ static long cachedMaxLatencySecs = (LONG_MAX/1000000);
+ static latencyVals noLatVals = {0.0,0.0,0.0};
+ double latencySecs = 0;
+ double latencyUSecs = 0;
+ struct latencyVals latVals = {0.0,0.0,0.0};
+
+ /* Calculate now time */
+ latencySecs = ((double)tv.tv_sec - (double)timeSecs);
+
+ if(cachedMaxLatencySecs > latencySecs)
+ {
+ latencyUSecs =((double)tv.tv_usec - (double)timeMicroSecs);
+ *latency = ((latencySecs *1000000) + latencyUSecs)/1000.0;
+
+ latVals.pubTimeMilliSecs = latencySecs*1000.0;
+ latVals.recieveTimeMilliSecs = latencyUSecs/1000.0;
+ latVals.latencyMilliSecs = *latency;
+ return latVals;
+ }
+ return noLatVals;
+}
+
+latencyVals calcLatency1TimeStamp(const char* timeStamp,
+ const char* timeFormat,
+ double* latency,
+ struct timeval tv)
+{
+ static latencyVals noLatVals = {0.0,0.0,0.0};
+ time_t rawTime;
+ struct tm* currentTime;
+ struct tm myTm;
+ struct latencyVals latVals={0.0,0.0,0.0};
+ long millis = 0;
+ const char* pos1=NULL;
+ int lenTimeFormat = 0;
+ int lenTimeStamp = 0;
+ double latencySecs=0;
+ double latencyUSec=0;
+ int nextNumCharBack=3;
+#ifdef WIN32
+ int secs=0;
+ int mins=0;
+#endif
+
+ lenTimeFormat = strlen(timeFormat);
+ lenTimeStamp = strlen(timeStamp);
+ if ((timeStamp == NULL) || (timeFormat== NULL)
+ || (lenTimeFormat == 0) || (lenTimeStamp == 0))
+ {
+ fprintf(stderr,"Error - calcLatency\n");
+ return noLatVals;
+ }
+ /*printf("TimeStampStr[%s]",timeStamp);*/
+ /* Calculate now time */
+ time ( &rawTime );
+ currentTime = localtime(&rawTime);
+
+/*#ifndef WIN32*/
+
+ pos1 = timeStamp+(lenTimeStamp-nextNumCharBack);
+ millis = strtol(pos1,NULL,10);
+ strptime(timeStamp,timeFormat,&myTm);
+ latencySecs = (((currentTime->tm_hour*60.0+currentTime->tm_min)*60.0)+currentTime->tm_sec) -
+ (((myTm.tm_hour*60.0+myTm.tm_min)*60.0)+myTm.tm_sec);
+ /*printf("%d:%d:%d-%d:%d:%d=%f\n",currentTime->tm_hour,currentTime->tm_min,currentTime->tm_sec,myTm.tm_hour,myTm.tm_min,myTm.tm_sec,latencySecs);*/
+
+/*#else
+
+
+ if(lenTimeStamp>10)
+ {
+ millis=0;
+ pos1 = timeStamp+(lenTimeStamp-nextNumCharBack);
+ if(pos1[0]=='.' || pos1[0] ==':')
+ {
+ nextNumCharBack-=1;
+ pos1 = timeStamp+(lenTimeStamp-nextNumCharBack);
+ nextNumCharBack+=3;
+ }
+ else
+ nextNumCharBack+=3;
+
+ millis = strtol(pos1,NULL,10);
+ }
+ else
+ {
+ nextNumCharBack=2;
+ }
+ pos1 = timeStamp+(lenTimeStamp-nextNumCharBack);
+ nextNumCharBack+=3;
+ secs = (int)strtol(pos1,NULL,10);
+ pos1 = timeStamp+(lenTimeStamp-nextNumCharBack);
+ mins = (int)strtol(pos1,NULL,10);
+ latencySecs = ((currentTime->tm_min*60) + currentTime->tm_sec) -
+ ((mins*60) + secs);
+#endif*/
+
+ latencyUSec = ((double)tv.tv_usec-(millis*1000.0));
+ /*printf("-------%f - %f=%f\n",(double)tv.tv_usec,millis*1000.0,latencyUSec);*/
+
+ /*if(latencySecs<0)
+ {
+ latencySecs+= SECONDSINHOUR;
+ }*/
+
+ *latency = ((latencySecs *1000000) + latencyUSec)/1000.0;
+ latVals.pubTimeMilliSecs = latencySecs*1000.0;
+ latVals.recieveTimeMilliSecs = latencyUSec/1000.0;
+ latVals.latencyMilliSecs = *latency;
+
+ return latVals;
+}
+
+void printStatsLevel1(int numMsg,FILE* graphData)
+{
+ struct cpuVals cpuV={0.0,0.0,0.0,0.0};
+ struct memVals memV={0,0,0.0};
+ gCount+=1;
+
+ if(gCount==1 && graphData == stdout)
+ printf("\n\n%s",gStatsLevel1Header);
+
+ if( (gCount % numMsg) ==0)
+ {
+ if(gCount%(numMsg*30)==0 && graphData == stdout)
+ {
+ printf("\n\n%s",gStatsLevel1Header);
+ }
+ getCpuTimeVals(&cpuV,1);
+ gRealTime+=cpuV.realTime;
+ gTotalPercent+=cpuV.pCpu;
+ getMemVals(getpid(),&memV);
+ if(graphData == stdout)
+ {
+ fprintf(graphData,
+ ",%-12.0ld,%-10.4f,%-9.3f,%-9.4f,%-9.4f,%-7.2f%%,%-9.2f%%,%-8ld,%-8.2f%%,\n",
+ gCount,gRealTime,cpuV.realTime,cpuV.userTime,cpuV.sysTime,
+ cpuV.pCpu,(gTotalPercent/(gCount/numMsg)),
+ memV.rss,memV.memPercent);
+ }
+ else if(graphData != NULL)
+ {
+ /* this file version does not include ave cpu percentage */
+ fprintf(graphData,
+ "%-12.0ld,%-10.4f,%-9.3f,%-9.4f,%-9.4f,%-7.2f,%-8ld,%-8.2f\n",
+ gCount,gRealTime,cpuV.realTime,cpuV.userTime,
+ cpuV.sysTime,cpuV.pCpu,memV.rss,memV.memPercent);
+ }
+ }
+}
+
+/*clock_t utime=0;
+double total=0;
+struct cpuVals lastCpuV;*/
+void getCpuTimeVals(cpuVals* cpuV, int isUpdate)
+{
+ /**
+ * Same value as running top with Interactive command "I"(<shift> i)
+ *
+ * Extract from top man page for INTERACTIVE COMMAND "I":
+ * Toggle between Solaris (CPU percentage divided by total number of
+ * CPUs) and Irix (CPU percentage calculated solely by amount of
+ * time) views. This is a toggle switch that affects only SMP sys-
+ * tems.
+ */
+ double diffRealTime=0.0;
+ double diff = 1234.0;
+ struct tms CurrentProcTime;
+ double CurrentRealTime = 0.0;
+
+#ifndef WIN32
+ CurrentRealTime = (double)times(&CurrentProcTime)/gCpuClockTicksPerSecond;
+#endif
+#ifdef WIN32
+ CurrentRealTime = clock()/(double)gCpuClockTicksPerSecond;
+#endif
+ diffRealTime=CurrentRealTime-gStartRealTime;
+ getProcAndChildCpu((int)getpid(),cpuV);
+ cpuV->realTime =diffRealTime;
+
+/*
+ although solaris cpu% is calculated(moving decaying?) by kernel,
+ it is not accurate in our scheme.
+*/
+
+ diff = (cpuV->userTime+cpuV->sysTime)-
+ (gLastCpuV.userTime+gLastCpuV.sysTime);
+ if(cpuV->realTime > 1e-6)
+ {
+ cpuV->pCpu = diff/diffRealTime*100.0/gNumCpu;
+ /*printf("New : (utime + stime ) (%9f + %9f) = %9f |rtime = %9f\n",cpuV->userTime,
+ cpuV->sysTime,
+ cpuV->userTime+cpuV->sysTime,
+ CurrentRealTime);
+ printf("Last: (utime + stime ) (%9f + %9f) = %9f |rtime = %9f\n",gLastCpuV.userTime,
+ gLastCpuV.sysTime,
+ gLastCpuV.userTime+gLastCpuV.sysTime,
+ gStartRealTime);
+ printf("Diff: (utime + stime ) (%9f + %9f) = %9f |rtime = %9f\n",cpuV->userTime - gLastCpuV.userTime,
+ cpuV->sysTime - gLastCpuV.sysTime,
+ diff,
+ diffRealTime);
+ printf("getCpuTimeval : CPU %9f = %9f / (%9f * %d) \n", cpuV->pCpu, diff, diffRealTime, gNumCpu);*/
+ }
+ else
+ {
+ cpuV->pCpu = -1;
+ }
+
+ if(isUpdate)
+ {
+ gStartRealTime=CurrentRealTime;
+ gStartProcTime=CurrentProcTime;
+ gLastCpuV=*(cpuV);
+ }
+}
+
+
+void getCpuTimeValDiff(cpuVals cpuV1, cpuVals cpuV2, cpuVals* cpuV)
+{
+
+ cpuV->userTime =cpuV2.userTime - cpuV1.userTime;
+
+ cpuV->sysTime =cpuV2.sysTime - cpuV1.sysTime;
+
+ cpuV->realTime =cpuV2.realTime - cpuV1.realTime;
+/*#ifdef __linux__*/
+ /*if (cpuV->realTime != 0)
+ cpuV->pCpu =(((cpuV->userTime + cpuV->sysTime)*100.0)/(cpuV->realTime*gNumCpu));*/
+ if (cpuV->realTime > 1e-6)
+ {
+ cpuV->pCpu =(((cpuV->userTime + cpuV->sysTime)*100.0)/(cpuV->realTime*gNumCpu));
+ /*printf("getCpuTimeValdiff: CPU %9f = %9f / (%9f * %d) \n", cpuV->pCpu,
+ cpuV->userTime + cpuV->sysTime,
+ cpuV->realTime,
+ gNumCpu);*/
+ }
+ else
+ {
+ cpuV->pCpu = -1;
+ }
+ /*printf("getCpuDiff:cpu-num%d-%f\n",gNumCpu,cpuV->pCpu);*/
+/*#endif*/
+
+}
+
+void calcPerfData(perfData* mPData, double interval,
+ performanceData* myPerformanceData)
+{
+ /*Calculate averages for the period*/
+ if (mPData->mMsgCountP > 0)
+ {
+ mPData->mMsgPerSecP = ( (double) mPData->mMsgCountP/
+ (double) interval);
+ }
+ else
+ {
+ mPData->mMsgPerSecP = 0.00;
+ }
+ if (mPData->mByteCountP > 0)
+ {
+ mPData->mBytePerSecP = ( (double) mPData->mByteCountP /
+ (double) interval);
+ }
+ else
+ {
+ mPData->mBytePerSecP = 0.00;
+ }
+ if ( (mPData->mLatAccumP > 0) && (mPData->mLatMsgCountP > 0) )
+ {
+ mPData->mAveLatencyP = ( (double) mPData->mLatAccumP /
+ (double)mPData->mLatMsgCountP );
+ }
+ /*Update global counters*/
+ mPData->mMsgCountA += mPData->mMsgCountP;
+ mPData->mByteCountA += mPData->mByteCountP;
+ mPData->mLatAccumA += mPData->mLatAccumP;
+ mPData->mLatSquareA += mPData->mLatSquareP;
+ mPData->mLatMsgCountA += mPData->mLatMsgCountP;
+ mPData->mTotalTime += interval;
+
+/* sCache->numMessages += mPData->mMsgCountP;
+ sCache->mNumBytes += mPData->mByteCountP;
+*/
+
+ /*Update global Max & Mins*/
+ if (mPData->mMaxMsgPerSecA < mPData->mMsgPerSecP)
+ {
+ mPData->mMaxMsgPerSecA = mPData->mMsgPerSecP;
+ }
+ if (mPData->mMinMsgPerSecA > mPData->mMsgPerSecP)
+ {
+ mPData->mMinMsgPerSecA = mPData->mMsgPerSecP;
+ }
+ if (mPData->mMinLatencyP < mPData->mMinLatencyA)
+ {
+ mPData->mMinLatencyA = mPData->mMinLatencyP;
+ }
+ if (mPData->mMaxLatencyP > mPData->mMaxLatencyA)
+ {
+ mPData->mMaxLatencyA = mPData->mMaxLatencyP;
+ }
+ /* Update global averages*/
+ if (mPData->mMsgCountA > 0)
+ {
+ mPData->mAveMsgPerSecA = ( (double) mPData->mMsgCountA /
+ (double) mPData->mTotalTime);
+ }
+ else
+ {
+ mPData->mAveMsgPerSecA = 0.00;
+ }
+ if (mPData->mByteCountA > 0)
+ {
+ mPData->mAveBytePerSecA = ( (double) mPData->mByteCountA /
+ (double) mPData->mTotalTime);
+ }
+ else
+ {
+ mPData->mAveBytePerSecA = 0.00;
+ }
+ if ( (mPData->mLatAccumA > 0) && (mPData->mLatMsgCountA > 0) )
+ {
+ mPData->mAveLatencyA = ( (double) mPData->mLatAccumA /
+ (double) mPData->mLatMsgCountA);
+ }
+ else
+ {
+ mPData->mAveLatencyA = 0.00;
+ }
+
+ /* call to get standard deviation for this period*/
+ mPData->mStdDeviationP= calcStdDeviation(mPData,"period");
+ if(myPerformanceData)
+ {
+ myPerformanceData->mSymbol = mPData->mSymbol;
+ myPerformanceData->mTotalTime = mPData->mTotalTime;
+ myPerformanceData->mMsgCountP = mPData->mMsgCountP;
+ myPerformanceData->mMsgPerSecP = mPData->mMsgPerSecP;
+ myPerformanceData->mByteCountP = mPData->mByteCountP/1000;
+ myPerformanceData->mBytePerSecP = mPData->mBytePerSecP/1000;
+ myPerformanceData->mMinLatencyP = mPData->mMinLatencyP;
+ myPerformanceData->mMaxLatencyP = mPData->mMaxLatencyP;
+ myPerformanceData->mAveLatencyP = mPData->mAveLatencyP;
+ myPerformanceData->mStdDeviationP = mPData->mStdDeviationP;
+ }
+}
+
+
+
+
+
+int printPerfData(perfData* pData,int headerFlag)
+{
+ if(pData->mOutfile != NULL)
+ {
+ /*only print out symbol details if they have changed*/
+ if( pData->mMsgCountP>0 )
+ {
+
+ if(headerFlag==1)
+ {
+ /* print stats header every time for all symbols */
+ fprintf(pData->mOutfile,opHeaderPeriod);
+ }
+ fprintf(pData->mOutfile," %-13s,",pData->mSymbol);
+ fprintf(pData->mOutfile,"%-13.2f,",pData->mTotalTime);
+ fprintf(pData->mOutfile,"%-9.0ld,",pData->mMsgCountP);
+ fprintf(pData->mOutfile,"%-9.2f,",pData->mMsgPerSecP);
+ fprintf(pData->mOutfile,"%-9.2f,",(pData->mByteCountP/1000));
+ fprintf(pData->mOutfile,"%-8.2f,",(pData->mBytePerSecP/1000));
+ fprintf(pData->mOutfile,"%-10.3f,",pData->mMinLatencyP);
+ fprintf(pData->mOutfile,"%-10.3f,",pData->mMaxLatencyP);
+ fprintf(pData->mOutfile,"%-10.3f,",pData->mAveLatencyP);
+ fprintf(pData->mOutfile,"%-13.4f,\n",pData->mStdDeviationP);
+ }
+ return STATS_OK;
+ }
+ else
+ {
+ return STATS_NULL_PARAM;
+ }
+}
+
+
+int printPerfReport (statsCache* sCache,FILE* outfile)
+{
+ /* update stats using final interval */
+ statisticsCacheToFile(sCache,NULL);
+
+ if(sCache && outfile != NULL)
+ {
+ if(sCache->mNumMsg>0)
+ {
+ /* call to iterate around wTable*/
+ if(outfile==stdout)
+ {
+ perfDataReportHeader(outfile);
+ }
+ wtable_for_each (sCache->mSymbolTable,wtableIteration,outfile );
+ }
+ else
+ {
+ /* just print current perfData*/
+ if(outfile==stdout)
+ {
+ perfDataReportHeader(outfile);
+ }
+ sCache->mPData->mStdDeviationA =
+ calcStdDeviation(sCache->mPData,"all");
+ printPerfDataReport(sCache->mPData,outfile);
+ }
+ return STATS_OK;
+ }
+ else
+ {
+ return STATS_NULL_PARAM;
+ }
+}
+
+void wtableIteration (
+ wtable_t table, void* data, const char* key, void* closure)
+{
+ /*
+ call back function from wtable_for_each() in printPerfReport ()
+ iterates around wtable returning new data item each time.
+ */
+
+ FILE* outfile = (FILE*) closure;
+ perfData* mPData = NULL;
+ mPData = (perfData*) data;
+ if (outfile != NULL)
+ {
+ mPData->mStdDeviationA = calcStdDeviation(mPData,"all");
+ printPerfDataReport(mPData,outfile);
+ }
+}
+void perfDataReportHeader(FILE* outfile)
+{
+ if(outfile!=NULL)
+ {
+ /* prints end of test report header*/
+ fprintf(outfile,
+ "\n\n\n\t\t\t\t\t *** Statistics Summary For Execution ***\n");
+ fprintf(outfile, " Symbol Name ,");
+ fprintf(outfile,opHeaderAcrossAll);
+ }
+}
+void printPerfDataReport(perfData* pData,FILE* outfile)
+{
+ if(outfile!=NULL)
+ {
+ fprintf(outfile," %-13s,",pData->mSymbol);
+ fprintf(outfile,"%-9.0ld,",pData->mMsgCountA);
+ fprintf(outfile,"%-11.2f,",pData->mMinMsgPerSecA);
+ fprintf(outfile,"%-11.2f,",pData->mMaxMsgPerSecA);
+ fprintf(outfile,"%-11.2f,",pData->mAveMsgPerSecA);
+ fprintf(outfile,"%-9.2f,",(pData->mByteCountA/1000));
+ fprintf(outfile,"%-8.2f,",(pData->mAveBytePerSecA/1000));
+ fprintf(outfile,"%-10.3f,",pData->mMinLatencyA);
+ fprintf(outfile,"%-10.3f,",pData->mMaxLatencyA);
+ fprintf(outfile,"%-10.3f,",pData->mAveLatencyA);
+ fprintf(outfile,"%-8.2f,\n",pData->mStdDeviationA);
+ }
+}
diff --git a/common/c_cpp/src/c/windows/wombat/wMath.h b/common/c_cpp/src/c/windows/wombat/wMath.h
new file mode 100644
index 0000000..406b5b5
--- /dev/null
+++ b/common/c_cpp/src/c/windows/wombat/wMath.h
@@ -0,0 +1,31 @@
+/* $Id: wMath.h,v 1.1.2.2 2012/03/20 11:15:17 emmapollock Exp $
+ *
+ * OpenMAMA: The open middleware agnostic messaging API
+ * Copyright (C) 2011 NYSE Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+#ifndef WMATH_H__
+#define WMATH_H__
+
+#include <math.h>
+
+const int wIsnan (double n)
+{
+ return _isnan(n);
+}
+
+#endif /* WMATH_H__ */
diff --git a/common/c_cpp/src/c/windows/wombat/wUuid.h b/common/c_cpp/src/c/windows/wombat/wUuid.h
new file mode 100644
index 0000000..43f4b2c
--- /dev/null
+++ b/common/c_cpp/src/c/windows/wombat/wUuid.h
@@ -0,0 +1,34 @@
+/* $Id: wUuid.h,v 1.1.2.2 2012/03/20 11:15:17 emmapollock Exp $
+ *
+ * OpenMAMA: The open middleware agnostic messaging API
+ * Copyright (C) 2011 NYSE Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+#ifndef WUUID_H__
+#define WUUID_H__
+
+#include "wombat/port.h"
+
+typedef int wUuid;
+
+COMMONExpDLL
+void wUuid_generate_time (wUuid myUuid);
+
+COMMONExpDLL
+void wUuid_unparse (wUuid myUuid, char* out);
+
+#endif /* WUUID_H__ */
diff --git a/common/c_cpp/src/c/wombat/wMessageStats.h b/common/c_cpp/src/c/wombat/wMessageStats.h
new file mode 100644
index 0000000..4a8916d
--- /dev/null
+++ b/common/c_cpp/src/c/wombat/wMessageStats.h
@@ -0,0 +1,190 @@
+
+#ifndef _PERF_DATA_H__
+#define _PERF_DATA_H__
+
+#if defined(__cplusplus)
+ extern "C" {
+#endif /* __cplusplus */
+
+#include "machine.h"
+#include <stdio.h>
+#ifndef WIN32
+#include <sys/time.h>
+#endif
+
+#ifdef WIN32
+
+#include <winsock2.h>
+
+#endif
+
+/*******************************************************************************
+*******************************************************************************/
+
+
+struct perfData_t;
+typedef struct perfData_t perfData;
+
+struct statsCache_t;
+typedef struct statsCache_t statsCache;
+
+struct processInfo_t;
+typedef struct processInfo_t processInfo;
+
+typedef struct latencyVals
+{
+ double pubTimeMilliSecs; /*time msg was published in milliseconds*/
+ double recieveTimeMilliSecs; /*time msg was recieved in milliseconds*/
+ double latencyMilliSecs; /*latency between two previous vals*/
+}latencyVals;
+
+typedef struct performanceData
+{
+ const char* mSymbol;
+ double mTotalTime;
+ long mMsgCountP;
+ double mMsgPerSecP;
+ long mByteCountP;
+ double mBytePerSecP;
+ double mMinLatencyP;
+ double mMaxLatencyP;
+ double mAveLatencyP;
+ double mStdDeviationP;
+}performanceData;
+
+enum
+{
+ STATS_OK = 0,
+ STATS_FAILURE_GENERAL = -1,
+ STATS_NULL_PARAM = 1,
+ STATS_NO_MEMORY = 2
+};
+
+/* used to create instance of struct to hold stats data
+** @param statsCache -- caches relevant statistics data
+**
+** @param numMsgCategories -- 0 if stats for all symbols needed or number of
+** symbols if stats are needed on per symbol basis
+**
+** @param outfile -- where stats are to be displayed
+**
+** @param header -- 1 to display header 0 for never
+**
+** returns STATS_NO_MEMORY or STATS_OK
+*/
+COMMONExpDLL
+int createStatisticsCache(statsCache** sCache,
+ int numMsgCategories,
+ FILE* outfile,
+ int header);
+
+
+
+/* used to gather stats at user specified intervals */
+/*@param myPerformanceData -- if user wants results in performanceData
+** structure pass in pointer to it, pass in null if
+** not, wMessageStats.c will then print results to
+** FILE* outfile that was passed in on creation of
+** statsCache.
+*/
+COMMONExpDLL
+void statisticsCacheToFile(statsCache* sCache,
+ performanceData* myPerformanceData);
+
+
+
+/* stores stats for individual symbols and counts
+** messages and bytes between intervals
+** @param msgCategoryName -- name by which message or symbol
+** is to be identified
+** @param numBytesRecieved -- size of message in Bytes
+** @param timeSecs -- time in secs from epoch until now
+** @param timeMicroSecs -- time in micro seconds from last second
+** @param cLatency -- if latency is already known pass it in,
+** set timeSecs + timeMicroSecs = 0
+** @param tv -- time on which latency will be calculated
+** i.e time message was recieved.
+*/
+COMMONExpDLL
+int updateStatisticsCache(statsCache* sCache,
+ const char* msgCategoryName,
+ long numBytesRecieved,
+ long timeSecs,
+ long timeMicroSecs,
+ double cLatency,
+ struct timeval tv);
+
+COMMONExpDLL
+void destroyStatsCache(statsCache*);
+
+/* prints end of test report to file */
+COMMONExpDLL
+int printPerfReport(statsCache* sCache, FILE*);
+
+
+
+/* function calculates latency between timeSecs,timeMicroSecs
+** and time of calling function. returns latencyVals structure.
+** @param timeSecs -- time in seconds since the epoch
+** @param timeMicroSecs -- time past the last second in micro seconds
+** @param latency -- puts latency value in double* latencyi
+** @param tv -- time on which latency will be calculated
+** i.e time message was recieved.
+*/
+COMMONExpDLL
+latencyVals calcLatency(long timeSecs,
+ long timeMicroSecs,
+ double* latency,
+ struct timeval tv);
+
+
+/* function calculates latency on timestamp and puts result in double* latency
+** @param timeStamp -- timestamp string
+** @param timeFormat -- format of timeStamp string ie "%d:%m:%Y:%H:%M:%S"
+** @param latency -- poiner to double where latency value
+** is to be stored
+** @param tv -- time on which latency will be calculated
+** i.e time message was recieved.
+*/
+COMMONExpDLL
+latencyVals calcLatency1TimeStamp(const char* timeStamp,
+ const char* timeFormat,
+ double* latency,
+ struct timeval tv);
+
+/* Function to print statsLevel 1 values
+** @param timeSecs -- time in seconds since the epoch
+** @param timeMicroSecs -- time past the last second in micro seconds
+** @param numMsg -- when using statsLevel 1, numMsg defines how often to
+** calculate results.
+** @graphData -- where you want data to be displayed ie
+** stdout or specify a directory /home/username/
+*/
+
+
+COMMONExpDLL
+void printStatsLevel1(int numMsg, FILE* graphData);
+/* function prints out to graphData:
+** if out to screen
+** |Global Count|Total Time|CPU utime|CPU stime|CPU%|Ave CPU%|Memory|Memory%|
+**
+** if out to file Ave CPU% is left out
+*/
+
+/* function starts timer for cpu ticks which is used by getCpuTimeVals
+** startCpuTime should be started before using getCpuTimeVals()
+**
+*/
+COMMONExpDLL
+void startCpuTimer(void);
+COMMONExpDLL
+void getCpuTimeVals(cpuVals *cpuV, int isUpdate);
+COMMONExpDLL
+void getCpuTimeValDiff(cpuVals cpuV1,cpuVals cpuV2,cpuVals* cpuV);
+/* function returns memory statistics */
+/*void getMemVals( int pid,memVals *memV);*/
+
+#if defined(__cplusplus)
+}
+#endif /* __cplusplus */
+#endif /*_PERF_DATA_H__*/
--
1.7.7.6


[PATCH 05/50] Make includes paths consistent

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

wConfig.h and port.h should be included as wombat/xxx.h. This simplifies the
builds and makes the consistent across Windows and Linux.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
common/c_cpp/src/c/windows/machine_win.c | 2 +-
common/c_cpp/src/c/windows/platform.c | 2 +-
common/c_cpp/src/c/windows/port.h | 7 ++++---
common/c_cpp/src/c/windows/wSemaphore.c | 2 +-
common/c_cpp/src/c/wombat/environment.h | 2 +-
5 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/common/c_cpp/src/c/windows/machine_win.c b/common/c_cpp/src/c/windows/machine_win.c
index f6f3ad5..3fce1cd 100644
--- a/common/c_cpp/src/c/windows/machine_win.c
+++ b/common/c_cpp/src/c/windows/machine_win.c
@@ -18,7 +18,7 @@
* 02110-1301 USA
*/

-#include "port.h"
+#include "wombat/port.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
diff --git a/common/c_cpp/src/c/windows/platform.c b/common/c_cpp/src/c/windows/platform.c
index f17e1f9..8b89525 100644
--- a/common/c_cpp/src/c/windows/platform.c
+++ b/common/c_cpp/src/c/windows/platform.c
@@ -21,7 +21,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
-#include "port.h"
+#include "wombat/port.h"
#include "platform.h"

#define PATH_SEPERATOR "\\"
diff --git a/common/c_cpp/src/c/windows/port.h b/common/c_cpp/src/c/windows/port.h
index df06b4f..ded944a 100644
--- a/common/c_cpp/src/c/windows/port.h
+++ b/common/c_cpp/src/c/windows/port.h
@@ -37,9 +37,10 @@
#include <tlhelp32.h>
#include <time.h>

-#include "wConfig.h"
-#include "lock.h"
-#include "mmap.h"
+#include "wombat/targetsxs.h"
+#include "wombat/wConfig.h"
+#include "windows/lock.h"
+#include "windows/mmap.h"

#if defined(__cplusplus)
extern "C" {
diff --git a/common/c_cpp/src/c/windows/wSemaphore.c b/common/c_cpp/src/c/windows/wSemaphore.c
index 6947ac4..f3dd8a6 100644
--- a/common/c_cpp/src/c/windows/wSemaphore.c
+++ b/common/c_cpp/src/c/windows/wSemaphore.c
@@ -18,7 +18,7 @@
* 02110-1301 USA
*/

-#include "port.h"
+#include "wombat/port.h"

#include "wombat/wSemaphore.h"

diff --git a/common/c_cpp/src/c/wombat/environment.h b/common/c_cpp/src/c/wombat/environment.h
index 8acafe5..98c8974 100644
--- a/common/c_cpp/src/c/wombat/environment.h
+++ b/common/c_cpp/src/c/wombat/environment.h
@@ -22,7 +22,7 @@
#ifndef _WOMBAT_ENVIRONMENT_H
#define _WOMBAT_ENVIRONMENT_H

-#include "wConfig.h"
+#include "wombat/wConfig.h"

/**
* This function will delete an environment varible.
--
1.7.7.6


[PATCH 04/50] Added WCACHEExpDLL macro for building NYSE wirecache

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

This is all part of DLL hell for windows: when we build common we need to
indicate that the mehtods from common are export from the DLL, and then we need
to indicate that they are imported when we build other products. This is
further complicated when products mix static and dynamic linking for different
components.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
common/c_cpp/src/c/linux/wConfig.h | 1 +
1 files changed, 1 insertions(+), 0 deletions(-)

diff --git a/common/c_cpp/src/c/linux/wConfig.h b/common/c_cpp/src/c/linux/wConfig.h
index 1d3d906..b817757 100644
--- a/common/c_cpp/src/c/linux/wConfig.h
+++ b/common/c_cpp/src/c/linux/wConfig.h
@@ -30,6 +30,7 @@
#define MAMDAExpDLL
#define MAMDAOPTExpDLL
#define MAMAExpBridgeDLL
+#define WCACHEExpDLL
#define MAMACALLTYPE

#define WCOMMONINLINE inline static
--
1.7.7.6


[PATCH 03/50] Fix timer implementation for Windows

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Address subtle difference between Linux and Windows networking required by
timers. Since there are no pipes on Windows, the timer implementation use a
loopback socket which requires send and recv rather the read/write.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
common/c_cpp/src/c/linux/network.c | 1 +
common/c_cpp/src/c/linux/port.h | 19 +++++++++++--------
common/c_cpp/src/c/timers.c | 12 ++++--------
common/c_cpp/src/c/timers.h | 12 ++++++------
common/c_cpp/src/c/windows/network.c | 2 +-
common/c_cpp/src/c/windows/port.c | 28 ++++++++++++++++++++++------
common/c_cpp/src/c/windows/port.h | 9 +++++++--
7 files changed, 52 insertions(+), 31 deletions(-)

diff --git a/common/c_cpp/src/c/linux/network.c b/common/c_cpp/src/c/linux/network.c
index 9969ff9..5c32910 100644
--- a/common/c_cpp/src/c/linux/network.c
+++ b/common/c_cpp/src/c/linux/network.c
@@ -22,6 +22,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
+#include <net/if.h>
#include "wombat/wCommon.h"


diff --git a/common/c_cpp/src/c/linux/port.h b/common/c_cpp/src/c/linux/port.h
index c328479..b3a928d 100644
--- a/common/c_cpp/src/c/linux/port.h
+++ b/common/c_cpp/src/c/linux/port.h
@@ -28,15 +28,9 @@
#ifndef PORT_LINUX_H__
#define PORT_LINUX_H__

-#if defined (__cplusplus)
-extern "C"
-{
-#endif
-
#include <pthread.h>
#include <sys/types.h>
#include <sys/ioctl.h>
-#include <sys/mman.h>
#include <sys/vfs.h>
#include <sys/param.h>
#include <sys/resource.h>
@@ -44,9 +38,9 @@ extern "C"
#include <sys/times.h>
#include <sys/socket.h>
#include <sys/utsname.h>
+#include <sys/mman.h>
#include <arpa/inet.h>
#include <netinet/in.h>
-#include <net/if.h>
#include <netdb.h>
#include <semaphore.h>
#include <dirent.h>
@@ -57,6 +51,10 @@ extern "C"

#include "wConfig.h"

+#if defined (__cplusplus)
+extern "C"
+{
+#endif
/* PTHREAD static locks are easy */
typedef pthread_mutex_t wthread_static_mutex_t;
#define WSTATIC_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
@@ -108,12 +106,17 @@ typedef pthread_key_t wthread_key_t;
int wsem_timedwait (wsem_t* sem, unsigned int ts);

/* Windows does not support AF_UNIX sockets, socketpairs, etc */
+#define wsocketstartup()
+#define wsocketcleanup()
+
#define wsocketpair(dom, type, prot, pair) (socketpair((dom),(type),(prot),(pair)))
#define wsetnonblock(s) (fcntl((s), F_SETFL, fcntl((s), F_GETFL) | O_NONBLOCK))
+#define wread read
+#define wwrite write

#define CPU_AFFINITY_SET cpu_set_t

-/* User pthreads for linux */
+/* Use pthreads for linux */
#define INVALID_THREAD (-1)

#define wthread_mutex_t pthread_mutex_t
diff --git a/common/c_cpp/src/c/timers.c b/common/c_cpp/src/c/timers.c
index cb93ce8..87ae925 100644
--- a/common/c_cpp/src/c/timers.c
+++ b/common/c_cpp/src/c/timers.c
@@ -84,11 +84,7 @@ int createTimerHeap (timerHeap* heap)

wthread_mutex_init (&heapImpl->mEndingLock, NULL);

- if (wthread_cond_init (&heapImpl->mEndingCond, NULL) != 0)
- {
- free (heapImpl);
- return -1;
- }
+ wthread_cond_init (&heapImpl->mEndingCond, NULL);

RB_INIT (&heapImpl->mTimeTree);

@@ -152,7 +148,7 @@ void* dispatchEntry (void *closure)
int numRead = 0;
do
{
- numRead = read(heap->mSockPair[0], &buff, sizeof (buff));
+ numRead = wread(heap->mSockPair[0], &buff, sizeof (buff));
if (numRead < 0)
{
if (errno == EINTR)
@@ -221,7 +217,7 @@ int destroyHeap (timerHeap heap)
timerHeapImpl* heapImpl = (timerHeapImpl*)heap;

writeagain:
- if (write (heapImpl->mSockPair[1], "d", 1) < 0)
+ if (wwrite (heapImpl->mSockPair[1], "d", 1) < 0)
{
if ((errno == EINTR) || (errno == EAGAIN))
goto writeagain;
@@ -277,7 +273,7 @@ int createTimer (timerElement* timer, timerHeap heap, timerFireCb cb, struct tim
if (kickPipe)
{
writeagain:
- if (write (heapImpl->mSockPair[1], "w", 1) < 0)
+ if (wwrite (heapImpl->mSockPair[1], "w", 1) < 0)
{
if ((errno == EINTR) || (errno == EAGAIN))
goto writeagain;
diff --git a/common/c_cpp/src/c/timers.h b/common/c_cpp/src/c/timers.h
index 22989f3..962163b 100644
--- a/common/c_cpp/src/c/timers.h
+++ b/common/c_cpp/src/c/timers.h
@@ -29,12 +29,12 @@ typedef void* timerHeap;

typedef void (*timerFireCb)(timerElement timer, void* mClosure);

-int createTimerHeap (timerHeap* heap);
-int startDispatchTimerHeap (timerHeap heap);
-wthread_t timerHeapGetTid (timerHeap heap);
-int destroyHeap (timerHeap heap);
+COMMONExpDLL int createTimerHeap (timerHeap* heap);
+COMMONExpDLL int startDispatchTimerHeap (timerHeap heap);
+COMMONExpDLL wthread_t timerHeapGetTid (timerHeap heap);
+COMMONExpDLL int destroyHeap (timerHeap heap);

-int createTimer (timerElement* timer, timerHeap heap, timerFireCb cb, struct timeval* timeout, void* closure);
-int destroyTimer (timerHeap heap, timerElement timer);
+COMMONExpDLL int createTimer (timerElement* timer, timerHeap heap, timerFireCb cb, struct timeval* timeout, void* closure);
+COMMONExpDLL int destroyTimer (timerHeap heap, timerElement timer);

#endif
diff --git a/common/c_cpp/src/c/windows/network.c b/common/c_cpp/src/c/windows/network.c
index b80914e..ce467f6 100644
--- a/common/c_cpp/src/c/windows/network.c
+++ b/common/c_cpp/src/c/windows/network.c
@@ -18,7 +18,7 @@
* 02110-1301 USA
*/

-#include "port.h"
+#include "wombat/port.h"

struct in_addr wresolve_ip (const char * arg)
{
diff --git a/common/c_cpp/src/c/windows/port.c b/common/c_cpp/src/c/windows/port.c
index 12f6431..2b92c90 100644
--- a/common/c_cpp/src/c/windows/port.c
+++ b/common/c_cpp/src/c/windows/port.c
@@ -18,7 +18,7 @@
* 02110-1301 USA
*/

-#include "port.h"
+#include "wombat/port.h"

int gettimeofday( struct timeval *result, void *dummy )
{
@@ -133,10 +133,10 @@ DWORD wthread_cond_wait( HANDLE *event, LPCRITICAL_SECTION *cs )
{
DWORD rval;

- LeaveCriticalSection( *cs );
+ LeaveCriticalSection( cs );
rval = WaitForSingleObject( *event, INFINITE );
ResetEvent( *event );
- EnterCriticalSection( *cs );
+ EnterCriticalSection( cs );

return rval;
}
@@ -177,6 +177,24 @@ const char *getlogin()
}

int
+wsocketstartup ()
+{
+ WSADATA wsaData;
+ int err =WSAStartup(MAKEWORD(2,2), &wsaData);
+ if (err)
+ {
+ return -1;
+ }
+ return 0;
+}
+
+void
+wsocketcleanup ()
+{
+ WSACleanup( );
+}
+
+int
wsocketpair (int domain, int type, int protocol, int* pair)
{
struct sockaddr_in addr;
@@ -184,7 +202,7 @@ wsocketpair (int domain, int type, int protocol, int* pair)
int l;

l = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (INVALID_SOCKET == pair[0])
+ if (INVALID_SOCKET == l)
return -1;

ZeroMemory (&addr, sizeof(addr));
@@ -233,8 +251,6 @@ wsocketpair (int domain, int type, int protocol, int* pair)
return -1;
}

- _close (l);
-
return 0;
}

diff --git a/common/c_cpp/src/c/windows/port.h b/common/c_cpp/src/c/windows/port.h
index 92080fd..df06b4f 100644
--- a/common/c_cpp/src/c/windows/port.h
+++ b/common/c_cpp/src/c/windows/port.h
@@ -86,6 +86,11 @@ typedef __int64 w_i64_t;
#define PATH_DELIM ';'

#define PATHSEP "\\"
+COMMONExpDLL int
+wsocketstartup (void);
+
+COMMONExpDLL void
+wsocketcleanup (void);

/* Socket Pair and set non blocking */
COMMONExpDLL int
@@ -143,8 +148,8 @@ int wsem_getvalue (wsem_t*, int* items);
#define strdup _strdup
#define strncasecmp _strnicmp
#define strcasecmp _stricmp
-#define read _read
-#define write _write
+#define wread(x,y,z) recv((x),(y),(z),0)
+#define wwrite(x,y,z) send((x),(y),(z),0)
#define close _close
#define sleep(x) Sleep( (x)*1000)

--
1.7.7.6


[PATCH 02/50] c linkage for wlock_xxx() methods

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

These must be declared with extern "C" to link correctly with C++ applications.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
common/c_cpp/src/c/wlock.h | 8 ++++++++
1 files changed, 8 insertions(+), 0 deletions(-)

diff --git a/common/c_cpp/src/c/wlock.h b/common/c_cpp/src/c/wlock.h
index 199dad8..f43051f 100644
--- a/common/c_cpp/src/c/wlock.h
+++ b/common/c_cpp/src/c/wlock.h
@@ -22,11 +22,19 @@
#ifndef _WOMBAT_WLOCK_H
#define _WOMBAT_WLOCK_H

+#include <wombat/wConfig.h>
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
typedef void * wLock;

COMMONExpDLL wLock wlock_create( void );
COMMONExpDLL void wlock_destroy( wLock lock );
COMMONExpDLL void wlock_lock( wLock lock );
COMMONExpDLL void wlock_unlock( wLock lock );
+#if defined (__cplusplus)
+}
+#endif

#endif /* _WOMBAT_WLOCK_H */
--
1.7.7.6


[PATCH 01/50] Ownership of message buffers

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

The msg payload bridges now have a msgPayloadSetParent() call which is
used to set the parent mamaMsg object for a payload. This allows the
"ownership" of the mamaMsg to be interrogated from a payload bridge
i.e. if a message can be modified or not. The use of the message owner
term has also been normalised.

Signed-off-by: Glenn McClements <gmcclements@nyx.com>
Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
common/c_cpp/src/c/wombat/wConfig.h | 2 +-
mama/c_cpp/src/c/msg.c | 106 +++++++++++++++---------
mama/c_cpp/src/c/msgfield.c | 6 +-
mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h | 1 +
mama/c_cpp/src/c/payload/avismsg/avispayload.c | 12 +++
mama/c_cpp/src/c/payload/avismsg/avispayload.h | 3 +
mama/c_cpp/src/c/payloadbridge.h | 12 ++-
7 files changed, 94 insertions(+), 48 deletions(-)

diff --git a/common/c_cpp/src/c/wombat/wConfig.h b/common/c_cpp/src/c/wombat/wConfig.h
index 3414bff..e7aeede 120000
--- a/common/c_cpp/src/c/wombat/wConfig.h
+++ b/common/c_cpp/src/c/wombat/wConfig.h
@@ -1 +1 @@
-../../.././src/c/linux/wConfig.h
\ No newline at end of file
+../../../src/c/linux/wConfig.h
\ No newline at end of file
diff --git a/mama/c_cpp/src/c/msg.c b/mama/c_cpp/src/c/msg.c
index ca17bf0..af51d25 100644
--- a/mama/c_cpp/src/c/msg.c
+++ b/mama/c_cpp/src/c/msg.c
@@ -92,7 +92,7 @@ typedef struct mamaMsgImpl_
msgBridge mBridgeMessage;
/*If we have detached the middleware message we will own it
and are responsible for destroying it*/
- int mMiddlewareMessageOwner;
+ int mMessageOwner;

/*The context if this is a msg from the dqStrategy cache*/
mamaDqContext* mDqStrategyContext;
@@ -121,7 +121,7 @@ mamaMsg_destroy (mamaMsg msg)
mamaMsgImpl_destroyLastVectorMsg (impl);
}

- if (impl->mPayloadBridge && impl->mMiddlewareMessageOwner)
+ if (impl->mPayloadBridge && impl->mMessageOwner)
{
if (impl->mFieldPayload)
{
@@ -138,9 +138,9 @@ mamaMsg_destroy (mamaMsg msg)

impl->mPayloadBridge = NULL;

- /*set mMiddlewareMessageOwner to zero now the payload has been destroyed to prevent
+ /*set mMessageOwner to zero now the payload has been destroyed to prevent
us destroying the underlying message again in the bridge specific function*/
- impl->mMiddlewareMessageOwner = 0;
+ impl->mMessageOwner = 0;
}

if (impl->mNestedMessages != NULL)
@@ -159,7 +159,7 @@ mamaMsg_destroy (mamaMsg msg)
{
/*Invoke the bridge specific destroy function*/
impl->mBridgeImpl->bridgeMamaMsgDestroy (
- impl->mBridgeMessage, impl->mMiddlewareMessageOwner);
+ impl->mBridgeMessage, impl->mMessageOwner);
}
else
{
@@ -228,8 +228,10 @@ mamaMsg_detach (mamaMsg msg)
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
mama_status status = MAMA_STATUS_OK;
- if (!impl) return MAMA_STATUS_NULL_ARG;
- if (!impl->mQueue) return MAMA_STATUS_INVALID_QUEUE;
+ msgPayload payload = NULL;
+
+ if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mQueue) return MAMA_STATUS_INVALID_QUEUE;
if (!impl->mBridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;

if (MAMA_STATUS_OK!=(status=mamaQueueImpl_detachMsg (impl->mQueue, msg)))
@@ -237,8 +239,8 @@ mamaMsg_detach (mamaMsg msg)
/*Message already logged in mamaQueueImpl_detachMsg*/
return status;
}
-
- /*We also need to detach the bridge specific message*/
+
+ /*We also need to detach the middleware bridge specific message*/
if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaMsgDetach
(impl->mBridgeMessage)))
{
@@ -246,10 +248,24 @@ mamaMsg_detach (mamaMsg msg)
"Could not detach bridge message.");
return status;
}
+ /* Copy the payload */
+ if (MAMA_STATUS_OK != (status =
+ (msg->mPayloadBridge->msgPayloadCopy (impl->mPayload,
+ &payload))))
+ {
+ mama_log(MAMA_LOG_LEVEL_ERROR,
+ "mamaMsg_detach() Failed. "
+ "Could not copy native payload [%d]", status);
+ return status;
+ }

+ msg->mPayload = payload;
+ msg->mPayloadBridge->msgPayloadSetParent (impl->mPayload, msg);
+
/*If this is a dqStrategy cache message*/
if (impl->mDqStrategyContext)
- {
+ {
+
if (MAMA_STATUS_OK!=(status=dqStrategyImpl_detachMsg (impl->mDqStrategyContext, msg)))
{
mama_log (MAMA_LOG_LEVEL_ERROR, "mamaMsg_detach(): "
@@ -259,7 +275,7 @@ mamaMsg_detach (mamaMsg msg)
}

/*We are now responsible for destroying the middleware message*/
- impl->mMiddlewareMessageOwner = 1;
+ impl->mMessageOwner = 1;

return MAMA_STATUS_OK;
}
@@ -383,7 +399,7 @@ mamaMsgImpl_setPayloadBridge (mamaMsg msg, mamaPayloadBridgeImpl* payloadBridge)
}

mama_status
-mamaMsgImpl_setPayload (mamaMsg msg, msgPayload payload, short ownsMsg)
+mamaMsgImpl_setPayload (mamaMsg msg, msgPayload payload, short owner)
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
if (!impl) return MAMA_STATUS_NULL_ARG;
@@ -400,8 +416,10 @@ mamaMsgImpl_setPayload (mamaMsg msg, msgPayload payload, short ownsMsg)
/* Do not destroy the list. We can reuse the memory! */
}

- impl->mPayload = payload;
- impl->mMiddlewareMessageOwner = ownsMsg;
+ impl->mPayload = payload;
+ impl->mMessageOwner = owner;
+ impl->mPayloadBridge->msgPayloadSetParent (impl->mPayload, msg);
+
return MAMA_STATUS_OK;
}

@@ -434,7 +452,7 @@ mamaMsg_createForPayloadBridge (mamaMsg* msg, mamaPayloadBridge payloadBridge)
return mamaMsgImpl_createForPayload (msg,
payload,
payloadBridge,
- 0);
+ 1);
}


@@ -469,7 +487,7 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
/*If there is an existing message buffer which we own destroy it*/
if (impl->mBridgeImpl &&
impl->mBridgeMessage &&
- impl->mMiddlewareMessageOwner)
+ impl->mMessageOwner)
{
if (MAMA_STATUS_OK!=
impl->mBridgeImpl->bridgeMamaMsgDestroyMiddlewareMsg (
@@ -481,8 +499,7 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
}
impl->mBridgeMessage = NULL;
}
-
- impl->mMiddlewareMessageOwner = 0;
+ impl->mMessageOwner = 0;

if (id == '\0')
id = (char) ((const char*)data) [0];
@@ -550,6 +567,7 @@ mamaMsgImpl_createNestedForPayload (mamaMsg* result,
impl->mPayloadBridge = parent->mPayloadBridge;
impl->mPayload = payload;
impl->mParent = parent;
+ impl->mPayloadBridge->msgPayloadSetParent (impl->mPayload, impl);

return MAMA_STATUS_OK;
}
@@ -562,20 +580,20 @@ mamaMsgImpl_setMessageOwner (mamaMsg msg,
mama_status status = MAMA_STATUS_OK;

mamaMsgImpl* impl = (mamaMsgImpl*)msg;
- impl->mMiddlewareMessageOwner = owner;
+ impl->mMessageOwner = owner;

return status;
}

mama_status
mamaMsgImpl_getMessageOwner (mamaMsg msg,
- short* owner)
+ short* owner)
{
mama_status status = MAMA_STATUS_OK;

mamaMsgImpl* impl = (mamaMsgImpl*)msg;
- *owner = impl->mMiddlewareMessageOwner;
-
+ *owner = impl->mMessageOwner;
+
return status;
}

@@ -583,7 +601,7 @@ mama_status
mamaMsgImpl_createForPayload (mamaMsg* msg,
msgPayload payload,
mamaPayloadBridgeImpl* payloadBridge,
- short noDeletePayload)
+ short owner)
{
mama_status status = MAMA_STATUS_OK;
mamaMsgImpl* impl = NULL;
@@ -609,14 +627,20 @@ mamaMsgImpl_createForPayload (mamaMsg* msg,
}
*msg = (mamaMsg)impl;

- impl->mPayload = payload;
- impl->mPayloadBridge = payloadBridge;
- impl->mFieldPayload = NULL;
+ impl->mPayload = payload;
+ impl->mPayloadBridge = payloadBridge;
+ impl->mFieldPayload = NULL;
/*These will be set later if necessary*/
- impl->mBridgeImpl = NULL;
- impl->mBridgeMessage = NULL;
- impl->mMiddlewareMessageOwner = noDeletePayload == 1 ? 0 : 1;
- impl->mDqStrategyContext = NULL;
+ impl->mBridgeImpl = NULL;
+ impl->mBridgeMessage = NULL;
+ impl->mMessageOwner = owner;
+ impl->mDqStrategyContext = NULL;
+
+ /* The payloadBridge and payload are optional for this function */
+ if (payloadBridge)
+ {
+ impl->mPayloadBridge->msgPayloadSetParent (impl->mPayload, impl);
+ }

return status;
}
@@ -650,7 +674,7 @@ mamaMsg_copy (mamaMsg src, mamaMsg* copy)
if (*copy == NULL)
{
if (MAMA_STATUS_OK != (status =
- mamaMsgImpl_createForPayload (copy, payload, source->mPayloadBridge, 0)))
+ mamaMsgImpl_createForPayload (copy, payload, source->mPayloadBridge, 1)))
{
mama_log(MAMA_LOG_LEVEL_ERROR,
"mamaMsg_copy Failed. "
@@ -695,7 +719,7 @@ mamaMsg_create (mamaMsg* msg)
return mamaMsgImpl_createForPayload (msg,
payload,
bridge,
- 0);
+ 1);
}

mama_status
@@ -2958,7 +2982,7 @@ mamaMsg_getVectorMsg (
impl->mPayloadBridge);
/*
* We do not detach the middle ware message so we do
- * not own it. NOTE Inverse sense to the create call below
+ * not own it.
*/
mamaMsgImpl_setPayload (impl->mLastVectorPayloadMsg[i],
payloadVector[i],
@@ -2969,12 +2993,12 @@ mamaMsg_getVectorMsg (
{
/*
* We create from payload so payload owns
- * We set noDelete to true to not delete the payload
+ * We set owner to false to not delete the payload
*/
mamaMsgImpl_createForPayload (&(impl->mLastVectorPayloadMsg[i]),
payloadVector[i],
impl->mPayloadBridge,
- 1);
+ 0);
}

/*Store the maxumim vector we have encountered*/
@@ -3052,9 +3076,9 @@ mamaMsgImpl_getPayloadBuffer (
if (impl->mPayloadBridge)
{
return
- impl->mPayloadBridge->msgPayloadGetByteBuffer (impl->mPayload,
- buffer,
- bufferLength);
+ impl->mPayloadBridge->msgPayloadGetByteBuffer (impl->mPayload,
+ buffer,
+ bufferLength);
}
return MAMA_STATUS_NULL_ARG;
}
@@ -3072,9 +3096,9 @@ mamaMsg_setNewBuffer (mamaMsg msg, void* buffer,
if (impl->mPayloadBridge)
{
return
- impl->mPayloadBridge->msgPayloadUnSerialize (impl->mPayload,
- buffer,
- size);
+ impl->mPayloadBridge->msgPayloadUnSerialize (impl->mPayload,
+ buffer,
+ size);
}

return MAMA_STATUS_NULL_ARG;
diff --git a/mama/c_cpp/src/c/msgfield.c b/mama/c_cpp/src/c/msgfield.c
index 747f983..ac5fee3 100644
--- a/mama/c_cpp/src/c/msgfield.c
+++ b/mama/c_cpp/src/c/msgfield.c
@@ -755,7 +755,7 @@ mamaMsgField_getVectorMsg (
impl->myPayloadBridge);
/*
* We do not detach the middleware message so we do
- * not own it. NOTE Inverse sense to the create call below
+ * not own it.
*/
mamaMsgImpl_setPayload (impl->myLastVectorPayloadMsg[i],
payloadVector[i],
@@ -766,12 +766,12 @@ mamaMsgField_getVectorMsg (
{
/*
* We create from payload so payload owns
- * We set noDelete to true to not delete the payload
+ * We set owner to false to not delete the payload
*/
mamaMsgImpl_createForPayload (&(impl->myLastVectorPayloadMsg[i]),
payloadVector[i],
impl->myPayloadBridge,
- 1);
+ 0);
}

/*Store the maxumim vector we have encountered*/
diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
index c68c28e..ade9b0b 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
@@ -33,6 +33,7 @@ typedef struct avisPayload
{
Attributes* mAvisMsg;
struct avisFieldPayload* mAvisField;
+ mamaMsg mParent;

void * mBuffer;
uint16_t mBufferLen;
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index e0f9df6..28cca7a 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -205,6 +205,18 @@ avismsgPayload_destroy (msgPayload msg)
}

mama_status
+avisPayload_setParent (msgPayload msg,
+ const mamaMsg parent)
+{
+ avisPayloadImpl* impl = (avisPayloadImpl*) msg;
+ if (!impl) return MAMA_STATUS_NULL_ARG;
+
+ impl->mParent = parent;
+
+ return MAMA_STATUS_OK;
+}
+
+mama_status
avismsgPayload_getByteSize (const msgPayload msg,
mama_size_t* size)
{
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.h b/mama/c_cpp/src/c/payload/avismsg/avispayload.h
index 4bb7f37..a2c1388 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.h
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.h
@@ -54,6 +54,9 @@ avismsgPayload_clear (msgPayload msg);
extern mama_status
avismsgPayload_destroy (msgPayload msg);
extern mama_status
+avismsgPayload_setParent (msgPayload msg,
+ const mamaMsg parent);
+extern mama_status
avismsgPayload_getByteSize (const msgPayload msg,
mama_size_t* size);

diff --git a/mama/c_cpp/src/c/payloadbridge.h b/mama/c_cpp/src/c/payloadbridge.h
index 97396f9..6d851ae 100644
--- a/mama/c_cpp/src/c/payloadbridge.h
+++ b/mama/c_cpp/src/c/payloadbridge.h
@@ -55,6 +55,8 @@ do \
= identifier ## Payload_clear; \
msgPayloadImpl->msgPayloadDestroy \
= identifier ## Payload_destroy; \
+ msgPayloadImpl->msgPayloadSetParent \
+ = identifier ## Payload_setParent; \
msgPayloadImpl->msgPayloadGetByteSize \
= identifier ## Payload_getByteSize; \
msgPayloadImpl->msgPayloadGetNumFields \
@@ -437,6 +439,9 @@ typedef mama_status
typedef mama_status
(*msgPayload_destroy) (msgPayload msg);
typedef mama_status
+(*msgPayload_setParent) (msgPayload msg,
+ const mamaMsg parent);
+typedef mama_status
(*msgPayload_getByteSize) (msgPayload msg,
mama_size_t* size);
typedef mama_status
@@ -456,14 +461,14 @@ typedef mama_status
void* closure);

typedef mama_status
-(*msgPayload_serialize) (const msgPayload msg,
+(*msgPayload_serialize) (const msgPayload msg,
const void** buffer,
mama_size_t* bufferLength);

typedef mama_status
-(*msgPayload_unSerialize) (const msgPayload msg,
+(*msgPayload_unSerialize) (const msgPayload msg,
const void** buffer,
- mama_size_t bufferLength);
+ mama_size_t bufferLength);

typedef mama_status
(*msgPayload_getByteBuffer) (const msgPayload msg,
@@ -1303,6 +1308,7 @@ typedef struct mamaPayloadBridgeImpl_
msgPayload_copy msgPayloadCopy;
msgPayload_clear msgPayloadClear;
msgPayload_destroy msgPayloadDestroy;
+ msgPayload_setParent msgPayloadSetParent;
msgPayload_getByteSize msgPayloadGetByteSize;
msgPayload_getNumFields msgPayloadGetNumFields;
msgPayload_getSendSubject msgPayloadGetSendSubject;
--
1.7.7.6

2181 - 2200 of 2311