Date   

Re: QPID Proton bridge: multiple subscribers, one publisher

macrux
 

Hi Frank,

Just to solve a similar question, is it possible to have the opposite behaviour in which multiple applications (separated instances) publish data into the same topic (or source), and only one subscriber receives all that data?

Thanks in advance,


On 21 July 2015 at 13:27, Frank Quinn <fquinn.ni@...> wrote:

Hi Nestor,

If you modify each new subscriber to use a different incoming URL and reply URL, you can actually do this today, though as you can imagine it becomes a bit of a brute to configure very quickly.

The better solution though is probably the currently experimental feature-qpid-broker branch that I'm hoping will make it into OpenMAMA 2.3.4. It allows for a much easier to manage pub / sub infrastructure where in terms of transport configuration, all you do is point it to the broker - there's an example mama.properties on that branch to give you an idea: http://git.openmama.org/?p=OpenMAMA.git;a=blob;f=mama/c_cpp/src/examples/mama.properties;h=86c7b1ce3933a10b0022b89837094bdb5904b974;hb=feature-qpid-broker

Cheers,
Frank


On Tue, 21 Jul 2015 16:23 Macrux <kmacrux@...> wrote:
Hi guys,

I'd like to know if there was or there will be some change to the Qpid Proton Bridge such that I can have a publish/subscribe connection in which only one advanced publisher can have multiple subscribers, because according to the wiki, "the current QPID Proton bridge is a point-to-point implementation" and I've had to run multiple publishers (always the same) each one with a different transport, and to subscribe each client with only one publisher.

Maybe, if someone knows some technique to achieve this behavior, it would be very helpful for me.

Thanks in advance.

Kind regards,


Nestor.
_______________________________________________
Openmama-dev mailing list
Openmama-dev@...
https://lists.openmama.org/mailman/listinfo/openmama-dev


SNAPSHOTS Messages

eduardo noe rodriguez franco <enoerodriguezf@...>
 

Hi everyone,

Could you help me with some questions, please?

1.- If one client or broker house made the connection when the market had been working for a short while, I mean, the client made the connection lately to the open of the market. can the client ask for a SNAPSHOT or summary of the operations? I need to build one instance to attend that kind of requests.

2.- In a multicast connection, when the client ask for a re-transmission or recap, that is point to point, but how its works? I mean, what i need to do to attend that kind of request.

Thanks in advance, and I am glad to be part of your group.

Regards

Eduardo Rodriguez


Re: Dropping support for Qpid Proton <= 0.6

Damian Maguire <d.maguire@...>
 

No objections here, removing support for the older versions makes sense and should help reduce the overhead of supporting the middleware. I also think we should be pretty aggressive in deprecating support in the future, especially for versions which are known to have major issues.

Cheers, 

D

Damian Maguire
Senior Sales Engineer

SR.LABS Proven High Speed Electronic Trading Solutions

Adelaide Exchange | 24-26 Adelaide Street | Belfast | UK |BT2 8GD

d.maguire@...  

+44 7835 844770



From: <openmama-dev-bounces@...> on behalf of Frank Quinn
Reply-To: "fquinn.ni@..."
Date: Tuesday, July 28, 2015 at 9:45 AM
To: openmama-dev
Subject: Re: [Openmama-dev] Dropping support for Qpid Proton <= 0.6

Hi Folks,

Last call on this - if I hear no objections within a week, I'll be removing 0.5 and 0.6 support from CI and removing the backwards compatibility macros from the code.

Cheers,
Frank

On Wed, Jul 22, 2015 at 9:28 AM, Frank Quinn <fquinn.ni@...> wrote:
Hi Folks,

I am planning on dropping support for Qpid Proton 0.5 and 0.6 in the next OpenMAMA release and instead sticking to versions 0.7+. Those guys move quickly and make small changes to interfaces and header names fairly regularly so keeping up with new releases without breaking older versions is fairly tricky, and something I'd like to at least limit.

Also, they had some compiler warnings in one of their headers when our strict parsing was enabled in 0.5 that wasn't fixed until 0.7 (see https://issues.apache.org/jira/browse/PROTON-420), so dropping support for all versions prior to 0.7 will mean we can turn -Werror back on without breaking older builds.

If anyone has any objections to this, please respond.

Cheers,
Frank


Re: Dropping support for Qpid Proton <= 0.6

Frank Quinn <fquinn.ni@...>
 

Hi Folks,

Last call on this - if I hear no objections within a week, I'll be removing 0.5 and 0.6 support from CI and removing the backwards compatibility macros from the code.

Cheers,
Frank

On Wed, Jul 22, 2015 at 9:28 AM, Frank Quinn <fquinn.ni@...> wrote:
Hi Folks,

I am planning on dropping support for Qpid Proton 0.5 and 0.6 in the next OpenMAMA release and instead sticking to versions 0.7+. Those guys move quickly and make small changes to interfaces and header names fairly regularly so keeping up with new releases without breaking older versions is fairly tricky, and something I'd like to at least limit.

Also, they had some compiler warnings in one of their headers when our strict parsing was enabled in 0.5 that wasn't fixed until 0.7 (see https://issues.apache.org/jira/browse/PROTON-420), so dropping support for all versions prior to 0.7 will mean we can turn -Werror back on without breaking older builds.

If anyone has any objections to this, please respond.

Cheers,
Frank


Re: OpenMAMA to OpenMAMDA Proxy App

Frank Quinn <f.quinn@...>
 

Hi Nestor,

 

Order book instruments are typically prefixed with a lower case ‘b’, for an example, see the bookticker example in http://www.openmama.org/blog/getting-started-part-2

 

Cheers,

Frank

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Macrux
Sent: 27 July 2015 16:22
To: openmama-dev@...
Subject: [Openmama-dev] OpenMAMA to OpenMAMDA Proxy App

 

Hello everybody,

I'm trying to figure this: I want to built an OpenMAMA proxy application, similar to MamaProxy, which listen to a source for data and then publish that data in an  topic again, but the difference is that I want to publish that data as OpenMAMDA messages, I mean, as an orderbook, just as the MamdaBookPublisher example, but I haven't been able to build the orderbook from the OpenMAMA data.

Let me to explain my setup:

I run the capture replay as in the example:

capturereplayc -S TEST -m qpid -tport pub  -dictionary data/data.dict -f data/openmama_utpcasheuro_capture.5000.10.qpid.mplay -r

Then, I run an application (similar to the MamaProxy listener) to listen to:

-S TEST -s PTBIZJYE0064.EUR.ENXL -tport sub -m qpid

And I have been trying to extract the information to build the book, but I can't find the data because I don't know the fields to extract from or the way to do it, the only thing I've recognized are the two first prices, times and sizes from each side. I'm working with the Java API.

I apologize if it's a silly question, but I'm stuck on this problem.

Thanks in advance,


Néstor.


OpenMAMA to OpenMAMDA Proxy App

macrux
 

Hello everybody,

I'm trying to figure this: I want to built an OpenMAMA proxy application, similar to MamaProxy, which listen to a source for data and then publish that data in an  topic again, but the difference is that I want to publish that data as OpenMAMDA messages, I mean, as an orderbook, just as the MamdaBookPublisher example, but I haven't been able to build the orderbook from the OpenMAMA data.

Let me to explain my setup:

I run the capture replay as in the example:

capturereplayc -S TEST -m qpid -tport pub  -dictionary data/data.dict -f data/openmama_utpcasheuro_capture.5000.10.qpid.mplay -r

Then, I run an application (similar to the MamaProxy listener) to listen to:

-S TEST -s PTBIZJYE0064.EUR.ENXL -tport sub -m qpid

And I have been trying to extract the information to build the book, but I can't find the data because I don't know the fields to extract from or the way to do it, the only thing I've recognized are the two first prices, times and sizes from each side. I'm working with the Java API.

I apologize if it's a silly question, but I'm stuck on this problem.

Thanks in advance,


Néstor.


[PATCH 14/14] AVIS: Fixed corruption bug in realloc

Frank Quinn <fquinn.ni@...>
 

There was a realloc bug in AVIS when serializing / deserializing
that should be fixed with this patch. Also added a getByteBuffer
implementation.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/payload/avismsg/avispayload.c | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index 20063f5..82aca1a 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -406,14 +406,16 @@ avismsgPayload_serialize     (const msgPayload    msg,
                 {
                     void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
                     impl->mBuffer = vp;
-                    buffPos=&impl->mBuffer;
+                    buffPos=impl->mBuffer;
                     buffPos+=currLen;
                     impl->mBufferLen+=200;
                 }
                 *(int8_t *)(buffPos) = 2;   buffPos+=1;     currLen+=1;
                 *(int16_t *)(buffPos) = len; buffPos+=2; currLen+=2;
                 memcpy (buffPos, currField->mName, len); buffPos+=len; currLen+=len;
-                *(int64_t *)(buffPos) = currField->mValue->value.int64; buffPos+=sizeof(int64_t); currLen+=sizeof(int64_t);
+                *(int64_t *)(buffPos) = (int64_t)currField->mValue->value.int64;
+                buffPos+=sizeof(int64_t);
+                currLen+=sizeof(int64_t);
                 break;
             case TYPE_REAL64:
                 len=+ strlen(currField->mName);;
@@ -421,7 +423,7 @@ avismsgPayload_serialize     (const msgPayload    msg,
                 {
                     void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
                     impl->mBuffer = vp;
-                    buffPos=&impl->mBuffer;
+                    buffPos=impl->mBuffer;
                     buffPos+=currLen;
                     impl->mBufferLen+=200;
                 }
@@ -436,11 +438,11 @@ avismsgPayload_serialize     (const msgPayload    msg,
                 {
                     void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
                     impl->mBuffer = vp;
-                    buffPos=&impl->mBuffer;
+                    buffPos=impl->mBuffer;
                     buffPos+=currLen;
                     impl->mBufferLen+=200;
                 }
-                *(int8_t *)(buffPos) = 4;   buffPos+=1;     currLen+=1;
+                *(int8_t *)(buffPos) = TYPE_STRING;   buffPos+=1;     currLen+=1;
                 *(int16_t *)(buffPos) = len; buffPos+=2; currLen+=2;
                 memcpy (buffPos, currField->mName, len); buffPos+=len; currLen+=len;
                 *(int16_t *)(buffPos) = strlen(currField->mValue->value.str); buffPos+=2; currLen+=2;
@@ -503,7 +505,7 @@ avismsgPayload_getByteBuffer     (const msgPayload    msg,
     CHECK_NULL(bufferLength);
 
     *buffer = impl->mAvisMsg;
-
+    *bufferLength = sizeof(impl->mAvisMsg);
 
     return MAMA_STATUS_OK;
 }
@@ -2034,6 +2036,7 @@ avismsgPayloadIter_get          (msgPayloadIter  iter,
         return NULL;
     }
 
+    /* If this is a special meta field, do not consider during iteration */
     if ((strcmp(SUBJECT_FIELD_NAME, avisField(field)->mName) == 0) ||
         (strcmp(INBOX_FIELD_NAME, avisField(field)->mName)== 0))
             return (avismsgPayloadIter_next(iter,field,msg));
--
2.4.3


[PATCH 13/14] UNITTEST: Remove presumptuous isTportDisconnected test

Frank Quinn <fquinn.ni@...>
 

I don't think it's reasonable to expect a unit test around OpenMAMA
to support this function call as the nature of transport connectivity
will vary across all middlewares. In the instance of Avis, it is
never really in a disconnected state and there are doubtless other
middleware implementations that are not connection oriented at all
and for which this test doesn't really make any sense, so I'm
taking it out.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 .../c/middleware/middlewareSubscriptionTests.cpp       | 18 ------------------
 1 file changed, 18 deletions(-)

diff --git a/mama/c_cpp/src/gunittest/c/middleware/middlewareSubscriptionTests.cpp b/mama/c_cpp/src/gunittest/c/middleware/middlewareSubscriptionTests.cpp
index 6f19c2e..e6eccbf 100644
--- a/mama/c_cpp/src/gunittest/c/middleware/middlewareSubscriptionTests.cpp
+++ b/mama/c_cpp/src/gunittest/c/middleware/middlewareSubscriptionTests.cpp
@@ -424,24 +424,6 @@ TEST_F (MiddlewareSubscriptionTests, getPlatformErrorInvalidSubBridge)
                status);
 }
 
-TEST_F (MiddlewareSubscriptionTests, isTportDisconnected)
-{
-    int res = NULL;
-    ASSERT_EQ(MAMA_STATUS_OK,
-              mamaSubscription_create(parent, queue, &callbacks, source, sourceName, closure));
-
-    ASSERT_EQ(MAMA_STATUS_OK,
-              mBridge->bridgeMamaSubscriptionCreate(&subscriber, sourceName, symbol,
-                                                    tport, queue, callbacks,
-                                                    parent, closure));
-
-    res=mBridge->bridgeMamaSubscriptionIsTportDisconnected(subscriber);
-    ASSERT_TRUE(res != NULL);
-
-    ASSERT_EQ(MAMA_STATUS_OK,
-              mBridge->bridgeMamaSubscriptionDestroy(subscriber));
-}
-
 TEST_F (MiddlewareSubscriptionTests, isTportDisconnectedInvalid)
 {
     ASSERT_EQ (MAMA_STATUS_NULL_ARG,
--
2.4.3


[PATCH 12/14] UNITTEST: Removed duplicateReplyHandle test

Frank Quinn <fquinn.ni@...>
 

This test doesn't really make sense without the use of an inbox
so it serves no useful function at the moment, therefore I'm taking
it out.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 .../src/gunittest/c/middleware/middlewareMsgTests.cpp | 19 -------------------
 1 file changed, 19 deletions(-)

diff --git a/mama/c_cpp/src/gunittest/c/middleware/middlewareMsgTests.cpp b/mama/c_cpp/src/gunittest/c/middleware/middlewareMsgTests.cpp
index fc12c55..8630bb1 100644
--- a/mama/c_cpp/src/gunittest/c/middleware/middlewareMsgTests.cpp
+++ b/mama/c_cpp/src/gunittest/c/middleware/middlewareMsgTests.cpp
@@ -290,25 +290,6 @@ TEST_F (MiddlewareMsgTests, getNativeHandleInvalidResult)
                mBridge->bridgeMamaMsgGetNativeHandle(msg,NULL));
 }
 
-TEST_F (MiddlewareMsgTests, duplicateReplyHandle)
-{
-    msgBridge msg        = NULL;
-    mamaMsg   parent     = NULL;
-    void*     result     = NULL;
-    int       destroyMsg = 1;
-
-    ASSERT_EQ(MAMA_STATUS_OK,mamaMsg_create(&parent));
-
-    ASSERT_EQ(MAMA_STATUS_OK,
-              mBridge->bridgeMamaMsgCreate(&msg,parent));
-
-    ASSERT_EQ(MAMA_STATUS_OK,
-              mBridge->bridgeMamaMsgDuplicateReplyHandle(msg,&result));
-
-    ASSERT_EQ(MAMA_STATUS_OK,
-              mBridge->bridgeMamaMsgDestroy(msg,destroyMsg));
-}
-
 TEST_F (MiddlewareMsgTests, duplicateReplyHandleInvalidMsgBridge)
 {
     void* result = NOT_NULL;
--
2.4.3



[PATCH 11/14] AVIS: Modified iterator to align with MAMA C++ Expectations

Frank Quinn <fquinn.ni@...>
 

MAMA expects iterators to rewind when begin is called, and
remain pointing to the first entry after the *first* time
that next is called. This change aligns with that requirement.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h |  1 +
 mama/c_cpp/src/c/payload/avismsg/avispayload.c | 26 +++++++++++++++++++++-----
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
index f83a1cb..b9b5ef2 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
@@ -55,6 +55,7 @@ typedef struct avisIterator
     AttributesIter*     mMsgIterator;
     Attributes*         mAvisMsg;
     avisFieldPayload*   mAvisField;
+    uint64_t            mIndex;
 } avisIterator;
 
 
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index e935e15..20063f5 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -2046,15 +2046,30 @@ avismsgPayloadIter_next          (msgPayloadIter  iter,
                                 msgFieldPayload field,
                                 msgPayload      msg)
 {
-    avisIterator* impl = (avisIterator*) iter;
-    if (!iter || !msg || !field) return NULL;
+    avisIterator*   impl = (avisIterator*) iter;
+    msgFieldPayload ret = NULL;
 
-    if (!attributes_iter_next(impl->mMsgIterator))
-        return NULL;
+    if (!iter || !msg) return NULL;
+    /* Only advance iterator if not first run */
+    if (impl->mIndex > 0)
+    {
+        impl->mIndex++;
+        if (!attributes_iter_next(impl->mMsgIterator))
+            return NULL;
+    }
 
-    return avismsgPayloadIter_get(iter, impl->mAvisField, msg);
+    ret = avismsgPayloadIter_get(iter, impl->mAvisField, msg);
+    if (NULL != ret)
+    {
+        impl->mIndex++;
+    }
+
+    return ret;
 }
 
+/* This isn't really supported on Avis. HasNext on avis is more like "i've
+ * just moved to next, do I have anything?"
+ */
 mama_bool_t
 avismsgPayloadIter_hasNext       (msgPayloadIter iter,
                                 msgPayload     msg)
@@ -2073,6 +2088,7 @@ avismsgPayloadIter_begin         (msgPayloadIter  iter,
     avisIterator* impl = (avisIterator*) iter;
     if (!impl) return NULL;
 
+    impl->mIndex = 0;
     attributes_iter_init(impl->mMsgIterator, impl->mAvisMsg);
     return avismsgPayloadIter_get(iter, impl->mAvisField, msg);
 }
--
2.4.3



[PATCH 10/14] UNITTEST: Fixed assumption of order in iteration

Frank Quinn <fquinn.ni@...>
 

The unit tests assumed that the order of entry into a message
field will be identical to the order of extraction while iterating.
This proved not to be the case with avis and possibly other payload
implementations so these changes will remove that assumption
without losing out on any other existing test strictness.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 .../src/gunittest/c/mamamsg/msgiterationtests.cpp  | 101 ++++++++++-----------
 .../gunittest/c/payload/payloadgeneraltests.cpp    |  24 +++--
 2 files changed, 67 insertions(+), 58 deletions(-)

diff --git a/mama/c_cpp/src/gunittest/c/mamamsg/msgiterationtests.cpp b/mama/c_cpp/src/gunittest/c/mamamsg/msgiterationtests.cpp
index b8785d4..40dc60e 100644
--- a/mama/c_cpp/src/gunittest/c/mamamsg/msgiterationtests.cpp
+++ b/mama/c_cpp/src/gunittest/c/mamamsg/msgiterationtests.cpp
@@ -26,8 +26,10 @@
 #include "mama/msg.h"
 #include <gtest/gtest.h>
 #include <cstdlib>
+#include <stdlib.h>
 #include "MainUnitTestC.h"
 #include <iostream>
+#include <map>
 #include "bridge.h"
 #include "mama/types.h"
 
@@ -249,6 +251,7 @@ protected:
     mamaMsgIterator iterator;
     mamaDictionary  dict;
     mamaMsgField    field;
+    std::map<mama_fid_t, uint64_t> values;
 };
 
 MsgNewIteratorTestC::MsgNewIteratorTestC(void)
@@ -271,11 +274,14 @@ void MsgNewIteratorTestC::SetUp(void)
 
     /* add a fields to the message. */
     mamaMsg_create    (&msg);
-    mamaMsg_addU8     (msg, "u8", 101, 8);
-    mamaMsg_addString (msg, "string", 102, "This is an iteration test.");
-    mamaMsg_addU16    (msg, "u16", 103, 16);
-    mamaMsg_addU32    (msg, "u32", 104, 32);
-    mamaMsg_addU64    (msg, "u64", 105, 64);
+    for (mama_fid_t f = 101; f < 106; f++)
+    {
+        char buf[64];
+        sprintf (buf, "field_%u", f);
+        int val = rand();
+        mamaMsg_addU64 (msg, buf, f, val);
+        values.insert(std::pair<mama_fid_t, uint64_t>(f, val));
+    }
 
     /* Build the MAMA Dictionary from our test message. */
     mamaDictionary_create (&dict);
@@ -408,16 +414,15 @@ TEST_F (MsgNewIteratorTestC, IteratorAssociateNullMsg)
 TEST_F (MsgNewIteratorTestC, IteratorBegin)
 {
     mama_fid_t      fid      = 0;
-    mama_u8_t       content  = 0;
+    mama_u64_t      content  = 0;
 
     field = mamaMsgIterator_begin (iterator);
 
     mamaMsgField_getFid (field, &fid);
-    mamaMsgField_getU8  (field, &content);
+    mamaMsgField_getU64 (field, &content);
 
     /* Check the contents of the field: */
-    ASSERT_EQ (101, fid);
-    ASSERT_EQ (8, content);
+    ASSERT_EQ(values.at(fid), content);
 }
 
 /*  Description:     Check that when passed a NULL iterator, begin returns a NULL
@@ -435,45 +440,44 @@ TEST_F (MsgNewIteratorTestC, DISABLED_IteratorBeginNullIter)
 
 /*  Description:     Call begin, check that the first field is returned, then
  *                   call next, and check that the first field is returned again,
- *                   then call next again, and ensure that the second field is
- *                   returned.
- *
- *  Expected Result: First  Check: fid - 101, value - 8
- *                   Second Check: fid - 101, value - 8
- *                   Third  Check: fid - 102, value - "This is an iteration test."
+ *                   then call next again, until all fields are confirmed.
  */
 TEST_F (MsgNewIteratorTestC, IteratorBeginNext)
 {
     mama_fid_t      fid      = 0;
-    mama_u8_t       content  = 0;
-    const char*     strContent;
+    mama_u64_t      content  = 0;
 
     field = mamaMsgIterator_begin (iterator);
 
     mamaMsgField_getFid (field, &fid);
-    mamaMsgField_getU8 (field, &content);
+    mamaMsgField_getU64 (field, &content);
 
-    /* Check the contents of the field: */
-    ASSERT_EQ (101, fid);
-    ASSERT_EQ (8, content);
+    /* Check the contents of the field match legal entry */
+    ASSERT_EQ(values.at(fid), content);
 
     field = mamaMsgIterator_next (iterator);
 
     mamaMsgField_getFid (field, &fid);
-    mamaMsgField_getU8 (field, &content);
 
     /* Ensure we return the first field again: */
-    ASSERT_EQ (101, fid);
-    ASSERT_EQ (8, content);
+    ASSERT_EQ(values[fid], content);
 
-    field = mamaMsgIterator_next (iterator);
+    /* Remove from reference map */
+    values.erase(fid);
 
-    mamaMsgField_getFid      (field, &fid);
-    mamaMsgField_getString   (field, &strContent);
+    /* For the 4 remaining fields, check contents */
+    for (int i = 0; i < 4; i++)
+    {
+        field = mamaMsgIterator_next (iterator);
+        mamaMsgField_getFid (field, &fid);
+        mamaMsgField_getU64 (field, &content);
+        ASSERT_EQ(values[fid], content);
+        /* Remove from reference map */
+        values.erase(fid);
+    }
 
-    /* Ensure we return the first field again: */
-    ASSERT_EQ (102, fid);
-    ASSERT_STREQ ("This is an iteration test.", strContent);
+    /* Gotta catch 'em all */
+    EXPECT_EQ(0, values.size());
 }
 
 /*  Description:     Step into the message, determine if it has a next value,
@@ -484,24 +488,21 @@ TEST_F (MsgNewIteratorTestC, IteratorBeginNext)
 TEST_F (MsgNewIteratorTestC, IteratorHasNext)
 {
     mama_fid_t      fid      = 0;
-    mama_u8_t       content  = 0;
+    mama_u64_t      content  = 0;
     mama_bool_t     hasNext  = 0;
-    const char*     strContent;
 
     field = mamaMsgIterator_begin (iterator);
 
     mamaMsgField_getFid (field, &fid);
-    mamaMsgField_getU8 (field, &content);
+    mamaMsgField_getU64 (field, &content);
 
     /* Check the contents of the field: */
-    ASSERT_EQ (101, fid);
-    ASSERT_EQ (8, content);
+    ASSERT_EQ(values[fid], content);
 
     field = mamaMsgIterator_next (iterator);
 
     /* Ensure we return the first field again: */
-    ASSERT_EQ (101, fid);
-    ASSERT_EQ (8, content);
+    ASSERT_EQ(values[fid], content);
 
     /* Check if we have a next value: */
     hasNext = mamaMsgIterator_hasNext (iterator);
@@ -510,12 +511,11 @@ TEST_F (MsgNewIteratorTestC, IteratorHasNext)
 
     /* Get the next value */
     field = mamaMsgIterator_next (iterator);
-    mamaMsgField_getFid      (field, &fid);
-    mamaMsgField_getString   (field, &strContent);
+    mamaMsgField_getFid (field, &fid);
+    mamaMsgField_getU64 (field, &content);
 
     /* Ensure we return the first field again: */
-    ASSERT_EQ (102, fid);
-    ASSERT_STREQ ("This is an iteration test.", strContent);
+    ASSERT_EQ(values[fid], content);
 }
 
 /*  Description:     Step to the end of the message, check if it has a next,
@@ -526,23 +526,20 @@ TEST_F (MsgNewIteratorTestC, IteratorHasNext)
 TEST_F (MsgNewIteratorTestC, IteratorHasNoNext)
 {
     mama_fid_t      fid      = 0;
-    mama_u8_t       content  = 0;
-    mama_bool_t     hasNext  = 0;
+    mama_u64_t      content  = 0;
 
     field = mamaMsgIterator_begin (iterator);
 
     mamaMsgField_getFid (field, &fid);
-    mamaMsgField_getU8 (field, &content);
+    mamaMsgField_getU64 (field, &content);
 
     /* Check the contents of the field: */
-    ASSERT_EQ (101, fid);
-    ASSERT_EQ (8, content);
+    ASSERT_EQ(values[fid], content);
 
     field = mamaMsgIterator_next (iterator);
 
     /* Ensure we return the first field again: */
-    ASSERT_EQ (101, fid);
-    ASSERT_EQ (8, content);
+    ASSERT_EQ(values[fid], content);
 
     /* Move to the last message: */
     field = mamaMsgIterator_next (iterator);
@@ -550,10 +547,12 @@ TEST_F (MsgNewIteratorTestC, IteratorHasNoNext)
     field = mamaMsgIterator_next (iterator);
     field = mamaMsgIterator_next (iterator);
 
-    /* Check if we have a next value: */
-    hasNext = mamaMsgIterator_hasNext (iterator);
+    /* Move past last message - should not crash */
+    field = mamaMsgIterator_next (iterator);
+    ASSERT_EQ(NULL, field);
 
-    ASSERT_EQ (0, hasNext);
+    /* Check if we have a next value: */
+    ASSERT_EQ (0, mamaMsgIterator_hasNext (iterator));
 }
 
 /*  Description:     Attempt to check hasNext for a NULL iterator.
diff --git a/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp b/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
index 199e2ad..7b9eca8 100644
--- a/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
+++ b/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
@@ -1891,28 +1891,38 @@ TEST_F(PayloadGeneralTests, IterAssociateValid)
 
 TEST_F(PayloadGeneralTests, IterAssociateTwiceValid)
 {
-    msgPayload          testPayload = NULL;
-    msgPayload          testPayload2 = NULL;
-    msgPayloadIter      testIter = NULL;
-    msgFieldPayload     testField = NULL;
-    mama_fid_t          test_fid = 0;
+    msgPayload           testPayload = NULL;
+    msgPayload           testPayload2 = NULL;
+    msgPayloadIter       testIter = NULL;
+    msgFieldPayload      testField = NULL;
+    mama_fid_t           test_fid = 0;
+    std::set<mama_fid_t> pl1_fids;
+    std::set<mama_fid_t> pl2_fids;
 
     result = aBridge->msgPayloadCreate(&testPayload);
     EXPECT_EQ (MAMA_STATUS_OK, result);
 
     // Create 2 payloads
     aBridge->msgPayloadAddString (testPayload, "name2", 102, "Unit");
+    pl1_fids.insert(102);
     aBridge->msgPayloadAddString (testPayload, "name3", 103, "Testing");
+    pl1_fids.insert(103);
     aBridge->msgPayloadAddString (testPayload, "name4", 104, "Is");
+    pl1_fids.insert(104);
     aBridge->msgPayloadAddString (testPayload, "name5", 105, "Fun");
+    pl1_fids.insert(105);
 
     result = aBridge->msgPayloadCreate(&testPayload2);
     EXPECT_EQ (MAMA_STATUS_OK, result);
 
     aBridge->msgPayloadAddString (testPayload2, "name2", 202, "Repeating");
+    pl2_fids.insert(202);
     aBridge->msgPayloadAddString (testPayload2, "name3", 203, "Things");
+    pl2_fids.insert(203);
     aBridge->msgPayloadAddString (testPayload2, "name4", 204, "Is");
+    pl2_fids.insert(204);
     aBridge->msgPayloadAddString (testPayload2, "name5", 205, "Great");
+    pl2_fids.insert(205);
 
     // Create iterator
     result = aBridge->msgPayloadIterCreate(&testIter, testPayload);
@@ -1931,7 +1941,7 @@ TEST_F(PayloadGeneralTests, IterAssociateTwiceValid)
     testField = aBridge->msgPayloadIterNext(testIter,testField,testPayload);
 
     result = aBridge->msgFieldPayloadGetFid(testField,NULL,NULL, &test_fid);
-    EXPECT_EQ (103, test_fid);
+    EXPECT_NE (pl1_fids.find(test_fid), pl1_fids.end());
 
     // reuse iterator with new payload
     result = aBridge->msgPayloadIterAssociate(testIter, testPayload2);
@@ -1946,7 +1956,7 @@ TEST_F(PayloadGeneralTests, IterAssociateTwiceValid)
     testField = aBridge->msgPayloadIterNext(testIter,testField,testPayload2);
 
     result = aBridge->msgFieldPayloadGetFid(testField,NULL,NULL, &test_fid);
-    EXPECT_EQ (203, test_fid);
+    EXPECT_NE (pl2_fids.find(test_fid), pl2_fids.end());
 
     result = aBridge->msgPayloadIterDestroy(testIter);
     EXPECT_EQ (MAMA_STATUS_OK, result);
--
2.4.3


[PATCH 09/14] UNITTEST: Fixed msgGetDateTimeMSecInValidName to behave correctly

Frank Quinn <fquinn.ni@...>
 

This unit test was testing for the wrong thing. Really all it
should have been testing was that the methods work OK when a NULL
name is provided, but a fid is non-NULL.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp b/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
index 408dc02..cc8ff50 100644
--- a/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
+++ b/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
@@ -697,8 +697,8 @@ TEST_F (MsgGeneralTestsC, DISABLED_msgGetDateTimeMSecInValidMsg)
 
 TEST_F (MsgGeneralTestsC, msgGetDateTimeMSecInValidName)
 {
-    mama_fid_t           fid          = 0;
-    mama_u64_t*          milliseconds = 0;
+    mama_fid_t           fid          = 102;
+    mama_u64_t           milliseconds = 0;
     mamaDateTime         dateTime     = NULL;
     mamaDateTime         m_out        = NULL;
 
@@ -707,10 +707,11 @@ TEST_F (MsgGeneralTestsC, msgGetDateTimeMSecInValidName)
     mamaDateTime_create(&m_out);
     mamaDateTime_setToNow(dateTime);
    
-    mamaMsg_addDateTime(mMsg, NULL, 102, dateTime);
-    mamaMsg_getDateTime(mMsg, NULL, 102, m_out);
+    mamaMsg_addDateTime(mMsg, NULL, fid, dateTime);
+    mamaMsg_getDateTime(mMsg, NULL, fid, m_out);
 
-    ASSERT_EQ (mamaMsg_getDateTimeMSec(mMsg, NULL, fid, milliseconds), MAMA_STATUS_INVALID_ARG);
+    /* Invalid name is ok, as long as fid is specified */
+    ASSERT_EQ (MAMA_STATUS_OK, mamaMsg_getDateTimeMSec(mMsg, NULL, fid, &milliseconds));
 }
 
 TEST_F (MsgGeneralTestsC, msgGetDateTimeMSecInValidFid)
--
2.4.3


[PATCH 08/14] AVIS: Added opaque data type serialization and deserialization

Frank Quinn <fquinn.ni@...>
 

There was already support for opaque data types but they could
not be serialized or deserialized. This patch should remedy
this.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/payload/avismsg/avispayload.c | 53 ++++++++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index 0089fdc..e935e15 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -306,7 +306,27 @@ avismsgPayload_unSerialize (const msgPayload    msg,
                 avisMsg_setString(impl->mAvisMsg, tempName, 0, impl->mStringBuffer);
                 break;
             case TYPE_OPAQUE:
+            {
+                uint64_t dataSize = 0;
+                /* Pull out the field id length */
+                len=*(int16_t *)(buffPos);
+                buffPos+=sizeof(int16_t);
+                currLen+=sizeof(int16_t);
+                /* Pull out the field id and NULL terminate */
+                memcpy (tempName, buffPos, len);
+                buffPos+=len;
+                currLen+=len;
+                tempName[len]='\0';
+                /* Pull out number of bytes in opaque data */
+                dataSize=*(uint64_t*)(buffPos);
+                buffPos+=sizeof(uint64_t);
+                currLen+=sizeof(uint64_t);
+                /* Use raw data directly */
+                avisMsg_setOpaque(impl->mAvisMsg, tempName, 0, buffPos, dataSize);
+                buffPos+=dataSize;
+                currLen+=dataSize;
                 break;
+            }
         }
     }
 
@@ -428,6 +448,39 @@ avismsgPayload_serialize     (const msgPayload    msg,
                 buffPos+=strlen(currField->mValue->value.str); currLen+=strlen(currField->mValue->value.str);
                 break;
             case TYPE_OPAQUE:
+                len=+ strlen(currField->mName);
+                /* Current length + 1 for type, 2 for name size + length of name
+                 * + 8 bytes for data size + data size */
+                if (impl->mBufferLen < currLen+3+len+8+currField->mValue->value.bytes.item_count)
+                {
+                    void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
+                    impl->mBuffer = vp;
+                    buffPos=impl->mBuffer;
+                    buffPos+=currLen;
+                    impl->mBufferLen+=200;
+                }
+                /* Copy data type */
+                *(int8_t *)(buffPos) = 5;
+                buffPos+=1;
+                currLen+=1;
+                /* Copy field id length */
+                *(int16_t *)(buffPos) = len;
+                buffPos+=2;
+                currLen+=2;
+                /* Copy field id itself */
+                memcpy (buffPos, currField->mName, len);
+                buffPos+=len;
+                currLen+=len;
+                /* Copy number of bytes in data */
+                *(uint64_t *)(buffPos) = (uint64_t)currField->mValue->value.bytes.item_count;
+                buffPos+=sizeof(uint64_t);
+                currLen+=sizeof(uint64_t);
+                /* Copy the opaque data itself */
+                memcpy (buffPos,
+                        currField->mValue->value.bytes.items,
+                        currField->mValue->value.bytes.item_count);
+                buffPos+=currField->mValue->value.bytes.item_count;
+                currLen+=currField->mValue->value.bytes.item_count;
                 break;
 
         }
--
2.4.3


[PATCH 07/14] UNITTEST: Added some working getSendSubject unit tests

Frank Quinn <fquinn.ni@...>
 

The previous tests pretty much assumed it wasn't implemented in the
bridge. This fix adds some basic testing around this functionality
for self-describing messages.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 .../src/gunittest/c/mamamsg/msggeneraltests.cpp    | 10 ++++++---
 .../gunittest/c/payload/payloadgeneraltests.cpp    | 25 ++++++++++++++++++----
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp b/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
index a7bc7a9..408dc02 100644
--- a/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
+++ b/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
@@ -531,18 +531,22 @@ TEST_F (MsgGeneralTestsC, DISABLED_msgImplSetDqStrategyContextInValidDq)
 */
 TEST_F (MsgGeneralTestsC, msgGetSendSubjectValid)
 {
-    const char*       subject    = "";
+    const char*       subjectIn  = "subject";
+    const char*       subjectOut = NULL;
     mama_status       status;
   
     //add fields to msg
     mamaMsg_addString (mMsg, "name0", 101, "test0");
     mamaMsg_addString (mMsg, "name1", 102, "test1");
-   
-    status = mamaMsg_getSendSubject(mMsg, &subject);
+
+    mamaMsgImpl_setBridgeImpl (mMsg, mMiddlewareBridge);
+    mamaMsgImpl_setSubscInfo (mMsg, NULL, NULL, subjectIn, 1);
+    status = mamaMsg_getSendSubject(mMsg, &subjectOut);
 
     CHECK_NON_IMPLEMENTED_OPTIONAL(status);
 
     ASSERT_EQ (status, MAMA_STATUS_OK);
+    EXPECT_STREQ(subjectIn, subjectOut);
 }
 
 TEST_F (MsgGeneralTestsC, msgGetSendSubjectInValidMsg)
diff --git a/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp b/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
index bf67cb0..199e2ad 100644
--- a/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
+++ b/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
@@ -40,11 +40,13 @@ protected:
     virtual void TearDown(void);
 
     mamaPayloadBridge   aBridge;
+    mamaBridge          mMiddlewareBridge;
     mama_status         result;
 };
 
 PayloadGeneralTests::PayloadGeneralTests(void)
     : aBridge (NULL)
+    , mMiddlewareBridge (NULL)
     , result (MAMA_STATUS_OK)
 {
 }
@@ -56,6 +58,7 @@ PayloadGeneralTests::~PayloadGeneralTests(void)
 void PayloadGeneralTests::SetUp(void)
 {
     mama_loadPayloadBridge (&aBridge,getPayload());
+    mama_loadBridge (&mMiddlewareBridge, getMiddleware());
 }
 
 void PayloadGeneralTests::TearDown(void)
@@ -534,12 +537,23 @@ TEST_F(PayloadGeneralTests, GetNumberFieldsInValidNumFields)
 TEST_F(PayloadGeneralTests, GetSendSubjectValid)
 {
     msgPayload          testPayload = NULL;
+    const char*         subjectOut  = NULL;
+    const char*         subjectIn   = "testsubj";
+    msgBridge           bridgeMsg   = NULL;
+    mamaMsg             parentMsg   = NULL;
 
-    result = aBridge->msgPayloadCreate(&testPayload);
+    //result = aBridge->msgPayloadCreate(&testPayload);
+    mamaMsg_createForPayloadBridge(&parentMsg, aBridge);
+    mamaMsgImpl_getPayload(parentMsg, &testPayload);
     EXPECT_EQ (MAMA_STATUS_OK, result);
 
-    result = aBridge->msgPayloadGetSendSubject(testPayload, NULL);
-    EXPECT_EQ (MAMA_STATUS_NOT_IMPLEMENTED, result);
+    mamaMsgImpl_setBridgeImpl (parentMsg, mMiddlewareBridge);
+    mamaMsgImpl_setSubscInfo (parentMsg, NULL, NULL, subjectIn, 1);
+
+    result = aBridge->msgPayloadGetSendSubject(testPayload, &subjectOut);
+    CHECK_NON_IMPLEMENTED_OPTIONAL(result);
+
+    EXPECT_EQ (MAMA_STATUS_OK, result);
 }
 
 
@@ -562,7 +576,10 @@ TEST_F(PayloadGeneralTests, GetSendSubjectInValidSubject)
     EXPECT_EQ (MAMA_STATUS_OK, result);
 
     result = aBridge->msgPayloadGetSendSubject(testPayload, NULL);
-    EXPECT_EQ (MAMA_STATUS_NOT_IMPLEMENTED, result);
+
+    CHECK_NON_IMPLEMENTED_OPTIONAL(result);
+
+    EXPECT_EQ (MAMA_STATUS_NULL_ARG, result);
 }
 
 TEST_F(PayloadGeneralTests, ToStringValid)
--
2.4.3


[PATCH 06/14] AVIS: Modified date time serialization to include flags

Frank Quinn <fquinn.ni@...>
 

This was done with a simple assignment. Will need revised if MAMA
ever decide to change their datetime format.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
index b179dbf..0e01966 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
@@ -973,11 +973,11 @@ mama_status avisValue_getDateTime(const Value* pValue, mamaDateTime result)
     if (!pValue) return MAMA_STATUS_NULL_ARG;
     switch (pValue->type)
     {
-        case TYPE_STRING:  mamaDateTime_setFromString (result, pValue->value.str); break;
-        case TYPE_REAL64:  mamaDateTime_setEpochTimeF64 (result, pValue->value.real64); break;
-        case TYPE_INT64:  mamaDateTime_setEpochTimeMilliseconds (result, pValue->value.int64); break;
-        default: return MAMA_STATUS_WRONG_FIELD_TYPE; break;
-}
+        case TYPE_STRING: mamaDateTime_setFromString (result, pValue->value.str); break;
+        case TYPE_REAL64: mamaDateTime_setEpochTimeF64 (result, pValue->value.real64); break;
+        case TYPE_INT64:  *result = pValue->value.int64; break;
+        default: return MAMA_STATUS_WRONG_FIELD_TYPE; break;
+    }
     return MAMA_STATUS_OK;
 }
 
@@ -988,9 +988,7 @@ avisMsg_setDateTime(
         mama_fid_t          fid,
         const mamaDateTime  value)
 {
-    uint64_t tempu64;
-    mamaDateTime_getEpochTimeMicroseconds(value, &tempu64);
-    return avisMsg_setU64(attributes, name, fid, tempu64);
+    return avisMsg_setU64(attributes, name, fid, *value);
 }
 
 mama_status
--
2.4.3


[PATCH 05/14] AVIS: Added avis implementation for muteCurrentTopic

Frank Quinn <fquinn.ni@...>
 

Like qpid, this is simply a call to the internal bridge function
to mute the subscription's topic.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/sub.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/sub.c b/mama/c_cpp/src/c/bridge/avis/sub.c
index 0418634..39f0541 100644
--- a/mama/c_cpp/src/c/bridge/avis/sub.c
+++ b/mama/c_cpp/src/c/bridge/avis/sub.c
@@ -462,7 +462,7 @@ avisBridgeMamaSubscription_setTopicClosure (subscriptionBridge subscriber,
 mama_status
 avisBridgeMamaSubscription_muteCurrentTopic (subscriptionBridge subscriber)
 {
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    return avisBridgeMamaSubscription_mute (subscriber);
 }
 
 int
--
2.4.3


[PATCH 04/14] AVIS: Added several value validation steps that unit tests require

Frank Quinn <fquinn.ni@...>
 

Several unit tests expect MAMA bridges to return standard error
codes in various scenarios. This set of updates should bring avis
into line with these expectations.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/msg.c             |  4 +++-
 mama/c_cpp/src/c/bridge/avis/sub.c             |  1 +
 mama/c_cpp/src/c/bridge/avis/transportbridge.c |  8 +++----
 mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c | 22 ++++++++++--------
 mama/c_cpp/src/c/payload/avismsg/avispayload.c | 32 +++++++++++++++++++++-----
 5 files changed, 46 insertions(+), 21 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/msg.c b/mama/c_cpp/src/c/bridge/avis/msg.c
index 5e02673..9840cf6 100644
--- a/mama/c_cpp/src/c/bridge/avis/msg.c
+++ b/mama/c_cpp/src/c/bridge/avis/msg.c
@@ -55,7 +55,7 @@ mama_status
 avisBridgeMamaMsg_create (msgBridge* msg, mamaMsg parent)
 {
     avisMsgImpl* impl;
-    if (avisMsg(msg) == NULL) return MAMA_STATUS_NULL_ARG;
+    if (avisMsg(msg) == NULL || parent == NULL) return MAMA_STATUS_NULL_ARG;
     *msg = NULL;
 
     impl = (avisMsgImpl*) calloc(1, sizeof(avisMsgImpl));
@@ -144,6 +144,7 @@ avisBridgeMamaMsg_isFromInbox (msgBridge msg)
 {
     const char* dummy;
 
+    if (NULL == msg) return -1;
     CHECK_MSG(msg);
 
     return (MAMA_STATUS_OK == avismsgPayload_getString(
@@ -213,6 +214,7 @@ mama_status
 avisBridgeMamaMsg_destroyReplyHandle (void* result)
 {
     char* replyAddr = (char*) result;
+    if (NULL == result) return MAMA_STATUS_NULL_ARG;
     free(replyAddr);
     return MAMA_STATUS_OK;
 }
diff --git a/mama/c_cpp/src/c/bridge/avis/sub.c b/mama/c_cpp/src/c/bridge/avis/sub.c
index 4a3a60d..0418634 100644
--- a/mama/c_cpp/src/c/bridge/avis/sub.c
+++ b/mama/c_cpp/src/c/bridge/avis/sub.c
@@ -446,6 +446,7 @@ mama_status
 avisBridgeMamaSubscription_getPlatformError (subscriptionBridge subscriber,
                                              void**             error)
 {
+    if (NULL == error) return MAMA_STATUS_NULL_ARG;
     CHECK_SUBSCRIBER(subscriber);
     *error = &(avisSub(subscriber)->mAvis->error);
     return MAMA_STATUS_OK;
diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.c b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
index f456f7e..3289189 100644
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
@@ -251,7 +251,7 @@ avisBridgeMamaTransport_getNumLoadBalanceAttributes (
                                           const char* name,
                                           int*        numLoadBalanceAttributes)
 {
-    if (!numLoadBalanceAttributes) return MAMA_STATUS_NULL_ARG;
+    if (!numLoadBalanceAttributes || !name) return MAMA_STATUS_NULL_ARG;
     *numLoadBalanceAttributes = 0;
     return MAMA_STATUS_OK;
 }
@@ -261,7 +261,7 @@ avisBridgeMamaTransport_getLoadBalanceSharedObjectName (
                                       const char*  name,
                                       const char** loadBalanceSharedObjectName)
 {
-    if (!loadBalanceSharedObjectName) return MAMA_STATUS_NULL_ARG;
+    if (!loadBalanceSharedObjectName || !name) return MAMA_STATUS_NULL_ARG;
     *loadBalanceSharedObjectName = NULL;
     return MAMA_STATUS_OK;
 }
@@ -271,7 +271,7 @@ avisBridgeMamaTransport_getLoadBalanceScheme (
                                     const char*    name,
                                     tportLbScheme* scheme)
 {
-    if (!scheme) return MAMA_STATUS_NULL_ARG;
+    if (!scheme || !name) return MAMA_STATUS_NULL_ARG;
     *scheme = TPORT_LB_SCHEME_STATIC;
     return MAMA_STATUS_OK;
 }
@@ -287,7 +287,7 @@ avisBridgeMamaTransport_create (transportBridge* result,
     mamaBridgeImpl*      bridgeImpl = NULL;
     const char*          url        = NULL;
 
-    if (!result || !name) return MAMA_STATUS_NULL_ARG;
+    if (!result || !name || !mamaTport) return MAMA_STATUS_NULL_ARG;
    
     transport = (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
     if (transport == NULL)
diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
index 0664c4b..b179dbf 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
@@ -952,17 +952,19 @@ avisMsg_getDateTime(
         mama_fid_t     fid,
         mamaDateTime   result)
 {
-    char tempName[64];
-    Value* pValue = NULL;
-    if(fid != 0)
-    {
-        snprintf (tempName, 63, "%d", fid);
-        pValue = attributes_get(attributes, tempName);
-    }
+    char tempName[64];
+    Value* pValue = NULL;
+    if (NULL == result)
+        return MAMA_STATUS_NULL_ARG;
+    if(fid != 0)
+    {
+        snprintf (tempName, 63, "%d", fid);
+        pValue = attributes_get(attributes, tempName);
+    }
     if ((!pValue) && (name))
-        pValue = attributes_get(attributes, name);
+        pValue = attributes_get(attributes, name);
     if (!pValue)
-      return MAMA_STATUS_NOT_FOUND;
+        return MAMA_STATUS_NOT_FOUND;
     return avisValue_getDateTime(pValue, result);
 }
 
@@ -1064,7 +1066,7 @@ mama_status avisValue_getPrice(const Value* pValue, mamaPrice result)
           return MAMA_STATUS_WRONG_FIELD_TYPE;
 
     mamaPrice_setWithHints (result, pValue->value.real64, MAMA_PRICE_HINTS_NONE);
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    return MAMA_STATUS_OK;
 }
 
 
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index f02920e..0089fdc 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -52,7 +52,7 @@
 #define CHECK_NAME(name,fid) \
         do {  \
            if ((fid == 0) && (name == 0)) return MAMA_STATUS_NULL_ARG; \
-           if ((fid == 0) && (strlen(name)== 0)) return MAMA_STATUS_INVALID_ARG; \
+           if ((fid == 0) && (strlen(name) == 0)) return MAMA_STATUS_INVALID_ARG; \
          } while(0)
 
 #define CHECK_ITER(iter) \
@@ -165,8 +165,8 @@ mama_status
 avismsgPayload_createFromByteBuffer(msgPayload* msg, mamaPayloadBridge bridge, const void* buffer, mama_size_t bufferLength)
 {
     CHECK_NULL(msg);
-    CHECK_NULL(bridge);
     CHECK_NULL(buffer);
+    if (0 == bufferLength) return MAMA_STATUS_INVALID_ARG;
    
     avisPayloadImpl* newPayload = (avisPayloadImpl*)calloc (1, sizeof(avisPayloadImpl));
 
@@ -226,12 +226,12 @@ avismsgPayload_setParent (msgPayload          msg,
 
 mama_status
 avismsgPayload_getByteSize       (const msgPayload    msg,
-                                mama_size_t*        size)
+                                  mama_size_t*        size)
 {
+    const void* buffer = NULL;
     CHECK_PAYLOAD(msg);
     CHECK_NULL(size);
-    *size = 0;
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    return avismsgPayload_serialize (msg, &buffer, size);
 }
 
 mama_status
@@ -251,6 +251,7 @@ avismsgPayload_unSerialize (const msgPayload    msg,
 
     CHECK_PAYLOAD (msg);
     CHECK_NULL(buffer);
+    CHECK_NULL(bufferLength);
 
     if (!impl->mAvisMsg)
         impl->mAvisMsg = attributes_create();
@@ -462,6 +463,8 @@ avismsgPayload_setByteBuffer     (const msgPayload    msg,
 {
     avisPayloadImpl* impl = (avisPayloadImpl*)msg;
     CHECK_PAYLOAD(msg);
+    CHECK_NULL(buffer);
+    CHECK_NULL(bufferLength);
 
     impl->mAvisMsg=(Attributes*) buffer;
 
@@ -568,6 +571,11 @@ avismsgPayload_iterateFields (const msgPayload        msg,
     mama_status status = MAMA_STATUS_OK;
     avisFieldPayload* currField = NULL;
 
+    if (!parent || !cb || !field)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
     if (!impl->mIterator)
     {
         status = avismsgPayloadIter_create((msgPayloadIter*) &impl->mIterator, msg);
@@ -604,7 +612,9 @@ avismsgPayload_getFieldAsString  (const msgPayload    msg,
                                 mama_size_t         len)
 {
     CHECK_PAYLOAD(msg);
+    CHECK_NULL(buf);
     CHECK_NAME(name, fid);
+    if (0 == len) return MAMA_STATUS_INVALID_ARG;
     return avisMsg_getFieldAsString(avisPayload(msg), name, fid, buf, len);
 }
 
@@ -857,6 +867,7 @@ avismsgPayload_addDateTime       (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(value);
     return avisMsg_setDateTime(avisPayload(msg), name, fid, value);
 }
 
@@ -868,6 +879,7 @@ avismsgPayload_addPrice          (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(value);
     return avisMsg_setPrice(avisPayload(msg), name, fid, value);
 }
 
@@ -1246,6 +1258,7 @@ avismsgPayload_updateDateTime    (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(value);
     return avisMsg_setDateTime(avisPayload(msg), name, fid, value);
 }
 
@@ -1257,6 +1270,7 @@ avismsgPayload_updatePrice       (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(value);
     return avisMsg_setPrice(avisPayload(msg), name, fid, value);
 }
 
@@ -1688,6 +1702,7 @@ avismsgPayload_getDateTime       (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getDateTime(avisPayload(msg), name, fid, result);
 }
 
@@ -1699,6 +1714,7 @@ avismsgPayload_getPrice          (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getPrice(avisPayload(msg), name, fid, result);
 }
 
@@ -1991,7 +2007,7 @@ avismsgPayloadIter_hasNext       (msgPayloadIter iter,
                                 msgPayload     msg)
 {
     avisIterator* impl = (avisIterator*) iter;
-    if (!impl) return false;
+    if (!impl || !msg) return false;
 
     return attributes_iter_has_next(impl->mMsgIterator);
 }
@@ -2485,6 +2501,7 @@ avismsgFieldPayload_getDateTime  (const msgFieldPayload   field,
                                 mamaDateTime            result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getDateTime(avisField(field)->mValue, result);
 }
 
@@ -2493,6 +2510,7 @@ avismsgFieldPayload_getPrice     (const msgFieldPayload   field,
                                 mamaPrice               result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getPrice(avisField(field)->mValue, result);
 }
 
@@ -2642,5 +2660,7 @@ avismsgFieldPayload_getAsString (
     char*         buf,
     mama_size_t   len)
 {
+    CHECK_NULL(buf);
+    CHECK_NULL(len);
     return avisValue_getFieldAsString(avisField(field)->mValue, avisField(field)->mName, 0, buf, len);
 }
--
2.4.3


[PATCH 03/14] AVIS: Pulled Queue and Timer implementation from qpid over to avis

Frank Quinn <fquinn.ni@...>
 

In future, we may want to make this code central. In the meantime,
at least making the code identical with qpid should make it easier
to maintain.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/bridge.c |   8 +-
 mama/c_cpp/src/c/bridge/avis/queue.c  | 473 ++++++++++++++++++++++++----------
 mama/c_cpp/src/c/bridge/avis/timer.c  | 346 +++++++++++++++----------
 3 files changed, 557 insertions(+), 270 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index e975474..9728046 100644
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -30,7 +30,7 @@
 #include "avisdefs.h"
 #include "transportbridge.h"
 
-timerHeap gTimerHeap;
+timerHeap gAvisTimerHeap;
 
 /*Responsible for creating the bridge impl structure*/
 void avisBridge_createImpl (mamaBridge* result)
@@ -110,14 +110,14 @@ avisBridge_open (mamaBridge bridgeImpl)
     mama_log (MAMA_LOG_LEVEL_NORMAL,
               "avisBridge_open(): Successfully created Avis queue");
 
-    if (0 != createTimerHeap (&gTimerHeap))
+    if (0 != createTimerHeap (&gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_NORMAL,
                 "avisBridge_open(): Failed to initialize timers.");
         return MAMA_STATUS_PLATFORM;
     }
 
-    if (0 != startDispatchTimerHeap (gTimerHeap))
+    if (0 != startDispatchTimerHeap (gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_NORMAL,
                 "avisBridge_open(): Failed to start timer thread.");
@@ -142,7 +142,7 @@ avisBridge_close (mamaBridge bridgeImpl)
     impl =  (mamaBridgeImpl*)bridgeImpl;
 
 
-    if (0 != destroyHeap (gTimerHeap))
+    if (0 != destroyHeap (gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_close():"
                 "Failed to destroy Avis timer heap.");
diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c
index e32845f..c0aab67 100644
--- a/mama/c_cpp/src/c/bridge/avis/queue.c
+++ b/mama/c_cpp/src/c/bridge/avis/queue.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,58 +19,131 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
+#include <wombat/queue.h>
 #include <bridge.h>
 #include "queueimpl.h"
 #include "avisbridgefunctions.h"
-#include <wombat/queue.h>
 
-#include <avis/elvin.h>
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
 
 typedef struct avisQueueBridge {
     mamaQueue          mParent;
     wombatQueue        mQueue;
-    mamaQueueEnqueueCB mEnqueueCb;
+    uint8_t            mHighWaterFired;
+    size_t             mHighWatermark;
+    size_t             mLowWatermark;
+    uint8_t            mIsDispatching;
+    mamaQueueEnqueueCB mEnqueueCallback;
     void*              mEnqueueClosure;
-    uint8_t            mIsNative;
+    wthread_mutex_t    mDispatchLock;
 } avisQueueBridge;
 
-typedef struct avisQueueClosure {
-    avisQueueBridge* mImpl;
-    mamaQueueEventCB mCb;
-    void*            mUserClosure;
-} avisQueueClosure;
 
-#define avisQueue(queue) ((avisQueueBridge*) queue)
-#define CHECK_QUEUE(queue) \
-        do {  \
-           if (avisQueue(queue) == 0) return MAMA_STATUS_NULL_ARG; \
-           if (avisQueue(queue)->mQueue == NULL) return MAMA_STATUS_NULL_ARG; \
-         } while(0)
+/*=========================================================================
+  =                              Macros                                   =
+  =========================================================================*/
+
+#define     CHECK_QUEUE(IMPL)                                                  \
+    do {                                                                       \
+        if (IMPL == NULL)              return MAMA_STATUS_NULL_ARG;            \
+        if (IMPL->mQueue == NULL)      return MAMA_STATUS_NULL_ARG;            \
+    } while(0)
+
+/* Timeout is in milliseconds */
+#define     AVIS_QUEUE_DISPATCH_TIMEOUT     500
+#define     AVIS_QUEUE_MAX_SIZE             WOMBAT_QUEUE_MAX_SIZE
+#define     AVIS_QUEUE_CHUNK_SIZE           WOMBAT_QUEUE_CHUNK_SIZE
+#define     AVIS_QUEUE_INITIAL_SIZE         WOMBAT_QUEUE_CHUNK_SIZE
+
 
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+/**
+ * This funcion is called to check the current queue size against configured
+ * watermarks to determine whether or not it should call the watermark callback
+ * functions. If it determines that it should, it invokes the relevant callback
+ * itself.
+ *
+ * @param impl The avis queue bridge implementation to check.
+ */
+static void
+avisBridgeMamaQueueImpl_checkWatermarks (avisQueueBridge* impl);
+
+
+/*=========================================================================
+  =               Public interface implementation functions               =
+  =========================================================================*/
 
 mama_status
 avisBridgeMamaQueue_create (queueBridge* queue,
                             mamaQueue    parent)
 {
-    avisQueueBridge* avisQueue = NULL;
+    /* Null initialize the queue to be created */
+    avisQueueBridge*    impl                = NULL;
+    wombatQueueStatus   underlyingStatus    = WOMBAT_QUEUE_OK;
 
-    if (queue == NULL)
+    if (queue == NULL || parent == NULL)
+    {
         return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Null initialize the queueBridge */
     *queue = NULL;
 
-    avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
-    if (avisQueue == NULL)
+    /* Allocate memory for the avis queue implementation */
+    impl = (avisQueueBridge*) calloc (1, sizeof (avisQueueBridge));
+    if (NULL == impl)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to allocate memory for queue.");
         return MAMA_STATUS_NOMEM;
+    }
 
-    avisQueue->mParent         = parent;
-    avisQueue->mEnqueueCb      = NULL;
-    avisQueue->mEnqueueClosure = NULL;
+    /* Initialize the dispatch lock */
+    wthread_mutex_init (&impl->mDispatchLock, NULL);
 
-    wombatQueue_allocate (&avisQueue->mQueue);
-    wombatQueue_create (avisQueue->mQueue, 0, 0, 0);
+    /* Back-reference the parent for future use in the implementation struct */
+    impl->mParent = parent;
 
-    *queue = (queueBridge) avisQueue;
+    /* Allocate and create the wombat queue */
+    underlyingStatus = wombatQueue_allocate (&impl->mQueue);
+    if (WOMBAT_QUEUE_OK != underlyingStatus)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to allocate memory for underlying queue.");
+        free (impl);
+        return MAMA_STATUS_NOMEM;
+    }
+
+    underlyingStatus = wombatQueue_create (impl->mQueue,
+                                           AVIS_QUEUE_MAX_SIZE,
+                                           AVIS_QUEUE_INITIAL_SIZE,
+                                           AVIS_QUEUE_CHUNK_SIZE);
+    if (WOMBAT_QUEUE_OK != underlyingStatus)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to create underlying queue.");
+        wombatQueue_deallocate (impl->mQueue);
+        free (impl);
+        return MAMA_STATUS_PLATFORM;
+    }
+
+    /* Populate the queueBridge pointer with the implementation for return */
+    *queue = (queueBridge) impl;
 
     return MAMA_STATUS_OK;
 }
@@ -80,23 +153,33 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue,
                                         mamaQueue    parent,
                                         void*        nativeQueue)
 {
-    avisQueueBridge* avisQueue = NULL;
-   
-    if (queue == NULL)
+    avisQueueBridge* impl = NULL;
+    if (NULL == queue || NULL == parent || NULL == nativeQueue)
+    {
         return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Null initialize the queueBridge to be returned */
     *queue = NULL;
 
-    avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
-    if (avisQueue == NULL)
+    /* Allocate memory for the avis bridge implementation */
+    impl = (avisQueueBridge*) calloc (1, sizeof (avisQueueBridge));
+    if (NULL == impl)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create_usingNative (): "
+                  "Failed to allocate memory for queue.");
         return MAMA_STATUS_NOMEM;
+    }
+
+    /* Back-reference the parent for future use in the implementation struct */
+    impl->mParent = parent;
 
-    avisQueue->mParent         = parent;
-    avisQueue->mEnqueueCb      = NULL;
-    avisQueue->mEnqueueClosure = NULL;
-    avisQueue->mQueue          = (wombatQueue)nativeQueue;
-    avisQueue->mIsNative       = 1;
+    /* Wombat queue has already been created, so simply reference it here */
+    impl->mQueue = (wombatQueue) nativeQueue;
 
-    *queue = (queueBridge) avisQueue;
+    /* Populate the queueBridge pointer with the implementation for return */
+    *queue = (queueBridge) impl;
 
     return MAMA_STATUS_OK;
 }
@@ -104,35 +187,98 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue,
 mama_status
 avisBridgeMamaQueue_destroy (queueBridge queue)
 {
-    CHECK_QUEUE(queue);
-    if (!avisQueue(queue)->mIsNative)
-        wombatQueue_destroy (avisQueue(queue)->mQueue);
-    free(avisQueue(queue));
+    wombatQueueStatus   status  = WOMBAT_QUEUE_OK;
+    avisQueueBridge*    impl    = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Destroy the underlying wombatQueue - can be called from any thread*/
+    wthread_mutex_lock              (&impl->mDispatchLock);
+    status = wombatQueue_destroy    (impl->mQueue);
+    wthread_mutex_unlock            (&impl->mDispatchLock);
+
+    /* Free the avisQueueImpl container struct */
+    free (impl);
+
+    if (WOMBAT_QUEUE_OK != status)
+    {
+        mama_log (MAMA_LOG_LEVEL_WARN,
+                  "avisBridgeMamaQueue_destroy (): "
+                  "Failed to destroy wombat queue (%d).",
+                  status);
+        return MAMA_STATUS_PLATFORM;
+    }
+
+    return MAMA_STATUS_OK;
+}
+
+mama_status
+avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count)
+{
+    avisQueueBridge* impl       = (avisQueueBridge*) queue;
+    int              countInt   = 0;
+
+    if (NULL == count)
+        return MAMA_STATUS_NULL_ARG;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Initialize count to zero */
+    *count = 0;
+
+    /* Get the wombatQueue size */
+    wombatQueue_getSize (impl->mQueue, &countInt);
+    *count = (size_t)countInt;
+
     return MAMA_STATUS_OK;
 }
 
 mama_status
 avisBridgeMamaQueue_dispatch (queueBridge queue)
 {
-    wombatQueueStatus status;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Lock for dispatching */
+    wthread_mutex_lock (&impl->mDispatchLock);
 
-    CHECK_QUEUE(queue);
+    impl->mIsDispatching = 1;
 
+    /*
+     * Continually dispatch as long as the calling application wants dispatching
+     * to be done and no errors are encountered
+     */
     do
     {
-        /* 500 is .5 seconds */
-        status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL, 500);
+        /* Check the watermarks to see if thresholds have been breached */
+        avisBridgeMamaQueueImpl_checkWatermarks (impl);
+
+        /*
+         * Perform a dispatch with a timeout to allow the dispatching process
+         * to be interrupted by the calling application between iterations
+         */
+        status = wombatQueue_timedDispatch (impl->mQueue,
+                                            NULL,
+                                            NULL,
+                                            AVIS_QUEUE_DISPATCH_TIMEOUT);
     }
-    while ((status == WOMBAT_QUEUE_OK ||
-            status == WOMBAT_QUEUE_TIMEOUT) &&
-            mamaQueueImpl_isDispatching (avisQueue(queue)->mParent));
+    while ( (WOMBAT_QUEUE_OK == status || WOMBAT_QUEUE_TIMEOUT == status)
+            && impl->mIsDispatching);
 
-    if (status != WOMBAT_QUEUE_OK && status != WOMBAT_QUEUE_TIMEOUT)
+    /* Unlock the dispatch lock */
+    wthread_mutex_unlock (&impl->mDispatchLock);
+
+    /* Timeout is encountered after each dispatch and so is expected here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_dispatch (): "
+                  "Failed to dispatch Avis Middleware queue (%d). ",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
@@ -143,41 +289,56 @@ avisBridgeMamaQueue_dispatch (queueBridge queue)
 mama_status
 avisBridgeMamaQueue_timedDispatch (queueBridge queue, uint64_t timeout)
 {
-    wombatQueueStatus status;
-   
-    CHECK_QUEUE(queue);
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl        = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL, timeout);
-    if (status == WOMBAT_QUEUE_TIMEOUT) return MAMA_STATUS_TIMEOUT;
+    /* Check the watermarks to see if thresholds have been breached */
+    avisBridgeMamaQueueImpl_checkWatermarks (impl);
 
-    if (status != WOMBAT_QUEUE_OK)
+    /* Attempt to dispatch the queue with a timeout once */
+    status = wombatQueue_timedDispatch (impl->mQueue,
+                                        NULL,
+                                        NULL,
+                                        timeout);
+
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_timedDispatch (): "
+                  "Failed to dispatch Avis Middleware queue (%d).",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
+
 }
 
 mama_status
 avisBridgeMamaQueue_dispatchEvent (queueBridge queue)
 {
-    wombatQueueStatus status;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
 
-    CHECK_QUEUE(queue);
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_dispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL);
+    /* Check the watermarks to see if thresholds have been breached */
+    avisBridgeMamaQueueImpl_checkWatermarks (impl);
 
-    if (status != WOMBAT_QUEUE_OK)
+    /* Attempt to dispatch the queue with a timeout once */
+    status = wombatQueue_dispatch (impl->mQueue, NULL, NULL);
+
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_dispatchEvent (): "
+                  "Failed to dispatch Avis Middleware queue (%d).",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
@@ -185,42 +346,40 @@ avisBridgeMamaQueue_dispatchEvent (queueBridge queue)
     return MAMA_STATUS_OK;
 }
 
-static void MAMACALLTYPE queueCb (void *ignored, void* closure)
-{
-    avisQueueClosure* cl = (avisQueueClosure*)closure;
-    if (NULL ==cl) return;
-    cl->mCb (cl->mImpl->mParent, cl->mUserClosure);
-    free (cl);
-}
-
 mama_status
 avisBridgeMamaQueue_enqueueEvent (queueBridge        queue,
                                   mamaQueueEventCB   callback,
                                   void*              closure)
 {
-    wombatQueueStatus status;
-    avisQueueClosure* cl = NULL;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
 
-    if (!callback) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-
-    cl = (avisQueueClosure*)calloc(1, sizeof(avisQueueClosure));
-    if (NULL == cl) return MAMA_STATUS_NOMEM;
+    if (NULL == callback)
+        return MAMA_STATUS_NULL_ARG;
 
-    cl->mImpl = avisQueue(queue);
-    cl->mCb    = callback;
-    cl->mUserClosure = closure;
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_enqueue (avisQueue(queue)->mQueue,
-                queueCb, NULL, cl);
+    /* Call the underlying wombatQueue_enqueue method */
+    status = wombatQueue_enqueue (impl->mQueue,
+                                  (wombatQueueCb) callback,
+                                  impl->mParent,
+                                  closure);
 
-    if (status != WOMBAT_QUEUE_OK)
-        return MAMA_STATUS_PLATFORM;
+    /* Call the enqueue callback if provided */
+    if (NULL != impl->mEnqueueCallback)
+    {
+        impl->mEnqueueCallback (impl->mParent, impl->mEnqueueClosure);
+    }
 
-    if (avisQueue(queue)->mEnqueueCb)
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status)
     {
-        avisQueue(queue)->mEnqueueCb (avisQueue(queue)->mParent,
-                                      avisQueue(queue)->mEnqueueClosure);
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_enqueueEvent (): "
+                  "Failed to enqueueEvent (%d). Callback: %p; Closure: %p",
+                  status, callback, closure);
+        return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
@@ -229,20 +388,13 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge        queue,
 mama_status
 avisBridgeMamaQueue_stopDispatch (queueBridge queue)
 {
-    wombatQueueStatus status;
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
 
-    if (queue == NULL)
-        return MAMA_STATUS_NULL_ARG;
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_unblock (avisQueue(queue)->mQueue);
-    if (status != WOMBAT_QUEUE_OK)
-    {
-        mama_log (MAMA_LOG_LEVEL_ERROR,
-                  " Failed to stop dispatching Avis Middleware queue.",
-                  "wmwMamaQueue_stopDispatch ():");
-        return MAMA_STATUS_PLATFORM;
-    }
+    /* Tell this implementation to stop dispatching */
+    impl->mIsDispatching = 0;
 
     return MAMA_STATUS_OK;
 }
@@ -252,11 +404,18 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge        queue,
                                         mamaQueueEnqueueCB callback,
                                         void*              closure)
 {
-    if (!callback) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl   = (avisQueueBridge*) queue;
 
-    avisQueue(queue)->mEnqueueCb      = callback;
-    avisQueue(queue)->mEnqueueClosure = closure;
+    if (NULL == callback)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the enqueue callback and closure */
+    impl->mEnqueueCallback  = callback;
+    impl->mEnqueueClosure   = closure;
 
     return MAMA_STATUS_OK;
 }
@@ -264,21 +423,35 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge        queue,
 mama_status
 avisBridgeMamaQueue_removeEnqueueCallback (queueBridge queue)
 {
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    avisQueue(queue)->mEnqueueCb      = NULL;
-    avisQueue(queue)->mEnqueueClosure = NULL;
+    /* Set the enqueue callback to NULL */
+    impl->mEnqueueCallback  = NULL;
+    impl->mEnqueueClosure   = NULL;
 
     return MAMA_STATUS_OK;
 }
 
 mama_status
 avisBridgeMamaQueue_getNativeHandle (queueBridge queue,
-                                     void**      result)
+                                     void**      nativeHandle)
 {
-    if (!result) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    *result = avisQueue(queue)->mQueue;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (NULL == nativeHandle)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Return the handle to the native queue */
+    *nativeHandle = queue;
+
     return MAMA_STATUS_OK;
 }
 
@@ -286,26 +459,64 @@ mama_status
 avisBridgeMamaQueue_setHighWatermark (queueBridge queue,
                                       size_t      highWatermark)
 {
-    if (!highWatermark) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (0 == highWatermark)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the high water mark */
+    impl->mHighWatermark = highWatermark;
+
+    return MAMA_STATUS_OK;
 }
 
 mama_status
-avisBridgeMamaQueue_setLowWatermark (queueBridge queue,
-                                     size_t lowWatermark)
+avisBridgeMamaQueue_setLowWatermark (queueBridge    queue,
+                                     size_t         lowWatermark)
 {
-    if (!lowWatermark) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (0 == lowWatermark)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the low water mark */
+    impl->mLowWatermark = lowWatermark;
+
+    return MAMA_STATUS_OK;
 }
 
-mama_status
-avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count)
+
+/*=========================================================================
+  =                  Private implementation functions                     =
+  =========================================================================*/
+
+void
+avisBridgeMamaQueueImpl_checkWatermarks (avisQueueBridge* impl)
 {
-    if (!count) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    *count = 0;
-    wombatQueue_getSize (avisQueue(queue)->mQueue, (int*)count);
-    return MAMA_STATUS_OK;
+    size_t              eventCount      =  0;
+
+    /* Get the current size of the wombat impl */
+    avisBridgeMamaQueue_getEventCount      ((queueBridge) impl, &eventCount);
+
+    /* If the high watermark had been fired but the event count is now down */
+    if (0 != impl->mHighWaterFired && eventCount == impl->mLowWatermark)
+    {
+        impl->mHighWaterFired = 0;
+        mamaQueueImpl_lowWatermarkExceeded (impl->mParent, eventCount);
+    }
+    /* If the high watermark is not currently fired and now above threshold */
+    else if (0 == impl->mHighWaterFired && eventCount >= impl->mHighWatermark)
+    {
+        impl->mHighWaterFired = 1;
+        mamaQueueImpl_highWatermarkExceeded (impl->mParent, eventCount);
+    }
 }
+
diff --git a/mama/c_cpp/src/c/bridge/avis/timer.c b/mama/c_cpp/src/c/bridge/avis/timer.c
index 61cb01f..e5ec0e1 100644
--- a/mama/c_cpp/src/c/bridge/avis/timer.c
+++ b/mama/c_cpp/src/c/bridge/avis/timer.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,79 +19,88 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
 #include <mama/timer.h>
 #include <timers.h>
 #include "avisbridgefunctions.h"
 #include <wombat/queue.h>
 
-extern timerHeap gTimerHeap;
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
+
+extern timerHeap gAvisTimerHeap;
 
 typedef struct avisTimerImpl_
 {
-    timerElement  mTimerElement;
-    double        mInterval;
-    mamaTimerCb   mAction;
-    void*         mClosure;
-    mamaTimer     mParent;
-    mamaQueue     mQueue;
-
-    /* This callback will be invoked whenever the timer has been completely destroyed. */
+    timerElement    mTimerElement;
+    double          mInterval;
+    void*           mClosure;
+    mamaTimer       mParent;
+    void*           mQueue;
+    uint8_t         mDestroying;
+    /* This callback will be invoked whenever the timer has been destroyed. */
     mamaTimerCb     mOnTimerDestroyed;
-
+    /* This callback will be invoked on each timer firing */
+    mamaTimerCb     mAction;
 } avisTimerImpl;
 
-static void MAMACALLTYPE
-destroy_callback (mamaQueue queue, void* closure)
-{
-    avisTimerImpl* impl  = (avisTimerImpl*) closure;
-
-    (*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure);
 
-    free (impl);
-}
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
 
+/**
+ * Due to the fact that timed events may still be on the event queue, the
+ * timer's destroy function does not destroy the implementation immediately.
+ * Instead, it sets an implementation specific flag to stop further callbacks
+ * from being enqueued from this timer, and then enqueues this function as a
+ * callback on the queue to perform the actual destruction. This function also
+ * calls the application developer's destroy callback function.
+ *
+ * @param queue   MAMA queue from which this callback was fired.
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
 static void MAMACALLTYPE
-timerQueueCb (mamaQueue queue, void* closure)
-{
-    avisTimerImpl* impl = (avisTimerImpl*)closure;
-
-    if (impl->mAction)
-        impl->mAction (impl->mParent, impl->mClosure);
+avisBridgeMamaTimerImpl_destroyCallback (mamaQueue queue, void* closure);
 
-}
+/**
+ * When a timer fires, it enqueues this callback for execution. This is where
+ * the action callback provided in the timer's create function gets fired.
+ *
+ * @param queue   MAMA queue from which this callback was fired.
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_queueCallback (mamaQueue queue, void* closure);
 
+/**
+ * Every time the timer fires, it calls this timer callback which adds
+ * avisBridgeMamaTimerImpl_queueCallback to the queue as long as the timer's
+ * mDestroying flag is not currently set.
+ *
+ * @param timer   The underlying timer element which has just fired (not used).
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
 static void
-timerCb (timerElement  timer,
-         void*         closure)
-{
-    avisTimerImpl* impl = (avisTimerImpl*)closure;
-    struct timeval timeout;
-
-    if (impl == NULL) return;
-
-    /* Mama timers are repeating */
-    timeout.tv_sec = (time_t)impl->mInterval;
-    timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
+avisBridgeMamaTimerImpl_timerCallback (timerElement timer, void* closure);
 
-    if (0 != createTimer (&impl->mTimerElement,
-                          gTimerHeap,
-                          timerCb,
-                          &timeout,
-                          impl))
-    {
-        mama_log (MAMA_LOG_LEVEL_ERROR,
-              "%s Failed to create Avis timer.",
-              "mamaTimer_create ():");
-    }
 
-    mamaQueue_enqueueEvent (impl->mQueue,
-                            timerQueueCb,
-                            (void*)impl);
-}
+/*=========================================================================
+  =               Public interface implementation functions               =
+  =========================================================================*/
 
 mama_status
-avisBridgeMamaTimer_create (timerBridge* result,
+avisBridgeMamaTimer_create (timerBridge*  result,
                            void*         nativeQueueHandle,
                            mamaTimerCb   action,
                            mamaTimerCb   onTimerDestroyed,
@@ -99,77 +108,92 @@ avisBridgeMamaTimer_create (timerBridge* result,
                            mamaTimer     parent,
                            void*         closure)
 {
-    avisTimerImpl* impl      =   NULL;
-    struct timeval timeout;
 
-    if (result == NULL) return MAMA_STATUS_NULL_ARG;
+    avisTimerImpl*              impl            = NULL;
+    int                         timerResult     = 0;
+    struct timeval              timeout;
 
-    mama_log (MAMA_LOG_LEVEL_FINEST,
-              "%s Entering with interval [%f].",
-              "avisMamaTimer_create ():",
-              interval);
+    if (NULL == result || NULL == nativeQueueHandle
+            || NULL == action
+            || NULL == parent )
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
 
+    /* Null initialize the timer bridge supplied */
     *result = NULL;
 
-    impl = (avisTimerImpl*)calloc (1, sizeof (avisTimerImpl));
-    if (impl == NULL) return MAMA_STATUS_NOMEM;
-
-    impl->mQueue    = NULL;
-    impl->mParent   = parent;
-    impl->mAction   = action;
-    impl->mClosure  = closure;
-    impl->mInterval = interval;
-
-    mamaTimer_getQueue (parent, &impl->mQueue);
-
-    impl->mOnTimerDestroyed = onTimerDestroyed;
-
-    *result = (timerBridge)impl;
+    /* Allocate the timer implementation and set up */
+    impl = (avisTimerImpl*) calloc (1, sizeof (avisTimerImpl));
+    if (NULL == impl)
+    {
+        return MAMA_STATUS_NOMEM;
+    }
 
-    timeout.tv_sec = (time_t)interval;
+    *result                     = (timerBridge) impl;
+    impl->mQueue                = nativeQueueHandle;
+    impl->mParent               = parent;
+    impl->mAction               = action;
+    impl->mClosure              = closure;
+    impl->mInterval             = interval;
+    impl->mOnTimerDestroyed     = onTimerDestroyed;
+    impl->mDestroying           = 0;
+
+    /* Determine when the next timer should fire */
+    timeout.tv_sec  = (time_t) interval;
     timeout.tv_usec = ((interval-timeout.tv_sec) * 1000000.0);
 
-    if (0 != createTimer (&impl->mTimerElement,
-                          gTimerHeap,
-                          timerCb,
-                          &timeout,
-                          impl))
+    /* Create the first single fire timer */
+    timerResult = createTimer (&impl->mTimerElement,
+                               gAvisTimerHeap,
+                               avisBridgeMamaTimerImpl_timerCallback,
+                               &timeout,
+                               impl);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to create Avis timer.",
-                  "mamaTimer_create ():");
-        return MAMA_STATUS_TIMER_FAILURE;
+                  "Failed to create Avis underlying timer [%d].", timerResult);
+        return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
 }
 
+/* This call should always come from MAMA queue thread */
 mama_status
 avisBridgeMamaTimer_destroy (timerBridge timer)
 {
-    mama_status    returnStatus = MAMA_STATUS_OK;
-    avisTimerImpl* impl = NULL;
+    avisTimerImpl*  impl            = NULL;
+    mama_status     returnStatus    = MAMA_STATUS_OK;
+    int             timerResult     = 0;
 
-    if (timer == NULL)
+    if (NULL == timer)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
-    impl->mAction = NULL;
-    mama_log (MAMA_LOG_LEVEL_FINEST,
-              "%s Entering for 0x%x",
-              "avisMamaTimer_destroy ():", impl);
+    /* Nullify the callback and set destroy flag */
+    impl                            = (avisTimerImpl*) timer;
+    impl->mDestroying               = 1;
+    impl->mAction                   = NULL;
 
-    if (0 != destroyTimer (gTimerHeap, impl->mTimerElement))
+    /* Destroy the timer element */
+    timerResult = destroyTimer (gAvisTimerHeap, impl->mTimerElement);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to destroy Avis timer.",
-                  "avisMamaTimer_destroy ():");
+                  "Failed to destroy Avis underlying timer [%d].",
+                  timerResult);
         returnStatus = MAMA_STATUS_PLATFORM;
     }
 
-    mamaQueue_enqueueEvent (impl->mQueue,
-                            destroy_callback,
-                            (void*)impl);
+    /*
+     * Put the impl free at the back of the queue to be executed when all
+     * pending timer events have been completed
+     */
+    avisBridgeMamaQueue_enqueueEvent ((queueBridge) impl->mQueue,
+                                      avisBridgeMamaTimerImpl_destroyCallback,
+                                      (void*) impl);
 
     return returnStatus;
 }
@@ -177,60 +201,52 @@ avisBridgeMamaTimer_destroy (timerBridge timer)
 mama_status
 avisBridgeMamaTimer_reset (timerBridge timer)
 {
-    mama_status status      = MAMA_STATUS_OK;
-    avisTimerImpl* impl  = NULL;
-    struct timeval timeout;
+    avisTimerImpl*      impl            = (avisTimerImpl*) timer;
+    int                 timerResult     = 0;
+    struct timeval      timeout;
 
-    if (timer == NULL)
+    if (NULL == impl)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
-    timeout.tv_sec = (time_t)impl->mInterval;
-    timeout.tv_usec = ((impl->mInterval-timeout.tv_sec) * 1000000.0);
+    /* Destroy the existing timer element */
+    destroyTimer (gAvisTimerHeap, impl->mTimerElement);
 
-    if (timer == NULL)
-        return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    /* Calculate next time interval */
+    timeout.tv_sec  = (time_t) impl->mInterval;
+    timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
 
-    if (0 != destroyTimer (gTimerHeap, impl->mTimerElement))
+    /* Create the timer for the next firing */
+    timerResult = createTimer (&impl->mTimerElement,
+                               gAvisTimerHeap,
+                               avisBridgeMamaTimerImpl_timerCallback,
+                               &timeout,
+                               impl);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to destroy Avis timer.",
-                  "avisMamaTimer_destroy ():");
-        avisBridgeMamaTimer_destroy (timer);
-        status = MAMA_STATUS_PLATFORM;
-    }
-    else
-    {
-        if (0 != createTimer (&impl->mTimerElement,
-                              gTimerHeap,
-                              timerCb,
-                              &timeout,
-                              impl))
-        {
-            mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to create Avis timer.",
-                  "mamaTimer_create ():");
-            status = MAMA_STATUS_PLATFORM;
-        }
+                  "Failed to reset Avis underlying timer [%d].", timerResult);
+        return MAMA_STATUS_PLATFORM;
     }
 
-    return status;
+    return MAMA_STATUS_OK;
+
 }
 
 mama_status
 avisBridgeMamaTimer_setInterval (timerBridge  timer,
-                                mama_f64_t   interval)
+                                 mama_f64_t   interval)
 {
-    avisTimerImpl* impl  = NULL;
-
-    if (timer == NULL)
+    avisTimerImpl* impl  = (avisTimerImpl*) timer;
+    if (NULL == timer)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
     impl->mInterval = interval;
 
-   return  avisBridgeMamaTimer_reset (timer);
+    return  avisBridgeMamaTimer_reset (timer);
 }
 
 mama_status
@@ -238,11 +254,71 @@ avisBridgeMamaTimer_getInterval (timerBridge    timer,
                                 mama_f64_t*    interval)
 {
     avisTimerImpl* impl  = NULL;
-
-    if (timer == NULL)
+    if (NULL == timer || NULL == interval)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
+    impl = (avisTimerImpl*) timer;
     *interval = impl->mInterval;
+
     return MAMA_STATUS_OK;
 }
+
+
+/*=========================================================================
+  =                  Private implementation functions                     =
+  =========================================================================*/
+
+/* This callback is invoked by the avis bridge's destroy event */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_destroyCallback (mamaQueue queue, void* closure)
+{
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+    (*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure);
+
+    /* Free the implementation memory here */
+    free (impl);
+}
+
+/* This callback is invoked by the avis bridge's timer event */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_queueCallback (mamaQueue queue, void* closure)
+{
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+    if (impl->mAction)
+    {
+        impl->mAction (impl->mParent, impl->mClosure);
+    }
+}
+
+/* This callback is invoked by the common timer's dispatch thread */
+static void
+avisBridgeMamaTimerImpl_timerCallback (timerElement  timer,
+                                       void*         closure)
+{
+
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+
+    if (NULL == impl)
+    {
+        return;
+    }
+
+    /*
+     * Only enqueue further timer callbacks the timer is not currently getting
+     * destroyed
+     */
+    if (0 == impl->mDestroying)
+    {
+        /* Set the timer for the next firing */
+        avisBridgeMamaTimer_reset ((timerBridge) closure);
+
+        /* Enqueue the callback for handling */
+        avisBridgeMamaQueue_enqueueEvent ((queueBridge) impl->mQueue,
+                                          avisBridgeMamaTimerImpl_queueCallback,
+                                          closure);
+    }
+}
+
+
--
2.4.3


[PATCH 02/14] AVIS: Pulled IO implementation from qpid over to avis

Frank Quinn <fquinn.ni@...>
 

In future, we may want to make this code central. In the meantime,
at least making the code identical with qpid should make it easier
to maintain.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/SConscript |   4 +-
 mama/c_cpp/src/c/bridge/avis/bridge.c   |  16 +-
 mama/c_cpp/src/c/bridge/avis/io.c       | 274 +++++++++++++++++++++++++++++---
 mama/c_cpp/src/c/bridge/avis/io.h       |  51 ++++++
 4 files changed, 320 insertions(+), 25 deletions(-)
 create mode 100644 mama/c_cpp/src/c/bridge/avis/io.h

diff --git a/mama/c_cpp/src/c/bridge/avis/SConscript b/mama/c_cpp/src/c/bridge/avis/SConscript
index 526612f..475c924 100644
--- a/mama/c_cpp/src/c/bridge/avis/SConscript
+++ b/mama/c_cpp/src/c/bridge/avis/SConscript
@@ -17,10 +17,10 @@ incPath.append('#mama/c_cpp/src/c')
 env['CCFLAGS'] = [x for x in env['CCFLAGS'] if x != '-pedantic-errors']
 
 if env['host']['os'] == 'Darwin':
-    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon'],
+    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'event', 'wombatcommon'],
                LIBPATH=libPath, CPPPATH=incPath)
 else:
-    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon', 'uuid'],
+    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'event', 'wombatcommon', 'uuid'],
                LIBPATH=libPath, CPPPATH=incPath)
 
 conf = Configure(env, config_h='./config.h', log_file='./config.log')
diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 33dfc17..e975474 100644
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -124,6 +124,9 @@ avisBridge_open (mamaBridge bridgeImpl)
         return MAMA_STATUS_PLATFORM;
     }
 
+    /* Start the io thread */
+    avisBridgeMamaIoImpl_start ();
+
     return MAMA_STATUS_OK;
 }
 
@@ -150,16 +153,19 @@ avisBridge_close (mamaBridge bridgeImpl)
 
     mamaBridgeImpl_getClosure(impl, &avisBridge);
 
-    if (avisBridge)
-    {
-        free (avisBridge);
-    }
-
     mamaQueue_destroyWait(impl->mDefaultEventQueue);
 
     free (impl);
    
     wsocketcleanup();
+
+    /* Stop and destroy the io thread */
+    avisBridgeMamaIoImpl_stop ();
+
+    if (avisBridge)
+    {
+        free (avisBridge);
+    }
     return status;
 }
 
diff --git a/mama/c_cpp/src/c/bridge/avis/io.c b/mama/c_cpp/src/c/bridge/avis/io.c
index 91e7d22..8630c98 100644
--- a/mama/c_cpp/src/c/bridge/avis/io.c
+++ b/mama/c_cpp/src/c/bridge/avis/io.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,36 +19,274 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
 #include <mama/io.h>
-#include <bridge.h>
+#include <wombat/port.h>
 #include "avisbridgefunctions.h"
+#include "io.h"
+#include <event.h>
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
+
+typedef struct avisIoImpl
+{
+    struct event_base*  mEventBase;
+    wthread_t           mDispatchThread;
+    uint8_t             mActive;
+    uint8_t             mEventsRegistered;
+    wsem_t              mResumeDispatching;
+} avisIoImpl;
+
+typedef struct avisIoEventImpl
+{
+    uint32_t            mDescriptor;
+    mamaIoCb            mAction;
+    mamaIoType          mIoType;
+    mamaIo              mParent;
+    void*               mClosure;
+    struct event        mEvent;
+} avisIoEventImpl;
+
+/*
+ * Global static container to hold instance-wide information not otherwise
+ * available in this interface.
+ */
+static avisIoImpl       gAvisIoContainer;
+
+
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+void*
+avisBridgeMamaIoImpl_dispatchThread (void* closure);
 
+void
+avisBridgeMamaIoImpl_libeventIoCallback (int fd, short type, void* closure);
+
+
+/*=========================================================================
+  =                   Public implementation functions                     =
+  =========================================================================*/
+
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_create(ioBridge*  result,
-                         void*      nativeQueueHandle,
-                         uint32_t   descriptor,
-                         mamaIoCb   action,
-                         mamaIoType ioType,
-                         mamaIo     parent,
-                         void*      closure)
-{
-    if (!result) return MAMA_STATUS_NULL_ARG;
+avisBridgeMamaIo_create         (ioBridge*   result,
+                                 void*       nativeQueueHandle,
+                                 uint32_t    descriptor,
+                                 mamaIoCb    action,
+                                 mamaIoType  ioType,
+                                 mamaIo      parent,
+                                 void*       closure)
+{
+    avisIoEventImpl*    impl    = NULL;
+    short               evtType = 0;
+
+    if (NULL == result)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
     *result = 0;
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+
+    /* Check for supported types so we don't prematurely allocate */
+    switch (ioType)
+    {
+    case MAMA_IO_READ:
+        evtType = EV_READ;
+        break;
+    case MAMA_IO_WRITE:
+        evtType = EV_WRITE;
+        break;
+    case MAMA_IO_ERROR:
+        evtType = EV_READ | EV_WRITE;
+        break;
+    case MAMA_IO_CONNECT:
+    case MAMA_IO_ACCEPT:
+    case MAMA_IO_CLOSE:
+    case MAMA_IO_EXCEPT:
+    default:
+        return MAMA_STATUS_UNSUPPORTED_IO_TYPE;
+        break;
+    }
+
+    impl = (avisIoEventImpl*) calloc (1, sizeof (avisIoEventImpl));
+    if (NULL == impl)
+    {
+        return MAMA_STATUS_NOMEM;
+    }
+
+    impl->mDescriptor           = descriptor;
+    impl->mAction               = action;
+    impl->mIoType               = ioType;
+    impl->mParent               = parent;
+    impl->mClosure              = closure;
+
+    event_set (&impl->mEvent,
+               impl->mDescriptor,
+               evtType,
+               avisBridgeMamaIoImpl_libeventIoCallback,
+               impl);
+
+    event_add (&impl->mEvent, NULL);
+
+    event_base_set (gAvisIoContainer.mEventBase, &impl->mEvent);
+
+    /* If this is the first event since base was emptied or created */
+    if (0 == gAvisIoContainer.mEventsRegistered)
+    {
+        wsem_post (&gAvisIoContainer.mResumeDispatching);
+    }
+    gAvisIoContainer.mEventsRegistered++;
+
+    *result = (ioBridge)impl;
+
+    return MAMA_STATUS_OK;
 }
 
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_destroy (ioBridge io)
+avisBridgeMamaIo_destroy        (ioBridge io)
 {
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisIoEventImpl* impl = (avisIoEventImpl*) io;
+    if (NULL == io)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    event_del (&impl->mEvent);
+
+    free (impl);
+    gAvisIoContainer.mEventsRegistered--;
+
+    return MAMA_STATUS_OK;
 }
 
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_getDescriptor (ioBridge io, uint32_t *result)
+avisBridgeMamaIo_getDescriptor  (ioBridge    io,
+                                 uint32_t*   result)
 {
-    if (!result) return MAMA_STATUS_NULL_ARG;
-    *result = 0;
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisIoEventImpl* impl = (avisIoEventImpl*) io;
+    if (NULL == io || NULL == result)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
+    *result = impl->mDescriptor;
+
+    return MAMA_STATUS_OK;
+}
+
+/*=========================================================================
+  =                  Public implementation prototypes                     =
+  =========================================================================*/
+
+mama_status
+avisBridgeMamaIoImpl_start ()
+{
+    int threadResult                        = 0;
+    gAvisIoContainer.mEventsRegistered      = 0;
+    gAvisIoContainer.mActive                = 1;
+    gAvisIoContainer.mEventBase             = event_init ();
+
+    wsem_init (&gAvisIoContainer.mResumeDispatching, 0, 0);
+    threadResult = wthread_create (&gAvisIoContainer.mDispatchThread,
+                                   NULL,
+                                   avisBridgeMamaIoImpl_dispatchThread,
+                                   gAvisIoContainer.mEventBase);
+    if (0 != threadResult)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaIoImpl_initialize(): "
+                  "wthread_create returned %d", threadResult);
+        return MAMA_STATUS_PLATFORM;
+    }
+    return MAMA_STATUS_OK;
+}
+
+mama_status
+avisBridgeMamaIoImpl_stop ()
+{
+    gAvisIoContainer.mActive = 0;
+
+    /* Alert the semaphore so the dispatch loop can exit */
+    wsem_post (&gAvisIoContainer.mResumeDispatching);
+
+    /* Tell the event loop to exit */
+    event_base_loopexit (gAvisIoContainer.mEventBase, NULL);
+
+    /* Join with the dispatch thread - it should exit shortly */
+    wthread_join (gAvisIoContainer.mDispatchThread, NULL);
+    wsem_destroy (&gAvisIoContainer.mResumeDispatching);
+
+    /* Free the main event base */
+    event_base_free (gAvisIoContainer.mEventBase);
+
+    return MAMA_STATUS_OK;
 }
 
+
+
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+void*
+avisBridgeMamaIoImpl_dispatchThread (void* closure)
+{
+    int             dispatchResult = 0;
+
+    /* Wait on the first event to register before starting dispatching */
+    wsem_wait (&gAvisIoContainer.mResumeDispatching);
+
+    while (0 != gAvisIoContainer.mActive)
+    {
+        dispatchResult = event_base_loop (gAvisIoContainer.mEventBase,
+                                          EVLOOP_NONBLOCK | EVLOOP_ONCE);
+
+        /* If no events are currently registered */
+        if (1 == dispatchResult)
+        {
+            /* Wait until they are */
+            gAvisIoContainer.mEventsRegistered = 0;
+            wsem_wait (&gAvisIoContainer.mResumeDispatching);
+        }
+    }
+    return NULL;
+}
+
+void
+avisBridgeMamaIoImpl_libeventIoCallback (int fd, short type, void* closure)
+{
+    avisIoEventImpl* impl = (avisIoEventImpl*) closure;
+
+    /* Timeout is the only error detectable with libevent */
+    if (EV_TIMEOUT == type)
+    {
+        /* If this is an error IO type, fire the callback */
+        if (impl->mIoType == MAMA_IO_ERROR && NULL != impl->mAction)
+        {
+            (impl->mAction)(impl->mParent, impl->mIoType, impl->mClosure);
+        }
+        /* If this is not an error IO type, do nothing */
+        else
+        {
+            return;
+        }
+    }
+
+    /* Call the action callback if defined */
+    if (NULL != impl->mAction)
+    {
+        (impl->mAction)(impl->mParent, impl->mIoType, impl->mClosure);
+    }
+
+    /* Enqueue for the next time */
+    event_add (&impl->mEvent, NULL);
+}
diff --git a/mama/c_cpp/src/c/bridge/avis/io.h b/mama/c_cpp/src/c/bridge/avis/io.h
new file mode 100644
index 0000000..a14e794
--- /dev/null
+++ b/mama/c_cpp/src/c/bridge/avis/io.h
@@ -0,0 +1,51 @@
+/* $Id$
+ *
+ * OpenMAMA: The open middleware agnostic messaging API
+ * Copyright (C) 2011 NYSE Technologies, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#ifndef MAMA_BRIDGE_AVIS_IO_H__
+#define MAMA_BRIDGE_AVIS_IO_H__
+
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+#include <mama/mama.h>
+
+/*=========================================================================
+  =                  Public implementation functions                      =
+  =========================================================================*/
+
+mama_status
+avisBridgeMamaIoImpl_start (void);
+
+mama_status
+avisBridgeMamaIoImpl_stop  (void);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* MAMA_BRIDGE_AVIS_IO_H__ */
--
2.4.3


[PATCH 01/14] AVIS: Added NULL checks for all tests which caused crashing

Frank Quinn <fquinn.ni@...>
 

This is with the exception of the timer implementation which
has several bugs which have already been fixed in the qpid
queue implementation so the qpid timer should instead simply
be converted into common code to share between avis and qpid
rather than fix the Timer crash we see in MamaMiddlewareC.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/bridge.c          |  6 ++++--
 mama/c_cpp/src/c/bridge/avis/io.c              |  2 ++
 mama/c_cpp/src/c/bridge/avis/msg.c             |  4 ++++
 mama/c_cpp/src/c/bridge/avis/publisher.c       | 12 ++++++++++--
 mama/c_cpp/src/c/bridge/avis/queue.c           |  8 +++++++-
 mama/c_cpp/src/c/bridge/avis/transportbridge.c |  9 +++++++++
 mama/c_cpp/src/c/payload/avismsg/avispayload.c |  1 +
 7 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 97cbb08..33dfc17 100644
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -77,8 +77,10 @@ static const char PAYLOAD_IDS[] = {MAMA_PAYLOAD_AVIS,NULL};
 mama_status
 avisBridge_getDefaultPayloadId (char***name, char** id)
 {
-    *name = PAYLOAD_NAMES;
-    *id = PAYLOAD_IDS;
+    if (!name) return MAMA_STATUS_NULL_ARG;
+    if (!id) return MAMA_STATUS_NULL_ARG;
+    *name = PAYLOAD_NAMES;
+    *id = PAYLOAD_IDS;
 
     return MAMA_STATUS_OK;
 }
diff --git a/mama/c_cpp/src/c/bridge/avis/io.c b/mama/c_cpp/src/c/bridge/avis/io.c
index f5d2544..91e7d22 100644
--- a/mama/c_cpp/src/c/bridge/avis/io.c
+++ b/mama/c_cpp/src/c/bridge/avis/io.c
@@ -33,6 +33,7 @@ avisBridgeMamaIo_create(ioBridge*  result,
                          mamaIo     parent,
                          void*      closure)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     *result = 0;
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
@@ -46,6 +47,7 @@ avisBridgeMamaIo_destroy (ioBridge io)
 mama_status
 avisBridgeMamaIo_getDescriptor (ioBridge io, uint32_t *result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     *result = 0;
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
diff --git a/mama/c_cpp/src/c/bridge/avis/msg.c b/mama/c_cpp/src/c/bridge/avis/msg.c
index 675ef6d..5e02673 100644
--- a/mama/c_cpp/src/c/bridge/avis/msg.c
+++ b/mama/c_cpp/src/c/bridge/avis/msg.c
@@ -119,6 +119,7 @@ mama_status
 avisBridgeMamaMsgImpl_setReplyHandle (msgBridge msg, void* result)
 {
     mama_status status = MAMA_STATUS_OK;
+    if (!result) return MAMA_STATUS_NULL_ARG;
 
     CHECK_MSG(msg);
 
@@ -155,6 +156,7 @@ avisBridgeMamaMsg_setSendSubject (msgBridge   msg,
                                   const char* subject)
 {
     mama_status status = MAMA_STATUS_OK;
+    if (!symbol) return MAMA_STATUS_NULL_ARG;
 
     CHECK_MSG(msg);
 
@@ -173,6 +175,7 @@ avisBridgeMamaMsg_setSendSubject (msgBridge   msg,
 mama_status
 avisBridgeMamaMsg_getNativeHandle (msgBridge msg, void** result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     CHECK_MSG(msg);
     *result = avisMsg(msg)->mAvisMsg;
     return MAMA_STATUS_OK;
@@ -201,6 +204,7 @@ mama_status
 avisBridgeMamaMsg_copyReplyHandle (void* src, void** dest)
 {
     const char* replyAddr = (const char*) src;
+    if (!src || !dest) return MAMA_STATUS_NULL_ARG;
     *dest = (void*) strdup(replyAddr);
     return MAMA_STATUS_OK;
 }
diff --git a/mama/c_cpp/src/c/bridge/avis/publisher.c b/mama/c_cpp/src/c/bridge/avis/publisher.c
index 2e25524..364aa1d 100644
--- a/mama/c_cpp/src/c/bridge/avis/publisher.c
+++ b/mama/c_cpp/src/c/bridge/avis/publisher.c
@@ -233,11 +233,10 @@ avisBridgeMamaPublisher_createByIndex (publisherBridge* result,
                                         void*            nativeQueueHandle,
                                         mamaPublisher    parent)
 {
+    if (!result || !tport || !parent) return MAMA_STATUS_NULL_ARG;
     Elvin* avis = getAvis(tport);
     avisPublisherBridge* publisher = NULL;
 
-    if (!result || !tport) return MAMA_STATUS_NULL_ARG;
-
     CHECK_AVIS(avis);
 
     *result = NULL;
@@ -314,6 +313,8 @@ avisBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg)
 {
     mama_status ret = MAMA_STATUS_OK;
 
+    if (!msg) return MAMA_STATUS_NULL_ARG;
+
     CHECK_PUBLISHER(publisher);
 
     ret = avisBridgeMamaPublisherImpl_prepareMessage (&msg);
@@ -348,6 +349,8 @@ avisBridgeMamaPublisher_sendReplyToInbox (publisherBridge  publisher,
     const char*  replyAddr   = NULL;
     mama_status  status;
    
+    if (!request || !reply) return MAMA_STATUS_NULL_ARG;
+
     CHECK_PUBLISHER(publisher);
 
     mamaMsg_getNativeHandle(request, (void**) &requestMsg);
@@ -402,6 +405,7 @@ avisBridgeMamaPublisher_sendReplyToInbox (publisherBridge  publisher,
 mama_status
 avisBridgeMamaPublisher_destroy (publisherBridge publisher)
 {
+    if (!publisher) return MAMA_STATUS_NULL_ARG;
     free ((char*)avisPublisher(publisher)->mSource);
     free ((char*)avisPublisher(publisher)->mTopic);
     free ((char*)avisPublisher(publisher)->mRoot);
@@ -420,6 +424,8 @@ avisBridgeMamaPublisher_sendFromInboxByIndex (publisherBridge publisher,
     const char* replyAddr = NULL;
     mama_status status;
 
+    if (!inbox || !msg) return MAMA_STATUS_NULL_ARG;
+
     CHECK_PUBLISHER(publisher);
 
     status = avisBridgeMamaPublisherImpl_prepareMessage (&msg);
@@ -475,6 +481,8 @@ avisBridgeMamaPublisher_sendReplyToInboxHandle (publisherBridge publisher,
 {
     mama_status status;
 
+    if (!inbox | !reply) return MAMA_STATUS_NULL_ARG;
+
     CHECK_PUBLISHER(publisher);
 
     status = avisBridgeMamaPublisherImpl_prepareMessage (&reply);
diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c
index 6416751..e32845f 100644
--- a/mama/c_cpp/src/c/bridge/avis/queue.c
+++ b/mama/c_cpp/src/c/bridge/avis/queue.c
@@ -200,7 +200,8 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge        queue,
 {
     wombatQueueStatus status;
     avisQueueClosure* cl = NULL;
-   
+
+    if (!callback) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
 
     cl = (avisQueueClosure*)calloc(1, sizeof(avisQueueClosure));
@@ -251,6 +252,7 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge        queue,
                                         mamaQueueEnqueueCB callback,
                                         void*              closure)
 {
+    if (!callback) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
 
     avisQueue(queue)->mEnqueueCb      = callback;
@@ -274,6 +276,7 @@ mama_status
 avisBridgeMamaQueue_getNativeHandle (queueBridge queue,
                                      void**      result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
     *result = avisQueue(queue)->mQueue;
     return MAMA_STATUS_OK;
@@ -283,6 +286,7 @@ mama_status
 avisBridgeMamaQueue_setHighWatermark (queueBridge queue,
                                       size_t      highWatermark)
 {
+    if (!highWatermark) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
@@ -291,6 +295,7 @@ mama_status
 avisBridgeMamaQueue_setLowWatermark (queueBridge queue,
                                      size_t lowWatermark)
 {
+    if (!lowWatermark) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
@@ -298,6 +303,7 @@ avisBridgeMamaQueue_setLowWatermark (queueBridge queue,
 mama_status
 avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count)
 {
+    if (!count) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
     *count = 0;
     wombatQueue_getSize (avisQueue(queue)->mQueue, (int*)count);
diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.c b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
index 9b89d65..f456f7e 100644
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
@@ -251,6 +251,7 @@ avisBridgeMamaTransport_getNumLoadBalanceAttributes (
                                           const char* name,
                                           int*        numLoadBalanceAttributes)
 {
+    if (!numLoadBalanceAttributes) return MAMA_STATUS_NULL_ARG;
     *numLoadBalanceAttributes = 0;
     return MAMA_STATUS_OK;
 }
@@ -260,6 +261,7 @@ avisBridgeMamaTransport_getLoadBalanceSharedObjectName (
                                       const char*  name,
                                       const char** loadBalanceSharedObjectName)
 {
+    if (!loadBalanceSharedObjectName) return MAMA_STATUS_NULL_ARG;
     *loadBalanceSharedObjectName = NULL;
     return MAMA_STATUS_OK;
 }
@@ -269,6 +271,7 @@ avisBridgeMamaTransport_getLoadBalanceScheme (
                                     const char*    name,
                                     tportLbScheme* scheme)
 {
+    if (!scheme) return MAMA_STATUS_NULL_ARG;
     *scheme = TPORT_LB_SCHEME_STATIC;
     return MAMA_STATUS_OK;
 }
@@ -283,6 +286,8 @@ avisBridgeMamaTransport_create (transportBridge* result,
     avisTransportBridge* transport  = NULL;
     mamaBridgeImpl*      bridgeImpl = NULL;
     const char*          url        = NULL;
+
+    if (!result || !name) return MAMA_STATUS_NULL_ARG;
    
     transport = (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
     if (transport == NULL)
@@ -353,6 +358,8 @@ avisBridgeMamaTransport_destroy (transportBridge transport)
     avisTransportBridge* transportBridge = (avisTransportBridge*) transport;
     avisBridgeImpl*      avisBridge      = NULL;
     mamaBridgeImpl*      bridgeImpl      = NULL;
+
+    if (!transport) return MAMA_STATUS_NULL_ARG;
    
     bridgeImpl = mamaTransportImpl_getBridgeImpl(
         avisTransport(transport)->mTransport);
@@ -506,6 +513,7 @@ mama_status
 avisBridgeMamaTransport_isConnectionIntercepted (mamaConnection connection,
                                                   uint8_t* result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     *result = 0;
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
@@ -523,6 +531,7 @@ mama_status
 avisBridgeMamaTransport_getNativeTransport (transportBridge transport,
                                              void**          result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     CHECK_TRANSPORT(transport);
     *result = avisTransport(transport);
     return MAMA_STATUS_OK;
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index 6da49f5..f02920e 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -2324,6 +2324,7 @@ avismsgFieldPayload_updateDateTime
 {
     CHECK_FIELD(field);
     CHECK_PAYLOAD(msg);
+    CHECK_NULL(value);
     return avisMsg_setDateTime(avisPayload(msg), avisField(field)->mName, 0, value);
 }
 
--
2.4.3

741 - 760 of 2306