Date   

[PATCH 34/50] [examples] Put C examples in .../examples/c

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

For consistency with the source tree and to keep the examples
for different languages separate.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/examples/c/Makefile.am | 2 +-
mama/c_cpp/src/examples/c/Makefile.sample | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/mama/c_cpp/src/examples/c/Makefile.am b/mama/c_cpp/src/examples/c/Makefile.am
index 9a71db8..ff9eb1c 100644
--- a/mama/c_cpp/src/examples/c/Makefile.am
+++ b/mama/c_cpp/src/examples/c/Makefile.am
@@ -62,7 +62,7 @@ dist_mamamultisubscriberc_SOURCES = mamamultisubscriberc.c
dist_mamaftmemberc_SOURCES = mamaftmemberc.c

# Additional installation directory for sample source code
-exampledir = ${prefix}/examples/mama
+exampledir = ${prefix}/examples/mama/c

# Rule to install the example programs:
example_DATA = \
diff --git a/mama/c_cpp/src/examples/c/Makefile.sample b/mama/c_cpp/src/examples/c/Makefile.sample
index 718195e..513fcb0 100644
--- a/mama/c_cpp/src/examples/c/Makefile.sample
+++ b/mama/c_cpp/src/examples/c/Makefile.sample
@@ -13,7 +13,7 @@
# SOLARIS/Forte: The makefile will need modified to use the Solaris compiler.
# See example below.
#=============================================================================
-API_HOME=../../
+API_HOME=../../../

## Solaris/Forte compiler:
#CXX = CC
--
1.7.7.6


[PATCH 33/50] [examples] Changes for Windows

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Use correct calling convention for callbacks and a few other minor
changes.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/examples/c/mamaftmemberc.c | 4 ++--
mama/c_cpp/src/examples/c/mamainboxc.c | 4 ++++
mama/c_cpp/src/examples/c/mamalistenc.c | 1 -
mama/c_cpp/src/examples/c/mamamultisubscriberc.c | 14 +++++++++++---
mama/c_cpp/src/examples/c/mamaproxyc.c | 4 ++++
mama/c_cpp/src/examples/c/mamapublisherc.c | 2 +-
6 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/examples/c/mamaftmemberc.c b/mama/c_cpp/src/examples/c/mamaftmemberc.c
index 8ba53c2..e2e3aa9 100644
--- a/mama/c_cpp/src/examples/c/mamaftmemberc.c
+++ b/mama/c_cpp/src/examples/c/mamaftmemberc.c
@@ -51,14 +51,14 @@ static void parseCommandLine (int argc, const char **argv);
static void initializeMama (void);
static void createFtMember (void);

-static void onFtStateChangeCb (mamaFtMember ftMember,
+static void MAMACALLTYPE onFtStateChangeCb (mamaFtMember ftMember,
const char* groupName,
mamaFtState state,
void* closure);

static void usage (int exitStatus);

-static void onFtStateChangeCb (mamaFtMember ftMember,
+static void MAMACALLTYPE onFtStateChangeCb (mamaFtMember ftMember,
const char* groupName,
mamaFtState state,
void* closure)
diff --git a/mama/c_cpp/src/examples/c/mamainboxc.c b/mama/c_cpp/src/examples/c/mamainboxc.c
index 4e2b9b7..48d50a7 100644
--- a/mama/c_cpp/src/examples/c/mamainboxc.c
+++ b/mama/c_cpp/src/examples/c/mamainboxc.c
@@ -198,6 +198,10 @@ static void sendRequest (void)
exit (status);
}

+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "mamaPublisher_sendFromInboxWithThrottle(): "
+ " %s",
+ mamaMsg_toString(msg));
/*The message can be destroyed once the sendCompleteCb has been invoked.*/
status = mamaPublisher_sendFromInboxWithThrottle (gPublisher, gInbox, msg,
sendCompleteCb, NULL);
diff --git a/mama/c_cpp/src/examples/c/mamalistenc.c b/mama/c_cpp/src/examples/c/mamalistenc.c
index 80ca789..be270d1 100644
--- a/mama/c_cpp/src/examples/c/mamalistenc.c
+++ b/mama/c_cpp/src/examples/c/mamalistenc.c
@@ -763,7 +763,6 @@ transportCb (mamaTransport tport,
void initializeMama (void)
{
mama_status status = MAMA_STATUS_OK;
- mamaPayloadBridge payBridge = NULL;

/*
mama_setApplicationName should be called before mama_open().
diff --git a/mama/c_cpp/src/examples/c/mamamultisubscriberc.c b/mama/c_cpp/src/examples/c/mamamultisubscriberc.c
index 2364ffb..c017dc9 100644
--- a/mama/c_cpp/src/examples/c/mamamultisubscriberc.c
+++ b/mama/c_cpp/src/examples/c/mamamultisubscriberc.c
@@ -21,7 +21,6 @@

#include "mama/mama.h"
#include "string.h"
-#include <unistd.h>
#define MAX_BRIDGES 10

static const char * gTopic = "MAMA_TOPIC";
@@ -52,21 +51,24 @@ NULL
static void parseCommandLine (int argc, const char **argv);
static void initialize (void);
static void createSubscriber (mamaBridge bridge, mamaTransport transport);
-static void startCB ( mama_status status );
+static void MAMACALLTYPE startCB ( mama_status status );
static void start (void);
static void displayMsg (mamaMsg msg);
static void usage (int exitStatus);

static void
+MAMACALLTYPE
displayCb (const mamaMsg msg,
const mamaMsgField field,
void * closure);

static void
+MAMACALLTYPE
createCb (mamaSubscription subscription,
void * closure);

static void
+MAMACALLTYPE
errorCb (mamaSubscription subscription,
mama_status status,
void* platformError,
@@ -74,6 +76,7 @@ errorCb (mamaSubscription subscription,
void* closure);

static void
+MAMACALLTYPE
msgCb (mamaSubscription subscription,
mamaMsg msg,
void * closure,
@@ -189,7 +192,8 @@ static void createSubscriber (mamaBridge bridge, mamaTransport transport)
}
}

-void startCB ( mama_status status )
+void MAMACALLTYPE
+startCB ( mama_status status )
{}

void start ()
@@ -206,6 +210,7 @@ void start ()
}

static void
+MAMACALLTYPE
createCb (mamaSubscription subscription, void *closure)
{
if (gQuietLevel < 2)
@@ -215,6 +220,7 @@ createCb (mamaSubscription subscription, void *closure)
}

static void
+MAMACALLTYPE
errorCb (mamaSubscription subscription,
mama_status status,
void* platformError,
@@ -227,6 +233,7 @@ errorCb (mamaSubscription subscription,
}

static void
+MAMACALLTYPE
msgCb (mamaSubscription subscription,
mamaMsg msg,
void* closure,
@@ -240,6 +247,7 @@ msgCb (mamaSubscription subscription,
}

void
+MAMACALLTYPE
displayCb (const mamaMsg msg,
const mamaMsgField field,
void* closure)
diff --git a/mama/c_cpp/src/examples/c/mamaproxyc.c b/mama/c_cpp/src/examples/c/mamaproxyc.c
index dfb5aa0..e50fb84 100644
--- a/mama/c_cpp/src/examples/c/mamaproxyc.c
+++ b/mama/c_cpp/src/examples/c/mamaproxyc.c
@@ -182,6 +182,10 @@ static void createPublisher ()
"_MD",
NULL);

+ // Turn off updateing of the messages as incoming messages
+ // from FH already have seqnum and senderid
+ mamaDQPublisherManager_setSeqNum(gDQPubManager, 0);
+ mamaDQPublisherManager_setSenderId(gDQPubManager, 0);
if (gSendSync)
{
mamaTimer_create (&gSyncTimer, gPubDefaultQueue, syncCallback, 15, gDQPubManager);
diff --git a/mama/c_cpp/src/examples/c/mamapublisherc.c b/mama/c_cpp/src/examples/c/mamapublisherc.c
index e00ea9d..a5fce51 100644
--- a/mama/c_cpp/src/examples/c/mamapublisherc.c
+++ b/mama/c_cpp/src/examples/c/mamapublisherc.c
@@ -305,7 +305,7 @@ inboundMsgCb (mamaSubscription subscription,

if (gQuietLevel < 2)
{
- printf ("Recieved inbound msg. Sending response\n");
+ printf ("Recieved inbound msg. (%s) Sending response\n", mamaMsg_toString (msg));
}

if (!mamaMsg_isFromInbox (msg))
--
1.7.7.6


[PATCH 32/50] [avis-payload] Fixed compiler warnings

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Primarily moved declarations to beginning of scope for non-C99 compliant compilers

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/payload/avismsg/avispayload.c | 103 +++++++++++++----------
mama/c_cpp/src/c/payload/avismsg/avispayload.h | 2 +
mama/c_cpp/src/c/payload/avismsg/msgfield.c | 2 +-
3 files changed, 61 insertions(+), 46 deletions(-)

diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index 28cca7a..f3c044a 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -131,15 +131,17 @@ avismsgPayload_getType ()
mama_status
avismsgPayload_create (msgPayload* msg)
{
- if (!msg) return MAMA_STATUS_NULL_ARG;

Attributes* avisMsg = attributes_create();
+ avisPayloadImpl* newPayload = NULL;
+
+ if (!msg) return MAMA_STATUS_NULL_ARG;

if (!avisMsg) {
mama_log (MAMA_LOG_LEVEL_ERROR, "attributes_create() failed. Could not create Avis msg");
return MAMA_STATUS_PLATFORM;
}
- avisPayloadImpl* newPayload = (avisPayloadImpl*)calloc (1, sizeof(avisPayloadImpl));
+ newPayload = (avisPayloadImpl*)calloc (1, sizeof(avisPayloadImpl));
newPayload->mAvisMsg=avisMsg;

*msg = newPayload;
@@ -194,8 +196,7 @@ mama_status
avismsgPayload_destroy (msgPayload msg)
{
CHECK_PAYLOAD(msg);
- attributes_free(avisPayload(msg));
- free(avisPayload(msg));
+ attributes_destroy(avisPayload(msg));

avismsgPayloadIter_destroy(avisPayloadImpl(msg)->mIterator);
free(avisPayloadImpl(msg)->mAvisField);
@@ -205,7 +206,7 @@ avismsgPayload_destroy (msgPayload msg)
}

mama_status
-avisPayload_setParent (msgPayload msg,
+avismsgPayload_setParent (msgPayload msg,
const mamaMsg parent)
{
avisPayloadImpl* impl = (avisPayloadImpl*) msg;
@@ -236,15 +237,15 @@ avismsgPayload_unSerialize (const msgPayload msg,
int64_t tempint64;
real64_t tempreal64;

+ uint32_t currLen = 1;
+ uint16_t len =0;
+ uint8_t * buffPos = (void*)buffer;

if (!impl->mAvisMsg)
impl->mAvisMsg = attributes_create();
else
attributes_clear(impl->mAvisMsg);

- uint32_t currLen = 1;
- uint16_t len =0;
- void * buffPos = (void*)buffer;
buffPos+=1; // Skip payload identifier
while (currLen < bufferLength)
{
@@ -307,7 +308,12 @@ avismsgPayload_serialize (const msgPayload msg,
mama_size_t* bufferLength)
{
avisPayloadImpl* impl = (avisPayloadImpl*) msg;
- mama_status status = MAMA_STATUS_OK;
+ uint8_t * buffPos = NULL;
+ mama_status status = MAMA_STATUS_OK;
+ uint16_t len = 0;
+ uint32_t currLen = 0;
+ avisFieldPayload* currField = NULL;
+
if (!impl->mIterator)
{
status = avismsgPayloadIter_create((msgPayloadIter*) &impl->mIterator, msg);
@@ -333,10 +339,8 @@ avismsgPayload_serialize (const msgPayload msg,
impl->mBufferLen = 200;
}

- void * buffPos = impl->mBuffer;
- uint16_t len =0;
- uint32_t currLen = 0;
- avisFieldPayload* currField = avismsgPayloadIter_begin(impl->mIterator, (msgFieldPayload) impl->mAvisField, msg);
+ buffPos = impl->mBuffer;
+ currField = avismsgPayloadIter_begin(impl->mIterator, (msgFieldPayload) impl->mAvisField, msg);

*(int8_t *)(buffPos) = MAMA_PAYLOAD_AVIS;
buffPos+=1; currLen+=1;
@@ -346,12 +350,13 @@ avismsgPayload_serialize (const msgPayload msg,
switch (currField->mValue->type)
{
case TYPE_INT32:
- len=strlen(currField->mName);
+ len=(uint16_t)strlen(currField->mName);
if (impl->mBufferLen < currLen+3+len+sizeof(int32_t))
{
void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
impl->mBuffer = vp;
- buffPos=&impl->mBuffer[currLen];
+ buffPos=(uint8_t *)impl->mBuffer;
+ buffPos+=currLen;
impl->mBufferLen+=200;
}
*(int8_t *)(buffPos) = 1; buffPos+=1; currLen+=1;
@@ -365,7 +370,8 @@ avismsgPayload_serialize (const msgPayload msg,
{
void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
impl->mBuffer = vp;
- buffPos=&impl->mBuffer[currLen];
+ buffPos=&impl->mBuffer;
+ buffPos+=currLen;
impl->mBufferLen+=200;
}
*(int8_t *)(buffPos) = 2; buffPos+=1; currLen+=1;
@@ -379,7 +385,8 @@ avismsgPayload_serialize (const msgPayload msg,
{
void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
impl->mBuffer = vp;
- buffPos=&impl->mBuffer[currLen];
+ buffPos=&impl->mBuffer;
+ buffPos+=currLen;
impl->mBufferLen+=200;
}
*(int8_t *)(buffPos) = 3; buffPos+=1; currLen+=1;
@@ -393,7 +400,8 @@ avismsgPayload_serialize (const msgPayload msg,
{
void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
impl->mBuffer = vp;
- buffPos=&impl->mBuffer[currLen];
+ buffPos=&impl->mBuffer;
+ buffPos+=currLen;
impl->mBufferLen+=200;
}
*(int8_t *)(buffPos) = 4; buffPos+=1; currLen+=1;
@@ -420,8 +428,8 @@ avismsgPayload_getByteBuffer (const msgPayload msg,
const void** buffer,
mama_size_t* bufferLength)
{
- CHECK_PAYLOAD(msg);
avisPayloadImpl* impl = (avisPayloadImpl*)msg;
+ CHECK_PAYLOAD(msg);

*buffer = impl->mAvisMsg;

@@ -435,8 +443,8 @@ avismsgPayload_setByteBuffer (const msgPayload msg,
const void* buffer,
mama_size_t bufferLength)
{
- CHECK_PAYLOAD(msg);
avisPayloadImpl* impl = (avisPayloadImpl*)msg;
+ CHECK_PAYLOAD(msg);

impl->mAvisMsg=(Attributes*) buffer;

@@ -467,6 +475,10 @@ avismsgPayload_toString (const msgPayload msg)
{
avisPayloadImpl* impl = (avisPayloadImpl*)msg;
mama_status status = MAMA_STATUS_OK;
+ char *strpos = NULL;
+ bool comma = false;
+ uint16_t curlen = 1;
+ avisFieldPayload* currField = NULL;
if (!impl->mIterator)
{
status = avismsgPayloadIter_create((msgPayloadIter*) &impl->mIterator, msg);
@@ -491,12 +503,12 @@ avismsgPayload_toString (const msgPayload msg)
impl->mStringBufferLen = 200;
}

- char *strpos =impl->mStringBuffer;
+ strpos =impl->mStringBuffer;
sprintf (strpos, "%s", "{");strpos++;
- bool comma = false;
- uint16_t curlen = 1;
+ comma = false;
+ curlen = 1;

- avisFieldPayload* currField = avismsgPayloadIter_begin(impl->mIterator, (msgFieldPayload) impl->mAvisField, msg);
+ currField = avismsgPayloadIter_begin(impl->mIterator, (msgFieldPayload) impl->mAvisField, msg);
while (currField != NULL) {
char valueString[512];
avisValue_getFieldAsString(currField->mValue, NULL, 0, valueString, sizeof(valueString));
@@ -533,6 +545,7 @@ avismsgPayload_iterateFields (const msgPayload msg,
{
avisPayloadImpl* impl = (avisPayloadImpl*)msg;
mama_status status = MAMA_STATUS_OK;
+ avisFieldPayload* currField = NULL;

if (!impl->mIterator)
{
@@ -552,7 +565,7 @@ avismsgPayload_iterateFields (const msgPayload msg,
return status;
}

- avisFieldPayload* currField = avismsgPayloadIter_begin(impl->mIterator, (msgFieldPayload) impl->mAvisField, msg);
+ currField = avismsgPayloadIter_begin(impl->mIterator, (msgFieldPayload) impl->mAvisField, msg);
while (currField != NULL) {
mamaMsgFieldImpl_setPayload (field, currField);
(cb)(msg, field, closure);
@@ -578,13 +591,12 @@ mama_status
avismsgPayload_apply (msgPayload dest,
const msgPayload src)
{
+ avisPayloadImpl* implSrc = (avisPayloadImpl*)src;
+ mama_status status = MAMA_STATUS_OK;
+ avisFieldPayload* currField = NULL;
CHECK_PAYLOAD(dest);
CHECK_PAYLOAD(src);

- avisPayloadImpl* implSrc = (avisPayloadImpl*)src;
-
-
- mama_status status = MAMA_STATUS_OK;

if (!implSrc->mIterator)
{
@@ -604,7 +616,7 @@ avismsgPayload_apply (msgPayload dest,
return status;
}

- avisFieldPayload* currField = avismsgPayloadIter_begin(implSrc->mIterator, (msgFieldPayload) implSrc->mAvisField, src);
+ currField = avismsgPayloadIter_begin(implSrc->mIterator, (msgFieldPayload) implSrc->mAvisField, src);
while (currField != NULL) {
switch (currField->mValue->type)
{
@@ -1544,23 +1556,23 @@ avismsgPayload_getField (const msgPayload msg,
mama_fid_t fid,
msgFieldPayload* result)
{
+ avisPayloadImpl* impl = (avisPayloadImpl*)msg;
+ char tempName[64];
+ char* id = (char*) name;
+ Value* pValue = NULL;
CHECK_PAYLOAD(msg);
CHECK_NAME(name, fid);

- avisPayloadImpl* impl = (avisPayloadImpl*)msg;
-
if ( (!impl->mAvisField) && (avismsgFieldPayload_create((msgFieldPayload*) &impl->mAvisField) != MAMA_STATUS_OK) ) {
return MAMA_STATUS_PLATFORM;
}

- char tempName[64];
- char* id = (char*) name;
if (fid!=0)
{
snprintf (tempName, 63, "%d", fid);
id=tempName;
}
- Value* pValue = attributes_get(avisPayload(msg), id);
+ pValue = attributes_get(avisPayload(msg), id);
if ((!pValue) &&(name))
{
pValue = attributes_get(avisPayload(msg), name);
@@ -1816,9 +1828,10 @@ avismsgPayloadIter_create (msgPayloadIter* iter,
msgPayload msg)
{
mama_status status = MAMA_STATUS_OK;
+ avisIterator* impl = NULL;
CHECK_PAYLOAD(msg);

- avisIterator* impl = calloc (1, sizeof (avisIterator));
+ impl = calloc (1, sizeof (avisIterator));
if (!impl) return (MAMA_STATUS_NOMEM);

impl->mMsgIterator = (AttributesIter*) avis_emalloc(sizeof(AttributesIter));
@@ -1843,16 +1856,15 @@ avismsgPayloadIter_get (msgPayloadIter iter,
msgFieldPayload field,
msgPayload msg)
{
+ avisIterator* impl = (avisIterator*) iter;
if (!iter || !msg || !field) return NULL;

- avisIterator* impl = (avisIterator*) iter;
-
- avisField(field)->mName = attributes_iter_name(impl->mMsgIterator);
- avisField(field)->mValue = attributes_iter_value(impl->mMsgIterator);
+ avisField(field)->mName = attributes_iter_name(impl->mMsgIterator);
+ avisField(field)->mValue = attributes_iter_value(impl->mMsgIterator);

- if ((strcmp(SUBJECT_FIELD_NAME, avisField(field)->mName) == 0) ||
- (strcmp(INBOX_FIELD_NAME, avisField(field)->mName)== 0))
- return (avismsgPayloadIter_next(iter,field,msg));
+ if ((strcmp(SUBJECT_FIELD_NAME, avisField(field)->mName) == 0) ||
+ (strcmp(INBOX_FIELD_NAME, avisField(field)->mName)== 0))
+ return (avismsgPayloadIter_next(iter,field,msg));

return field;
}
@@ -1862,9 +1874,9 @@ avismsgPayloadIter_next (msgPayloadIter iter,
msgFieldPayload field,
msgPayload msg)
{
+ avisIterator* impl = (avisIterator*) iter;
if (!iter || !msg || !field) return NULL;

- avisIterator* impl = (avisIterator*) iter;
if (!attributes_iter_next(impl->mMsgIterator))
return NULL;

@@ -1904,8 +1916,9 @@ mama_status
avismsgPayloadIter_associate (msgPayloadIter iter,
msgPayload msg)
{
- CHECK_PAYLOAD(msg);
avisIterator* impl = (avisIterator*) iter;
+ CHECK_PAYLOAD(msg);
+
if (!impl) return MAMA_STATUS_NULL_ARG;

attributes_iter_init(impl->mMsgIterator, avisPayload(msg));
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.h b/mama/c_cpp/src/c/payload/avismsg/avispayload.h
index a2c1388..2e87888 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.h
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.h
@@ -30,9 +30,11 @@ extern "C" {
#endif


+MAMAExpDLL
extern mama_status
avismsgPayload_destroyImpl (mamaPayloadBridge mamaPayloadBridge);

+MAMAExpDLL
extern mama_status
avismsgPayload_createImpl (mamaPayloadBridge* result, char* identifier);

diff --git a/mama/c_cpp/src/c/payload/avismsg/msgfield.c b/mama/c_cpp/src/c/payload/avismsg/msgfield.c
index eb0c081..474055b 100755
--- a/mama/c_cpp/src/c/payload/avismsg/msgfield.c
+++ b/mama/c_cpp/src/c/payload/avismsg/msgfield.c
@@ -86,9 +86,9 @@ mamaMsgField_getDescriptor(
const mamaMsgField msgField,
mamaFieldDescriptor* result)
{
+ mama_status status = MAMA_STATUS_OK;
CHECK_FIELD(msgField);

- mama_status status = MAMA_STATUS_OK;
/* The FD may have already been obtained for this field from
* getName, getFid etc.*/
if (avisField->mFieldDesc)
--
1.7.7.6


[PATCH 31/50] [avis] Windows Changes

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Callbacks must be declared with the correct calling convention for Windows.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/bridge/avis/msg.c | 3 +--
mama/c_cpp/src/c/bridge/avis/queue.c | 2 +-
mama/c_cpp/src/c/bridge/avis/sub.c | 4 ++--
mama/c_cpp/src/c/bridge/avis/subinitial.c | 8 ++++----
mama/c_cpp/src/c/bridge/avis/timer.c | 4 ++--
5 files changed, 10 insertions(+), 11 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/msg.c b/mama/c_cpp/src/c/bridge/avis/msg.c
index b422543..c0553af 100644
--- a/mama/c_cpp/src/c/bridge/avis/msg.c
+++ b/mama/c_cpp/src/c/bridge/avis/msg.c
@@ -84,8 +84,7 @@ mama_status
avisBridgeMamaMsg_destroyMiddlewareMsg (msgBridge msg)
{
CHECK_MSG(msg);
- attributes_free(avisMsg(msg)->mAvisMsg);
- free(avisMsg(msg)->mAvisMsg);
+ attributes_destroy(avisMsg(msg)->mAvisMsg);
avisMsg(msg)->mAvisMsg = NULL;

return MAMA_STATUS_OK;
diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c
index fd8c629..1e7263c 100644
--- a/mama/c_cpp/src/c/bridge/avis/queue.c
+++ b/mama/c_cpp/src/c/bridge/avis/queue.c
@@ -174,7 +174,7 @@ avisBridgeMamaQueue_dispatchEvent (queueBridge queue)
return MAMA_STATUS_OK;
}

-static void queueCb (void *ignored, void* closure)
+static void MAMACALLTYPE queueCb (void *ignored, void* closure)
{
avisQueueClosure* cl = (avisQueueClosure*)closure;
if (NULL ==cl) return;
diff --git a/mama/c_cpp/src/c/bridge/avis/sub.c b/mama/c_cpp/src/c/bridge/avis/sub.c
index 08fc697..5d0752d 100644
--- a/mama/c_cpp/src/c/bridge/avis/sub.c
+++ b/mama/c_cpp/src/c/bridge/avis/sub.c
@@ -72,7 +72,7 @@ const char* makeAvisSubject(const char* subject)
}


-static void
+static void MAMACALLTYPE
destroy_callback(void* subscriber, void* closure)
{
// cant do anything without a subscriber
@@ -92,7 +92,7 @@ destroy_callback(void* subscriber, void* closure)
* @param data The Avis Attributes* clone (must be freed)
* @param closure The subscriber
*/
-static void
+static void MAMACALLTYPE
avis_queue_callback (void* data, void* closure)
{
mama_status status;
diff --git a/mama/c_cpp/src/c/bridge/avis/subinitial.c b/mama/c_cpp/src/c/bridge/avis/subinitial.c
index 95c8ff3..7f58441 100644
--- a/mama/c_cpp/src/c/bridge/avis/subinitial.c
+++ b/mama/c_cpp/src/c/bridge/avis/subinitial.c
@@ -50,7 +50,7 @@ typedef struct avisInboxImpl
} while(0)


-static void
+static void MAMACALLTYPE
avisInbox_onMsg(
mamaSubscription subscription,
mamaMsg msg,
@@ -63,14 +63,14 @@ avisInbox_onMsg(
(avisInbox(closure)->mMsgCB)(msg, avisInbox(closure)->mClosure);
}

-static void
+static void MAMACALLTYPE
avisInbox_onCreate(
mamaSubscription subscription,
void* closure)
{
}

-static void
+static void MAMACALLTYPE
avisInbox_onDestroy(
mamaSubscription subscription,
void* closure)
@@ -81,7 +81,7 @@ avisInbox_onDestroy(
(avisInbox(closure)->mOnInboxDestroyed)(avisInbox(closure)->mParent, avisInbox(closure)->mClosure);
}

-static void
+static void MAMACALLTYPE
avisInbox_onError(
mamaSubscription subscription,
mama_status status,
diff --git a/mama/c_cpp/src/c/bridge/avis/timer.c b/mama/c_cpp/src/c/bridge/avis/timer.c
index 18aad31..4489ac6 100755
--- a/mama/c_cpp/src/c/bridge/avis/timer.c
+++ b/mama/c_cpp/src/c/bridge/avis/timer.c
@@ -42,7 +42,7 @@ typedef struct avisTimerImpl_
/* TODO: add queue */
} avisTimerImpl;

-static void
+static void MAMACALLTYPE
destroy_callback(void* timer, void* closure)
{
avisTimerImpl* impl = (avisTimerImpl*)timer;
@@ -51,7 +51,7 @@ destroy_callback(void* timer, void* closure)
free (impl);
}

-static void
+static void MAMACALLTYPE
timerQueueCb (void* data, void* closure)
{
avisTimerImpl* impl = (avisTimerImpl*)data;
--
1.7.7.6


[PATCH 30/50] [avis] Fixed Compiler Warnings

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Moved variable declarations to the beginning of blocks as intermingling
decarations and code is not allowed pre-C99. Also fixed a few other minor
warnings.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/bridge/avis/msg.c | 11 +++++------
mama/c_cpp/src/c/bridge/avis/publisher.c | 30 ++++++++++++++++++------------
mama/c_cpp/src/c/bridge/avis/queue.c | 18 ++++++++++--------
mama/c_cpp/src/c/bridge/avis/sub.c | 6 +++---
mama/c_cpp/src/c/bridge/avis/timer.c | 6 +++---
5 files changed, 39 insertions(+), 32 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/msg.c b/mama/c_cpp/src/c/bridge/avis/msg.c
index 8cc97eb..b422543 100644
--- a/mama/c_cpp/src/c/bridge/avis/msg.c
+++ b/mama/c_cpp/src/c/bridge/avis/msg.c
@@ -53,10 +53,11 @@ typedef struct avisMsgImpl
mama_status
avisBridgeMamaMsg_create (msgBridge* msg, mamaMsg parent)
{
+ avisMsgImpl* impl;
if (avisMsg(msg) == NULL) return MAMA_STATUS_NULL_ARG;
*msg = NULL;

- avisMsgImpl* impl = (avisMsgImpl*) calloc(1, sizeof(avisMsgImpl));
+ impl = (avisMsgImpl*) calloc(1, sizeof(avisMsgImpl));
if (!impl) return MAMA_STATUS_NOMEM;

mamaMsg_getNativeMsg(parent, (void**)&impl->mAvisMsg);
@@ -109,8 +110,8 @@ avisBridgeMamaMsg_getPlatformError (msgBridge msg, void** error)
mama_status
avisBridgeMamaMsgImpl_setReplyHandle (msgBridge msg, void* result)
{
- CHECK_MSG(msg);
mama_status status = MAMA_STATUS_OK;
+ CHECK_MSG(msg);
if (MAMA_STATUS_OK != (status = mamaMsg_updateString(avisMsg(msg)->mParent, INBOX_FIELD_NAME, 0, (const char*) result))) {
return status;
}
@@ -137,10 +138,9 @@ avisBridgeMamaMsg_setSendSubject (msgBridge msg,
const char* symbol,
const char* subject)
{
+ mama_status status = MAMA_STATUS_OK;
CHECK_MSG(msg);

- // TODO -- symbol?
- mama_status status = MAMA_STATUS_OK;
if (MAMA_STATUS_OK != (status = mamaMsg_updateString(avisMsg(msg)->mParent, SUBJECT_FIELD_NAME, 0, subject))) {
return status;
}
@@ -164,10 +164,9 @@ avisBridgeMamaMsg_getNativeHandle (msgBridge msg, void** result)
mama_status
avisBridgeMamaMsg_duplicateReplyHandle (msgBridge msg, void** result)
{
- CHECK_MSG(msg);
-
const char* replyAddr;
mama_status status = MAMA_STATUS_OK;
+ CHECK_MSG(msg);
if (MAMA_STATUS_OK != (status = mamaMsg_getString(avisMsg(msg)->mParent, INBOX_FIELD_NAME, 0, &replyAddr))) {
return status;
}
diff --git a/mama/c_cpp/src/c/bridge/avis/publisher.c b/mama/c_cpp/src/c/bridge/avis/publisher.c
index bc416fa..faaceb7 100644
--- a/mama/c_cpp/src/c/bridge/avis/publisher.c
+++ b/mama/c_cpp/src/c/bridge/avis/publisher.c
@@ -67,12 +67,14 @@ avisBridgeMamaPublisher_createByIndex (publisherBridge* result,
void* nativeQueueHandle,
mamaPublisher parent)
{
- if (!result || !tport) return MAMA_STATUS_NULL_ARG;
Elvin* avis = getAvis(tport);
+ avisPublisherBridge* publisher = NULL;
+
+ if (!result || !tport) return MAMA_STATUS_NULL_ARG;
CHECK_AVIS(avis);

*result = NULL;
- avisPublisherBridge* publisher = (avisPublisherBridge*) calloc (1, sizeof(avisPublisherBridge));
+ publisher = (avisPublisherBridge*) calloc (1, sizeof(avisPublisherBridge));
if (publisher == NULL)
return MAMA_STATUS_NOMEM;
publisher->mTransport = tport;
@@ -145,10 +147,11 @@ avisBridgeMamaPublisherImpl_buildSendSubject (avisPublisherBridge* impl)
mama_status
avisBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg)
{
- CHECK_PUBLISHER(publisher);
mama_size_t dataLen;
mama_status status;
- Attributes* attributes;
+ Attributes* attributes = NULL;
+
+ CHECK_PUBLISHER(publisher);

status = mamaMsgImpl_getPayloadBuffer (msg, (const void**)&attributes, &dataLen);
if (attributes == NULL)
@@ -173,18 +176,20 @@ avisBridgeMamaPublisher_sendReplyToInbox (publisherBridge publisher,
mamaMsg request,
mamaMsg reply)
{
- CHECK_PUBLISHER(publisher);
Attributes* requestMsg = NULL;
Attributes* replyMsg = NULL;
+ const char* replyAddr = NULL;
mama_size_t dataLen;
+ mama_status status;
+
+ CHECK_PUBLISHER(publisher);

mamaMsg_getNativeHandle(request, (void**) &requestMsg);
mamaMsgImpl_getPayloadBuffer (reply, (const void**)&replyMsg, &dataLen);

if (!requestMsg || !replyMsg) return MAMA_STATUS_NULL_ARG;

- const char* replyAddr = NULL;
- mama_status status = mamaMsg_getString(request, INBOX_FIELD_NAME, 0, &replyAddr);
+ status = mamaMsg_getString(request, INBOX_FIELD_NAME, 0, &replyAddr);
if ((status != MAMA_STATUS_OK) || (replyAddr == NULL) || (strlen(replyAddr) == 0)) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaPublisher_sendReplyToInbox(): "
"No reply address in message.");
@@ -223,6 +228,8 @@ avisBridgeMamaPublisher_sendFromInboxByIndex (publisherBridge publisher,
mamaInbox inbox,
mamaMsg msg)
{
+ const char* replyAddr = NULL;
+ mama_status status;
// CHECK_PUBLISHER(publisher);
if (avisPublisher(publisher) == 0)
return MAMA_STATUS_NULL_ARG;
@@ -231,13 +238,11 @@ avisBridgeMamaPublisher_sendFromInboxByIndex (publisherBridge publisher,
if (!elvin_is_open(avisPublisher(publisher)->mAvis))
return MAMA_STATUS_INVALID_ARG;

-
-
// get reply address from inbox
- const char* replyAddr = avisInboxImpl_getReplySubject(mamaInboxImpl_getInboxBridge(inbox));
+ replyAddr = avisInboxImpl_getReplySubject(mamaInboxImpl_getInboxBridge(inbox));

// set reply address in msg
- mama_status status = mamaMsg_updateString(msg, INBOX_FIELD_NAME, 0, replyAddr);
+ status = mamaMsg_updateString(msg, INBOX_FIELD_NAME, 0, replyAddr);
if (status != MAMA_STATUS_OK)
return status;

@@ -258,9 +263,10 @@ avisBridgeMamaPublisher_sendReplyToInboxHandle (publisherBridge publisher,
void * inbox,
mamaMsg reply)
{
+ mama_status status;
CHECK_PUBLISHER(publisher);

- mama_status status = mamaMsg_updateString(reply, SUBJECT_FIELD_NAME, 0, (const char*) inbox);
+ status = mamaMsg_updateString(reply, SUBJECT_FIELD_NAME, 0, (const char*) inbox);
if (status != MAMA_STATUS_OK)
return status;

diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c
index 24ee8d1..fd8c629 100644
--- a/mama/c_cpp/src/c/bridge/avis/queue.c
+++ b/mama/c_cpp/src/c/bridge/avis/queue.c
@@ -51,11 +51,12 @@ mama_status
avisBridgeMamaQueue_create (queueBridge* queue,
mamaQueue parent)
{
+ avisQueueBridge* avisQueue = NULL;
if (queue == NULL)
return MAMA_STATUS_NULL_ARG;
*queue = NULL;

- avisQueueBridge* avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
+ avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
if (avisQueue == NULL)
return MAMA_STATUS_NOMEM;

@@ -74,11 +75,12 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue,
mamaQueue parent,
void* nativeQueue)
{
+ avisQueueBridge* avisQueue = NULL;
if (queue == NULL)
return MAMA_STATUS_NULL_ARG;
*queue = NULL;

- avisQueueBridge* avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
+ avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
if (avisQueue == NULL)
return MAMA_STATUS_NOMEM;

@@ -104,9 +106,9 @@ avisBridgeMamaQueue_destroy (queueBridge queue)
mama_status
avisBridgeMamaQueue_dispatch (queueBridge queue)
{
- CHECK_QUEUE(queue);
wombatQueueStatus status;

+ CHECK_QUEUE(queue);
do
{
/* 500 is .5 seconds */
@@ -132,8 +134,8 @@ avisBridgeMamaQueue_dispatch (queueBridge queue)
mama_status
avisBridgeMamaQueue_timedDispatch (queueBridge queue, uint64_t timeout)
{
- CHECK_QUEUE(queue);
wombatQueueStatus status;
+ CHECK_QUEUE(queue);

status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue,
NULL, NULL, timeout);
@@ -154,8 +156,8 @@ avisBridgeMamaQueue_timedDispatch (queueBridge queue, uint64_t timeout)
mama_status
avisBridgeMamaQueue_dispatchEvent (queueBridge queue)
{
- CHECK_QUEUE(queue);
wombatQueueStatus status;
+ CHECK_QUEUE(queue);

status = wombatQueue_dispatch (avisQueue(queue)->mQueue,
NULL, NULL);
@@ -186,10 +188,10 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge queue,
void* closure)
{
wombatQueueStatus status;
+ avisQueueClosure* cl = NULL;
CHECK_QUEUE(queue);

- avisQueueClosure* cl =
- (avisQueueClosure*)calloc(1, sizeof(avisQueueClosure));
+ cl = (avisQueueClosure*)calloc(1, sizeof(avisQueueClosure));
if (NULL == cl) return MAMA_STATUS_NOMEM;

cl->mImpl = avisQueue(queue);
@@ -208,8 +210,8 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge queue,
mama_status
avisBridgeMamaQueue_stopDispatch (queueBridge queue)
{
- CHECK_QUEUE(queue);
wombatQueueStatus status;
+ CHECK_QUEUE(queue);

if (queue == NULL)
return MAMA_STATUS_NULL_ARG;
diff --git a/mama/c_cpp/src/c/bridge/avis/sub.c b/mama/c_cpp/src/c/bridge/avis/sub.c
index edc026b..08fc697 100644
--- a/mama/c_cpp/src/c/bridge/avis/sub.c
+++ b/mama/c_cpp/src/c/bridge/avis/sub.c
@@ -153,7 +153,6 @@ avis_queue_callback (void* data, void* closure)

attributes_free ((Attributes*)data);
free ((Attributes*)data);
-
}

static void
@@ -199,10 +198,11 @@ avisBridgeMamaSubscription_create (subscriptionBridge* subscriber,
mamaSubscription subscription,
void* closure)
{
+ avisSubscription* impl = NULL;
if (!subscriber || !subscription || !transport )
return MAMA_STATUS_NULL_ARG;

- avisSubscription* impl = (avisSubscription*)calloc (1, sizeof(avisSubscription));
+ impl = (avisSubscription*)calloc (1, sizeof(avisSubscription));
if (impl == NULL)
return MAMA_STATUS_NOMEM;

@@ -269,9 +269,9 @@ avisBridgeMamaSubscription_createWildCard (
mama_status
avisBridgeMamaSubscription_destroy (subscriptionBridge subscriber)
{
- CHECK_SUBSCRIBER(subscriber);
mama_status status = MAMA_STATUS_OK;
wombatQueue queue = NULL;
+ CHECK_SUBSCRIBER(subscriber);

elvin_subscription_remove_listener(avisSub(subscriber)->mAvisSubscription, avis_callback);

diff --git a/mama/c_cpp/src/c/bridge/avis/timer.c b/mama/c_cpp/src/c/bridge/avis/timer.c
index 7a69051..18aad31 100755
--- a/mama/c_cpp/src/c/bridge/avis/timer.c
+++ b/mama/c_cpp/src/c/bridge/avis/timer.c
@@ -72,7 +72,7 @@ timerCb (timerElement timer,

/* Mama timers are repeating */
timeout.tv_sec = (time_t)impl->mInterval;
- timeout.tv_usec = (suseconds_t)((impl->mInterval- timeout.tv_sec) * 1000000.0);
+ timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
if (0 != createTimer (&impl->mTimerElement,
gTimerHeap,
timerCb,
@@ -122,7 +122,7 @@ avisBridgeMamaTimer_create (timerBridge* result,
*result = (timerBridge)impl;

timeout.tv_sec = (time_t)interval;
- timeout.tv_usec = (suseconds_t)((interval-timeout.tv_sec) * 1000000.0);
+ timeout.tv_usec = ((interval-timeout.tv_sec) * 1000000.0);
if (0 != createTimer (&impl->mTimerElement,
gTimerHeap,
timerCb,
@@ -180,7 +180,7 @@ avisBridgeMamaTimer_reset (timerBridge timer)
impl = (avisTimerImpl*)timer;

timeout.tv_sec = (time_t)impl->mInterval;
- timeout.tv_usec = (suseconds_t)((impl->mInterval-timeout.tv_sec) * 1000000.0);
+ timeout.tv_usec = ((impl->mInterval-timeout.tv_sec) * 1000000.0);

if (timer == NULL)
return MAMA_STATUS_NULL_ARG;
--
1.7.7.6


[PATCH 29/50] [avis] Use Platform Independent UUID from Common

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Windows does not support libuuid (DCE Compatible Universally Unique
Identifier Library). The OpenMAMA common library provides a compatibility
layer.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/bridge/avis/Makefile.am | 2 +-
mama/c_cpp/src/c/bridge/avis/subinitial.c | 18 +++++++++---------
2 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/Makefile.am b/mama/c_cpp/src/c/bridge/avis/Makefile.am
index 8cc5913..4385ff0 100644
--- a/mama/c_cpp/src/c/bridge/avis/Makefile.am
+++ b/mama/c_cpp/src/c/bridge/avis/Makefile.am
@@ -40,7 +40,7 @@ CFLAGS += -Wimplicit -Wno-long-long -Wmissing-prototypes -Wstrict-prototypes -Wa
CPPFLAGS += -Wno-long-long -Wimplicit -Wno-long-long -Wmissing-prototypes -Wstrict-prototypes -Wall
endif

-LIBS = -lavis -luuid -lmama -lm -lwombatcommon
+LIBS = -lavis -lmama -lm -lwombatcommon -luuid

libmamaavisimpl_la_SOURCES = \
bridge.c \
diff --git a/mama/c_cpp/src/c/bridge/avis/subinitial.c b/mama/c_cpp/src/c/bridge/avis/subinitial.c
index 7567732..95c8ff3 100644
--- a/mama/c_cpp/src/c/bridge/avis/subinitial.c
+++ b/mama/c_cpp/src/c/bridge/avis/subinitial.c
@@ -19,7 +19,7 @@
* 02110-1301 USA
*/

-#include <uuid/uuid.h>
+#include <wombat/wUuid.h>

#include <mama/mama.h>
#include <bridge.h>
@@ -107,27 +107,27 @@ avisBridgeMamaInbox_createByIndex (inboxBridge* bridge,
void* closure,
mamaInbox parent)
{
+ wUuid t_uuid;
+ char t_str[37];
+ mamaMsgCallbacks cb;
+ avisInboxImpl* impl = NULL;
+ mama_status status = MAMA_STATUS_OK;
if (!bridge || !transport || !queue || !msgCB) return MAMA_STATUS_NULL_ARG;
- avisInboxImpl* impl = (avisInboxImpl*)calloc(1, sizeof(avisInboxImpl));
+ impl = (avisInboxImpl*)calloc(1, sizeof(avisInboxImpl));
if (!impl)
return MAMA_STATUS_NOMEM;

- mama_status status = MAMA_STATUS_OK;
if (MAMA_STATUS_OK != (status = mamaSubscription_allocate(&impl->mSubscription))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaInbox_createByIndex(): Failed to allocate subscription ");
free(impl);
return status;
}

- uuid_t t_uuid;
// NOTE: uuid_generate is very expensive, so we use cheaper uuid_generate_time
- //uuid_generate(uuid);
- uuid_generate_time(t_uuid);
- char t_str[uuidStringLen+1];
- uuid_unparse(t_uuid, t_str);
+ wUuid_generate_time(t_uuid);
+ wUuid_unparse(t_uuid, t_str);
snprintf(impl->mInbox, sizeof(impl->mInbox) -1, "_INBOX.%s", t_str);

- mamaMsgCallbacks cb;
cb.onCreate = &avisInbox_onCreate;
cb.onError = &avisInbox_onError;
cb.onMsg = &avisInbox_onMsg;
--
1.7.7.6


[PATCH 28/50] [mama] Minor Header Adjustments

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Moved #includes outside of #extern "C" in types.h to avoid issues on Windows
where standard headers may contain C++ templates.

Include port.h from middleware.c.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama/types.h | 9 +++++----
mama/c_cpp/src/c/middleware.c | 1 +
2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/types.h b/mama/c_cpp/src/c/mama/types.h
index d26c206..e0227ff 100644
--- a/mama/c_cpp/src/c/mama/types.h
+++ b/mama/c_cpp/src/c/mama/types.h
@@ -22,15 +22,14 @@
#ifndef MamaTypesH__
#define MamaTypesH__

-#if defined(__cplusplus)
-extern "C" {
-#endif

#include "wombat/port.h"
#include <stdlib.h>
#include <string.h>

-#define MAMA_QUANTITY_EPSILON ((mama_f64_t)0.00000000001)
+#if defined(__cplusplus)
+extern "C" {
+#endif

typedef int8_t mama_i8_t;
typedef uint8_t mama_u8_t;
@@ -48,6 +47,8 @@ typedef uint16_t mama_fid_t;
typedef size_t mama_size_t;
typedef uint32_t mama_seqnum_t;

+#define MAMA_QUANTITY_EPSILON ((mama_f64_t)0.00000000001)
+
/**
* Macro to determine if a quantity is zero
*/
diff --git a/mama/c_cpp/src/c/middleware.c b/mama/c_cpp/src/c/middleware.c
index ce80c4f..ad288ad 100644
--- a/mama/c_cpp/src/c/middleware.c
+++ b/mama/c_cpp/src/c/middleware.c
@@ -19,6 +19,7 @@
* 02110-1301 USA
*/

+#include "wombat/port.h"
#include <mama/middleware.h>
#include <string.h>

--
1.7.7.6


[PATCH 27/50] [mama] Log Messages with MAMA_MSG_STATUS_UNKNOWN

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

On receiving a message with a status of MAMA_MSG_STATUS_UNKNOWN, log the message
if at FINEST and track it update the statistics for the transport, queue and
application.

Also tidied up the file a little: converted tabs to space, reformatted long
lines, etc.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/listenermsgcallback.c | 122 +++++++++++++++++---------------
1 files changed, 64 insertions(+), 58 deletions(-)

diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c
index cbe1c90..532a433 100644
--- a/mama/c_cpp/src/c/listenermsgcallback.c
+++ b/mama/c_cpp/src/c/listenermsgcallback.c
@@ -45,33 +45,12 @@ extern int gGenerateTransportStats;
extern int gGenerateGlobalStats;
extern int gGenerateQueueStats;

-/* *************************************************** */
-/* Private Function Prototypes. */
-/* *************************************************** */
-
-/**
- * This function will invoke the subscription's onError callback passing in a particular error code.
- *
- * @param[in] callback The impl.
- * @param[in] ctx The subscription context.
- * @param[in] mamaStatus The status that will be passed to the error callback.
- * @param[in] subscription The subscription.
- * @param[in] userSymbol The symbol.
- */
-static void
-listenerMsgCallbackImpl_invokeErrorCallback(listenerMsgCallback callback,
+/* Function prototypes. */
+void listenerMsgCallback_invokeErrorCallback(listenerMsgCallback callback,
SubjectContext *ctx, mama_status mamaStatus, mamaSubscription
subscription, const char *userSymbol);

-/**
- * This function will write a log message if an unknown message status is detected.
- *
- * @param[in] ctx The subscription context.
- * @param[in] status The message status.
- * @param[in] subscription The subscription.
- */
-static void
-listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
+void listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
mamaSubscription subscription);

/**
@@ -195,7 +174,8 @@ static void processPointToPointMessage (msgCallback* callback,

/* Mark the subscription as inactive if we are not expecting
* any more updates. */
- if (!mamaSubscription_isExpectingUpdates (self->mSubscription))
+ if (!mamaSubscription_isExpectingUpdates (self->mSubscription) &&
+ !mamaSubscription_getAcceptMultipleInitials (self->mSubscription))
{
mamaSubscription_deactivate (self->mSubscription);
}
@@ -223,6 +203,33 @@ static void processPointToPointMessage (msgCallback* callback,
* may have been destroyed in the callback! */
}

+/* Description : This function will invoke the subscription's onError callback
+ * passing in a particular error code.
+ */
+void listenerMsgCallback_invokeErrorCallback(listenerMsgCallback callback,
+ SubjectContext *ctx, mama_status mamaStatus, mamaSubscription
+ subscription, const char *userSymbol)
+{
+ /* Local variables. */
+ void *closure = NULL;
+
+ /* Get the callback object from the subscription. */
+ mamaMsgCallbacks *cbs = mamaSubscription_getUserCallbacks (subscription);
+
+ /* Wait for a response. */
+ mamaSubscription_stopWaitForResponse(subscription, ctx);
+
+ /* Get the closure from the subscription. */
+ mamaSubscription_getClosure (subscription, &closure);
+
+ mama_setLastError (MAMA_ERROR_DEFAULT);
+ cbs->onError (subscription,
+ mamaStatus,
+ NULL,
+ userSymbol,
+ closure);
+}
+
static int isInitialMessageOrRecap (msgCallback *callback, int msgType)
{
return msgType == MAMA_MSG_TYPE_INITIAL ||
@@ -247,7 +254,7 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
mamaTransport transport;
mamaStatsCollector* queueStatsCollector = NULL;
mamaStatsCollector* tportStatsCollector = NULL;
- const char* userSymbol = NULL;
+ const char* userSymbol = NULL;

if (!ctx)
{
@@ -283,22 +290,25 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
MamaStatNumMessages.mFid);

/* Get the user symbol from the subscription. */
- mamaSubscription_getSymbol(subscription, &userSymbol);
+ mamaSubscription_getSymbol(subscription, &userSymbol);

if (status != MAMA_MSG_STATUS_OK)
{
switch (status)
{
case MAMA_MSG_STATUS_NOT_PERMISSIONED:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_PERMISSIONED, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_NOT_PERMISSIONED, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_BAD_SYMBOL:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_BAD_SYMBOL, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_BAD_SYMBOL, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_NOT_FOUND:
- listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_FOUND, subscription, userSymbol);
+ listenerMsgCallback_invokeErrorCallback(callback, ctx,
+ MAMA_STATUS_NOT_FOUND, subscription, userSymbol);
return;

case MAMA_MSG_STATUS_NO_SUBSCRIBERS:
@@ -338,6 +348,30 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
listenerMsgCallbackImpl_logUnknownStatus(ctx, status, subscription);
break;
}
+ case MAMA_MSG_STATUS_UNKNOWN:
+ {
+ listenerMsgCallbackImpl_logUnknownStatus(ctx, status, subscription);
+ mamaSubscription_setPossiblyStale(subscription);
+
+ if (queueStatsCollector)
+ {
+ mamaStatsCollector_incrementStat (*queueStatsCollector,
+ MamaStatUnknownMsgs.mFid);
+ }
+ if (tportStatsCollector)
+ {
+ mamaStatsCollector_incrementStat (*tportStatsCollector,
+ MamaStatUnknownMsgs.mFid);
+ }
+ if (mamaInternal_getGlobalStatsCollector())
+ {
+ mamaStatsCollector_incrementStat
+ (*(mamaInternal_getGlobalStatsCollector()),
+ MamaStatUnknownMsgs.mFid);
+ }
+ return; //throw away msg
+ break;
+ }
default:
{
/* Log the fact we have received an unknown message. */
@@ -587,34 +621,6 @@ checkEntitlement( msgCallback *callback, mamaMsg msg, SubjectContext* ctx )
#endif /* WITH_ENTITLEMENTS */
}

-/* *************************************************** */
-/* Private Functions. */
-/* *************************************************** */
-
-/* Description: This function will invoke the subscription's onError callback passing in a particular error code.
- */
-void listenerMsgCallbackImpl_invokeErrorCallback(listenerMsgCallback callback, SubjectContext *ctx, mama_status mamaStatus, mamaSubscription subscription, const char *userSymbol)
-{
- /* Local variables. */
- void *closure = NULL;
-
- /* Get the callback object from the subscription. */
- mamaMsgCallbacks *cbs = mamaSubscription_getUserCallbacks (subscription);
-
- /* Wait for a response. */
- mamaSubscription_stopWaitForResponse(subscription, ctx);
-
- /* Get the closure from the subscription. */
- mamaSubscription_getClosure (subscription, &closure);
-
- mama_setLastError (MAMA_ERROR_DEFAULT);
- cbs->onError (subscription,
- mamaStatus,
- NULL,
- userSymbol,
- closure);
-}
-
void listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status,
mamaSubscription subscription)
{
--
1.7.7.6


[PATCH 26/50] [mama] Do not modify attached messages

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

For some middlewares with "zero-copy" semantics like shared memory or RDMA, applications share message payloads. Others may re-use buffers. In these cases, modifying a message potentially causes undesirable affects on other applications. For this reason, applications must call mamaMsg_detach() to take ownership of or copy the payload prior to updating or adding fields. This change enforces the read-only semantics prior to detaching the message.

This change also removes the unnecessary mamaMsg_createNative() method. The concept of a native payload is arbitrary and redundant since we already have the notion of a default payload.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama/status.h | 6 +-
mama/c_cpp/src/c/msg.c | 139 ++++++++++++++++++++++++++++++---------
mama/c_cpp/src/c/msgfield.c | 123 +++++++++++++++++++++++++++++-----
mama/c_cpp/src/c/msgfieldimpl.h | 2 +-
mama/c_cpp/src/c/msgimpl.h | 3 +
mama/c_cpp/src/c/status.c | 2 +-
6 files changed, 219 insertions(+), 56 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/status.h b/mama/c_cpp/src/c/mama/status.h
index fa0c5f5..1c48c00 100644
--- a/mama/c_cpp/src/c/mama/status.h
+++ b/mama/c_cpp/src/c/mama/status.h
@@ -92,6 +92,8 @@ typedef enum
MAMA_STATUS_NO_BRIDGE_IMPL = 26,
/* Invalid queue */
MAMA_STATUS_INVALID_QUEUE = 27,
+ /* Not modifiable */
+ MAMA_STATUS_NOT_MODIFIABLE = 28,
/* Not permissioned for the subject */
MAMA_STATUS_NOT_PERMISSIONED = 4001,
/* Subscription is in an invalid state. */
@@ -99,9 +101,7 @@ typedef enum
/* Queue has open objects. */
MAMA_STATUS_QUEUE_OPEN_OBJECTS = 5002,
/* The function isn't supported for this type of subscription. */
- MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE = 5003,
- /* The underlying transport saw a gap. */
- MAMA_STATUS_SUBSCRIPTION_GAP = 5004
+ MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE = 5003

#ifdef WITH_ENTITLEMENTS
/* Out of memory */
diff --git a/mama/c_cpp/src/c/msg.c b/mama/c_cpp/src/c/msg.c
index af51d25..d3c0e80 100644
--- a/mama/c_cpp/src/c/msg.c
+++ b/mama/c_cpp/src/c/msg.c
@@ -67,6 +67,7 @@ typedef struct mamaMsgImpl_
char mSubject[MAX_SUBJECT];

msgPayload mPayload;
+ msgPayload mPayloads[MAMA_PAYLOAD_MAX];
/* Set of get/set/update methods to use for a non wmsg payload */
mamaPayloadBridgeImpl* mPayloadBridge;

@@ -358,6 +359,18 @@ mamaMsg_getPayloadType (mamaMsg msg, mamaPayloadType* payloadType)
return MAMA_STATUS_OK;
}

+mama_status
+mamaMsgImpl_getPayload (const mamaMsg msg, msgPayload* payload)
+{
+ mamaMsgImpl* impl = (mamaMsgImpl*)msg;
+
+ if (!impl || !payload) return MAMA_STATUS_NULL_ARG;
+
+ *payload = impl->mPayload;
+
+ return MAMA_STATUS_OK;
+}
+
const char*
mamaPayload_convertToString (mamaPayloadType payloadType)
{
@@ -465,6 +478,7 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
mama_status status = MAMA_STATUS_OK;
msgPayload payload = NULL;
+ mamaPayloadBridgeImpl* newPayloadBridge = NULL;
if (impl == NULL)
{
mama_log (MAMA_LOG_LEVEL_WARN,
@@ -502,9 +516,10 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
impl->mMessageOwner = 0;

if (id == '\0')
- id = (char) ((const char*)data) [0];
+ id = (char) ((const char*)data) [0];

impl->mPayloadBridge = mamaInternal_findPayload(id);
+ impl->mPayload = impl->mPayloads[(uint8_t)id];

if (!impl->mPayloadBridge) return MAMA_STATUS_NO_BRIDGE_IMPL;

@@ -519,6 +534,7 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
{
return status;
}
+ impl->mPayloads[(uint8_t)id] = payload;
/* The middleware does not own this message */
return mamaMsgImpl_setPayload (msg, payload, 0);
}
@@ -709,13 +725,14 @@ mamaMsg_create (mamaMsg* msg)
mamaPayloadBridge bridge = mamaInternal_getDefaultPayload ();
msgPayload payload = NULL;

- if (MAMA_STATUS_OK !=
- (status = bridge->msgPayloadCreate (&payload)))
+ if (bridge)
{
- *msg = NULL;
- return status;
+ if (MAMA_STATUS_OK != (status = bridge->msgPayloadCreate (&payload)))
+ {
+ *msg = NULL;
+ return status;
+ }
}
-
return mamaMsgImpl_createForPayload (msg,
payload,
bridge,
@@ -723,24 +740,6 @@ mamaMsg_create (mamaMsg* msg)
}

mama_status
-mamaMsg_createNative (mamaMsg* msg, mamaBridge bridge)
-{
- mamaBridgeImpl* impl = (mamaBridgeImpl*) bridge;
- if (impl)
- {
- char*payloadName;
- char payloadId;
-
- if (impl->bridgeGetDefaultPayloadId(&payloadName, &payloadId) == MAMA_STATUS_OK)
- {
- return (mamaMsg_createForPayload (msg, payloadId));
- }
- }
- return MAMA_STATUS_INVALID_ARG;
-}
-
-
-mama_status
mamaMsg_getSendSubject (const mamaMsg msg, const char** subject)
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
@@ -797,10 +796,12 @@ mamaMsgImpl_getStatusFromMsg (mamaMsg msg)
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
int32_t result = MAMA_MSG_STATUS_UNKNOWN;
- mamaMsg_getI32 (msg,
+ if (mamaMsg_getI32 (msg,
MamaFieldMsgStatus.mName,
MamaFieldMsgStatus.mFid,
- &result);
+ &result) != MAMA_STATUS_OK)
+ result = MAMA_MSG_STATUS_UNKNOWN;
+
impl->mStatus = (mamaMsgStatus) result;
return (mamaMsgStatus) result;
}
@@ -851,6 +852,7 @@ mamaMsg_addBool(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddBool (impl->mPayload,
name,
@@ -868,6 +870,7 @@ mamaMsg_addChar(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddChar (impl->mPayload,
name,
@@ -885,6 +888,7 @@ mamaMsg_addI8(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddI8 (impl->mPayload,
name,
@@ -902,6 +906,7 @@ mamaMsg_addU8(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddU8 (impl->mPayload,
name,
@@ -919,6 +924,7 @@ mamaMsg_addI16(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddI16 (impl->mPayload,
name,
@@ -936,6 +942,7 @@ mamaMsg_addU16(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddU16 (impl->mPayload,
name,
@@ -955,6 +962,7 @@ mamaMsg_addI32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddI32 (impl->mPayload,
name,
@@ -972,6 +980,7 @@ mamaMsg_addU32(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddU32 (impl->mPayload,
name,
@@ -989,6 +998,7 @@ mamaMsg_addI64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddI64 (impl->mPayload,
name,
@@ -1006,6 +1016,7 @@ mamaMsg_addU64(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddU64 (impl->mPayload,
name,
@@ -1023,6 +1034,7 @@ mamaMsg_addF32(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddF32 (impl->mPayload,
name,
@@ -1040,6 +1052,7 @@ mamaMsg_addF64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddF64 (impl->mPayload,
name,
@@ -1057,6 +1070,7 @@ mamaMsg_addString (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddString (impl->mPayload,
name,
@@ -1075,6 +1089,7 @@ mamaMsg_addOpaque(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddOpaque (impl->mPayload,
name,
@@ -1093,6 +1108,7 @@ mamaMsg_addDateTime(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddDateTime (impl->mPayload,
name,
@@ -1110,6 +1126,7 @@ mamaMsg_addPrice(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddPrice (impl->mPayload,
name,
@@ -1128,6 +1145,7 @@ mamaMsg_addMsg(
mamaMsgImpl* subMsg = (mamaMsgImpl*)value;
if (!impl || !subMsg || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;

+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddMsg (impl->mPayload,
name,
@@ -1148,6 +1166,7 @@ mamaMsg_addVectorBool (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorBool (impl->mPayload,
name,
@@ -1167,6 +1186,7 @@ mamaMsg_addVectorChar (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorChar (impl->mPayload,
name,
@@ -1186,6 +1206,7 @@ mamaMsg_addVectorI8 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorI8 (impl->mPayload,
name,
@@ -1205,6 +1226,7 @@ mamaMsg_addVectorU8 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorU8 (impl->mPayload,
name,
@@ -1224,6 +1246,7 @@ mamaMsg_addVectorI16 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorI16 (impl->mPayload,
name,
@@ -1243,6 +1266,7 @@ mamaMsg_addVectorU16 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorU16 (impl->mPayload,
name,
@@ -1262,6 +1286,7 @@ mamaMsg_addVectorI32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorI32 (impl->mPayload,
name,
@@ -1281,6 +1306,7 @@ mamaMsg_addVectorU32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorU32 (impl->mPayload,
name,
@@ -1300,6 +1326,7 @@ mamaMsg_addVectorI64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorI64 (impl->mPayload,
name,
@@ -1319,6 +1346,7 @@ mamaMsg_addVectorU64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorU64 (impl->mPayload,
name,
@@ -1338,6 +1366,7 @@ mamaMsg_addVectorF32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorF32 (impl->mPayload,
name,
@@ -1357,6 +1386,7 @@ mamaMsg_addVectorF64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorF64 (impl->mPayload,
name,
@@ -1376,6 +1406,7 @@ mamaMsg_addVectorString (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorString (impl->mPayload,
name,
@@ -1395,6 +1426,7 @@ mamaMsg_addVectorDateTime (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorDateTime (
impl->mPayload,
@@ -1414,6 +1446,7 @@ mamaMsg_addVectorPrice (
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadAddVectorPrice (impl->mPayload,
name,
@@ -1438,6 +1471,7 @@ mamaMsg_addVectorMsg(
return MAMA_STATUS_NULL_ARG;
}

+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;
return impl->mPayloadBridge->msgPayloadAddVectorMsg (impl->mPayload,
name,
fid,
@@ -1722,7 +1756,7 @@ mamaMsg_getField(
}
impl->mCurrentField->myPayloadBridge = impl->mPayloadBridge;
impl->mCurrentField->myPayload = impl->mFieldPayload;
- impl->mCurrentField->myMsgPayload = impl->mPayload;
+ impl->mCurrentField->myMsg = impl;

mamaField = (mamaMsgFieldImpl*)impl->mCurrentField;
mamaField->myDictionary = NULL;
@@ -1805,6 +1839,7 @@ mamaMsg_updateBool(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_OK;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1826,6 +1861,7 @@ mamaMsg_updateChar(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1847,6 +1883,7 @@ mamaMsg_updateI8(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1868,6 +1905,7 @@ mamaMsg_updateU8(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1889,6 +1927,7 @@ mamaMsg_updateI16(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1910,6 +1949,7 @@ mamaMsg_updateU16(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1931,6 +1971,7 @@ mamaMsg_updateI32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1952,6 +1993,7 @@ mamaMsg_updateU32(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

if (impl->mPayloadBridge)
{
@@ -1973,6 +2015,7 @@ mamaMsg_updateI64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateU64 (impl->mPayload,
name,
@@ -1990,6 +2033,7 @@ mamaMsg_updateU64(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateU64 (impl->mPayload,
name,
@@ -2007,6 +2051,7 @@ mamaMsg_updateF32(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateF32 (impl->mPayload,
name,
@@ -2024,6 +2069,7 @@ mamaMsg_updateF64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateF64 (impl->mPayload,
name,
@@ -2041,6 +2087,7 @@ mamaMsg_updateString (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateString (impl->mPayload,
name,
@@ -2055,6 +2102,7 @@ mamaMsg_applyMsg (mamaMsg msg, mamaMsg src)
mamaMsgImpl* source = (mamaMsgImpl*)src;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadApply (impl->mPayload,
source->mPayload);
@@ -2080,6 +2128,7 @@ mamaMsg_updateSubMsg (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateSubMsg (impl->mPayload,
name,
@@ -2098,6 +2147,7 @@ mamaMsg_updateOpaque (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateOpaque (impl->mPayload,
name,
@@ -2116,6 +2166,7 @@ mamaMsg_updateDateTime(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateDateTime (impl->mPayload,
name,
@@ -2133,6 +2184,7 @@ mamaMsg_updatePrice(
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdatePrice (impl->mPayload,
name,
@@ -2151,6 +2203,7 @@ mamaMsg_updateVectorMsg (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorMsg (impl->mPayload,
name,
@@ -2170,6 +2223,7 @@ mamaMsg_updateVectorString (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorString (
impl->mPayload,
@@ -2190,6 +2244,7 @@ mamaMsg_updateVectorBool (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorBool (
impl->mPayload,
@@ -2210,6 +2265,7 @@ mamaMsg_updateVectorChar (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorChar (
impl->mPayload,
@@ -2230,6 +2286,7 @@ mamaMsg_updateVectorI8 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorI8 (impl->mPayload,
name,
@@ -2249,6 +2306,7 @@ mamaMsg_updateVectorU8 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorU8 (impl->mPayload,
name,
@@ -2269,6 +2327,7 @@ mamaMsg_updateVectorI16 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorI16 (impl->mPayload,
name,
@@ -2288,6 +2347,7 @@ mamaMsg_updateVectorU16 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorU16 (impl->mPayload,
name,
@@ -2307,6 +2367,7 @@ mamaMsg_updateVectorI32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorI32 (impl->mPayload,
name,
@@ -2326,6 +2387,7 @@ mamaMsg_updateVectorU32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorU32 (impl->mPayload,
name,
@@ -2345,6 +2407,7 @@ mamaMsg_updateVectorI64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorI64 (impl->mPayload,
name,
@@ -2364,6 +2427,7 @@ mamaMsg_updateVectorU64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorU64 (impl->mPayload,
name,
@@ -2383,6 +2447,7 @@ mamaMsg_updateVectorF32 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorF32 (impl->mPayload,
name,
@@ -2402,6 +2467,7 @@ mamaMsg_updateVectorF64 (
mamaMsgImpl* impl = (mamaMsgImpl*)msg;

if (!impl || !impl->mPayloadBridge) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mMessageOwner) return MAMA_STATUS_NOT_MODIFIABLE;

return impl->mPayloadBridge->msgPayloadUpdateVectorF64 (impl->mPayload,
name,
@@ -2542,7 +2608,7 @@ mamaMsg_iterateFields (const mamaMsg msg,
if (impl->mPayloadBridge)
{
impl->mCurrentField->myPayloadBridge = impl->mPayloadBridge;
- impl->mCurrentField->myMsgPayload = impl->mPayload;
+ impl->mCurrentField->myMsg = impl;
return (impl->mPayloadBridge->msgPayloadIterateFields (impl->mPayload,
msg,
impl->mCurrentField,
@@ -3312,7 +3378,7 @@ mamaMsgIterator_associate (mamaMsgIterator iterator, mamaMsg msg)
if (msgImpl->mPayloadBridge)
{
itrImpl->mCurrentField->myPayloadBridge = msgImpl->mPayloadBridge;
- itrImpl->mCurrentField->myMsgPayload = msgImpl->mPayload;
+ itrImpl->mCurrentField->myMsg = msgImpl;
itrImpl->mPayloadBridge = msgImpl->mPayloadBridge;

/* Create the native payload iter if it hasn't been created already */
@@ -3393,12 +3459,15 @@ mamaMsgIterator_next (mamaMsgIterator iterator)
if (impl->mPayloadBridge)
{
msgFieldPayload msgField = NULL;
+ msgPayload payload = NULL;
+
+ mamaMsgImpl_getPayload (currentField->myMsg, &payload);

if (NULL == (msgField =
(impl->mPayloadBridge->msgPayloadIterNext (
impl->mPayloadIter,
currentField->myPayload,
- currentField->myMsgPayload))))
+ payload))))
{
return NULL;
}
@@ -3422,9 +3491,12 @@ mamaMsgIterator_hasNext (mamaMsgIterator iterator)

if (impl->mPayloadBridge)
{
+ msgPayload payload = NULL;
+
+ mamaMsgImpl_getPayload (currentField->myMsg, &payload);
return impl->mPayloadBridge->msgPayloadIterHasNext (
impl->mPayloadIter,
- currentField->myMsgPayload);
+ payload);

}
return (0);
@@ -3442,12 +3514,15 @@ mamaMsgIterator_begin (mamaMsgIterator iterator)
if (impl->mPayloadBridge)
{
msgFieldPayload msgField = NULL;
+ msgPayload payload = NULL;
+
+ mamaMsgImpl_getPayload (currentField->myMsg, &payload);

if (NULL == (msgField =
(impl->mPayloadBridge->msgPayloadIterBegin (
impl->mPayloadIter,
currentField->myPayload,
- currentField->myMsgPayload))))
+ payload))))
{
return NULL;
}
diff --git a/mama/c_cpp/src/c/msgfield.c b/mama/c_cpp/src/c/msgfield.c
index ac5fee3..50ae29c 100644
--- a/mama/c_cpp/src/c/msgfield.c
+++ b/mama/c_cpp/src/c/msgfield.c
@@ -801,19 +801,20 @@ mamaMsgField_getAsString (
char * buf,
size_t len)
{
- mamaMsgFieldImpl* impl =
- (mamaMsgFieldImpl*)(msgField);
+ mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)(msgField);
+ msgPayload payload = NULL;
if (!impl) return MAMA_STATUS_INVALID_ARG;
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);

- if (impl->myPayloadBridge)
+ if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadGetAsString (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
buf, len-1);

}
- return MAMA_STATUS_NULL_ARG;
+ return MAMA_STATUS_NULL_ARG;
}

mama_status
@@ -822,14 +823,20 @@ mamaMsgField_updateBool (
mama_bool_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateBool (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -842,14 +849,20 @@ mamaMsgField_updateChar (
char value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateChar (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -862,14 +875,20 @@ mamaMsgField_updateI8 (
mama_i8_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI8 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -882,14 +901,20 @@ mamaMsgField_updateU8 (
mama_u8_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI8 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -902,14 +927,20 @@ mamaMsgField_updateI16 (
mama_i16_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI16 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -922,14 +953,20 @@ mamaMsgField_updateU16 (
mama_u16_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateU16 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -942,14 +979,20 @@ mamaMsgField_updateI32 (
mama_i32_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI32 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);
}
return MAMA_STATUS_NULL_ARG;
@@ -961,14 +1004,20 @@ mamaMsgField_updateU32 (
mama_u32_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateU32 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -981,14 +1030,20 @@ mamaMsgField_updateI64 (
mama_i64_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateI64 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -1001,14 +1056,20 @@ mamaMsgField_updateU64 (
mama_u64_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateU64 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -1021,14 +1082,20 @@ mamaMsgField_updateF32 (
mama_f32_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateF32 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -1041,14 +1108,20 @@ mamaMsgField_updateF64 (
mama_f64_t value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateF64 (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
@@ -1061,14 +1134,20 @@ mamaMsgField_updateDateTime (
const mamaDateTime value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdateDateTime (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);
}
return MAMA_STATUS_NULL_ARG;
@@ -1080,14 +1159,20 @@ mamaMsgField_updatePrice (
const mamaPrice value)
{
mamaMsgFieldImpl* impl = (mamaMsgFieldImpl*)field;
+ short owner = 0;
+ msgPayload payload = NULL;

if (!impl) return MAMA_STATUS_NULL_ARG;

+ mamaMsgImpl_getMessageOwner (impl->myMsg, &owner);
+ if (!owner) return MAMA_STATUS_NOT_MODIFIABLE;
+
+ mamaMsgImpl_getPayload (impl->myMsg, &payload);
if (impl->myPayloadBridge)
{
return impl->myPayloadBridge->msgFieldPayloadUpdatePrice (
impl->myPayload,
- impl->myMsgPayload,
+ payload,
value);

}
diff --git a/mama/c_cpp/src/c/msgfieldimpl.h b/mama/c_cpp/src/c/msgfieldimpl.h
index 0ee5f5b..f5b3a3e 100644
--- a/mama/c_cpp/src/c/msgfieldimpl.h
+++ b/mama/c_cpp/src/c/msgfieldimpl.h
@@ -38,7 +38,7 @@ typedef struct mamaMsgFieldImpl_
size_t myLastVectorPayloadMsgLen;
mamaMsg mySubMsg;
msgFieldPayload myPayload;
- msgPayload myMsgPayload;
+ mamaMsg myMsg;
mamaPayloadBridge myPayloadBridge;
} mamaMsgFieldImpl;

diff --git a/mama/c_cpp/src/c/msgimpl.h b/mama/c_cpp/src/c/msgimpl.h
index 95bdacf..88e29ec 100644
--- a/mama/c_cpp/src/c/msgimpl.h
+++ b/mama/c_cpp/src/c/msgimpl.h
@@ -82,6 +82,9 @@ mamaMsgImpl_getPayloadBuffer(const mamaMsg msg,
const void** buffer,
mama_size_t* bufferLength);

+/*Get the underlying payload parent mamaMsg. */
+MAMAExpDLL extern mama_status
+mamaMsgImpl_getPayload (const mamaMsg msg, msgPayload* payload);

MAMAExpDLL
extern mama_status
diff --git a/mama/c_cpp/src/c/status.c b/mama/c_cpp/src/c/status.c
index 7e7ff91..e8c1c31 100644
--- a/mama/c_cpp/src/c/status.c
+++ b/mama/c_cpp/src/c/status.c
@@ -57,11 +57,11 @@ mamaStatus_stringForStatus (mama_status status)
case MAMA_STATUS_NOT_INSTALLED : return "NOT_INSTALLED";
case MAMA_STATUS_NO_BRIDGE_IMPL : return "NO_BRIDGE_IMPL";
case MAMA_STATUS_INVALID_QUEUE : return "INVALID_QUEUE";
+ case MAMA_STATUS_NOT_MODIFIABLE : return "NOT_MODIFIABLE";
case MAMA_STATUS_NOT_PERMISSIONED : return "MAMA_STATUS_NOT_PERMISSIONED";
case MAMA_STATUS_SUBSCRIPTION_INVALID_STATE: return "MAMA_STATUS_SUBSCRIPTION_INVALID_STATE";
case MAMA_STATUS_QUEUE_OPEN_OBJECTS: return "MAMA_STATUS_QUEUE_OPEN_OBJECTS";
case MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE: return "MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE";
- case MAMA_STATUS_SUBSCRIPTION_GAP: return "MAMA_STATUS_SUBSCRIPTION_GAP";

#ifdef WITH_ENTITLEMENTS
case MAMA_ENTITLE_STATUS_NOMEM : return "ENTITLE_STATUS_NOMEM";
--
1.7.7.6


[PATCH 25/50] [mama] Add closure to mamaQueue

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Added a closure to mamaQueue to allow applcations to store queue specific
context.

The new methods are:
mamaQueue_setClosure()
mamaQueue_getClosure()

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama/queue.h | 7 +++++++
mama/c_cpp/src/c/queue.c | 27 ++++++++++++++++++++++++++-
mama/c_cpp/src/c/queueimpl.h | 6 ------
3 files changed, 33 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/queue.h b/mama/c_cpp/src/c/mama/queue.h
index f04efd6..bc3362a 100644
--- a/mama/c_cpp/src/c/mama/queue.h
+++ b/mama/c_cpp/src/c/mama/queue.h
@@ -464,6 +464,13 @@ MAMAExpDLL
extern mama_status
mamaDispatcher_destroy (mamaDispatcher dispatcher);

+MAMAExpDLL
+extern mama_status
+mamaQueue_getClosure (mamaQueue queue, void** closure);
+
+MAMAExpDLL
+extern mama_status
+mamaQueue_setClosure (mamaQueue queue, void* closure);
#if defined(__cplusplus)
}
#endif
diff --git a/mama/c_cpp/src/c/queue.c b/mama/c_cpp/src/c/queue.c
index 4026a1a..f7cbb0b 100644
--- a/mama/c_cpp/src/c/queue.c
+++ b/mama/c_cpp/src/c/queue.c
@@ -99,6 +99,7 @@ typedef struct mamaQueueImpl_

/* This flag indicates whether object locking and unlocking will be tracked by the queue. */
int mTrackObjectLocks;
+ void* mClosure;
} mamaQueueImpl;

/*Main structure for the mamaDispatcher*/
@@ -116,11 +117,35 @@ typedef struct mamaDisptacherImpl_


mama_status
+mamaQueue_setClosure ( mamaQueue queue, void* closure)
+{
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ if (!impl) return MAMA_STATUS_NULL_ARG;
+
+ impl->mClosure = closure;
+
+ return MAMA_STATUS_OK;
+}
+
+mama_status
+mamaQueue_getClosure ( mamaQueue queue, void** closure)
+{
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!closure) return MAMA_STATUS_INVALID_ARG;
+
+ *closure = impl->mClosure;
+
+ return MAMA_STATUS_OK;
+}
+
+
+mama_status
mamaQueue_createReuseableMsg (mamaQueueImpl* impl)
{
mama_status status = MAMA_STATUS_OK;
/*Create the reuseable cached mamaMsg for this queue*/
- if (MAMA_STATUS_OK != (status = mamaMsgImpl_createForPayload (&(impl->mMsg), NULL,NULL,1)))
+ if (MAMA_STATUS_OK != (status = mamaMsgImpl_createForPayload (&(impl->mMsg), NULL,NULL,0)))
{
mama_log (MAMA_LOG_LEVEL_ERROR, "mamaQueue_create(): "
"Could not create message for queue.");
diff --git a/mama/c_cpp/src/c/queueimpl.h b/mama/c_cpp/src/c/queueimpl.h
index 0fa8a15..70247ef 100644
--- a/mama/c_cpp/src/c/queueimpl.h
+++ b/mama/c_cpp/src/c/queueimpl.h
@@ -103,12 +103,6 @@ MAMAExpDLL
extern mama_status
mamaQueueImpl_lowWatermarkExceeded (mamaQueue queue, size_t size);

-#ifdef WITH_FASTMSG
-MAMAExpDLL
-extern mama_status
-mamaQueueImpl_getFastBridge (mamaQueue queue, payloadBridge* bridge);
-#endif
-
MAMAExpDLL
extern mamaStatsCollector*
mamaQueueImpl_getStatsCollector (mamaQueue queue);
--
1.7.7.6


[PATCH 24/50] [mama] mama.c cleanup

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Moved MAMA_PAYLOAD_MAX from mama.c to mamainternal.h.

Removed uncessary comments and conditionally compiled debug blocks.

Moved #includes out extern "C" block in mama.h. This causes problems on Windows
as some of the standard headers contain C++ macros.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama.c | 15 -----------
mama/c_cpp/src/c/mama/mama.h | 53 +++------------------------------------
mama/c_cpp/src/c/mamainternal.h | 1 +
3 files changed, 5 insertions(+), 64 deletions(-)

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index 277eada..1d989a6 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -127,8 +127,6 @@ mamaStat gWombatMsgsStat;
mamaStat gFastMsgsStat;
mamaStat gRvMsgsStat;

-#define MAMA_PAYLOAD_MAX CHAR_MAX
-
static mamaPayloadBridge gDefaultPayload = NULL;

static wthread_key_t last_err_key;
@@ -146,9 +144,6 @@ typedef struct mamaAppContext_
/**
* This structure contains data needed to control starting and stopping of
* mama.
- *
- * TODO: Access to this structure will ultimately be protected by a reference
- * count and a lock.
*/
typedef struct mamaImpl_
{
@@ -699,16 +694,6 @@ mama_openWithPropertiesCount (const char* path,
}
/* Code after this point is one-time initialization */

-#ifdef WITH_INACTIVE_CHECK
- mama_log (MAMA_LOG_LEVEL_WARN,
- "********************************************************");
- mama_log (MAMA_LOG_LEVEL_WARN, "WARNING!!! - In inactive subscription check mode."
- " Do not release!!!");
- mama_log (MAMA_LOG_LEVEL_WARN,
- "********************************************************");
-#endif
-
-
#ifdef DEV_RELEASE
mama_log (MAMA_LOG_LEVEL_WARN,
"\n********************************************************************************\n"
diff --git a/mama/c_cpp/src/c/mama/mama.h b/mama/c_cpp/src/c/mama/mama.h
index c6cbe1c..341e635 100644
--- a/mama/c_cpp/src/c/mama/mama.h
+++ b/mama/c_cpp/src/c/mama/mama.h
@@ -22,10 +22,6 @@
#ifndef MamaH__
#define MamaH__

-#if defined(__cplusplus)
-extern "C"
-{
-#endif
#include "mama/config.h"
#include <mama/log.h>
#include <mama/error.h>
@@ -55,6 +51,10 @@ extern "C"
#include <mama/quality.h>
#include <mama/ft.h>

+#if defined(__cplusplus)
+extern "C"
+{
+#endif

#define MAMA_OPEN_MD ((uint32_t)(0x00000001))
#define MAMA_OPEN_PUB_SUB ((uint32_t)(0x00000002))
@@ -220,40 +220,6 @@ extern "C"
extern mama_status
mama_openWithProperties (const char* path,
const char* filename);
- /**
- * Initialize MAMA.
- *
- * Allows users of the API to override the default behavior of mama_open()
- * where a file mama.properties is required to be located in the directory
- * specified by \$WOMBAT_PATH.
- *
- * The properties file must have the same structure as a standard
- * mama.properties file.
- *
- * If null is passed as the path the API will look for the properties file on
- * the \$WOMBAT_PATH.
- *
- * If null is passed as the filename the API will look for the default
- * filename of mama.properties.
- *
- * The count value on return will be the number of times that mama_openXxx()
- * has ben invoked successfully. Applicatiins can use this to perform
- * one-time initialization when the value is 1 and the return is
- * MAMA_STATUS_OK
- *
- * @param path Fully qualified path to the directory containing the properties
- * file
- * @param filename The name of the file containing MAMA properties.
- * @param count The number of times mama_OpenXXX() has been called
- * successfully.
- *
- * @return mama_status Whether the call was successful or not.
- */
- MAMAExpDLL
- extern mama_status
- mama_openWithPropertiesCount (const char* path,
- const char* filename,
- unsigned int* count);

/**
* Set a specific property for the API.
@@ -325,17 +291,6 @@ extern "C"
MAMAExpDLL
extern mama_status
mama_close (void);
-
- /**
- * Close MAMA and free all associated resource.
- *
- * @param count Filled with the number of times mama has been opened
- * successfully. Applications can perform global one-time cleanup when this
- * value is 0 and the return value is MAMA_STATUS_OK.
- */
- MAMAExpDLL
- extern mama_status
- mama_closeCount (unsigned int* count);

/**
* Return the version information for the library.
diff --git a/mama/c_cpp/src/c/mamainternal.h b/mama/c_cpp/src/c/mamainternal.h
index 23f23c7..6b901b3 100644
--- a/mama/c_cpp/src/c/mamainternal.h
+++ b/mama/c_cpp/src/c/mamainternal.h
@@ -30,6 +30,7 @@ extern "C"
{
#endif

+#define MAMA_PAYLOAD_MAX CHAR_MAX
/**
* Check whether Callbacks are run in 'debug' catch exceptions mode
*/
--
1.7.7.6


[PATCH 23/50] [mama] Cleaup Statistics Handling in mama_close

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Properly cleanup the StatsGenerator, StatsCollector and StatsPublisher to avoid
resource leaks.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama.c | 20 ++++++++++++++++++++
mama/c_cpp/src/c/statsgenerator.c | 24 ++++++++++++++++++++++++
mama/c_cpp/src/c/statsgeneratorinternal.h | 14 +++++++++++++-
3 files changed, 57 insertions(+), 1 deletions(-)

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index 885763f..277eada 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -1153,6 +1153,26 @@ mama_closeCount (unsigned int* count)

wthread_key_delete(last_err_key);

+ if (gStatsGenerator)
+ {
+ mamaStatsGenerator_stopReportTimer(gStatsGenerator);
+ }
+
+ if (gGlobalStatsCollector)
+ {
+ if (gStatsGenerator)
+ {
+ mamaStatsGenerator_removeStatsCollector (gStatsGenerator, gGlobalStatsCollector);
+ }
+ mamaStatsCollector_destroy (*gGlobalStatsCollector);
+ gGlobalStatsCollector = NULL;
+ }
+
+ if (gStatsPublisher)
+ {
+ mamaStatsLogger_destroy (gStatsPublisher);
+ gStatsPublisher = NULL;
+ }
for (middleware = 0; middleware != MAMA_MIDDLEWARE_MAX; ++middleware)
{
mamaBridge bridge = gImpl.myBridges[middleware];
diff --git a/mama/c_cpp/src/c/statsgenerator.c b/mama/c_cpp/src/c/statsgenerator.c
index 8e47268..508167a 100644
--- a/mama/c_cpp/src/c/statsgenerator.c
+++ b/mama/c_cpp/src/c/statsgenerator.c
@@ -74,6 +74,8 @@ mamaStatsGenerator_destroy (mamaStatsGenerator statsGenerator)
{
mamaStatsGeneratorImpl* impl = (mamaStatsGeneratorImpl*)statsGenerator;

+ if(impl)
+ {
impl->mStatsLogger = NULL;

if (impl->mStatMsg != NULL)
@@ -89,6 +91,7 @@ mamaStatsGenerator_destroy (mamaStatsGenerator statsGenerator)

free (impl);

+ }
return MAMA_STATUS_OK;
}

@@ -237,3 +240,24 @@ mamaStatsGenerator_allocateStatsCollector (mamaStatsGenerator statsGenerator)

return list_allocate_element (impl->mStatsCollectors);
}
+
+mama_status mamaStatsGenerator_stopReportTimer(mamaStatsGenerator statsGenerator)
+{
+ /* Returns. */
+ mama_status ret = MAMA_STATUS_NULL_ARG;
+
+ /* Get the impl. */
+ mamaStatsGeneratorImpl *impl = (mamaStatsGeneratorImpl*)statsGenerator;
+ if(NULL != impl)
+ {
+ /* Destroy the timer. */
+ ret = MAMA_STATUS_OK;
+ if(NULL != impl->mReportTimer)
+ {
+ ret = mamaTimer_destroy(impl->mReportTimer);
+ impl->mReportTimer = NULL;
+ }
+ }
+
+ return ret;
+}
diff --git a/mama/c_cpp/src/c/statsgeneratorinternal.h b/mama/c_cpp/src/c/statsgeneratorinternal.h
index 8f27dc6..97bf717 100644
--- a/mama/c_cpp/src/c/statsgeneratorinternal.h
+++ b/mama/c_cpp/src/c/statsgeneratorinternal.h
@@ -33,6 +33,18 @@ MAMAExpDLL
extern mama_status
mamaStatsGenerator_create (mamaStatsGenerator* statsGenerator, mama_f64_t reportInterval);

+/**
+ * This function should be called to stop the stats report timer before the internal event
+ * queue has been destroyed.
+ *
+ * @param[in] statsGenerator The stats generator.
+ * @returns mama_status can be one of
+ * MAMA_STATUS_NULL_ARG
+ * MAMA_STATUS_OK
+ */
+MAMAExpDLL
+extern mama_status
+mamaStatsGenerator_stopReportTimer(mamaStatsGenerator statsGenerator);
MAMAExpDLL
extern mama_status
mamaStatsGenerator_destroy (mamaStatsGenerator statsGenerator);
@@ -47,7 +59,7 @@ mamaStatsGenerator_setLogStats (mamaStatsGenerator statsGenerator, int logStats)

MAMAExpDLL
extern mama_status
-mamaStatsGenerator_setStatsLogger (mamaStatsGenerator statsGenerator, mamaStatsLogger* usageLogger);
+mamaStatsGenerator_setStatsLogger (mamaStatsGenerator statsGenerator, mamaStatsLogger* statsLogger);

MAMAExpDLL
extern void
--
1.7.7.6


[PATCH 22/50] [mama] Change static mutexes to be recursive

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

This change avoids rare instances of deadlock when starting, stoping, opening
and closing OpenMAMA.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
common/c_cpp/src/c/linux/port.h | 2 +-
mama/c_cpp/src/c/mama.c | 114 +++++++++++++++++++--------------------
2 files changed, 57 insertions(+), 59 deletions(-)

diff --git a/common/c_cpp/src/c/linux/port.h b/common/c_cpp/src/c/linux/port.h
index b3a928d..67ae0c9 100644
--- a/common/c_cpp/src/c/linux/port.h
+++ b/common/c_cpp/src/c/linux/port.h
@@ -57,7 +57,7 @@ extern "C"
#endif
/* PTHREAD static locks are easy */
typedef pthread_mutex_t wthread_static_mutex_t;
-#define WSTATIC_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
+#define WSTATIC_MUTEX_INITIALIZER PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
#define wthread_static_mutex_lock(x) pthread_mutex_lock((x))
#define wthread_static_mutex_unlock(x) pthread_mutex_unlock((x))

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index dc6dcb5..885763f 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -172,8 +172,11 @@ static mamaImpl gImpl = {{0}, {0}, {0}, {0}, 0, WSTATIC_MUTEX_INITIALIZER};
static mama_status
mama_loadBridgeWithPathInternal (mamaBridge* impl,
const char* middlewareName,
- const char* path,
- uint8_t lock);
+ const char* path);
+
+mama_status
+mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
+ const char* payloadName);

/* Description : This function will free any memory associated with a
* mamaApplicationContext object but will not free the
@@ -323,8 +326,7 @@ static mama_status mamaInternal_loadStatsPublisher ()
if (MAMA_STATUS_OK !=
(status = mama_loadBridgeWithPathInternal (&bridge,
statsLogMiddlewareName,
- NULL,
- 0)))
+ NULL)))
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"mamaInternal_loadStatsLogger(): ",
@@ -644,6 +646,8 @@ mama_openWithPropertiesCount (const char* path,
const char* appString = NULL;
const char* statsLogging = "false";
const char* catchCallbackExceptions = NULL;
+ char** payloadName;
+ char* payloadId;

wthread_static_mutex_lock (&gImpl.myLock);

@@ -720,6 +724,27 @@ mama_openWithPropertiesCount (const char* path,

/* Do not call mamaInternal_loadStatsPublisher here.
It only needs to be called if we are publishing */
+ /* Look for a bridge for each of the middlewares and open them */
+ for (middleware = 0; middleware != MAMA_MIDDLEWARE_MAX; ++middleware)
+ {
+ mamaBridgeImpl* impl = (mamaBridgeImpl*) gImpl.myBridges [middleware];
+ if (impl)
+ {
+ if (impl->bridgeGetDefaultPayloadId(&payloadName, &payloadId) == MAMA_STATUS_OK)
+ {
+ uint8_t i=0;
+ while (payloadId[i] != NULL)
+ {
+ if (!gImpl.myPayloads [(uint8_t)payloadId[i]])
+ {
+ mamaPayloadBridge payloadImpl;
+ mama_loadPayloadBridgeInternal (&payloadImpl,payloadName[i]);
+ }
+ i++;
+ }
+ }
+ }
+ }

catchCallbackExceptions = properties_Get (gProperties, "mama.catchcallbackexceptions.enable");
if (catchCallbackExceptions != NULL && strtobool(catchCallbackExceptions))
@@ -1776,16 +1801,21 @@ mama_setDefaultPayload (char id)
}

mama_status
+mama_loadPayloadBridge (mamaPayloadBridge* impl,
+ const char* payloadName)
+{
+ return mama_loadPayloadBridgeInternal (impl, payloadName);
+}
+mama_status
mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
- const char* payloadName,
- uint8_t lock)
+ const char* payloadName)
{
char bridgeImplName [256];
char initFuncName [256];
LIB_HANDLE bridgeLib = NULL;
msgPayload_createImpl initFunc = NULL;
mama_status status = MAMA_STATUS_OK;
- char payloadChar;
+ char payloadChar ='/0';

if (!impl || !payloadName)
return MAMA_STATUS_NULL_ARG;
@@ -1793,8 +1823,7 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
snprintf (bridgeImplName, 256, "mama%simpl",
payloadName);

- if (lock)
- wthread_static_mutex_lock (&gImpl.myLock);
+ wthread_static_mutex_lock (&gImpl.myLock);

bridgeLib = openSharedLib (bridgeImplName, NULL);

@@ -1806,8 +1835,7 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
"Could not open payload bridge library [%s] [%s]",
bridgeImplName ? bridgeImplName : "",
getLibError());
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

@@ -1825,27 +1853,25 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
bridgeImplName ? bridgeImplName : "");
closeSharedLib (bridgeLib);

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return MAMA_STATUS_NO_BRIDGE_IMPL;
}

if (MAMA_STATUS_OK != (status = initFunc (impl, &payloadChar)))
{
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return status;
}

- if (!impl)
+ if (!*impl)
{
mama_log (MAMA_LOG_LEVEL_ERROR,
- "mama_loadPayloadBridge(): Error in [%s] ", initFuncName);
+ "mama_loadPayloadBridge():Failed to load %s payload bridge from library [%s] ",
+ payloadName, bridgeImplName);

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return MAMA_STATUS_NO_BRIDGE_IMPL;
}
@@ -1857,8 +1883,7 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
"Payload bridge %s already loaded",
payloadName);

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return MAMA_STATUS_OK;
}
@@ -1876,19 +1901,11 @@ mama_loadPayloadBridgeInternal (mamaPayloadBridge* impl,
"Sucessfully loaded %s payload bridge from library [%s]",
payloadName, bridgeImplName);

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);

return MAMA_STATUS_OK;
}

-mama_status
-mama_loadPayloadBridge (mamaPayloadBridge* impl,
- const char* payloadName)
-{
- return mama_loadPayloadBridgeInternal (impl, payloadName, 1);
-}
-
int
mamaInternal_generateLbmStats ()
{
@@ -1906,15 +1923,12 @@ mama_loadBridge (mamaBridge* impl,
mama_status
mama_loadBridgeWithPathInternal (mamaBridge* impl,
const char* middlewareName,
- const char* path,
- uint8_t lock)
+ const char* path)
{
char bridgeImplName [256];
char initFuncName [256];
LIB_HANDLE bridgeLib = NULL;
bridge_createImpl initFunc = NULL;
- char* payloadName = NULL;
- char payloadId = '\0';
mama_status result = MAMA_STATUS_OK;
mamaMiddleware middleware = 0;

@@ -1930,15 +1944,13 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,
middlewareName);
}

- if (lock)
- wthread_static_mutex_lock (&gImpl.myLock);
+ wthread_static_mutex_lock (&gImpl.myLock);

/* Check if a bridge has already been initialized for the middleware */
if (gImpl.myBridges [middleware])
{
*impl = gImpl.myBridges [middleware];
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_OK;
}

@@ -1966,8 +1978,7 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,
bridgeImplName ? bridgeImplName : "",
getLibError());
}
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

@@ -1984,8 +1995,7 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,
initFuncName ? initFuncName : "",
bridgeImplName ? bridgeImplName : "");
closeSharedLib (bridgeLib);
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

@@ -1995,8 +2005,7 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"mama_loadBridge(): Error in [%s] ", initFuncName);
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_NO_BRIDGE_IMPL;
}

@@ -2010,25 +2019,14 @@ mama_loadBridgeWithPathInternal (mamaBridge* impl,

if (MAMA_STATUS_OK != result)
{
- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return result;
}

- if (((mamaBridgeImpl*)(*impl))->bridgeGetDefaultPayloadId(&payloadName, &payloadId) == MAMA_STATUS_OK)
- {
- if (!gImpl.myPayloads [(uint8_t)payloadId])
- {
- mamaPayloadBridge payloadImpl;
- mama_loadPayloadBridgeInternal (&payloadImpl,payloadName,0);
- }
- }
-
gImpl.myBridges [middleware] = *impl;
gImpl.myBridgeLibraries [middleware] = bridgeLib;

- if (lock)
- wthread_static_mutex_unlock (&gImpl.myLock);
+ wthread_static_mutex_unlock (&gImpl.myLock);
return MAMA_STATUS_OK;
}

@@ -2037,7 +2035,7 @@ mama_loadBridgeWithPath (mamaBridge* impl,
const char* middlewareName,
const char* path)
{
- return mama_loadBridgeWithPathInternal(impl, middlewareName, path, 1);
+ return mama_loadBridgeWithPathInternal(impl, middlewareName, path);
}

/*
--
1.7.7.6


[PATCH 21/50] [mama] mamaTransport_forceClientDisconnect

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Add method to MAMA API that allows connection based middlewares (TCP) to
disconnect poorly behaved clients.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/transport.c | 14 ++++++++++++++
mama/c_cpp/src/c/transportimpl.h | 15 +++++++++++++++
2 files changed, 29 insertions(+), 0 deletions(-)

diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index f059280..d51d12d 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -1964,6 +1964,20 @@ mamaTransport_findConnection (mamaTransport transport,
}

mama_status
+mamaTransportImpl_forceClientDisconnect (mamaTransport transport,
+ const char* ipAddress,
+ uint16_t port)
+{
+ if (!self || ipAddress == NULL) return MAMA_STATUS_NULL_ARG;
+
+ return self->mBridgeImpl->bridgeMamaTransportForceClientDisconnect (
+ self->mTransports,
+ self->mNumTransports,
+ ipAddress,
+ port);
+}
+
+mama_status
mamaTransport_getAllConnections (mamaTransport transport,
mamaConnection** result,
uint32_t* len)
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index 18c3a28..80bf98a 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -280,5 +280,20 @@ mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(
mamaTransport transport,
short cause,
const void *platformInfo);
+/**
+ * Disconnect a client with the specified IP Address and port. This information
+ * may be retrieved from a mamaConnection object or out of band.
+ *
+ * For middleware that does not provide this functionality (non WMW middleware),
+ * the method returns MAMA_STATUS_NOT_IMPL.
+ */
+MAMAExpDLL
+extern mama_status
+mamaTransportImpl_forceClientDisconnect (mamaTransport transport,
+ const char* ipAddress,
+ uint16_t port);
+#if defined(__cplusplus)
+}
+#endif

#endif /* TransportImplH__ */
--
1.7.7.6


[PATCH 20/50] [mama] transport write queue high and low watermarks

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Allow applications to set high and low watermarks for internal write queues if
the underlying middleware queues writes. Socket based middlewares that use
scatter/gather IO and a dedicated IO thread often coalesce writes in a separate
queue for each connected client. A larger queue where that trigger a high
watermark often indicates a slow consumer.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama/transport.h | 16 +++++++++++++-
mama/c_cpp/src/c/transport.c | 40 +++++++++++++++++++++++++++++++-----
mama/c_cpp/src/c/transportimpl.h | 6 +++++
3 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/transport.h b/mama/c_cpp/src/c/mama/transport.h
index d66905c..e1a29ac 100644
--- a/mama/c_cpp/src/c/mama/transport.h
+++ b/mama/c_cpp/src/c/mama/transport.h
@@ -76,7 +76,9 @@ typedef enum
MAMA_TRANSPORT_PUBLISHER_DISCONNECT,
MAMA_TRANSPORT_QUALITY,
MAMA_TRANSPORT_NAMING_SERVICE_CONNECT,
- MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT
+ MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT,
+ MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK,
+ MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK
} mamaTransportEvent;

/**
@@ -186,6 +188,18 @@ extern mama_status
mamaTransport_setTransportCallback (mamaTransport transport,
mamaTransportCB callback,
void* closure);
+/**
+ * Set the transport write queue high and low water mark values. The
+ * MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK and
+ * MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK events will be delivered though
+ * the transport callback when the respective number of items are outstanding on
+ * a clients write queue.
+ */
+MAMAExpDLL
+extern mama_status
+mamaTransport_setWriteQueueWatermarks (mamaTransport transport,
+ uint32_t highWater,
+ uint32_t lowWater);

/**
* Set the transport topic callback. It receives advisories when a client
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 7e583d5..f059280 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -89,6 +89,8 @@ typedef struct transportImpl_
mamaSymbolMapFunc mMapFunc;
void* mMapFuncClosure;

+ uint32_t mWriteQueueHighWatermark;
+ uint32_t mWriteQueueLowWatermark;
/* These members are only needed for the market data transport */
wList mListeners;

@@ -160,12 +162,12 @@ typedef struct transportImpl_

int mGroupSizeHint;

- uint8_t mDisableDisconnectCb;
- uint8_t mDisableRefresh;
+ uint8_t mDisableRefresh;
dqStartegyScheme mDQStratScheme;
dqftStrategyScheme mFTStratScheme;

uint8_t mInternal;
+ uint8_t mDisableDisconnectCb;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -445,15 +447,15 @@ static void setFtStrategy (mamaTransport transport)
}
}

+void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
+{
+ self->mDisableRefresh=disable;
+}
/**
* Check property to disable refresh messages. Undocumented.
*
* Return non-zero to disable refresh messages.
*/
-void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
-{
- self->mDisableRefresh=disable;
-}

static int mamaTransportInternal_disableRefreshes(const char* transportName)
{
@@ -1130,6 +1132,7 @@ mamaTransport_destroy (mamaTransport transport)
if (self->mRefreshTransport)
{
refreshTransport_destroy (self->mRefreshTransport);
+ self->mRefreshTransport = NULL;
}

if (self->mThrottle)
@@ -1610,6 +1613,8 @@ mamaTransportEvent_toString (mamaTransportEvent event)
case MAMA_TRANSPORT_QUALITY: return "QUALITY";
case MAMA_TRANSPORT_NAMING_SERVICE_CONNECT: return "NAMING_SERVICE_CONNECT";
case MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT: return "NAMING_SERVICE_DISCONNECT";
+ case MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK: return "MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK";
+ case MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK: return "MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK";
default: return "UNKNOWN";
}
}
@@ -1625,6 +1630,29 @@ mamaTransport_setTransportCallback (mamaTransport transport,
return MAMA_STATUS_OK;
}

+mama_status
+mamaTransport_setWriteQueueWatermarks (mamaTransport transport,
+ uint32_t high,
+ uint32_t low)
+{
+ if (!self)
+ return MAMA_STATUS_NULL_ARG;
+ if (high < low || low == 0)
+ return MAMA_STATUS_INVALID_ARG;
+
+ self->mWriteQueueHighWatermark = high;
+ self->mWriteQueueLowWatermark = low;
+ return MAMA_STATUS_OK;
+}
+
+void
+mamaTransportImpl_getWriteQueueWatermarks (mamaTransport transport,
+ uint32_t* high,
+ uint32_t* low)
+{
+ *high = self->mWriteQueueHighWatermark;
+ *low = self->mWriteQueueLowWatermark;
+}

mama_status
mamaTransport_setTransportTopicCallback (mamaTransport transport,
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index a253e3f..18c3a28 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -139,6 +139,12 @@ mamaTransportImpl_getTransportTopicCallback (mamaTransport transport,

MAMAExpDLL
extern void
+mamaTransportImpl_getWriteQueueWatermarks (mamaTransport transport,
+ uint32_t* high,
+ uint32_t* low);
+
+MAMAExpDLL
+extern void
mamaTransportImpl_resetRefreshForListener (mamaTransport tport, void *handle);

MAMAExpDLL
--
1.7.7.6


[PATCH 19/50] [mama] Create an Internal Transport for Monitoring

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

OpenMAMA creates a dedicated internal transport for publishing statistics for
monitoring and other internal communication. This transport disables some
features of application level transports including the CM responder.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama.c | 2 +-
mama/c_cpp/src/c/transport.c | 204 ++++++++++++++++++++++---------------
mama/c_cpp/src/c/transportimpl.h | 31 +++---
3 files changed, 136 insertions(+), 101 deletions(-)

diff --git a/mama/c_cpp/src/c/mama.c b/mama/c_cpp/src/c/mama.c
index 206892d..dc6dcb5 100644
--- a/mama/c_cpp/src/c/mama.c
+++ b/mama/c_cpp/src/c/mama.c
@@ -389,7 +389,7 @@ mamaInternal_createStatsPublisher ()
statsLogTportName = "statslogger";
}

- result = mamaTransport_allocate (&statsLogTport);
+ result = mamaTransportImpl_allocateInternalTransport (&statsLogTport);
if( result != MAMA_STATUS_OK )
return result;

diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 0a4adcd..7e583d5 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -165,6 +165,7 @@ typedef struct transportImpl_
dqStartegyScheme mDQStratScheme;
dqftStrategyScheme mFTStratScheme;

+ uint8_t mInternal;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -282,60 +283,68 @@ mamaTransport_allocate (mamaTransport* result)
*
* Return non-zero if object should be created, otherwise return zero
*/
-static int mamaTransportInternal_cmResponderEnabled (const char* transportName,
+static int mamaTransportInternal_cmResponderEnabled (transportImpl *impl,
+ const char* transportName,
const char* middleware)
{
const char* propValue;
char propString[MAX_PROP_STRING];
char propStringMw[MAX_PROP_STRING];
int retVal;
+ /* Returns. */
+ int ret = 0;

- /* Check for mama.middleware.transport.transportname first */
- retVal=snprintf (propStringMw, MAX_PROP_STRING,
- "mama.%s.transport.%s.%s", middleware,
- transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
-
- if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ /* The CM responder will not be created for an internal transport. */
+ if(impl->mInternal == 0)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
-
- propValue = properties_Get (mamaInternal_getProperties (), propStringMw);
-
- if (NULL==propValue)
- {
- /* We might have specified mama.transport.transportname -
- only look for this after we've tried with middleware */
- retVal = snprintf (propString, MAX_PROP_STRING,
- "mama.transport.%s.%s",
- transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
-
- if ((retVal<0) || (retVal>=MAX_PROP_STRING))
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
-
- propValue = properties_Get (mamaInternal_getProperties (), propString);
- }
-
- /* Return default if we have specified neither mama.middleware.transport...nor
- mama.transport... */
- if (NULL==propValue)
- {
- return DEFAULT_WANT_AUTO_CM_CREATE;
- }
- else if (properties_GetPropertyValueAsBoolean (propValue))
- {
- return 1;
- }
- else
- {
- return 0;
+ /* Check for mama.middleware.transport.transportname first */
+ retVal=snprintf (propStringMw, MAX_PROP_STRING,
+ "mama.%s.transport.%s.%s", middleware,
+ transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
+
+ if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+
+ propValue = properties_Get (mamaInternal_getProperties (), propStringMw);
+
+ if (NULL==propValue)
+ {
+ /* We might have specified mama.transport.transportname -
+ only look for this after we've tried with middleware */
+ retVal = snprintf (propString, MAX_PROP_STRING,
+ "mama.transport.%s.%s",
+ transportName ? transportName : "", PROP_NAME_WANT_AUTO_CM_CREATE);
+
+ if ((retVal<0) || (retVal>=MAX_PROP_STRING))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "Error reading %s from properties file", PROP_NAME_WANT_AUTO_CM_CREATE);
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+
+ propValue = properties_Get (mamaInternal_getProperties (), propString);
+ }
+
+ /* Return default if we have specified neither mama.middleware.transport...nor
+ mama.transport... */
+ if (NULL==propValue)
+ {
+ return DEFAULT_WANT_AUTO_CM_CREATE;
+ }
+ else if (properties_GetPropertyValueAsBoolean (propValue))
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
+ return ret;
}

static void setPreInitialStrategy (mamaTransport transport)
@@ -452,7 +461,8 @@ static int mamaTransportInternal_disableRefreshes(const char* transportName)
char propString[MAX_PROP_STRING];
int retVal;

- retVal=snprintf(propString, MAX_PROP_STRING, "mama.transport.%s.%s",
+ retVal=snprintf (propString, MAX_PROP_STRING,
+ "mama.transport.%s.%s",
transportName ? transportName : "", PROP_NAME_DISABLE_REFRESH);

if ((retVal<0) || (retVal>=MAX_PROP_STRING))
@@ -586,6 +596,7 @@ mamaTransport_create (mamaTransport transport,
const char* throttleInt = NULL;
if (!transport) return MAMA_STATUS_NULL_ARG;
if (!bridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;
+ mama_log(MAMA_LOG_LEVEL_FINER, "Entering mamaTransport_create for transport (%p) with name %s", transport, name);

self->mBridgeImpl = (mamaBridgeImpl*)bridgeImpl;

@@ -822,7 +833,7 @@ mamaTransport_create (mamaTransport transport,

setGroupSizeHint (transport, middleware);

- rval = init (self, mamaTransportInternal_cmResponderEnabled (name, middleware));
+ rval = init (self, mamaTransportInternal_cmResponderEnabled (self, name, middleware));
if (rval != MAMA_STATUS_OK) return rval;


@@ -1083,6 +1094,7 @@ mamaTransport_destroy (mamaTransport transport)
{
int i;
int allTransportsValid;
+ mama_log(MAMA_LOG_LEVEL_FINER, "Entering mamaTransport_destroy for transport (%p)", transport);

if (!self) return MAMA_STATUS_NULL_ARG;
if (!self->mBridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;
@@ -1099,6 +1111,10 @@ mamaTransport_destroy (mamaTransport transport)
}
if (allTransportsValid)
{
+ if(NULL != self->mCmResponder)
+ {
+ mamaCmResponder_destroy (self->mCmResponder);
+ }
/* Inform all listeners that the transport is about to be destroyed. */
mamaTransportImpl_clearTransportWithListeners (self);

@@ -2052,6 +2068,49 @@ mamaTransportImpl_unsetAllPossiblyStale (mamaTransport transport)
}
}

+/**
+ * Return the cause and platform info for the last message processed on the
+ * transport.
+ *
+ * @param transport The transport.
+ * @param cause To return the cause.
+ * @param platformInfo To return the bridge specific info, under no circumstances
+ * should the returned object be deleted.
+ */
+void
+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport transport,
+ short* cause,
+ const void** platformInfo)
+{
+ if (!self)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (): "
+ "NULL transport.");
+ return;
+ }
+
+ *cause = self->mCause;
+ *platformInfo = self->mPlatformInfo;
+}
+
+void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport, short cause, const void *platformInfo)
+{
+ /* Get the impl. */
+ transportImpl *impl = (transportImpl *)transport;
+ if(NULL != impl)
+ {
+ /* Set the cause. */
+ impl->mCause = cause;
+ impl->mPlatformInfo = platformInfo;
+ }
+
+ /* Otherwise write an error log. */
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL transport.");
+ }
+}
void
mamaTransportImpl_getTransportIndex (mamaTransport transport,
int* transportIndex)
@@ -2429,44 +2488,23 @@ mama_status mamaTransport_removePublisher (mamaTransport transport, void *handle
return MAMA_STATUS_OK;
}

-/* *************************************************** */
-/* Internal Functions. */
-/* *************************************************** */
-
-void mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(mamaTransport transport,
- short *cause, const void **platformInfo)
+mama_status mamaTransportImpl_allocateInternalTransport(mamaTransport *transport)
{
- /* Get the impl. */
- transportImpl *impl = (transportImpl *)transport;
- if(NULL != impl)
- {
- /* Return the cause. */
- *cause = impl->mCause;
- *platformInfo = impl->mPlatformInfo;
- }
- else
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(): NULL "
- "transport.");
- }
-}
+ /* Returns. */
+ mama_status ret = MAMA_STATUS_NULL_ARG;

-void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport,
- short cause, const void *platformInfo)
-{
- /* Get the impl. */
- transportImpl *impl = (transportImpl *)transport;
- if(NULL != impl)
- {
- /* Set the cause. */
- impl->mCause = cause;
- impl->mPlatformInfo = (void*)platformInfo;
- }
- else
+ if(NULL != transport)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL "
- "transport.");
+ /* Allocate the transport as normal. */
+ ret = mamaTransport_allocate(transport);
+ if(MAMA_STATUS_OK == ret)
+ {
+ /* Get the impl. */
+ transportImpl *impl = (transportImpl *)*transport;
+
+ /* Set the internal flag. */
+ impl->mInternal = 1;
+ }
}
+ return ret;
}
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index 5ae83c6..a253e3f 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -158,6 +158,11 @@ MAMAExpDLL
extern void
mamaTransportImpl_unsetAllPossiblyStale (mamaTransport tport);

+MAMAExpDLL
+extern void
+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport tport,
+ short* cause,
+ const void** platformInfo);
/*
Get the bridge impl associated with the specified transport.
This will be how other objects gain access to the bridge.
@@ -241,27 +246,19 @@ mama_status mamaTransport_addPublisher(mamaTransport transport, mamaPublisher pu
mama_status mamaTransport_removePublisher(mamaTransport transport, void *handle);
preInitialScheme mamaTransportImpl_getPreInitialScheme (mamaTransport transport);

-#if defined(__cplusplus)
-}
-#endif
-
-
/**
- * This function will return the cause and platform info for the last message
- * processed on the transport.
+ * This function will allocate an internal transport for use with the internal event queue, this sort
+ * of transport is limited and does not support certain features, including
+ * The CM Responder.
*
- * @param[in] transport The transport.
- * @param[out] cause To return the cause.
- * @param[out] platformInfo To return the bridge specific info, under no
- * circumstances should the returned object be deleted.
+ * @param[out] transport To return the transport.
*
+ * @returns mama_status value can be one of:
+ * MAMA_STATUS_NOMEM
+ * MAMA_STATUS_NULL_ARG
+ * MAMA_STATUS_OK
*/
-MAMAExpDLL
-extern void
-mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(
- mamaTransport tport,
- short *cause,
- const void **platformInfo);
+mama_status mamaTransportImpl_allocateInternalTransport(mamaTransport *transport);

/**
* This function will set the cause and platform info for the transport.
--
1.7.7.6


[PATCH 18/50] [mama] Subscription: don't put invalid actions on the throttle queue

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Added code to avoid race conditions when enqueing shutdown logic on the
throttle.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama/subscription.h | 7 +------
mama/c_cpp/src/c/subscription.c | 19 +++++++++++--------
2 files changed, 12 insertions(+), 14 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/subscription.h b/mama/c_cpp/src/c/mama/subscription.h
index 9eb1f0c..27978eb 100644
--- a/mama/c_cpp/src/c/mama/subscription.h
+++ b/mama/c_cpp/src/c/mama/subscription.h
@@ -106,12 +106,7 @@ typedef enum
/* The subscription has been de-allocated, this state is only supported so that the log entry will whenever the subscription
* has finally been freed.
*/
- MAMA_SUBSCRIPTION_DEALLOCATED = 10,
-
- /* The subscription is being re-activated, this state can only occur if the mamaSubscription_activate has been called while
- * the subscription is being deactivated, (i.e. its state is MAMA_SUBSCRIPTION_DEACTIVATING.
- */
- MAMA_SUBSCRIPTION_REACTIVATING = 11
+ MAMA_SUBSCRIPTION_DEALLOCATED = 10

} mamaSubscriptionState;

diff --git a/mama/c_cpp/src/c/subscription.c b/mama/c_cpp/src/c/subscription.c
index 6c73978..8c2497b 100644
--- a/mama/c_cpp/src/c/subscription.c
+++ b/mama/c_cpp/src/c/subscription.c
@@ -1772,7 +1772,7 @@ mama_status mamaSubscriptionImpl_deactivate(mamaSubscriptionImpl *impl)
mamaTransport_removeListener(impl->mTransport, impl->mSubscHandle);

/* If there is a create action on the throttle it must be removed. */
- if(NULL != impl->mAction)
+ if((NULL != throttle) && (NULL != impl->mAction))
{
wombatThrottle_removeAction(throttle, impl->mAction);
}
@@ -2195,7 +2195,7 @@ void MAMACALLTYPE mamaSubscriptionImpl_onSubscriptionDestroyed(mamaSubscription
break;

/* The subscription must be de-activated then re-activated. */
- case MAMA_SUBSCRIPTION_REACTIVATING:
+ case MAMA_SUBSCRIPTION_ACTIVATING:

/* Change the state. */
mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_DEACTIVATED);
@@ -2233,7 +2233,7 @@ void MAMACALLTYPE mamaSubscriptionImpl_onSubscriptionDestroyed(mamaSubscription
mama_status mamaSubscriptionImpl_removeFromThrottle(mamaSubscriptionImpl *impl)
{
/* Returns. */
- mama_status ret = MAMA_STATUS_OK;
+ mama_status ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE;

/* Acquire the lock before anything else is done. */
wlock_lock(impl->mCreateDestroyLock);
@@ -2262,7 +2262,8 @@ mama_status mamaSubscriptionImpl_removeFromThrottle(mamaSubscriptionImpl *impl)
* if the subscription is still in the activating state.
*/
ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE;
- if(MAMA_SUBSCRIPTION_ACTIVATING == wInterlocked_read(&impl->mState))
+ if(MAMA_SUBSCRIPTION_ACTIVATING == wInterlocked_read(&impl->mState) &&
+ NULL != impl->mAction)
{
/* Remove the subscription from the throttle. */
wombatThrottle_removeAction(throttle, impl->mAction);
@@ -2514,6 +2515,7 @@ mama_status mamaSubscription_allocate(mamaSubscription *subscription)
impl->mPreInitialCacheSize = MAMA_SUBSCRIPTION_DEFAULT_PREINITIALCACHESIZE;

/* Set the initial state of the subscription now that the memory has been allocated. */
+ wInterlocked_initialize(&impl->mState);
mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_ALLOCATED);

/* The function has succeeded. */
@@ -2570,7 +2572,7 @@ mama_status mamaSubscription_activate(mamaSubscription subscription)
case MAMA_SUBSCRIPTION_DEACTIVATING:

/* Set the state to indicate that the subscription will be reactivated. */
- mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_REACTIVATING);
+ mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_ACTIVATING);

ret = MAMA_STATUS_OK;
break;
@@ -2914,7 +2916,8 @@ mama_status mamaSubscription_deactivate(mamaSubscription subscription)
/* If invalid state is returned by this function then the subscription has become active
* while waiting for it to be removed from the throttle.
*/
- if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret)
+ if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret &&
+ MAMA_SUBSCRIPTION_ACTIVATED == wInterlocked_read(&impl->mState))
{
/* Invoke this function recursively to process the correct state. */
ret = mamaSubscription_deactivate(subscription);
@@ -3043,7 +3046,8 @@ mama_status mamaSubscription_destroy(mamaSubscription subscription)
/* If invalid state is returned by this function then the subscription has become active
* while waiting for it to be removed from the throttle.
*/
- if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret)
+ if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret &&
+ MAMA_SUBSCRIPTION_ACTIVATED == wInterlocked_read(&impl->mState))
{
/* Invoke this function recursively to process the correct state. */
ret = mamaSubscription_destroy(subscription);
@@ -4251,7 +4255,6 @@ const char* mamaSubscription_stringForState(mamaSubscriptionState state)
case MAMA_SUBSCRIPTION_DESTROYED: return "MAMA_SUBSCRIPTION_DESTROYED";
case MAMA_SUBSCRIPTION_DEALLOCATING: return "MAMA_SUBSCRIPTION_DEALLOCATING";
case MAMA_SUBSCRIPTION_DEALLOCATED: return "MAMA_SUBSCRIPTION_DEALLOCATED";
- case MAMA_SUBSCRIPTION_REACTIVATING: return "MAMA_SUBSCRIPTION_REACTIVATING";
}

return "State not recognised";
--
1.7.7.6


[PATCH 17/50] [mama] Configurable Schemes for data quality

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

The dqStratgyScheme deterines whether or not OpenMAMA ignores duplicate
messages. Valid values are:
DQ_SCHEME_DELIVER_ALL deliver all messages including duplicats
DQ_SCHEME_IGNORE_DUPS do not deliver duplicate messages

The dqftStrategyScheme determines how a subscription behaves when OpenMAMA
detects a fault tolerant take over (the sender id changes for a subscription):
DQ_FT_DO_NOT_WAIT_FOR_RECAP continue to process STALE messages
DQ_FT_WAIT_FOR_RECAP discard stale messages

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqstrategy.c | 46 ++++++++++++++--
mama/c_cpp/src/c/dqstrategy.h | 5 ++-
mama/c_cpp/src/c/listenermsgcallback.c | 22 +++++++-
mama/c_cpp/src/c/mama/subscription.h | 12 ++++
mama/c_cpp/src/c/transport.c | 92 ++++++++++++++++++++++++++++++++
5 files changed, 170 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/dqstrategy.c b/mama/c_cpp/src/c/dqstrategy.c
index 8f7ccda..1e495b6 100644
--- a/mama/c_cpp/src/c/dqstrategy.c
+++ b/mama/c_cpp/src/c/dqstrategy.c
@@ -141,7 +141,8 @@ handleFTTakeover (dqStrategy strategy,
int msgType,
mamaDqContext* ctx,
mama_seqnum_t seqNum,
- mama_u64_t senderId)
+ mama_u64_t senderId,
+ int recoverOnRecap)
{
const char* symbol = NULL;
mamaSubscription_getSymbol (self->mSubscription, &symbol);
@@ -151,11 +152,19 @@ handleFTTakeover (dqStrategy strategy,
"Previous SeqNum: %u. New SeqNum: %u. [%s]",
ctx->mSenderId, senderId, ctx->mSeqNum, seqNum, symbol);

+ if (recoverOnRecap)
+ {
+ ctx->mSeqNum = senderId;
+ ctx->mDQState = DQ_STATE_WAITING_FOR_RECAP_AFTER_FT;
+ }
+ else
+ {
resetDqState (strategy, ctx);

/*In all cases we reset the data quality context*/
dqStrategyImpl_resetDqContext (ctx, seqNum, senderId);

+ }
return MAMA_STATUS_OK;
}

@@ -183,6 +192,7 @@ dqStrategy_checkSeqNum (dqStrategy strategy,

mamaMsg_getSeqNum (msg, &seqNum);

+ ctx->mDoNotForward = 0;
if (mamaMsg_getU64 (msg, MamaFieldSenderId.mName, MamaFieldSenderId.mFid,
&senderId) != MAMA_STATUS_OK)
{
@@ -207,7 +217,14 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
mamaStatsCollector_incrementStat (*(mamaInternal_getGlobalStatsCollector()),
MamaStatFtTakeovers.mFid);
}
- return handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId);
+ if (DQ_FT_WAIT_FOR_RECAP==mamaTransportImpl_getFtStrategyScheme(tport))
+ {
+ handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId, 1);
+ }
+ else
+ {
+ return handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId, 0);
+ }
}

if (gMamaLogLevel >= MAMA_LOG_LEVEL_FINER)
@@ -243,7 +260,8 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
if (((ctxDqState == DQ_STATE_NOT_ESTABLISHED) ||
(seqNum == 0) ||
(seqNum == (ctxSeqNum + conflateCnt))) &&
- (ctxDqState != DQ_STATE_WAITING_FOR_RECAP))
+ ((ctxDqState != DQ_STATE_WAITING_FOR_RECAP) ||
+ (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)))
{
/* No gap */
if (self->mTryToFillGap)
@@ -268,7 +286,19 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
return MAMA_STATUS_OK;
}

- if (seqNum == ctxSeqNum)
+ /* For late joins or middlewares that support a publish cache, it is possible that you will get old updates
+ in this case take no action */
+ if (DQ_SCHEME_INGORE_DUPS==mamaTransportImpl_getDqStrategyScheme(tport))
+ {
+ if ((seqNum <= ctxSeqNum) && ((ctxDqState != DQ_STATE_WAITING_FOR_RECAP) ||
+ (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)))
+ {
+ ctx->mDoNotForward = 1;
+ return MAMA_STATUS_OK;
+ }
+ }
+
+ if ((seqNum == ctxSeqNum) && (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT))
{
/* Duplicate data - set DQQuality to DUPLICATE, invoke quality callback */
ctx->mDQState = DQ_STATE_DUPLICATE;
@@ -290,6 +320,13 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
return MAMA_STATUS_OK;
}

+ if (ctxDqState == DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)
+ {
+ ctx->mDoNotForward = 1;
+ return MAMA_STATUS_OK;
+ }
+ else
+ {
/* If we get here, we missed a sequence number. */
if ((PRE_INITIAL_SCHEME_ON_GAP==mamaTransportImpl_getPreInitialScheme(tport))
&&(self->mTryToFillGap))
@@ -323,6 +360,7 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
}

handleStaleData (self, msg, ctx);
+ }
break;
case MAMA_MSG_TYPE_INITIAL :
case MAMA_MSG_TYPE_BOOK_INITIAL :
diff --git a/mama/c_cpp/src/c/dqstrategy.h b/mama/c_cpp/src/c/dqstrategy.h
index a7ff6a7..d550e95 100644
--- a/mama/c_cpp/src/c/dqstrategy.h
+++ b/mama/c_cpp/src/c/dqstrategy.h
@@ -50,7 +50,9 @@ typedef enum dqState_
* In the case of a stale initial, we do not want
* a recap because it may also be stale data.
*/
- DQ_STATE_STALE_NO_RECAP = 5
+ DQ_STATE_STALE_NO_RECAP = 5,
+
+ DQ_STATE_WAITING_FOR_RECAP_AFTER_FT = 6
} dqState;

typedef struct
@@ -64,6 +66,7 @@ typedef struct
imageRequest mRecapRequest;
mama_u64_t mSenderId;

+ uint8_t mDoNotForward;
} mamaDqContext;

typedef struct dqStrategy_* dqStrategy;
diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c
index dca5474..cbe1c90 100644
--- a/mama/c_cpp/src/c/listenermsgcallback.c
+++ b/mama/c_cpp/src/c/listenermsgcallback.c
@@ -449,7 +449,16 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
case MAMA_MSG_TYPE_QUOTE:
case MAMA_MSG_TYPE_TRADE:
mamaSubscription_checkSeqNum(subscription, msg, msgType, ctx);
- mamaSubscription_forwardMsg(subscription, msg);
+ if (!ctx->mDqContext.mDoNotForward)
+ {
+ mamaSubscription_forwardMsg(subscription, msg);
+ }
+ else
+ {
+ mamaSubscription_getSymbol (subscription, &userSymbol);
+ mama_log (MAMA_LOG_LEVEL_FINER, "Subscription for %s not forwarded"
+ " as message seqnum is before seqnum expecting", userSymbol);
+ }
break;
case MAMA_MSG_TYPE_REFRESH:
mamaSubscription_respondToRefreshMessage(subscription);
@@ -468,7 +477,16 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
break;
default:
mamaSubscription_checkSeqNum(subscription, msg, msgType, ctx);
- mamaSubscription_forwardMsg(subscription, msg);
+ if (!ctx->mDqContext.mDoNotForward)
+ {
+ mamaSubscription_forwardMsg(subscription, msg);
+ }
+ else
+ {
+ mamaSubscription_getSymbol (subscription, &userSymbol);
+ mama_log (MAMA_LOG_LEVEL_FINER, "Subscription for %s not forwarded"
+ " as message seqnum is before seqnum expecting", userSymbol);
+ }
}
}

diff --git a/mama/c_cpp/src/c/mama/subscription.h b/mama/c_cpp/src/c/mama/subscription.h
index 058a41c..9eb1f0c 100644
--- a/mama/c_cpp/src/c/mama/subscription.h
+++ b/mama/c_cpp/src/c/mama/subscription.h
@@ -115,6 +115,18 @@ typedef enum

} mamaSubscriptionState;

+typedef enum
+{
+ DQ_SCHEME_DELIVER_ALL,
+ DQ_SCHEME_INGORE_DUPS
+} dqStartegyScheme;
+
+
+typedef enum
+{
+ DQ_FT_DO_NOT_WAIT_FOR_RECAP,
+ DQ_FT_WAIT_FOR_RECAP
+}dqftStrategyScheme;
/* *************************************************** */
/* Type Defines. */
/* *************************************************** */
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 20ee0cc..0a4adcd 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -162,6 +162,9 @@ typedef struct transportImpl_

uint8_t mDisableDisconnectCb;
uint8_t mDisableRefresh;
+ dqStartegyScheme mDQStratScheme;
+ dqftStrategyScheme mFTStratScheme;
+
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -177,6 +180,8 @@ init (transportImpl* transport, int createResponder)
self->mCause = 0;
self->mPlatformInfo = NULL;
self->mPreInitialScheme = PRE_INITIAL_SCHEME_ON_GAP;
+ self->mDQStratScheme = DQ_SCHEME_DELIVER_ALL;
+ self->mFTStratScheme = DQ_FT_DO_NOT_WAIT_FOR_RECAP;


mama_log (MAMA_LOG_LEVEL_FINEST,
@@ -365,6 +370,72 @@ static void setPreInitialStrategy (mamaTransport transport)
"%s: Using default preinitial strategy: ON_GAP", self->mName);
}
}
+static void setDQStrategy (mamaTransport transport)
+{
+ const char* propValue = NULL;
+ char propNameBuf[256];
+
+ if (!self) return;
+
+ snprintf (propNameBuf, 256, "mama.transport.%s.dqstrategy", self->mName);
+
+ propValue = properties_Get(mamaInternal_getProperties(),
+ propNameBuf);
+
+ if (NULL!=propValue)
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL, "Setting %s=%s",
+ propNameBuf, propValue);
+
+ if (0==strcmp (propValue, "ignoredups"))
+ {
+ self->mDQStratScheme = DQ_SCHEME_INGORE_DUPS;
+ }
+ else
+ {
+ self->mDQStratScheme = DQ_SCHEME_DELIVER_ALL;
+ }
+ }
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL,
+ "%s: Using default dq strategy: DQ_SCHEME_DELIVER_ALL", self->mName);
+ }
+}
+
+static void setFtStrategy (mamaTransport transport)
+{
+ const char* propValue = NULL;
+ char propNameBuf[256];
+
+ if (!self) return;
+
+ snprintf (propNameBuf, 256, "mama.transport.%s.ftstrategy", self->mName);
+
+ propValue = properties_Get(mamaInternal_getProperties(),
+ propNameBuf);
+
+ if (NULL!=propValue)
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL, "Setting %s=%s",
+ propNameBuf, propValue);
+
+ if (0==strcmp (propValue, "waitforrecap"))
+ {
+ self->mFTStratScheme = DQ_FT_WAIT_FOR_RECAP;
+ }
+ else
+ {
+ self->mFTStratScheme = DQ_FT_DO_NOT_WAIT_FOR_RECAP;
+ }
+ }
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL,
+ "%s: Using default ft strategy: DQ_FT_DO_NOT_WAIT_FOR_RECAP", self->mName);
+ }
+}
+
/**
* Check property to disable refresh messages. Undocumented.
*
@@ -756,6 +827,8 @@ mamaTransport_create (mamaTransport transport,


setPreInitialStrategy ((mamaTransport)self);
+ setDQStrategy (self);
+ setFtStrategy (self);

if (mamaTransportImpl_disableDisconnectCb (name))
{
@@ -1584,6 +1657,25 @@ mamaTransportImpl_getPreInitialScheme (mamaTransport transport)
return PRE_INITIAL_SCHEME_ON_GAP;
}

+dqStartegyScheme
+mamaTransportImpl_getDqStrategyScheme (mamaTransport transport)
+{
+ if (self)
+ {
+ return self->mDQStratScheme;
+ }
+ return DQ_SCHEME_DELIVER_ALL;
+}
+
+dqftStrategyScheme
+mamaTransportImpl_getFtStrategyScheme (mamaTransport transport)
+{
+ if (self)
+ {
+ return self->mFTStratScheme;
+ }
+ return DQ_FT_DO_NOT_WAIT_FOR_RECAP;
+}
/* Process an advisory message and invokes callbacks
* on all listeners.
* @param transport The transport.
--
1.7.7.6


[PATCH 16/50] [mama] dqpublisher destroy

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Added proper destroy implementation for dqpublisher to allow applications to
clean up and shutdown properly.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqpublishermanager.c | 48 +++++++++++++++++++++++++++++++++
1 files changed, 48 insertions(+), 0 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index ecf6ced..156b586 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -285,11 +285,59 @@ mama_status mamaDQPublisherManager_create (

void mamaDQPublisherManager_destroy (mamaDQPublisherManager manager)
{
+ /* Get the impl. */
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
+ if(NULL != impl)
+ {
+ /* Destroy the publisher. */
+ if(NULL != impl->mPublisher)
+ {
+ mamaPublisher_destroy(impl->mPublisher);
+ }
+
+ /* Destroy the subscription. */
+ if(NULL != impl->mSubscription)
+ {
+ mamaSubscription_destroy(impl->mSubscription);
+ mamaSubscription_deallocate(impl->mSubscription);
+ }
+
+ /* Destroy the inbox. */
+ if(NULL != impl->mInbox)
+ {
+ mamaInbox_destroy(impl->mInbox);
+ }

+ /* Destroy the re-usable messages. */
+ if(NULL != impl->mRefreshResponseMsg)
+ {
+ mamaMsg_destroy(impl->mRefreshResponseMsg);
+ }
+ if(NULL != impl->mNoSubscribersMsg)
+ {
+ mamaMsg_destroy(impl->mNoSubscribersMsg);
+ }
+ if(NULL != impl->mSyncRequestMsg)
+ {
+ mamaMsg_destroy(impl->mSyncRequestMsg);
+ }
+
+ /* Free the namespace. */
+ if(NULL != impl->mNameSpace)
+ {
+ free(impl->mNameSpace);
+ }
+
+ /* Destroy the publisher table. */
+ if(NULL != impl->mPublisherMap)
+ {
wtable_destroy ( impl->mPublisherMap );
}

+ /* Free the impl itself. */
+ free(impl);
+ }
+}
mama_status mamaDQPublisherManager_addPublisher (
mamaDQPublisherManager manager,
const char *symbol,
--
1.7.7.6


[PATCH 15/50] [mama] dqpublisher manager bug fixes

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Only invoke the application on create callback if it is not NULL. This avoids
core dumps if the application does not provide a callback.

The manager was not maintaining the publisher map correctly. It allocated new
entries when an entry already existed, and did not remove entries properly.

Create the CM subscription after the other objects are created. Creating it
early could result in CM requests invoking callbacks before the manager fully
initializes.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqpublishermanager.c | 28 ++++++++++++----------------
1 files changed, 12 insertions(+), 16 deletions(-)

diff --git a/mama/c_cpp/src/c/dqpublishermanager.c b/mama/c_cpp/src/c/dqpublishermanager.c
index 0ee62de..ecf6ced 100644
--- a/mama/c_cpp/src/c/dqpublishermanager.c
+++ b/mama/c_cpp/src/c/dqpublishermanager.c
@@ -114,7 +114,9 @@ dqPublisherImplCreateCb (mamaSubscription subsc, void* closure)
{
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) (closure);

- impl->mUserCallbacks.onCreate ((mamaDQPublisherManager)impl);
+ if(NULL != impl)
+ if(NULL != impl->mUserCallbacks.onCreate)
+ impl->mUserCallbacks.onCreate ((mamaDQPublisherManager)impl);
}


@@ -263,8 +265,14 @@ mama_status mamaDQPublisherManager_create (
impl->mNameSpace = strdup(sourcename);
strcat(topic, ".");
strcat(topic, sourcename);
+ impl->mPublisherMap = wtable_create (topic, NUM_BUCKETS);

mamaSubscription_allocate (&impl->mSubscription);
+ mamaPublisher_create (&impl->mPublisher,
+ transport,
+ MAMA_CM_TOPIC,
+ NULL,
+ NULL);
mamaSubscription_createBasic (impl->mSubscription,
transport,
queue,
@@ -272,14 +280,6 @@ mama_status mamaDQPublisherManager_create (
topic,
impl);

- mamaPublisher_create (&impl->mPublisher,
- transport,
- MAMA_CM_TOPIC,
- NULL,
- NULL);
-
- impl->mPublisherMap = wtable_create (topic, NUM_BUCKETS);
-
return MAMA_STATUS_OK;
}

@@ -298,15 +298,10 @@ mama_status mamaDQPublisherManager_addPublisher (
{
mamaPublishTopic* newTopic = NULL;
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
-
- if (wtable_lookup (impl->mPublisherMap , ( char* )symbol))
- return (MAMA_STATUS_INVALID_ARG);
-
-
newTopic = (mamaPublishTopic*) wtable_lookup (impl->mPublisherMap, (char*)symbol);


- if (newTopic)
+ if (!newTopic)
{
newTopic = (mamaPublishTopic*) calloc (1, sizeof (mamaPublishTopic));

@@ -408,10 +403,11 @@ mama_status mamaDQPublisherManager_destroyPublisher (
mamaDQPublisherManagerImpl* impl = (mamaDQPublisherManagerImpl*) manager;
mamaPublishTopic* newTopic = NULL;

- if ((newTopic = wtable_lookup (impl->mPublisherMap , ( char* )symbol)))
+ if (!(newTopic = wtable_lookup (impl->mPublisherMap , ( char* )symbol)))
return (MAMA_STATUS_INVALID_ARG);

mamaDQPublisher_destroy(newTopic->pub);
+ wtable_remove (impl->mPublisherMap, symbol);

free ((void*)newTopic->symbol);
free ((void*)newTopic);
--
1.7.7.6

2161 - 2180 of 2305