Date   

[PATCH 08/14] AVIS: Added opaque data type serialization and deserialization

Frank Quinn <fquinn.ni@...>
 

There was already support for opaque data types but they could
not be serialized or deserialized. This patch should remedy
this.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/payload/avismsg/avispayload.c | 53 ++++++++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index 0089fdc..e935e15 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -306,7 +306,27 @@ avismsgPayload_unSerialize (const msgPayload    msg,
                 avisMsg_setString(impl->mAvisMsg, tempName, 0, impl->mStringBuffer);
                 break;
             case TYPE_OPAQUE:
+            {
+                uint64_t dataSize = 0;
+                /* Pull out the field id length */
+                len=*(int16_t *)(buffPos);
+                buffPos+=sizeof(int16_t);
+                currLen+=sizeof(int16_t);
+                /* Pull out the field id and NULL terminate */
+                memcpy (tempName, buffPos, len);
+                buffPos+=len;
+                currLen+=len;
+                tempName[len]='\0';
+                /* Pull out number of bytes in opaque data */
+                dataSize=*(uint64_t*)(buffPos);
+                buffPos+=sizeof(uint64_t);
+                currLen+=sizeof(uint64_t);
+                /* Use raw data directly */
+                avisMsg_setOpaque(impl->mAvisMsg, tempName, 0, buffPos, dataSize);
+                buffPos+=dataSize;
+                currLen+=dataSize;
                 break;
+            }
         }
     }
 
@@ -428,6 +448,39 @@ avismsgPayload_serialize     (const msgPayload    msg,
                 buffPos+=strlen(currField->mValue->value.str); currLen+=strlen(currField->mValue->value.str);
                 break;
             case TYPE_OPAQUE:
+                len=+ strlen(currField->mName);
+                /* Current length + 1 for type, 2 for name size + length of name
+                 * + 8 bytes for data size + data size */
+                if (impl->mBufferLen < currLen+3+len+8+currField->mValue->value.bytes.item_count)
+                {
+                    void*vp=realloc (impl->mBuffer, impl->mBufferLen+200);
+                    impl->mBuffer = vp;
+                    buffPos=impl->mBuffer;
+                    buffPos+=currLen;
+                    impl->mBufferLen+=200;
+                }
+                /* Copy data type */
+                *(int8_t *)(buffPos) = 5;
+                buffPos+=1;
+                currLen+=1;
+                /* Copy field id length */
+                *(int16_t *)(buffPos) = len;
+                buffPos+=2;
+                currLen+=2;
+                /* Copy field id itself */
+                memcpy (buffPos, currField->mName, len);
+                buffPos+=len;
+                currLen+=len;
+                /* Copy number of bytes in data */
+                *(uint64_t *)(buffPos) = (uint64_t)currField->mValue->value.bytes.item_count;
+                buffPos+=sizeof(uint64_t);
+                currLen+=sizeof(uint64_t);
+                /* Copy the opaque data itself */
+                memcpy (buffPos,
+                        currField->mValue->value.bytes.items,
+                        currField->mValue->value.bytes.item_count);
+                buffPos+=currField->mValue->value.bytes.item_count;
+                currLen+=currField->mValue->value.bytes.item_count;
                 break;
 
         }
--
2.4.3


[PATCH 07/14] UNITTEST: Added some working getSendSubject unit tests

Frank Quinn <fquinn.ni@...>
 

The previous tests pretty much assumed it wasn't implemented in the
bridge. This fix adds some basic testing around this functionality
for self-describing messages.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 .../src/gunittest/c/mamamsg/msggeneraltests.cpp    | 10 ++++++---
 .../gunittest/c/payload/payloadgeneraltests.cpp    | 25 ++++++++++++++++++----
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp b/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
index a7bc7a9..408dc02 100644
--- a/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
+++ b/mama/c_cpp/src/gunittest/c/mamamsg/msggeneraltests.cpp
@@ -531,18 +531,22 @@ TEST_F (MsgGeneralTestsC, DISABLED_msgImplSetDqStrategyContextInValidDq)
 */
 TEST_F (MsgGeneralTestsC, msgGetSendSubjectValid)
 {
-    const char*       subject    = "";
+    const char*       subjectIn  = "subject";
+    const char*       subjectOut = NULL;
     mama_status       status;
   
     //add fields to msg
     mamaMsg_addString (mMsg, "name0", 101, "test0");
     mamaMsg_addString (mMsg, "name1", 102, "test1");
-   
-    status = mamaMsg_getSendSubject(mMsg, &subject);
+
+    mamaMsgImpl_setBridgeImpl (mMsg, mMiddlewareBridge);
+    mamaMsgImpl_setSubscInfo (mMsg, NULL, NULL, subjectIn, 1);
+    status = mamaMsg_getSendSubject(mMsg, &subjectOut);
 
     CHECK_NON_IMPLEMENTED_OPTIONAL(status);
 
     ASSERT_EQ (status, MAMA_STATUS_OK);
+    EXPECT_STREQ(subjectIn, subjectOut);
 }
 
 TEST_F (MsgGeneralTestsC, msgGetSendSubjectInValidMsg)
diff --git a/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp b/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
index bf67cb0..199e2ad 100644
--- a/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
+++ b/mama/c_cpp/src/gunittest/c/payload/payloadgeneraltests.cpp
@@ -40,11 +40,13 @@ protected:
     virtual void TearDown(void);
 
     mamaPayloadBridge   aBridge;
+    mamaBridge          mMiddlewareBridge;
     mama_status         result;
 };
 
 PayloadGeneralTests::PayloadGeneralTests(void)
     : aBridge (NULL)
+    , mMiddlewareBridge (NULL)
     , result (MAMA_STATUS_OK)
 {
 }
@@ -56,6 +58,7 @@ PayloadGeneralTests::~PayloadGeneralTests(void)
 void PayloadGeneralTests::SetUp(void)
 {
     mama_loadPayloadBridge (&aBridge,getPayload());
+    mama_loadBridge (&mMiddlewareBridge, getMiddleware());
 }
 
 void PayloadGeneralTests::TearDown(void)
@@ -534,12 +537,23 @@ TEST_F(PayloadGeneralTests, GetNumberFieldsInValidNumFields)
 TEST_F(PayloadGeneralTests, GetSendSubjectValid)
 {
     msgPayload          testPayload = NULL;
+    const char*         subjectOut  = NULL;
+    const char*         subjectIn   = "testsubj";
+    msgBridge           bridgeMsg   = NULL;
+    mamaMsg             parentMsg   = NULL;
 
-    result = aBridge->msgPayloadCreate(&testPayload);
+    //result = aBridge->msgPayloadCreate(&testPayload);
+    mamaMsg_createForPayloadBridge(&parentMsg, aBridge);
+    mamaMsgImpl_getPayload(parentMsg, &testPayload);
     EXPECT_EQ (MAMA_STATUS_OK, result);
 
-    result = aBridge->msgPayloadGetSendSubject(testPayload, NULL);
-    EXPECT_EQ (MAMA_STATUS_NOT_IMPLEMENTED, result);
+    mamaMsgImpl_setBridgeImpl (parentMsg, mMiddlewareBridge);
+    mamaMsgImpl_setSubscInfo (parentMsg, NULL, NULL, subjectIn, 1);
+
+    result = aBridge->msgPayloadGetSendSubject(testPayload, &subjectOut);
+    CHECK_NON_IMPLEMENTED_OPTIONAL(result);
+
+    EXPECT_EQ (MAMA_STATUS_OK, result);
 }
 
 
@@ -562,7 +576,10 @@ TEST_F(PayloadGeneralTests, GetSendSubjectInValidSubject)
     EXPECT_EQ (MAMA_STATUS_OK, result);
 
     result = aBridge->msgPayloadGetSendSubject(testPayload, NULL);
-    EXPECT_EQ (MAMA_STATUS_NOT_IMPLEMENTED, result);
+
+    CHECK_NON_IMPLEMENTED_OPTIONAL(result);
+
+    EXPECT_EQ (MAMA_STATUS_NULL_ARG, result);
 }
 
 TEST_F(PayloadGeneralTests, ToStringValid)
--
2.4.3


[PATCH 06/14] AVIS: Modified date time serialization to include flags

Frank Quinn <fquinn.ni@...>
 

This was done with a simple assignment. Will need revised if MAMA
ever decide to change their datetime format.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
index b179dbf..0e01966 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
@@ -973,11 +973,11 @@ mama_status avisValue_getDateTime(const Value* pValue, mamaDateTime result)
     if (!pValue) return MAMA_STATUS_NULL_ARG;
     switch (pValue->type)
     {
-        case TYPE_STRING:  mamaDateTime_setFromString (result, pValue->value.str); break;
-        case TYPE_REAL64:  mamaDateTime_setEpochTimeF64 (result, pValue->value.real64); break;
-        case TYPE_INT64:  mamaDateTime_setEpochTimeMilliseconds (result, pValue->value.int64); break;
-        default: return MAMA_STATUS_WRONG_FIELD_TYPE; break;
-}
+        case TYPE_STRING: mamaDateTime_setFromString (result, pValue->value.str); break;
+        case TYPE_REAL64: mamaDateTime_setEpochTimeF64 (result, pValue->value.real64); break;
+        case TYPE_INT64:  *result = pValue->value.int64; break;
+        default: return MAMA_STATUS_WRONG_FIELD_TYPE; break;
+    }
     return MAMA_STATUS_OK;
 }
 
@@ -988,9 +988,7 @@ avisMsg_setDateTime(
         mama_fid_t          fid,
         const mamaDateTime  value)
 {
-    uint64_t tempu64;
-    mamaDateTime_getEpochTimeMicroseconds(value, &tempu64);
-    return avisMsg_setU64(attributes, name, fid, tempu64);
+    return avisMsg_setU64(attributes, name, fid, *value);
 }
 
 mama_status
--
2.4.3


[PATCH 05/14] AVIS: Added avis implementation for muteCurrentTopic

Frank Quinn <fquinn.ni@...>
 

Like qpid, this is simply a call to the internal bridge function
to mute the subscription's topic.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/sub.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/sub.c b/mama/c_cpp/src/c/bridge/avis/sub.c
index 0418634..39f0541 100644
--- a/mama/c_cpp/src/c/bridge/avis/sub.c
+++ b/mama/c_cpp/src/c/bridge/avis/sub.c
@@ -462,7 +462,7 @@ avisBridgeMamaSubscription_setTopicClosure (subscriptionBridge subscriber,
 mama_status
 avisBridgeMamaSubscription_muteCurrentTopic (subscriptionBridge subscriber)
 {
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    return avisBridgeMamaSubscription_mute (subscriber);
 }
 
 int
--
2.4.3


[PATCH 04/14] AVIS: Added several value validation steps that unit tests require

Frank Quinn <fquinn.ni@...>
 

Several unit tests expect MAMA bridges to return standard error
codes in various scenarios. This set of updates should bring avis
into line with these expectations.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/msg.c             |  4 +++-
 mama/c_cpp/src/c/bridge/avis/sub.c             |  1 +
 mama/c_cpp/src/c/bridge/avis/transportbridge.c |  8 +++----
 mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c | 22 ++++++++++--------
 mama/c_cpp/src/c/payload/avismsg/avispayload.c | 32 +++++++++++++++++++++-----
 5 files changed, 46 insertions(+), 21 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/msg.c b/mama/c_cpp/src/c/bridge/avis/msg.c
index 5e02673..9840cf6 100644
--- a/mama/c_cpp/src/c/bridge/avis/msg.c
+++ b/mama/c_cpp/src/c/bridge/avis/msg.c
@@ -55,7 +55,7 @@ mama_status
 avisBridgeMamaMsg_create (msgBridge* msg, mamaMsg parent)
 {
     avisMsgImpl* impl;
-    if (avisMsg(msg) == NULL) return MAMA_STATUS_NULL_ARG;
+    if (avisMsg(msg) == NULL || parent == NULL) return MAMA_STATUS_NULL_ARG;
     *msg = NULL;
 
     impl = (avisMsgImpl*) calloc(1, sizeof(avisMsgImpl));
@@ -144,6 +144,7 @@ avisBridgeMamaMsg_isFromInbox (msgBridge msg)
 {
     const char* dummy;
 
+    if (NULL == msg) return -1;
     CHECK_MSG(msg);
 
     return (MAMA_STATUS_OK == avismsgPayload_getString(
@@ -213,6 +214,7 @@ mama_status
 avisBridgeMamaMsg_destroyReplyHandle (void* result)
 {
     char* replyAddr = (char*) result;
+    if (NULL == result) return MAMA_STATUS_NULL_ARG;
     free(replyAddr);
     return MAMA_STATUS_OK;
 }
diff --git a/mama/c_cpp/src/c/bridge/avis/sub.c b/mama/c_cpp/src/c/bridge/avis/sub.c
index 4a3a60d..0418634 100644
--- a/mama/c_cpp/src/c/bridge/avis/sub.c
+++ b/mama/c_cpp/src/c/bridge/avis/sub.c
@@ -446,6 +446,7 @@ mama_status
 avisBridgeMamaSubscription_getPlatformError (subscriptionBridge subscriber,
                                              void**             error)
 {
+    if (NULL == error) return MAMA_STATUS_NULL_ARG;
     CHECK_SUBSCRIBER(subscriber);
     *error = &(avisSub(subscriber)->mAvis->error);
     return MAMA_STATUS_OK;
diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.c b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
index f456f7e..3289189 100644
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
@@ -251,7 +251,7 @@ avisBridgeMamaTransport_getNumLoadBalanceAttributes (
                                           const char* name,
                                           int*        numLoadBalanceAttributes)
 {
-    if (!numLoadBalanceAttributes) return MAMA_STATUS_NULL_ARG;
+    if (!numLoadBalanceAttributes || !name) return MAMA_STATUS_NULL_ARG;
     *numLoadBalanceAttributes = 0;
     return MAMA_STATUS_OK;
 }
@@ -261,7 +261,7 @@ avisBridgeMamaTransport_getLoadBalanceSharedObjectName (
                                       const char*  name,
                                       const char** loadBalanceSharedObjectName)
 {
-    if (!loadBalanceSharedObjectName) return MAMA_STATUS_NULL_ARG;
+    if (!loadBalanceSharedObjectName || !name) return MAMA_STATUS_NULL_ARG;
     *loadBalanceSharedObjectName = NULL;
     return MAMA_STATUS_OK;
 }
@@ -271,7 +271,7 @@ avisBridgeMamaTransport_getLoadBalanceScheme (
                                     const char*    name,
                                     tportLbScheme* scheme)
 {
-    if (!scheme) return MAMA_STATUS_NULL_ARG;
+    if (!scheme || !name) return MAMA_STATUS_NULL_ARG;
     *scheme = TPORT_LB_SCHEME_STATIC;
     return MAMA_STATUS_OK;
 }
@@ -287,7 +287,7 @@ avisBridgeMamaTransport_create (transportBridge* result,
     mamaBridgeImpl*      bridgeImpl = NULL;
     const char*          url        = NULL;
 
-    if (!result || !name) return MAMA_STATUS_NULL_ARG;
+    if (!result || !name || !mamaTport) return MAMA_STATUS_NULL_ARG;
    
     transport = (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
     if (transport == NULL)
diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
index 0664c4b..b179dbf 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
@@ -952,17 +952,19 @@ avisMsg_getDateTime(
         mama_fid_t     fid,
         mamaDateTime   result)
 {
-    char tempName[64];
-    Value* pValue = NULL;
-    if(fid != 0)
-    {
-        snprintf (tempName, 63, "%d", fid);
-        pValue = attributes_get(attributes, tempName);
-    }
+    char tempName[64];
+    Value* pValue = NULL;
+    if (NULL == result)
+        return MAMA_STATUS_NULL_ARG;
+    if(fid != 0)
+    {
+        snprintf (tempName, 63, "%d", fid);
+        pValue = attributes_get(attributes, tempName);
+    }
     if ((!pValue) && (name))
-        pValue = attributes_get(attributes, name);
+        pValue = attributes_get(attributes, name);
     if (!pValue)
-      return MAMA_STATUS_NOT_FOUND;
+        return MAMA_STATUS_NOT_FOUND;
     return avisValue_getDateTime(pValue, result);
 }
 
@@ -1064,7 +1066,7 @@ mama_status avisValue_getPrice(const Value* pValue, mamaPrice result)
           return MAMA_STATUS_WRONG_FIELD_TYPE;
 
     mamaPrice_setWithHints (result, pValue->value.real64, MAMA_PRICE_HINTS_NONE);
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    return MAMA_STATUS_OK;
 }
 
 
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index f02920e..0089fdc 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -52,7 +52,7 @@
 #define CHECK_NAME(name,fid) \
         do {  \
            if ((fid == 0) && (name == 0)) return MAMA_STATUS_NULL_ARG; \
-           if ((fid == 0) && (strlen(name)== 0)) return MAMA_STATUS_INVALID_ARG; \
+           if ((fid == 0) && (strlen(name) == 0)) return MAMA_STATUS_INVALID_ARG; \
          } while(0)
 
 #define CHECK_ITER(iter) \
@@ -165,8 +165,8 @@ mama_status
 avismsgPayload_createFromByteBuffer(msgPayload* msg, mamaPayloadBridge bridge, const void* buffer, mama_size_t bufferLength)
 {
     CHECK_NULL(msg);
-    CHECK_NULL(bridge);
     CHECK_NULL(buffer);
+    if (0 == bufferLength) return MAMA_STATUS_INVALID_ARG;
    
     avisPayloadImpl* newPayload = (avisPayloadImpl*)calloc (1, sizeof(avisPayloadImpl));
 
@@ -226,12 +226,12 @@ avismsgPayload_setParent (msgPayload          msg,
 
 mama_status
 avismsgPayload_getByteSize       (const msgPayload    msg,
-                                mama_size_t*        size)
+                                  mama_size_t*        size)
 {
+    const void* buffer = NULL;
     CHECK_PAYLOAD(msg);
     CHECK_NULL(size);
-    *size = 0;
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    return avismsgPayload_serialize (msg, &buffer, size);
 }
 
 mama_status
@@ -251,6 +251,7 @@ avismsgPayload_unSerialize (const msgPayload    msg,
 
     CHECK_PAYLOAD (msg);
     CHECK_NULL(buffer);
+    CHECK_NULL(bufferLength);
 
     if (!impl->mAvisMsg)
         impl->mAvisMsg = attributes_create();
@@ -462,6 +463,8 @@ avismsgPayload_setByteBuffer     (const msgPayload    msg,
 {
     avisPayloadImpl* impl = (avisPayloadImpl*)msg;
     CHECK_PAYLOAD(msg);
+    CHECK_NULL(buffer);
+    CHECK_NULL(bufferLength);
 
     impl->mAvisMsg=(Attributes*) buffer;
 
@@ -568,6 +571,11 @@ avismsgPayload_iterateFields (const msgPayload        msg,
     mama_status status = MAMA_STATUS_OK;
     avisFieldPayload* currField = NULL;
 
+    if (!parent || !cb || !field)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
     if (!impl->mIterator)
     {
         status = avismsgPayloadIter_create((msgPayloadIter*) &impl->mIterator, msg);
@@ -604,7 +612,9 @@ avismsgPayload_getFieldAsString  (const msgPayload    msg,
                                 mama_size_t         len)
 {
     CHECK_PAYLOAD(msg);
+    CHECK_NULL(buf);
     CHECK_NAME(name, fid);
+    if (0 == len) return MAMA_STATUS_INVALID_ARG;
     return avisMsg_getFieldAsString(avisPayload(msg), name, fid, buf, len);
 }
 
@@ -857,6 +867,7 @@ avismsgPayload_addDateTime       (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(value);
     return avisMsg_setDateTime(avisPayload(msg), name, fid, value);
 }
 
@@ -868,6 +879,7 @@ avismsgPayload_addPrice          (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(value);
     return avisMsg_setPrice(avisPayload(msg), name, fid, value);
 }
 
@@ -1246,6 +1258,7 @@ avismsgPayload_updateDateTime    (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(value);
     return avisMsg_setDateTime(avisPayload(msg), name, fid, value);
 }
 
@@ -1257,6 +1270,7 @@ avismsgPayload_updatePrice       (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(value);
     return avisMsg_setPrice(avisPayload(msg), name, fid, value);
 }
 
@@ -1688,6 +1702,7 @@ avismsgPayload_getDateTime       (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getDateTime(avisPayload(msg), name, fid, result);
 }
 
@@ -1699,6 +1714,7 @@ avismsgPayload_getPrice          (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getPrice(avisPayload(msg), name, fid, result);
 }
 
@@ -1991,7 +2007,7 @@ avismsgPayloadIter_hasNext       (msgPayloadIter iter,
                                 msgPayload     msg)
 {
     avisIterator* impl = (avisIterator*) iter;
-    if (!impl) return false;
+    if (!impl || !msg) return false;
 
     return attributes_iter_has_next(impl->mMsgIterator);
 }
@@ -2485,6 +2501,7 @@ avismsgFieldPayload_getDateTime  (const msgFieldPayload   field,
                                 mamaDateTime            result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getDateTime(avisField(field)->mValue, result);
 }
 
@@ -2493,6 +2510,7 @@ avismsgFieldPayload_getPrice     (const msgFieldPayload   field,
                                 mamaPrice               result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getPrice(avisField(field)->mValue, result);
 }
 
@@ -2642,5 +2660,7 @@ avismsgFieldPayload_getAsString (
     char*         buf,
     mama_size_t   len)
 {
+    CHECK_NULL(buf);
+    CHECK_NULL(len);
     return avisValue_getFieldAsString(avisField(field)->mValue, avisField(field)->mName, 0, buf, len);
 }
--
2.4.3


[PATCH 03/14] AVIS: Pulled Queue and Timer implementation from qpid over to avis

Frank Quinn <fquinn.ni@...>
 

In future, we may want to make this code central. In the meantime,
at least making the code identical with qpid should make it easier
to maintain.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/bridge.c |   8 +-
 mama/c_cpp/src/c/bridge/avis/queue.c  | 473 ++++++++++++++++++++++++----------
 mama/c_cpp/src/c/bridge/avis/timer.c  | 346 +++++++++++++++----------
 3 files changed, 557 insertions(+), 270 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index e975474..9728046 100644
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -30,7 +30,7 @@
 #include "avisdefs.h"
 #include "transportbridge.h"
 
-timerHeap gTimerHeap;
+timerHeap gAvisTimerHeap;
 
 /*Responsible for creating the bridge impl structure*/
 void avisBridge_createImpl (mamaBridge* result)
@@ -110,14 +110,14 @@ avisBridge_open (mamaBridge bridgeImpl)
     mama_log (MAMA_LOG_LEVEL_NORMAL,
               "avisBridge_open(): Successfully created Avis queue");
 
-    if (0 != createTimerHeap (&gTimerHeap))
+    if (0 != createTimerHeap (&gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_NORMAL,
                 "avisBridge_open(): Failed to initialize timers.");
         return MAMA_STATUS_PLATFORM;
     }
 
-    if (0 != startDispatchTimerHeap (gTimerHeap))
+    if (0 != startDispatchTimerHeap (gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_NORMAL,
                 "avisBridge_open(): Failed to start timer thread.");
@@ -142,7 +142,7 @@ avisBridge_close (mamaBridge bridgeImpl)
     impl =  (mamaBridgeImpl*)bridgeImpl;
 
 
-    if (0 != destroyHeap (gTimerHeap))
+    if (0 != destroyHeap (gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_close():"
                 "Failed to destroy Avis timer heap.");
diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c
index e32845f..c0aab67 100644
--- a/mama/c_cpp/src/c/bridge/avis/queue.c
+++ b/mama/c_cpp/src/c/bridge/avis/queue.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,58 +19,131 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
+#include <wombat/queue.h>
 #include <bridge.h>
 #include "queueimpl.h"
 #include "avisbridgefunctions.h"
-#include <wombat/queue.h>
 
-#include <avis/elvin.h>
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
 
 typedef struct avisQueueBridge {
     mamaQueue          mParent;
     wombatQueue        mQueue;
-    mamaQueueEnqueueCB mEnqueueCb;
+    uint8_t            mHighWaterFired;
+    size_t             mHighWatermark;
+    size_t             mLowWatermark;
+    uint8_t            mIsDispatching;
+    mamaQueueEnqueueCB mEnqueueCallback;
     void*              mEnqueueClosure;
-    uint8_t            mIsNative;
+    wthread_mutex_t    mDispatchLock;
 } avisQueueBridge;
 
-typedef struct avisQueueClosure {
-    avisQueueBridge* mImpl;
-    mamaQueueEventCB mCb;
-    void*            mUserClosure;
-} avisQueueClosure;
 
-#define avisQueue(queue) ((avisQueueBridge*) queue)
-#define CHECK_QUEUE(queue) \
-        do {  \
-           if (avisQueue(queue) == 0) return MAMA_STATUS_NULL_ARG; \
-           if (avisQueue(queue)->mQueue == NULL) return MAMA_STATUS_NULL_ARG; \
-         } while(0)
+/*=========================================================================
+  =                              Macros                                   =
+  =========================================================================*/
+
+#define     CHECK_QUEUE(IMPL)                                                  \
+    do {                                                                       \
+        if (IMPL == NULL)              return MAMA_STATUS_NULL_ARG;            \
+        if (IMPL->mQueue == NULL)      return MAMA_STATUS_NULL_ARG;            \
+    } while(0)
+
+/* Timeout is in milliseconds */
+#define     AVIS_QUEUE_DISPATCH_TIMEOUT     500
+#define     AVIS_QUEUE_MAX_SIZE             WOMBAT_QUEUE_MAX_SIZE
+#define     AVIS_QUEUE_CHUNK_SIZE           WOMBAT_QUEUE_CHUNK_SIZE
+#define     AVIS_QUEUE_INITIAL_SIZE         WOMBAT_QUEUE_CHUNK_SIZE
+
 
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+/**
+ * This funcion is called to check the current queue size against configured
+ * watermarks to determine whether or not it should call the watermark callback
+ * functions. If it determines that it should, it invokes the relevant callback
+ * itself.
+ *
+ * @param impl The avis queue bridge implementation to check.
+ */
+static void
+avisBridgeMamaQueueImpl_checkWatermarks (avisQueueBridge* impl);
+
+
+/*=========================================================================
+  =               Public interface implementation functions               =
+  =========================================================================*/
 
 mama_status
 avisBridgeMamaQueue_create (queueBridge* queue,
                             mamaQueue    parent)
 {
-    avisQueueBridge* avisQueue = NULL;
+    /* Null initialize the queue to be created */
+    avisQueueBridge*    impl                = NULL;
+    wombatQueueStatus   underlyingStatus    = WOMBAT_QUEUE_OK;
 
-    if (queue == NULL)
+    if (queue == NULL || parent == NULL)
+    {
         return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Null initialize the queueBridge */
     *queue = NULL;
 
-    avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
-    if (avisQueue == NULL)
+    /* Allocate memory for the avis queue implementation */
+    impl = (avisQueueBridge*) calloc (1, sizeof (avisQueueBridge));
+    if (NULL == impl)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to allocate memory for queue.");
         return MAMA_STATUS_NOMEM;
+    }
 
-    avisQueue->mParent         = parent;
-    avisQueue->mEnqueueCb      = NULL;
-    avisQueue->mEnqueueClosure = NULL;
+    /* Initialize the dispatch lock */
+    wthread_mutex_init (&impl->mDispatchLock, NULL);
 
-    wombatQueue_allocate (&avisQueue->mQueue);
-    wombatQueue_create (avisQueue->mQueue, 0, 0, 0);
+    /* Back-reference the parent for future use in the implementation struct */
+    impl->mParent = parent;
 
-    *queue = (queueBridge) avisQueue;
+    /* Allocate and create the wombat queue */
+    underlyingStatus = wombatQueue_allocate (&impl->mQueue);
+    if (WOMBAT_QUEUE_OK != underlyingStatus)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to allocate memory for underlying queue.");
+        free (impl);
+        return MAMA_STATUS_NOMEM;
+    }
+
+    underlyingStatus = wombatQueue_create (impl->mQueue,
+                                           AVIS_QUEUE_MAX_SIZE,
+                                           AVIS_QUEUE_INITIAL_SIZE,
+                                           AVIS_QUEUE_CHUNK_SIZE);
+    if (WOMBAT_QUEUE_OK != underlyingStatus)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to create underlying queue.");
+        wombatQueue_deallocate (impl->mQueue);
+        free (impl);
+        return MAMA_STATUS_PLATFORM;
+    }
+
+    /* Populate the queueBridge pointer with the implementation for return */
+    *queue = (queueBridge) impl;
 
     return MAMA_STATUS_OK;
 }
@@ -80,23 +153,33 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue,
                                         mamaQueue    parent,
                                         void*        nativeQueue)
 {
-    avisQueueBridge* avisQueue = NULL;
-   
-    if (queue == NULL)
+    avisQueueBridge* impl = NULL;
+    if (NULL == queue || NULL == parent || NULL == nativeQueue)
+    {
         return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Null initialize the queueBridge to be returned */
     *queue = NULL;
 
-    avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
-    if (avisQueue == NULL)
+    /* Allocate memory for the avis bridge implementation */
+    impl = (avisQueueBridge*) calloc (1, sizeof (avisQueueBridge));
+    if (NULL == impl)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create_usingNative (): "
+                  "Failed to allocate memory for queue.");
         return MAMA_STATUS_NOMEM;
+    }
+
+    /* Back-reference the parent for future use in the implementation struct */
+    impl->mParent = parent;
 
-    avisQueue->mParent         = parent;
-    avisQueue->mEnqueueCb      = NULL;
-    avisQueue->mEnqueueClosure = NULL;
-    avisQueue->mQueue          = (wombatQueue)nativeQueue;
-    avisQueue->mIsNative       = 1;
+    /* Wombat queue has already been created, so simply reference it here */
+    impl->mQueue = (wombatQueue) nativeQueue;
 
-    *queue = (queueBridge) avisQueue;
+    /* Populate the queueBridge pointer with the implementation for return */
+    *queue = (queueBridge) impl;
 
     return MAMA_STATUS_OK;
 }
@@ -104,35 +187,98 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue,
 mama_status
 avisBridgeMamaQueue_destroy (queueBridge queue)
 {
-    CHECK_QUEUE(queue);
-    if (!avisQueue(queue)->mIsNative)
-        wombatQueue_destroy (avisQueue(queue)->mQueue);
-    free(avisQueue(queue));
+    wombatQueueStatus   status  = WOMBAT_QUEUE_OK;
+    avisQueueBridge*    impl    = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Destroy the underlying wombatQueue - can be called from any thread*/
+    wthread_mutex_lock              (&impl->mDispatchLock);
+    status = wombatQueue_destroy    (impl->mQueue);
+    wthread_mutex_unlock            (&impl->mDispatchLock);
+
+    /* Free the avisQueueImpl container struct */
+    free (impl);
+
+    if (WOMBAT_QUEUE_OK != status)
+    {
+        mama_log (MAMA_LOG_LEVEL_WARN,
+                  "avisBridgeMamaQueue_destroy (): "
+                  "Failed to destroy wombat queue (%d).",
+                  status);
+        return MAMA_STATUS_PLATFORM;
+    }
+
+    return MAMA_STATUS_OK;
+}
+
+mama_status
+avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count)
+{
+    avisQueueBridge* impl       = (avisQueueBridge*) queue;
+    int              countInt   = 0;
+
+    if (NULL == count)
+        return MAMA_STATUS_NULL_ARG;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Initialize count to zero */
+    *count = 0;
+
+    /* Get the wombatQueue size */
+    wombatQueue_getSize (impl->mQueue, &countInt);
+    *count = (size_t)countInt;
+
     return MAMA_STATUS_OK;
 }
 
 mama_status
 avisBridgeMamaQueue_dispatch (queueBridge queue)
 {
-    wombatQueueStatus status;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Lock for dispatching */
+    wthread_mutex_lock (&impl->mDispatchLock);
 
-    CHECK_QUEUE(queue);
+    impl->mIsDispatching = 1;
 
+    /*
+     * Continually dispatch as long as the calling application wants dispatching
+     * to be done and no errors are encountered
+     */
     do
     {
-        /* 500 is .5 seconds */
-        status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL, 500);
+        /* Check the watermarks to see if thresholds have been breached */
+        avisBridgeMamaQueueImpl_checkWatermarks (impl);
+
+        /*
+         * Perform a dispatch with a timeout to allow the dispatching process
+         * to be interrupted by the calling application between iterations
+         */
+        status = wombatQueue_timedDispatch (impl->mQueue,
+                                            NULL,
+                                            NULL,
+                                            AVIS_QUEUE_DISPATCH_TIMEOUT);
     }
-    while ((status == WOMBAT_QUEUE_OK ||
-            status == WOMBAT_QUEUE_TIMEOUT) &&
-            mamaQueueImpl_isDispatching (avisQueue(queue)->mParent));
+    while ( (WOMBAT_QUEUE_OK == status || WOMBAT_QUEUE_TIMEOUT == status)
+            && impl->mIsDispatching);
 
-    if (status != WOMBAT_QUEUE_OK && status != WOMBAT_QUEUE_TIMEOUT)
+    /* Unlock the dispatch lock */
+    wthread_mutex_unlock (&impl->mDispatchLock);
+
+    /* Timeout is encountered after each dispatch and so is expected here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_dispatch (): "
+                  "Failed to dispatch Avis Middleware queue (%d). ",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
@@ -143,41 +289,56 @@ avisBridgeMamaQueue_dispatch (queueBridge queue)
 mama_status
 avisBridgeMamaQueue_timedDispatch (queueBridge queue, uint64_t timeout)
 {
-    wombatQueueStatus status;
-   
-    CHECK_QUEUE(queue);
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl        = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL, timeout);
-    if (status == WOMBAT_QUEUE_TIMEOUT) return MAMA_STATUS_TIMEOUT;
+    /* Check the watermarks to see if thresholds have been breached */
+    avisBridgeMamaQueueImpl_checkWatermarks (impl);
 
-    if (status != WOMBAT_QUEUE_OK)
+    /* Attempt to dispatch the queue with a timeout once */
+    status = wombatQueue_timedDispatch (impl->mQueue,
+                                        NULL,
+                                        NULL,
+                                        timeout);
+
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_timedDispatch (): "
+                  "Failed to dispatch Avis Middleware queue (%d).",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
+
 }
 
 mama_status
 avisBridgeMamaQueue_dispatchEvent (queueBridge queue)
 {
-    wombatQueueStatus status;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
 
-    CHECK_QUEUE(queue);
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_dispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL);
+    /* Check the watermarks to see if thresholds have been breached */
+    avisBridgeMamaQueueImpl_checkWatermarks (impl);
 
-    if (status != WOMBAT_QUEUE_OK)
+    /* Attempt to dispatch the queue with a timeout once */
+    status = wombatQueue_dispatch (impl->mQueue, NULL, NULL);
+
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_dispatchEvent (): "
+                  "Failed to dispatch Avis Middleware queue (%d).",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
@@ -185,42 +346,40 @@ avisBridgeMamaQueue_dispatchEvent (queueBridge queue)
     return MAMA_STATUS_OK;
 }
 
-static void MAMACALLTYPE queueCb (void *ignored, void* closure)
-{
-    avisQueueClosure* cl = (avisQueueClosure*)closure;
-    if (NULL ==cl) return;
-    cl->mCb (cl->mImpl->mParent, cl->mUserClosure);
-    free (cl);
-}
-
 mama_status
 avisBridgeMamaQueue_enqueueEvent (queueBridge        queue,
                                   mamaQueueEventCB   callback,
                                   void*              closure)
 {
-    wombatQueueStatus status;
-    avisQueueClosure* cl = NULL;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
 
-    if (!callback) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-
-    cl = (avisQueueClosure*)calloc(1, sizeof(avisQueueClosure));
-    if (NULL == cl) return MAMA_STATUS_NOMEM;
+    if (NULL == callback)
+        return MAMA_STATUS_NULL_ARG;
 
-    cl->mImpl = avisQueue(queue);
-    cl->mCb    = callback;
-    cl->mUserClosure = closure;
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_enqueue (avisQueue(queue)->mQueue,
-                queueCb, NULL, cl);
+    /* Call the underlying wombatQueue_enqueue method */
+    status = wombatQueue_enqueue (impl->mQueue,
+                                  (wombatQueueCb) callback,
+                                  impl->mParent,
+                                  closure);
 
-    if (status != WOMBAT_QUEUE_OK)
-        return MAMA_STATUS_PLATFORM;
+    /* Call the enqueue callback if provided */
+    if (NULL != impl->mEnqueueCallback)
+    {
+        impl->mEnqueueCallback (impl->mParent, impl->mEnqueueClosure);
+    }
 
-    if (avisQueue(queue)->mEnqueueCb)
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status)
     {
-        avisQueue(queue)->mEnqueueCb (avisQueue(queue)->mParent,
-                                      avisQueue(queue)->mEnqueueClosure);
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_enqueueEvent (): "
+                  "Failed to enqueueEvent (%d). Callback: %p; Closure: %p",
+                  status, callback, closure);
+        return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
@@ -229,20 +388,13 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge        queue,
 mama_status
 avisBridgeMamaQueue_stopDispatch (queueBridge queue)
 {
-    wombatQueueStatus status;
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
 
-    if (queue == NULL)
-        return MAMA_STATUS_NULL_ARG;
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_unblock (avisQueue(queue)->mQueue);
-    if (status != WOMBAT_QUEUE_OK)
-    {
-        mama_log (MAMA_LOG_LEVEL_ERROR,
-                  " Failed to stop dispatching Avis Middleware queue.",
-                  "wmwMamaQueue_stopDispatch ():");
-        return MAMA_STATUS_PLATFORM;
-    }
+    /* Tell this implementation to stop dispatching */
+    impl->mIsDispatching = 0;
 
     return MAMA_STATUS_OK;
 }
@@ -252,11 +404,18 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge        queue,
                                         mamaQueueEnqueueCB callback,
                                         void*              closure)
 {
-    if (!callback) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl   = (avisQueueBridge*) queue;
 
-    avisQueue(queue)->mEnqueueCb      = callback;
-    avisQueue(queue)->mEnqueueClosure = closure;
+    if (NULL == callback)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the enqueue callback and closure */
+    impl->mEnqueueCallback  = callback;
+    impl->mEnqueueClosure   = closure;
 
     return MAMA_STATUS_OK;
 }
@@ -264,21 +423,35 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge        queue,
 mama_status
 avisBridgeMamaQueue_removeEnqueueCallback (queueBridge queue)
 {
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    avisQueue(queue)->mEnqueueCb      = NULL;
-    avisQueue(queue)->mEnqueueClosure = NULL;
+    /* Set the enqueue callback to NULL */
+    impl->mEnqueueCallback  = NULL;
+    impl->mEnqueueClosure   = NULL;
 
     return MAMA_STATUS_OK;
 }
 
 mama_status
 avisBridgeMamaQueue_getNativeHandle (queueBridge queue,
-                                     void**      result)
+                                     void**      nativeHandle)
 {
-    if (!result) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    *result = avisQueue(queue)->mQueue;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (NULL == nativeHandle)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Return the handle to the native queue */
+    *nativeHandle = queue;
+
     return MAMA_STATUS_OK;
 }
 
@@ -286,26 +459,64 @@ mama_status
 avisBridgeMamaQueue_setHighWatermark (queueBridge queue,
                                       size_t      highWatermark)
 {
-    if (!highWatermark) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (0 == highWatermark)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the high water mark */
+    impl->mHighWatermark = highWatermark;
+
+    return MAMA_STATUS_OK;
 }
 
 mama_status
-avisBridgeMamaQueue_setLowWatermark (queueBridge queue,
-                                     size_t lowWatermark)
+avisBridgeMamaQueue_setLowWatermark (queueBridge    queue,
+                                     size_t         lowWatermark)
 {
-    if (!lowWatermark) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (0 == lowWatermark)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the low water mark */
+    impl->mLowWatermark = lowWatermark;
+
+    return MAMA_STATUS_OK;
 }
 
-mama_status
-avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count)
+
+/*=========================================================================
+  =                  Private implementation functions                     =
+  =========================================================================*/
+
+void
+avisBridgeMamaQueueImpl_checkWatermarks (avisQueueBridge* impl)
 {
-    if (!count) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    *count = 0;
-    wombatQueue_getSize (avisQueue(queue)->mQueue, (int*)count);
-    return MAMA_STATUS_OK;
+    size_t              eventCount      =  0;
+
+    /* Get the current size of the wombat impl */
+    avisBridgeMamaQueue_getEventCount      ((queueBridge) impl, &eventCount);
+
+    /* If the high watermark had been fired but the event count is now down */
+    if (0 != impl->mHighWaterFired && eventCount == impl->mLowWatermark)
+    {
+        impl->mHighWaterFired = 0;
+        mamaQueueImpl_lowWatermarkExceeded (impl->mParent, eventCount);
+    }
+    /* If the high watermark is not currently fired and now above threshold */
+    else if (0 == impl->mHighWaterFired && eventCount >= impl->mHighWatermark)
+    {
+        impl->mHighWaterFired = 1;
+        mamaQueueImpl_highWatermarkExceeded (impl->mParent, eventCount);
+    }
 }
+
diff --git a/mama/c_cpp/src/c/bridge/avis/timer.c b/mama/c_cpp/src/c/bridge/avis/timer.c
index 61cb01f..e5ec0e1 100644
--- a/mama/c_cpp/src/c/bridge/avis/timer.c
+++ b/mama/c_cpp/src/c/bridge/avis/timer.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,79 +19,88 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
 #include <mama/timer.h>
 #include <timers.h>
 #include "avisbridgefunctions.h"
 #include <wombat/queue.h>
 
-extern timerHeap gTimerHeap;
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
+
+extern timerHeap gAvisTimerHeap;
 
 typedef struct avisTimerImpl_
 {
-    timerElement  mTimerElement;
-    double        mInterval;
-    mamaTimerCb   mAction;
-    void*         mClosure;
-    mamaTimer     mParent;
-    mamaQueue     mQueue;
-
-    /* This callback will be invoked whenever the timer has been completely destroyed. */
+    timerElement    mTimerElement;
+    double          mInterval;
+    void*           mClosure;
+    mamaTimer       mParent;
+    void*           mQueue;
+    uint8_t         mDestroying;
+    /* This callback will be invoked whenever the timer has been destroyed. */
     mamaTimerCb     mOnTimerDestroyed;
-
+    /* This callback will be invoked on each timer firing */
+    mamaTimerCb     mAction;
 } avisTimerImpl;
 
-static void MAMACALLTYPE
-destroy_callback (mamaQueue queue, void* closure)
-{
-    avisTimerImpl* impl  = (avisTimerImpl*) closure;
-
-    (*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure);
 
-    free (impl);
-}
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
 
+/**
+ * Due to the fact that timed events may still be on the event queue, the
+ * timer's destroy function does not destroy the implementation immediately.
+ * Instead, it sets an implementation specific flag to stop further callbacks
+ * from being enqueued from this timer, and then enqueues this function as a
+ * callback on the queue to perform the actual destruction. This function also
+ * calls the application developer's destroy callback function.
+ *
+ * @param queue   MAMA queue from which this callback was fired.
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
 static void MAMACALLTYPE
-timerQueueCb (mamaQueue queue, void* closure)
-{
-    avisTimerImpl* impl = (avisTimerImpl*)closure;
-
-    if (impl->mAction)
-        impl->mAction (impl->mParent, impl->mClosure);
+avisBridgeMamaTimerImpl_destroyCallback (mamaQueue queue, void* closure);
 
-}
+/**
+ * When a timer fires, it enqueues this callback for execution. This is where
+ * the action callback provided in the timer's create function gets fired.
+ *
+ * @param queue   MAMA queue from which this callback was fired.
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_queueCallback (mamaQueue queue, void* closure);
 
+/**
+ * Every time the timer fires, it calls this timer callback which adds
+ * avisBridgeMamaTimerImpl_queueCallback to the queue as long as the timer's
+ * mDestroying flag is not currently set.
+ *
+ * @param timer   The underlying timer element which has just fired (not used).
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
 static void
-timerCb (timerElement  timer,
-         void*         closure)
-{
-    avisTimerImpl* impl = (avisTimerImpl*)closure;
-    struct timeval timeout;
-
-    if (impl == NULL) return;
-
-    /* Mama timers are repeating */
-    timeout.tv_sec = (time_t)impl->mInterval;
-    timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
+avisBridgeMamaTimerImpl_timerCallback (timerElement timer, void* closure);
 
-    if (0 != createTimer (&impl->mTimerElement,
-                          gTimerHeap,
-                          timerCb,
-                          &timeout,
-                          impl))
-    {
-        mama_log (MAMA_LOG_LEVEL_ERROR,
-              "%s Failed to create Avis timer.",
-              "mamaTimer_create ():");
-    }
 
-    mamaQueue_enqueueEvent (impl->mQueue,
-                            timerQueueCb,
-                            (void*)impl);
-}
+/*=========================================================================
+  =               Public interface implementation functions               =
+  =========================================================================*/
 
 mama_status
-avisBridgeMamaTimer_create (timerBridge* result,
+avisBridgeMamaTimer_create (timerBridge*  result,
                            void*         nativeQueueHandle,
                            mamaTimerCb   action,
                            mamaTimerCb   onTimerDestroyed,
@@ -99,77 +108,92 @@ avisBridgeMamaTimer_create (timerBridge* result,
                            mamaTimer     parent,
                            void*         closure)
 {
-    avisTimerImpl* impl      =   NULL;
-    struct timeval timeout;
 
-    if (result == NULL) return MAMA_STATUS_NULL_ARG;
+    avisTimerImpl*              impl            = NULL;
+    int                         timerResult     = 0;
+    struct timeval              timeout;
 
-    mama_log (MAMA_LOG_LEVEL_FINEST,
-              "%s Entering with interval [%f].",
-              "avisMamaTimer_create ():",
-              interval);
+    if (NULL == result || NULL == nativeQueueHandle
+            || NULL == action
+            || NULL == parent )
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
 
+    /* Null initialize the timer bridge supplied */
     *result = NULL;
 
-    impl = (avisTimerImpl*)calloc (1, sizeof (avisTimerImpl));
-    if (impl == NULL) return MAMA_STATUS_NOMEM;
-
-    impl->mQueue    = NULL;
-    impl->mParent   = parent;
-    impl->mAction   = action;
-    impl->mClosure  = closure;
-    impl->mInterval = interval;
-
-    mamaTimer_getQueue (parent, &impl->mQueue);
-
-    impl->mOnTimerDestroyed = onTimerDestroyed;
-
-    *result = (timerBridge)impl;
+    /* Allocate the timer implementation and set up */
+    impl = (avisTimerImpl*) calloc (1, sizeof (avisTimerImpl));
+    if (NULL == impl)
+    {
+        return MAMA_STATUS_NOMEM;
+    }
 
-    timeout.tv_sec = (time_t)interval;
+    *result                     = (timerBridge) impl;
+    impl->mQueue                = nativeQueueHandle;
+    impl->mParent               = parent;
+    impl->mAction               = action;
+    impl->mClosure              = closure;
+    impl->mInterval             = interval;
+    impl->mOnTimerDestroyed     = onTimerDestroyed;
+    impl->mDestroying           = 0;
+
+    /* Determine when the next timer should fire */
+    timeout.tv_sec  = (time_t) interval;
     timeout.tv_usec = ((interval-timeout.tv_sec) * 1000000.0);
 
-    if (0 != createTimer (&impl->mTimerElement,
-                          gTimerHeap,
-                          timerCb,
-                          &timeout,
-                          impl))
+    /* Create the first single fire timer */
+    timerResult = createTimer (&impl->mTimerElement,
+                               gAvisTimerHeap,
+                               avisBridgeMamaTimerImpl_timerCallback,
+                               &timeout,
+                               impl);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to create Avis timer.",
-                  "mamaTimer_create ():");
-        return MAMA_STATUS_TIMER_FAILURE;
+                  "Failed to create Avis underlying timer [%d].", timerResult);
+        return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
 }
 
+/* This call should always come from MAMA queue thread */
 mama_status
 avisBridgeMamaTimer_destroy (timerBridge timer)
 {
-    mama_status    returnStatus = MAMA_STATUS_OK;
-    avisTimerImpl* impl = NULL;
+    avisTimerImpl*  impl            = NULL;
+    mama_status     returnStatus    = MAMA_STATUS_OK;
+    int             timerResult     = 0;
 
-    if (timer == NULL)
+    if (NULL == timer)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
-    impl->mAction = NULL;
-    mama_log (MAMA_LOG_LEVEL_FINEST,
-              "%s Entering for 0x%x",
-              "avisMamaTimer_destroy ():", impl);
+    /* Nullify the callback and set destroy flag */
+    impl                            = (avisTimerImpl*) timer;
+    impl->mDestroying               = 1;
+    impl->mAction                   = NULL;
 
-    if (0 != destroyTimer (gTimerHeap, impl->mTimerElement))
+    /* Destroy the timer element */
+    timerResult = destroyTimer (gAvisTimerHeap, impl->mTimerElement);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to destroy Avis timer.",
-                  "avisMamaTimer_destroy ():");
+                  "Failed to destroy Avis underlying timer [%d].",
+                  timerResult);
         returnStatus = MAMA_STATUS_PLATFORM;
     }
 
-    mamaQueue_enqueueEvent (impl->mQueue,
-                            destroy_callback,
-                            (void*)impl);
+    /*
+     * Put the impl free at the back of the queue to be executed when all
+     * pending timer events have been completed
+     */
+    avisBridgeMamaQueue_enqueueEvent ((queueBridge) impl->mQueue,
+                                      avisBridgeMamaTimerImpl_destroyCallback,
+                                      (void*) impl);
 
     return returnStatus;
 }
@@ -177,60 +201,52 @@ avisBridgeMamaTimer_destroy (timerBridge timer)
 mama_status
 avisBridgeMamaTimer_reset (timerBridge timer)
 {
-    mama_status status      = MAMA_STATUS_OK;
-    avisTimerImpl* impl  = NULL;
-    struct timeval timeout;
+    avisTimerImpl*      impl            = (avisTimerImpl*) timer;
+    int                 timerResult     = 0;
+    struct timeval      timeout;
 
-    if (timer == NULL)
+    if (NULL == impl)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
-    timeout.tv_sec = (time_t)impl->mInterval;
-    timeout.tv_usec = ((impl->mInterval-timeout.tv_sec) * 1000000.0);
+    /* Destroy the existing timer element */
+    destroyTimer (gAvisTimerHeap, impl->mTimerElement);
 
-    if (timer == NULL)
-        return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    /* Calculate next time interval */
+    timeout.tv_sec  = (time_t) impl->mInterval;
+    timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
 
-    if (0 != destroyTimer (gTimerHeap, impl->mTimerElement))
+    /* Create the timer for the next firing */
+    timerResult = createTimer (&impl->mTimerElement,
+                               gAvisTimerHeap,
+                               avisBridgeMamaTimerImpl_timerCallback,
+                               &timeout,
+                               impl);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to destroy Avis timer.",
-                  "avisMamaTimer_destroy ():");
-        avisBridgeMamaTimer_destroy (timer);
-        status = MAMA_STATUS_PLATFORM;
-    }
-    else
-    {
-        if (0 != createTimer (&impl->mTimerElement,
-                              gTimerHeap,
-                              timerCb,
-                              &timeout,
-                              impl))
-        {
-            mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to create Avis timer.",
-                  "mamaTimer_create ():");
-            status = MAMA_STATUS_PLATFORM;
-        }
+                  "Failed to reset Avis underlying timer [%d].", timerResult);
+        return MAMA_STATUS_PLATFORM;
     }
 
-    return status;
+    return MAMA_STATUS_OK;
+
 }
 
 mama_status
 avisBridgeMamaTimer_setInterval (timerBridge  timer,
-                                mama_f64_t   interval)
+                                 mama_f64_t   interval)
 {
-    avisTimerImpl* impl  = NULL;
-
-    if (timer == NULL)
+    avisTimerImpl* impl  = (avisTimerImpl*) timer;
+    if (NULL == timer)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
     impl->mInterval = interval;
 
-   return  avisBridgeMamaTimer_reset (timer);
+    return  avisBridgeMamaTimer_reset (timer);
 }
 
 mama_status
@@ -238,11 +254,71 @@ avisBridgeMamaTimer_getInterval (timerBridge    timer,
                                 mama_f64_t*    interval)
 {
     avisTimerImpl* impl  = NULL;
-
-    if (timer == NULL)
+    if (NULL == timer || NULL == interval)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
+    impl = (avisTimerImpl*) timer;
     *interval = impl->mInterval;
+
     return MAMA_STATUS_OK;
 }
+
+
+/*=========================================================================
+  =                  Private implementation functions                     =
+  =========================================================================*/
+
+/* This callback is invoked by the avis bridge's destroy event */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_destroyCallback (mamaQueue queue, void* closure)
+{
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+    (*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure);
+
+    /* Free the implementation memory here */
+    free (impl);
+}
+
+/* This callback is invoked by the avis bridge's timer event */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_queueCallback (mamaQueue queue, void* closure)
+{
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+    if (impl->mAction)
+    {
+        impl->mAction (impl->mParent, impl->mClosure);
+    }
+}
+
+/* This callback is invoked by the common timer's dispatch thread */
+static void
+avisBridgeMamaTimerImpl_timerCallback (timerElement  timer,
+                                       void*         closure)
+{
+
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+
+    if (NULL == impl)
+    {
+        return;
+    }
+
+    /*
+     * Only enqueue further timer callbacks the timer is not currently getting
+     * destroyed
+     */
+    if (0 == impl->mDestroying)
+    {
+        /* Set the timer for the next firing */
+        avisBridgeMamaTimer_reset ((timerBridge) closure);
+
+        /* Enqueue the callback for handling */
+        avisBridgeMamaQueue_enqueueEvent ((queueBridge) impl->mQueue,
+                                          avisBridgeMamaTimerImpl_queueCallback,
+                                          closure);
+    }
+}
+
+
--
2.4.3


[PATCH 02/14] AVIS: Pulled IO implementation from qpid over to avis

Frank Quinn <fquinn.ni@...>
 

In future, we may want to make this code central. In the meantime,
at least making the code identical with qpid should make it easier
to maintain.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/SConscript |   4 +-
 mama/c_cpp/src/c/bridge/avis/bridge.c   |  16 +-
 mama/c_cpp/src/c/bridge/avis/io.c       | 274 +++++++++++++++++++++++++++++---
 mama/c_cpp/src/c/bridge/avis/io.h       |  51 ++++++
 4 files changed, 320 insertions(+), 25 deletions(-)
 create mode 100644 mama/c_cpp/src/c/bridge/avis/io.h

diff --git a/mama/c_cpp/src/c/bridge/avis/SConscript b/mama/c_cpp/src/c/bridge/avis/SConscript
index 526612f..475c924 100644
--- a/mama/c_cpp/src/c/bridge/avis/SConscript
+++ b/mama/c_cpp/src/c/bridge/avis/SConscript
@@ -17,10 +17,10 @@ incPath.append('#mama/c_cpp/src/c')
 env['CCFLAGS'] = [x for x in env['CCFLAGS'] if x != '-pedantic-errors']
 
 if env['host']['os'] == 'Darwin':
-    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon'],
+    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'event', 'wombatcommon'],
                LIBPATH=libPath, CPPPATH=incPath)
 else:
-    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon', 'uuid'],
+    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'event', 'wombatcommon', 'uuid'],
                LIBPATH=libPath, CPPPATH=incPath)
 
 conf = Configure(env, config_h='./config.h', log_file='./config.log')
diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 33dfc17..e975474 100644
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -124,6 +124,9 @@ avisBridge_open (mamaBridge bridgeImpl)
         return MAMA_STATUS_PLATFORM;
     }
 
+    /* Start the io thread */
+    avisBridgeMamaIoImpl_start ();
+
     return MAMA_STATUS_OK;
 }
 
@@ -150,16 +153,19 @@ avisBridge_close (mamaBridge bridgeImpl)
 
     mamaBridgeImpl_getClosure(impl, &avisBridge);
 
-    if (avisBridge)
-    {
-        free (avisBridge);
-    }
-
     mamaQueue_destroyWait(impl->mDefaultEventQueue);
 
     free (impl);
    
     wsocketcleanup();
+
+    /* Stop and destroy the io thread */
+    avisBridgeMamaIoImpl_stop ();
+
+    if (avisBridge)
+    {
+        free (avisBridge);
+    }
     return status;
 }
 
diff --git a/mama/c_cpp/src/c/bridge/avis/io.c b/mama/c_cpp/src/c/bridge/avis/io.c
index 91e7d22..8630c98 100644
--- a/mama/c_cpp/src/c/bridge/avis/io.c
+++ b/mama/c_cpp/src/c/bridge/avis/io.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,36 +19,274 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
 #include <mama/io.h>
-#include <bridge.h>
+#include <wombat/port.h>
 #include "avisbridgefunctions.h"
+#include "io.h"
+#include <event.h>
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
+
+typedef struct avisIoImpl
+{
+    struct event_base*  mEventBase;
+    wthread_t           mDispatchThread;
+    uint8_t             mActive;
+    uint8_t             mEventsRegistered;
+    wsem_t              mResumeDispatching;
+} avisIoImpl;
+
+typedef struct avisIoEventImpl
+{
+    uint32_t            mDescriptor;
+    mamaIoCb            mAction;
+    mamaIoType          mIoType;
+    mamaIo              mParent;
+    void*               mClosure;
+    struct event        mEvent;
+} avisIoEventImpl;
+
+/*
+ * Global static container to hold instance-wide information not otherwise
+ * available in this interface.
+ */
+static avisIoImpl       gAvisIoContainer;
+
+
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+void*
+avisBridgeMamaIoImpl_dispatchThread (void* closure);
 
+void
+avisBridgeMamaIoImpl_libeventIoCallback (int fd, short type, void* closure);
+
+
+/*=========================================================================
+  =                   Public implementation functions                     =
+  =========================================================================*/
+
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_create(ioBridge*  result,
-                         void*      nativeQueueHandle,
-                         uint32_t   descriptor,
-                         mamaIoCb   action,
-                         mamaIoType ioType,
-                         mamaIo     parent,
-                         void*      closure)
-{
-    if (!result) return MAMA_STATUS_NULL_ARG;
+avisBridgeMamaIo_create         (ioBridge*   result,
+                                 void*       nativeQueueHandle,
+                                 uint32_t    descriptor,
+                                 mamaIoCb    action,
+                                 mamaIoType  ioType,
+                                 mamaIo      parent,
+                                 void*       closure)
+{
+    avisIoEventImpl*    impl    = NULL;
+    short               evtType = 0;
+
+    if (NULL == result)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
     *result = 0;
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+
+    /* Check for supported types so we don't prematurely allocate */
+    switch (ioType)
+    {
+    case MAMA_IO_READ:
+        evtType = EV_READ;
+        break;
+    case MAMA_IO_WRITE:
+        evtType = EV_WRITE;
+        break;
+    case MAMA_IO_ERROR:
+        evtType = EV_READ | EV_WRITE;
+        break;
+    case MAMA_IO_CONNECT:
+    case MAMA_IO_ACCEPT:
+    case MAMA_IO_CLOSE:
+    case MAMA_IO_EXCEPT:
+    default:
+        return MAMA_STATUS_UNSUPPORTED_IO_TYPE;
+        break;
+    }
+
+    impl = (avisIoEventImpl*) calloc (1, sizeof (avisIoEventImpl));
+    if (NULL == impl)
+    {
+        return MAMA_STATUS_NOMEM;
+    }
+
+    impl->mDescriptor           = descriptor;
+    impl->mAction               = action;
+    impl->mIoType               = ioType;
+    impl->mParent               = parent;
+    impl->mClosure              = closure;
+
+    event_set (&impl->mEvent,
+               impl->mDescriptor,
+               evtType,
+               avisBridgeMamaIoImpl_libeventIoCallback,
+               impl);
+
+    event_add (&impl->mEvent, NULL);
+
+    event_base_set (gAvisIoContainer.mEventBase, &impl->mEvent);
+
+    /* If this is the first event since base was emptied or created */
+    if (0 == gAvisIoContainer.mEventsRegistered)
+    {
+        wsem_post (&gAvisIoContainer.mResumeDispatching);
+    }
+    gAvisIoContainer.mEventsRegistered++;
+
+    *result = (ioBridge)impl;
+
+    return MAMA_STATUS_OK;
 }
 
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_destroy (ioBridge io)
+avisBridgeMamaIo_destroy        (ioBridge io)
 {
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisIoEventImpl* impl = (avisIoEventImpl*) io;
+    if (NULL == io)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    event_del (&impl->mEvent);
+
+    free (impl);
+    gAvisIoContainer.mEventsRegistered--;
+
+    return MAMA_STATUS_OK;
 }
 
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_getDescriptor (ioBridge io, uint32_t *result)
+avisBridgeMamaIo_getDescriptor  (ioBridge    io,
+                                 uint32_t*   result)
 {
-    if (!result) return MAMA_STATUS_NULL_ARG;
-    *result = 0;
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisIoEventImpl* impl = (avisIoEventImpl*) io;
+    if (NULL == io || NULL == result)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
+    *result = impl->mDescriptor;
+
+    return MAMA_STATUS_OK;
+}
+
+/*=========================================================================
+  =                  Public implementation prototypes                     =
+  =========================================================================*/
+
+mama_status
+avisBridgeMamaIoImpl_start ()
+{
+    int threadResult                        = 0;
+    gAvisIoContainer.mEventsRegistered      = 0;
+    gAvisIoContainer.mActive                = 1;
+    gAvisIoContainer.mEventBase             = event_init ();
+
+    wsem_init (&gAvisIoContainer.mResumeDispatching, 0, 0);
+    threadResult = wthread_create (&gAvisIoContainer.mDispatchThread,
+                                   NULL,
+                                   avisBridgeMamaIoImpl_dispatchThread,
+                                   gAvisIoContainer.mEventBase);
+    if (0 != threadResult)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaIoImpl_initialize(): "
+                  "wthread_create returned %d", threadResult);
+        return MAMA_STATUS_PLATFORM;
+    }
+    return MAMA_STATUS_OK;
+}
+
+mama_status
+avisBridgeMamaIoImpl_stop ()
+{
+    gAvisIoContainer.mActive = 0;
+
+    /* Alert the semaphore so the dispatch loop can exit */
+    wsem_post (&gAvisIoContainer.mResumeDispatching);
+
+    /* Tell the event loop to exit */
+    event_base_loopexit (gAvisIoContainer.mEventBase, NULL);
+
+    /* Join with the dispatch thread - it should exit shortly */
+    wthread_join (gAvisIoContainer.mDispatchThread, NULL);
+    wsem_destroy (&gAvisIoContainer.mResumeDispatching);
+
+    /* Free the main event base */
+    event_base_free (gAvisIoContainer.mEventBase);
+
+    return MAMA_STATUS_OK;
 }
 
+
+
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+void*
+avisBridgeMamaIoImpl_dispatchThread (void* closure)
+{
+    int             dispatchResult = 0;
+
+    /* Wait on the first event to register before starting dispatching */
+    wsem_wait (&gAvisIoContainer.mResumeDispatching);
+
+    while (0 != gAvisIoContainer.mActive)
+    {
+        dispatchResult = event_base_loop (gAvisIoContainer.mEventBase,
+                                          EVLOOP_NONBLOCK | EVLOOP_ONCE);
+
+        /* If no events are currently registered */
+        if (1 == dispatchResult)
+        {
+            /* Wait until they are */
+            gAvisIoContainer.mEventsRegistered = 0;
+            wsem_wait (&gAvisIoContainer.mResumeDispatching);
+        }
+    }
+    return NULL;
+}
+
+void
+avisBridgeMamaIoImpl_libeventIoCallback (int fd, short type, void* closure)
+{
+    avisIoEventImpl* impl = (avisIoEventImpl*) closure;
+
+    /* Timeout is the only error detectable with libevent */
+    if (EV_TIMEOUT == type)
+    {
+        /* If this is an error IO type, fire the callback */
+        if (impl->mIoType == MAMA_IO_ERROR && NULL != impl->mAction)
+        {
+            (impl->mAction)(impl->mParent, impl->mIoType, impl->mClosure);
+        }
+        /* If this is not an error IO type, do nothing */
+        else
+        {
+            return;
+        }
+    }
+
+    /* Call the action callback if defined */
+    if (NULL != impl->mAction)
+    {
+        (impl->mAction)(impl->mParent, impl->mIoType, impl->mClosure);
+    }
+
+    /* Enqueue for the next time */
+    event_add (&impl->mEvent, NULL);
+}
diff --git a/mama/c_cpp/src/c/bridge/avis/io.h b/mama/c_cpp/src/c/bridge/avis/io.h
new file mode 100644
index 0000000..a14e794
--- /dev/null
+++ b/mama/c_cpp/src/c/bridge/avis/io.h
@@ -0,0 +1,51 @@
+/* $Id$
+ *
+ * OpenMAMA: The open middleware agnostic messaging API
+ * Copyright (C) 2011 NYSE Technologies, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#ifndef MAMA_BRIDGE_AVIS_IO_H__
+#define MAMA_BRIDGE_AVIS_IO_H__
+
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+#include <mama/mama.h>
+
+/*=========================================================================
+  =                  Public implementation functions                      =
+  =========================================================================*/
+
+mama_status
+avisBridgeMamaIoImpl_start (void);
+
+mama_status
+avisBridgeMamaIoImpl_stop  (void);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* MAMA_BRIDGE_AVIS_IO_H__ */
--
2.4.3


[PATCH 01/14] AVIS: Added NULL checks for all tests which caused crashing

Frank Quinn <fquinn.ni@...>
 

This is with the exception of the timer implementation which
has several bugs which have already been fixed in the qpid
queue implementation so the qpid timer should instead simply
be converted into common code to share between avis and qpid
rather than fix the Timer crash we see in MamaMiddlewareC.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/bridge.c          |  6 ++++--
 mama/c_cpp/src/c/bridge/avis/io.c              |  2 ++
 mama/c_cpp/src/c/bridge/avis/msg.c             |  4 ++++
 mama/c_cpp/src/c/bridge/avis/publisher.c       | 12 ++++++++++--
 mama/c_cpp/src/c/bridge/avis/queue.c           |  8 +++++++-
 mama/c_cpp/src/c/bridge/avis/transportbridge.c |  9 +++++++++
 mama/c_cpp/src/c/payload/avismsg/avispayload.c |  1 +
 7 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 97cbb08..33dfc17 100644
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -77,8 +77,10 @@ static const char PAYLOAD_IDS[] = {MAMA_PAYLOAD_AVIS,NULL};
 mama_status
 avisBridge_getDefaultPayloadId (char***name, char** id)
 {
-    *name = PAYLOAD_NAMES;
-    *id = PAYLOAD_IDS;
+    if (!name) return MAMA_STATUS_NULL_ARG;
+    if (!id) return MAMA_STATUS_NULL_ARG;
+    *name = PAYLOAD_NAMES;
+    *id = PAYLOAD_IDS;
 
     return MAMA_STATUS_OK;
 }
diff --git a/mama/c_cpp/src/c/bridge/avis/io.c b/mama/c_cpp/src/c/bridge/avis/io.c
index f5d2544..91e7d22 100644
--- a/mama/c_cpp/src/c/bridge/avis/io.c
+++ b/mama/c_cpp/src/c/bridge/avis/io.c
@@ -33,6 +33,7 @@ avisBridgeMamaIo_create(ioBridge*  result,
                          mamaIo     parent,
                          void*      closure)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     *result = 0;
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
@@ -46,6 +47,7 @@ avisBridgeMamaIo_destroy (ioBridge io)
 mama_status
 avisBridgeMamaIo_getDescriptor (ioBridge io, uint32_t *result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     *result = 0;
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
diff --git a/mama/c_cpp/src/c/bridge/avis/msg.c b/mama/c_cpp/src/c/bridge/avis/msg.c
index 675ef6d..5e02673 100644
--- a/mama/c_cpp/src/c/bridge/avis/msg.c
+++ b/mama/c_cpp/src/c/bridge/avis/msg.c
@@ -119,6 +119,7 @@ mama_status
 avisBridgeMamaMsgImpl_setReplyHandle (msgBridge msg, void* result)
 {
     mama_status status = MAMA_STATUS_OK;
+    if (!result) return MAMA_STATUS_NULL_ARG;
 
     CHECK_MSG(msg);
 
@@ -155,6 +156,7 @@ avisBridgeMamaMsg_setSendSubject (msgBridge   msg,
                                   const char* subject)
 {
     mama_status status = MAMA_STATUS_OK;
+    if (!symbol) return MAMA_STATUS_NULL_ARG;
 
     CHECK_MSG(msg);
 
@@ -173,6 +175,7 @@ avisBridgeMamaMsg_setSendSubject (msgBridge   msg,
 mama_status
 avisBridgeMamaMsg_getNativeHandle (msgBridge msg, void** result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     CHECK_MSG(msg);
     *result = avisMsg(msg)->mAvisMsg;
     return MAMA_STATUS_OK;
@@ -201,6 +204,7 @@ mama_status
 avisBridgeMamaMsg_copyReplyHandle (void* src, void** dest)
 {
     const char* replyAddr = (const char*) src;
+    if (!src || !dest) return MAMA_STATUS_NULL_ARG;
     *dest = (void*) strdup(replyAddr);
     return MAMA_STATUS_OK;
 }
diff --git a/mama/c_cpp/src/c/bridge/avis/publisher.c b/mama/c_cpp/src/c/bridge/avis/publisher.c
index 2e25524..364aa1d 100644
--- a/mama/c_cpp/src/c/bridge/avis/publisher.c
+++ b/mama/c_cpp/src/c/bridge/avis/publisher.c
@@ -233,11 +233,10 @@ avisBridgeMamaPublisher_createByIndex (publisherBridge* result,
                                         void*            nativeQueueHandle,
                                         mamaPublisher    parent)
 {
+    if (!result || !tport || !parent) return MAMA_STATUS_NULL_ARG;
     Elvin* avis = getAvis(tport);
     avisPublisherBridge* publisher = NULL;
 
-    if (!result || !tport) return MAMA_STATUS_NULL_ARG;
-
     CHECK_AVIS(avis);
 
     *result = NULL;
@@ -314,6 +313,8 @@ avisBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg)
 {
     mama_status ret = MAMA_STATUS_OK;
 
+    if (!msg) return MAMA_STATUS_NULL_ARG;
+
     CHECK_PUBLISHER(publisher);
 
     ret = avisBridgeMamaPublisherImpl_prepareMessage (&msg);
@@ -348,6 +349,8 @@ avisBridgeMamaPublisher_sendReplyToInbox (publisherBridge  publisher,
     const char*  replyAddr   = NULL;
     mama_status  status;
    
+    if (!request || !reply) return MAMA_STATUS_NULL_ARG;
+
     CHECK_PUBLISHER(publisher);
 
     mamaMsg_getNativeHandle(request, (void**) &requestMsg);
@@ -402,6 +405,7 @@ avisBridgeMamaPublisher_sendReplyToInbox (publisherBridge  publisher,
 mama_status
 avisBridgeMamaPublisher_destroy (publisherBridge publisher)
 {
+    if (!publisher) return MAMA_STATUS_NULL_ARG;
     free ((char*)avisPublisher(publisher)->mSource);
     free ((char*)avisPublisher(publisher)->mTopic);
     free ((char*)avisPublisher(publisher)->mRoot);
@@ -420,6 +424,8 @@ avisBridgeMamaPublisher_sendFromInboxByIndex (publisherBridge publisher,
     const char* replyAddr = NULL;
     mama_status status;
 
+    if (!inbox || !msg) return MAMA_STATUS_NULL_ARG;
+
     CHECK_PUBLISHER(publisher);
 
     status = avisBridgeMamaPublisherImpl_prepareMessage (&msg);
@@ -475,6 +481,8 @@ avisBridgeMamaPublisher_sendReplyToInboxHandle (publisherBridge publisher,
 {
     mama_status status;
 
+    if (!inbox | !reply) return MAMA_STATUS_NULL_ARG;
+
     CHECK_PUBLISHER(publisher);
 
     status = avisBridgeMamaPublisherImpl_prepareMessage (&reply);
diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c
index 6416751..e32845f 100644
--- a/mama/c_cpp/src/c/bridge/avis/queue.c
+++ b/mama/c_cpp/src/c/bridge/avis/queue.c
@@ -200,7 +200,8 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge        queue,
 {
     wombatQueueStatus status;
     avisQueueClosure* cl = NULL;
-   
+
+    if (!callback) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
 
     cl = (avisQueueClosure*)calloc(1, sizeof(avisQueueClosure));
@@ -251,6 +252,7 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge        queue,
                                         mamaQueueEnqueueCB callback,
                                         void*              closure)
 {
+    if (!callback) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
 
     avisQueue(queue)->mEnqueueCb      = callback;
@@ -274,6 +276,7 @@ mama_status
 avisBridgeMamaQueue_getNativeHandle (queueBridge queue,
                                      void**      result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
     *result = avisQueue(queue)->mQueue;
     return MAMA_STATUS_OK;
@@ -283,6 +286,7 @@ mama_status
 avisBridgeMamaQueue_setHighWatermark (queueBridge queue,
                                       size_t      highWatermark)
 {
+    if (!highWatermark) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
@@ -291,6 +295,7 @@ mama_status
 avisBridgeMamaQueue_setLowWatermark (queueBridge queue,
                                      size_t lowWatermark)
 {
+    if (!lowWatermark) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
@@ -298,6 +303,7 @@ avisBridgeMamaQueue_setLowWatermark (queueBridge queue,
 mama_status
 avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count)
 {
+    if (!count) return MAMA_STATUS_NULL_ARG;
     CHECK_QUEUE(queue);
     *count = 0;
     wombatQueue_getSize (avisQueue(queue)->mQueue, (int*)count);
diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.c b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
index 9b89d65..f456f7e 100644
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
@@ -251,6 +251,7 @@ avisBridgeMamaTransport_getNumLoadBalanceAttributes (
                                           const char* name,
                                           int*        numLoadBalanceAttributes)
 {
+    if (!numLoadBalanceAttributes) return MAMA_STATUS_NULL_ARG;
     *numLoadBalanceAttributes = 0;
     return MAMA_STATUS_OK;
 }
@@ -260,6 +261,7 @@ avisBridgeMamaTransport_getLoadBalanceSharedObjectName (
                                       const char*  name,
                                       const char** loadBalanceSharedObjectName)
 {
+    if (!loadBalanceSharedObjectName) return MAMA_STATUS_NULL_ARG;
     *loadBalanceSharedObjectName = NULL;
     return MAMA_STATUS_OK;
 }
@@ -269,6 +271,7 @@ avisBridgeMamaTransport_getLoadBalanceScheme (
                                     const char*    name,
                                     tportLbScheme* scheme)
 {
+    if (!scheme) return MAMA_STATUS_NULL_ARG;
     *scheme = TPORT_LB_SCHEME_STATIC;
     return MAMA_STATUS_OK;
 }
@@ -283,6 +286,8 @@ avisBridgeMamaTransport_create (transportBridge* result,
     avisTransportBridge* transport  = NULL;
     mamaBridgeImpl*      bridgeImpl = NULL;
     const char*          url        = NULL;
+
+    if (!result || !name) return MAMA_STATUS_NULL_ARG;
    
     transport = (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
     if (transport == NULL)
@@ -353,6 +358,8 @@ avisBridgeMamaTransport_destroy (transportBridge transport)
     avisTransportBridge* transportBridge = (avisTransportBridge*) transport;
     avisBridgeImpl*      avisBridge      = NULL;
     mamaBridgeImpl*      bridgeImpl      = NULL;
+
+    if (!transport) return MAMA_STATUS_NULL_ARG;
    
     bridgeImpl = mamaTransportImpl_getBridgeImpl(
         avisTransport(transport)->mTransport);
@@ -506,6 +513,7 @@ mama_status
 avisBridgeMamaTransport_isConnectionIntercepted (mamaConnection connection,
                                                   uint8_t* result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     *result = 0;
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
@@ -523,6 +531,7 @@ mama_status
 avisBridgeMamaTransport_getNativeTransport (transportBridge transport,
                                              void**          result)
 {
+    if (!result) return MAMA_STATUS_NULL_ARG;
     CHECK_TRANSPORT(transport);
     *result = avisTransport(transport);
     return MAMA_STATUS_OK;
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index 6da49f5..f02920e 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -2324,6 +2324,7 @@ avismsgFieldPayload_updateDateTime
 {
     CHECK_FIELD(field);
     CHECK_PAYLOAD(msg);
+    CHECK_NULL(value);
     return avisMsg_setDateTime(avisPayload(msg), avisField(field)->mName, 0, value);
 }
 
--
2.4.3


Dropping support for Qpid Proton <= 0.6

Frank Quinn <fquinn.ni@...>
 

Hi Folks,

I am planning on dropping support for Qpid Proton 0.5 and 0.6 in the next OpenMAMA release and instead sticking to versions 0.7+. Those guys move quickly and make small changes to interfaces and header names fairly regularly so keeping up with new releases without breaking older versions is fairly tricky, and something I'd like to at least limit.

Also, they had some compiler warnings in one of their headers when our strict parsing was enabled in 0.5 that wasn't fixed until 0.7 (see https://issues.apache.org/jira/browse/PROTON-420), so dropping support for all versions prior to 0.7 will mean we can turn -Werror back on without breaking older builds.

If anyone has any objections to this, please respond.

Cheers,
Frank


Re: QPID Proton bridge: multiple subscribers, one publisher

Frank Quinn <fquinn.ni@...>
 

Hi Nestor,

If you modify each new subscriber to use a different incoming URL and reply URL, you can actually do this today, though as you can imagine it becomes a bit of a brute to configure very quickly.

The better solution though is probably the currently experimental feature-qpid-broker branch that I'm hoping will make it into OpenMAMA 2.3.4. It allows for a much easier to manage pub / sub infrastructure where in terms of transport configuration, all you do is point it to the broker - there's an example mama.properties on that branch to give you an idea: http://git.openmama.org/?p=OpenMAMA.git;a=blob;f=mama/c_cpp/src/examples/mama.properties;h=86c7b1ce3933a10b0022b89837094bdb5904b974;hb=feature-qpid-broker

Cheers,
Frank


On Tue, 21 Jul 2015 16:23 Macrux <kmacrux@...> wrote:
Hi guys,

I'd like to know if there was or there will be some change to the Qpid Proton Bridge such that I can have a publish/subscribe connection in which only one advanced publisher can have multiple subscribers, because according to the wiki, "the current QPID Proton bridge is a point-to-point implementation" and I've had to run multiple publishers (always the same) each one with a different transport, and to subscribe each client with only one publisher.

Maybe, if someone knows some technique to achieve this behavior, it would be very helpful for me.

Thanks in advance.

Kind regards,


Nestor.
_______________________________________________
Openmama-dev mailing list
Openmama-dev@...
https://lists.openmama.org/mailman/listinfo/openmama-dev


QPID Proton bridge: multiple subscribers, one publisher

Macrux <kmacrux@...>
 

Hi guys,

I'd like to know if there was or there will be some change to the Qpid Proton Bridge such that I can have a publish/subscribe connection in which only one advanced publisher can have multiple subscribers, because according to the wiki, "the current QPID Proton bridge is a point-to-point implementation" and I've had to run multiple publishers (always the same) each one with a different transport, and to subscribe each client with only one publisher.

Maybe, if someone knows some technique to achieve this behavior, it would be very helpful for me.

Thanks in advance.

Kind regards,


Nestor.


[PATCH] AVIS: Made some basic changes to eliminate crashes on CI tests

Frank Quinn <fquinn.ni@...>
 

Pushed this change earlier on but sending around for transparency. Avis tests still fail but at least they finish running (and a lot of the failures are expected due to lack of support in the avis bridge / middleware for certain functionality).

This was mostly just NULL checking and changing an EXPECT to an
ASSERT in the unit tests inside msgvectortests.cpp.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/payload/avismsg/avispayload.c     | 79 ++++++++++++++--------
 .../src/gunittest/c/mamamsg/msgvectortests.cpp     |  2 +-
 2 files changed, 53 insertions(+), 28 deletions(-)

diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index 5a92b43..6da49f5 100644
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -102,7 +102,7 @@ avismsgPayload_createImpl (mamaPayloadBridge* result, char* identifier)
     mamaPayloadBridgeImpl*       impl    = NULL;
     mama_status             resultStatus = MAMA_STATUS_OK;
 
-    CHECK_NULL (result);
+    CHECK_NULL(result);
 
     impl = (mamaPayloadBridgeImpl*)calloc (1, sizeof (mamaPayloadBridgeImpl));
     if (!impl)
@@ -164,9 +164,9 @@ avismsgPayload_createForTemplate (msgPayload*         msg,
 mama_status
 avismsgPayload_createFromByteBuffer(msgPayload* msg, mamaPayloadBridge bridge, const void* buffer, mama_size_t bufferLength)
 {
-    CHECK_NULL (msg);
-    CHECK_NULL (bridge);
-    CHECK_NULL (buffer);
+    CHECK_NULL(msg);
+    CHECK_NULL(bridge);
+    CHECK_NULL(buffer);
    
     avisPayloadImpl* newPayload = (avisPayloadImpl*)calloc (1, sizeof(avisPayloadImpl));
 
@@ -217,7 +217,7 @@ avismsgPayload_setParent (msgPayload          msg,
                        const mamaMsg       parent)
 {
     avisPayloadImpl* impl = (avisPayloadImpl*) msg;
-    CHECK_NULL (impl);
+    CHECK_NULL(impl);
 
     impl->mParent = parent;
    
@@ -250,7 +250,7 @@ avismsgPayload_unSerialize (const msgPayload    msg,
     uint8_t *    buffPos = (void*)buffer;
 
     CHECK_PAYLOAD (msg);
-    CHECK_NULL (buffer);
+    CHECK_NULL(buffer);
 
     if (!impl->mAvisMsg)
         impl->mAvisMsg = attributes_create();
@@ -326,8 +326,8 @@ avismsgPayload_serialize     (const msgPayload    msg,
     avisFieldPayload*    currField    = NULL;
 
     CHECK_PAYLOAD (msg);
-    CHECK_NULL (buffer);
-    CHECK_NULL (bufferLength);
+    CHECK_NULL(buffer);
+    CHECK_NULL(bufferLength);
 
     if (!impl->mIterator)
     {
@@ -445,8 +445,8 @@ avismsgPayload_getByteBuffer     (const msgPayload    msg,
 {
     avisPayloadImpl* impl = (avisPayloadImpl*)msg;
     CHECK_PAYLOAD(msg);
-    CHECK_NULL (buffer);
-    CHECK_NULL (bufferLength);
+    CHECK_NULL(buffer);
+    CHECK_NULL(bufferLength);
 
     *buffer = impl->mAvisMsg;
 
@@ -659,7 +659,7 @@ avismsgPayload_getNativeMsg     (const msgPayload    msg,
                                void**              nativeMsg)
 {
     CHECK_PAYLOAD(msg);
-    CHECK_NULL (nativeMsg);
+    CHECK_NULL(nativeMsg);
     *nativeMsg = msg;
     return MAMA_STATUS_OK;
 }
@@ -832,7 +832,7 @@ avismsgPayload_addString         (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
-    CHECK_NULL (value);
+    CHECK_NULL(value);
     return avisMsg_setString(avisPayload(msg), name, fid, value);
 }
 
@@ -845,7 +845,7 @@ avismsgPayload_addOpaque         (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
-    CHECK_NULL (value);
+    CHECK_NULL(value);
     return avisMsg_setOpaque(avisPayload(msg), name, fid, value, size);
 }
 
@@ -1221,7 +1221,7 @@ avismsgPayload_updateString      (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
-    CHECK_NULL (value);
+    CHECK_NULL(value);
     return avisMsg_setString(avisPayload(msg), name, fid, value);
 }
 
@@ -1234,7 +1234,7 @@ avismsgPayload_updateOpaque      (msgPayload          msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
-    CHECK_NULL (value);
+    CHECK_NULL(value);
     return avisMsg_setOpaque(avisPayload(msg), name, fid, value, size);
 }
 
@@ -1266,8 +1266,6 @@ avismsgPayload_updateSubMsg      (msgPayload          msg,
                                 mama_fid_t          fid,
                                 const msgPayload    value)
 {
-    CHECK_PAYLOAD(msg);
-    CHECK_NAME(name,fid);
     return MAMA_STATUS_NOT_IMPLEMENTED;
 }
 
@@ -1480,6 +1478,7 @@ avismsgPayload_getBool           (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(mamaResult);
     return avisMsg_getBool(avisPayload(msg), name, fid, mamaResult);
 }
 
@@ -1491,6 +1490,7 @@ avismsgPayload_getChar           (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getChar(avisPayload(msg), name, fid, result);
 }
 
@@ -1502,6 +1502,7 @@ avismsgPayload_getI8             (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getI8(avisPayload(msg), name, fid, result);
 }
 
@@ -1513,6 +1514,7 @@ avismsgPayload_getU8             (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getU8(avisPayload(msg), name, fid, result);
 }
 
@@ -1524,6 +1526,7 @@ avismsgPayload_getI16            (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getI16(avisPayload(msg), name, fid, result);
 }
 
@@ -1535,6 +1538,7 @@ avismsgPayload_getU16            (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getU16(avisPayload(msg), name, fid, result);
 }
 
@@ -1546,6 +1550,7 @@ avismsgPayload_getI32            (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getI32(avisPayload(msg), name, fid, result);
 }
 
@@ -1557,6 +1562,7 @@ avismsgPayload_getU32            (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getU32(avisPayload(msg), name, fid, result);
 }
 
@@ -1568,6 +1574,7 @@ avismsgPayload_getI64            (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(mamaResult);
     return avisMsg_getI64(avisPayload(msg), name, fid, mamaResult);
 }
 
@@ -1579,6 +1586,7 @@ avismsgPayload_getU64            (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(mamaResult);
     return avisMsg_getU64(avisPayload(msg), name, fid, mamaResult);
 }
 
@@ -1590,6 +1598,7 @@ avismsgPayload_getF32            (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getF32(avisPayload(msg), name, fid, result);
 }
 
@@ -1601,6 +1610,7 @@ avismsgPayload_getF64            (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
+    CHECK_NULL(result);
     return avisMsg_getF64(avisPayload(msg), name, fid, result);
 }
 
@@ -1612,7 +1622,7 @@ avismsgPayload_getString         (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
-    CHECK_NULL (result);
+    CHECK_NULL(result);
     return avisMsg_getString(avisPayload(msg), name, fid, result);
 }
 
@@ -1625,7 +1635,7 @@ avismsgPayload_getOpaque         (const msgPayload    msg,
 {
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name,fid);
-    CHECK_NULL (result);
+    CHECK_NULL(result);
     return avisMsg_getOpaque(avisPayload(msg), name, fid, result, size);
 }
 
@@ -1641,7 +1651,7 @@ avismsgPayload_getField          (const msgPayload    msg,
     Value* pValue = NULL;
     CHECK_PAYLOAD(msg);
     CHECK_NAME(name, fid);
-    CHECK_NULL (result);
+    CHECK_NULL(result);
 
     if ( (!impl->mAvisField) && (avismsgFieldPayload_create((msgFieldPayload*) &impl->mAvisField) != MAMA_STATUS_OK) ) {
         return MAMA_STATUS_PLATFORM;
@@ -1910,7 +1920,7 @@ avismsgPayloadIter_create        (msgPayloadIter* iter,
     mama_status status = MAMA_STATUS_OK;
     avisIterator* impl = NULL;
     CHECK_PAYLOAD(msg);
-    CHECK_NULL (iter);
+    CHECK_NULL(iter);
 
     impl = calloc (1, sizeof (avisIterator));
     if (!impl) return (MAMA_STATUS_NOMEM);
@@ -2012,7 +2022,7 @@ avismsgPayloadIter_associate      (msgPayloadIter iter,
     avisIterator* impl = (avisIterator*) iter;
     CHECK_PAYLOAD(msg);
 
-    CHECK_NULL (impl);
+    CHECK_NULL(impl);
 
     attributes_iter_init(impl->mMsgIterator, avisPayload(msg));
     impl->mAvisMsg = avisPayload(msg);
@@ -2023,7 +2033,7 @@ mama_status
 avismsgPayloadIter_destroy       (msgPayloadIter iter)
 {
     avisIterator* impl = (avisIterator*) iter;
-    CHECK_NULL (impl);
+    CHECK_NULL(impl);
 
     if (impl->mMsgIterator)
        attributes_iter_destroy(impl->mMsgIterator);
@@ -2068,6 +2078,7 @@ avismsgFieldPayload_getName      (const msgFieldPayload   field,
                                 const char**            result)
 {
     uint16_t fid =0;
+    CHECK_NULL(result);
     CHECK_FIELD(field);
 
     fid = atoi(avisField(field)->mName);
@@ -2113,6 +2124,7 @@ avismsgFieldPayload_getFid       (const msgFieldPayload   field,
 {
     uint16_t fid =0;
     CHECK_FIELD(field);
+    CHECK_NULL(result);
 
     fid = atoi(avisField(field)->mName);
     if (fid==0)
@@ -2170,6 +2182,7 @@ mama_status
 avismsgFieldPayload_getType      (msgFieldPayload         field,
                                 mamaFieldType*          result)
 {
+    CHECK_NULL(result);
     CHECK_FIELD(field);
     if (avisField(field)->mValue == 0)
         return MAMA_STATUS_INVALID_ARG;
@@ -2331,7 +2344,7 @@ avismsgFieldPayload_updateString  (msgFieldPayload         field,
 {
     CHECK_FIELD(field);
     CHECK_PAYLOAD(msg);
-    CHECK_NULL (value);
+    CHECK_NULL(value);
     return avisMsg_setString(avisPayload(msg), avisField(field)->mName, 0, value);
 }
 
@@ -2343,6 +2356,7 @@ avismsgFieldPayload_getBool      (const msgFieldPayload   field,
                                 mama_bool_t*            result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getBool(avisField(field)->mValue, result);
 }
 
@@ -2351,6 +2365,7 @@ avismsgFieldPayload_getChar      (const msgFieldPayload   field,
                                 char*                   result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getChar(avisField(field)->mValue, result);
 }
 
@@ -2359,6 +2374,7 @@ avismsgFieldPayload_getI8        (const msgFieldPayload   field,
                                 mama_i8_t*              result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getI8(avisField(field)->mValue, result);
 }
 
@@ -2367,6 +2383,7 @@ avismsgFieldPayload_getU8        (const msgFieldPayload   field,
                                 mama_u8_t*              result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getU8(avisField(field)->mValue, result);
 }
 
@@ -2375,6 +2392,7 @@ avismsgFieldPayload_getI16       (const msgFieldPayload   field,
                                 mama_i16_t*             result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getI16(avisField(field)->mValue, result);
 }
 
@@ -2383,6 +2401,7 @@ avismsgFieldPayload_getU16       (const msgFieldPayload   field,
                                 mama_u16_t*             result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getU16(avisField(field)->mValue, result);
 }
 
@@ -2391,6 +2410,7 @@ avismsgFieldPayload_getI32       (const msgFieldPayload   field,
                                 mama_i32_t*             result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getI32(avisField(field)->mValue, result);
 }
 
@@ -2399,6 +2419,7 @@ avismsgFieldPayload_getU32       (const msgFieldPayload   field,
                                 mama_u32_t*             result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getU32(avisField(field)->mValue, result);
 }
 
@@ -2407,6 +2428,7 @@ avismsgFieldPayload_getI64       (const msgFieldPayload   field,
                                 mama_i64_t*             result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getI64(avisField(field)->mValue, result);
 }
 
@@ -2415,6 +2437,7 @@ avismsgFieldPayload_getU64       (const msgFieldPayload   field,
                                 mama_u64_t*             result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getU64(avisField(field)->mValue, result);
 }
 
@@ -2423,6 +2446,7 @@ avismsgFieldPayload_getF32       (const msgFieldPayload   field,
                                 mama_f32_t*             result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getF32(avisField(field)->mValue, result);
 }
 
@@ -2431,6 +2455,7 @@ avismsgFieldPayload_getF64       (const msgFieldPayload   field,
                                 mama_f64_t*             result)
 {
     CHECK_FIELD(field);
+    CHECK_NULL(result);
     return avisValue_getF64(avisField(field)->mValue, result);
 }
 
@@ -2439,7 +2464,7 @@ avismsgFieldPayload_getString    (const msgFieldPayload   field,
                                 const char**            result)
 {
     CHECK_FIELD(field);
-    CHECK_NULL (result);
+    CHECK_NULL(result);
     return avisValue_getString(avisField(field)->mValue, result);
 }
 
@@ -2449,8 +2474,8 @@ avismsgFieldPayload_getOpaque    (const msgFieldPayload   field,
                                 mama_size_t*            size)
 {
     CHECK_FIELD(field);
-    CHECK_NULL (result);
-    CHECK_NULL (size);
+    CHECK_NULL(result);
+    CHECK_NULL(size);
     return avisValue_getOpaque(avisField(field)->mValue, result, size);
 }
 
diff --git a/mama/c_cpp/src/gunittest/c/mamamsg/msgvectortests.cpp b/mama/c_cpp/src/gunittest/c/mamamsg/msgvectortests.cpp
index b72fa68..c291e44 100644
--- a/mama/c_cpp/src/gunittest/c/mamamsg/msgvectortests.cpp
+++ b/mama/c_cpp/src/gunittest/c/mamamsg/msgvectortests.cpp
@@ -116,7 +116,7 @@ TEST_F(MsgVectorBoolTestsC, AddVectorBool)
                                      mIn,
                                      VECTOR_SIZE);
 
-    EXPECT_EQ(mStatus, MAMA_STATUS_OK);
+    ASSERT_EQ(mStatus, MAMA_STATUS_OK);
 
     mStatus = mamaMsg_getVectorBool (mMsg,
                                      NULL,
--
2.4.3


OpenMAMA 2.3.3 Released

Frank Quinn <fquinn.ni@...>
 

Hi Folks,

It is with great pleasure that we can announce the release of OpenMAMA 2.3.3. The latest release contains several minor bugfixes since 2.3.2 and the following new features:

* Support added for Qpid Proton > 0.8
* Deferred entitlements now supported to allow middleware bridges to manage entitlements
* Basic MAMA Plugin implementation added
* Fedora 21, 22 and CentOS 7 RPMs now supported

Note that due to deadlocking issues encountered with Qpid Proton, you are recommended to avoid 0.9.x releases of Qpid Proton and instead either stick to 0.8, or else try out their 0.10.x beta versions ahead of their official 0.10 release which contains a fix for the deadlock issue which we hit during testing. See https://issues.apache.org/jira/browse/PROTON-907 for more details.

We've also updated our continuous integration over at http://ci.openmama.org to help us support the multiple versions of qpid proton which people seem to be using at the moment (though we will need to start dropping support for some of those older versions soon).

Full details of the release including binary and package downloads can be found on the releases page of the website over at http://www.openmama.org/downloads/releases.

You can also as usual view everything included in this release over on the source control browser: http://git.openmama.org/?p=OpenMAMA.git;a=shortlog;h=refs/tags/OpenMAMA-2.3.3-release

Any problems with the release, please let us know through the usual channels - mailing list / IRC etc.

Cheers,
Frank


[PATCH 2/2] RPM: Added RPM builds for CentOS 7, FC21, FC22

Frank Quinn <fquinn.ni@...>
 

Also deprecated support for EOL distros FC19 and FC20.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 release_scripts/openmama-rpm.sh | 15 ++++++++-------
 release_scripts/openmama.spec   |  4 ++--
 2 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/release_scripts/openmama-rpm.sh b/release_scripts/openmama-rpm.sh
index 191678c..96054f8 100755
--- a/release_scripts/openmama-rpm.sh
+++ b/release_scripts/openmama-rpm.sh
@@ -260,8 +260,9 @@ if [ $MOCK_BUILD -eq 1 ] && [ $RETURN_CODE -eq 0 ]; then
     try cd ${BUILD_DIR}/SRPMS
     try /usr/bin/mock -r epel-6-i386 --define 'BUILD_VERSION '${VERSION} --define 'BUILD_NUMBER '${BUILD_NUMBER} openmama-${VERSION}-${BUILD_NUMBER}.*.src.rpm > ${BUILD_DIR}/mock-el6-i386.log 2>&1
     try /usr/bin/mock -r epel-6-x86_64 --define 'BUILD_VERSION '${VERSION} --define 'BUILD_NUMBER '${BUILD_NUMBER} openmama-${VERSION}-${BUILD_NUMBER}.*.src.rpm > ${BUILD_DIR}/mock-el6-x64.log 2>&1
-    try /usr/bin/mock -r fedora-19-x86_64 --define 'BUILD_VERSION '${VERSION} --define 'BUILD_NUMBER '${BUILD_NUMBER} openmama-${VERSION}-${BUILD_NUMBER}.*.src.rpm > ${BUILD_DIR}/mock-f19-x64.log 2>&1
-    try /usr/bin/mock -r fedora-20-x86_64 --define 'BUILD_VERSION '${VERSION} --define 'BUILD_NUMBER '${BUILD_NUMBER} openmama-${VERSION}-${BUILD_NUMBER}.*.src.rpm > ${BUILD_DIR}/mock-f20-x64.log 2>&1
+    try /usr/bin/mock -r epel-7-x86_64 --define 'BUILD_VERSION '${VERSION} --define 'BUILD_NUMBER '${BUILD_NUMBER} openmama-${VERSION}-${BUILD_NUMBER}.*.src.rpm > ${BUILD_DIR}/mock-el7-x64.log 2>&1
+    try /usr/bin/mock -r fedora-21-x86_64 --define 'BUILD_VERSION '${VERSION} --define 'BUILD_NUMBER '${BUILD_NUMBER} openmama-${VERSION}-${BUILD_NUMBER}.*.src.rpm > ${BUILD_DIR}/mock-f21-x64.log 2>&1
+    try /usr/bin/mock -r fedora-22-x86_64 --define 'BUILD_VERSION '${VERSION} --define 'BUILD_NUMBER '${BUILD_NUMBER} openmama-${VERSION}-${BUILD_NUMBER}.*.src.rpm > ${BUILD_DIR}/mock-f22-x64.log 2>&1
     next
     RETURN_CODE=$?
 fi
@@ -281,13 +282,13 @@ if [ $PACKAGE_RELEASE -eq 1 ] && [ $RETURN_CODE -eq 0 ]; then
     # Copy in the Source RPM
     try cp ${BUILD_DIR}/SRPMS/openmama-${VERSION}-${BUILD_NUMBER}.*.src.rpm ${RELEASE_DIR}
 
-    # These paths are hard coded, since they should always remain there. We may
-    # want to update them periodically, especially with EL7, and newer Fedora
-    # versions.
+    # These paths are hard coded, since they should always remain there. We need
+    # to update these periodically.
     try cp /var/lib/mock/epel-6-i386/result/openmama-${VERSION}-${BUILD_NUMBER}.el6.i686.rpm ${RELEASE_DIR}
     try cp /var/lib/mock/epel-6-x86_64/result/openmama-${VERSION}-${BUILD_NUMBER}.el6.x86_64.rpm ${RELEASE_DIR}
-    try cp /var/lib/mock/fedora-19-x86_64/result/openmama-${VERSION}-${BUILD_NUMBER}.fc19.x86_64.rpm ${RELEASE_DIR}
-    try cp /var/lib/mock/fedora-20-x86_64/result/openmama-${VERSION}-${BUILD_NUMBER}.fc20.x86_64.rpm ${RELEASE_DIR}
+    try cp /var/lib/mock/epel-7-x86_64/result/openmama-${VERSION}-${BUILD_NUMBER}.el7.*.x86_64.rpm ${RELEASE_DIR}
+    try cp /var/lib/mock/fedora-21-x86_64/result/openmama-${VERSION}-${BUILD_NUMBER}.fc21.x86_64.rpm ${RELEASE_DIR}
+    try cp /var/lib/mock/fedora-22-x86_64/result/openmama-${VERSION}-${BUILD_NUMBER}.fc22.x86_64.rpm ${RELEASE_DIR}
 
     # Build and tar the binary release
     if [ -d ${BUILD_DIR}/binary ]; then
diff --git a/release_scripts/openmama.spec b/release_scripts/openmama.spec
index d5ec35d..40ae53b 100644
--- a/release_scripts/openmama.spec
+++ b/release_scripts/openmama.spec
@@ -12,8 +12,8 @@ BuildRequires: libtool autoconf automake ant libuuid-devel flex doxygen qpid-pro
 Requires: libuuid qpid-proton-c libevent ncurses
 
 %if 0%{?fedora}
-BuildRequires: java-1.7.0-openjdk-devel
-Requires: java-1.7.0-openjdk
+BuildRequires: java-1.8.0-openjdk-devel
+Requires: java-1.8.0-openjdk
 %define java_home /usr/lib/jvm/java/
 %endif
 
--
2.4.3


[PATCH 1/2] SCONS: Fixed issue with builds not working on 32 bit Linux

Frank Quinn <fquinn.ni@...>
 

Added i686 and a few other similar ancestors for good measure.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 site_scons/community/command_line.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/site_scons/community/command_line.py b/site_scons/community/command_line.py
index 4bbacc5..1f7f278 100644
--- a/site_scons/community/command_line.py
+++ b/site_scons/community/command_line.py
@@ -65,7 +65,7 @@ def get_command_line_opts( host, products, VERSIONS ):
                          #mamda all is a windows only build
                          allowed_values=( [ x for x in products if x != "mamdaall" ] )),
             EnumVariable('target_arch', 'Specifies if the build should target 32 or 64 bit architectures.',
-                          host['arch'], allowed_values=['x86', 'x86_64']),
+                          host['arch'], allowed_values=['i386', 'i586', 'i686', 'x86', 'x86_64']),
             EnumVariable( 'compiler', 'Compiler to use for building OpenMAMA',
                          'default', allowed_values=('default', 'gcc', 'clang', 'clang-analyzer')),
         )
--
2.4.3


[PATCH 2/2] QPID: Fixed build error introduced in proton 0.9

fquinn.ni@...
 

From: Frank Quinn <fquinn.ni@...>

---
mama/c_cpp/src/c/bridge/qpid/qpiddefs.h | 5 ++++-
1 files changed, 4 insertions(+), 1 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h
index cd52e96..79ddbc4 100644
--- a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h
+++ b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h
@@ -32,7 +32,6 @@
#include <list.h>

/* Qpid include files */
-#include <proton/driver.h>
#include <proton/message.h>
#include <proton/messenger.h>

@@ -87,6 +86,10 @@ typedef enum qpidMsgType_
#if (PN_VERSION_MAJOR == 0 && PN_VERSION_MINOR <= 7)
#include <proton/util.h>
#endif
+#if (PN_VERSION_MAJOR == 0 && PN_VERSION_MINOR <= 8)
+/* The proton header driver.h was removed in version 0.9 */
+#include <proton/driver.h>
+#endif

/* Place other version specific macros here */

--
1.7.1


[PATCH 1/2] QPID: Fixed warnings introduced since proton 0.8

fquinn.ni@...
 

From: Frank Quinn <fquinn.ni@...>

---
mama/c_cpp/src/c/bridge/qpid/transport.c | 2 +-
mama/c_cpp/src/c/payload/qpidmsg/qpidcommon.c | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.c b/mama/c_cpp/src/c/bridge/qpid/transport.c
index 8ca9550..aa84fd7 100644
--- a/mama/c_cpp/src/c/bridge/qpid/transport.c
+++ b/mama/c_cpp/src/c/bridge/qpid/transport.c
@@ -1433,7 +1433,7 @@ void* qpidBridgeMamaTransportImpl_dispatchThread (void* closure)
case QPID_MSG_SUB_REQUEST:
{
pn_data_t* data = pn_message_body (msgNode->mMsg);
- char* topic = NULL;
+ const char* topic = NULL;
const char* replyTo = NULL;

/* Move to the content which will contain the topic */
diff --git a/mama/c_cpp/src/c/payload/qpidmsg/qpidcommon.c b/mama/c_cpp/src/c/payload/qpidmsg/qpidcommon.c
index 8e91c2d..78d5123 100644
--- a/mama/c_cpp/src/c/payload/qpidmsg/qpidcommon.c
+++ b/mama/c_cpp/src/c/payload/qpidmsg/qpidcommon.c
@@ -210,7 +210,7 @@ qpidmsgPayloadInternal_elementToString (pn_data_t* payload,
case PN_BINARY:
{
mama_size_t i = 0;
- char* bytePos = NULL;
+ const char* bytePos = NULL;
pn_bytes_t bytes;

bytes = atom.u.as_bytes;
--
1.7.1


Re: Java MamaPublisher destroy

Adrienne Ambrose <a.ambrose@...>
 

Hi Reed,

 

I have raised Bugzilla Bug-203 for tracking of this issue, if you wish to submit a patch for this.

 

Thanks,

Adrienne

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Stuart Beattie
Sent: 10 July 2015 16:33
To: Alpert, Reed; openmama-dev@...
Subject: Re: [Openmama-dev] Java MamaPublisher destroy

 

We are now tracking this internally in order to investigate further and schedule it for a future release.  If you like, you can raise a Bugzilla ticket to track it there too.

 

Thanks

Stuart

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Stuart Beattie
Sent: 10 July 2015 16:24
To: Alpert, Reed; openmama-dev@...
Subject: Re: [Openmama-dev] Java MamaPublisher destroy

 

Hi Reed,

 

No, the missing destroy call looks like a bug.

 

Thanks

Stuart

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Alpert, Reed
Sent: 10 July 2015 13:58
To:
openmama-dev@...
Subject: [Openmama-dev] Java MamaPublisher destroy

 

Hi,

 

For Java MamaPublisher I don’t see a destroy or finalize method, are the C resources being freed in another way I don’t see?

 

Thanks,

 

Reed.

 


Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198  | M: 917.414.4613 | reed.alpert@...

Alternate Contact:  CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@...

 

This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, "JPMC"). This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising in any way from its use. Please note that any electronic communication that is conducted within or through JPMC's systems is subject to interception, monitoring, review, retention and external production in accordance with JPMC's policy and local laws, rules and regulations; may be stored or otherwise processed in countries other than the country in which you are located; and will be treated in accordance with JPMC policies and applicable laws and regulations. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.


Re: Java MamaPublisher destroy

Stuart Beattie <s.beattie@...>
 

We are now tracking this internally in order to investigate further and schedule it for a future release.  If you like, you can raise a Bugzilla ticket to track it there too.

 

Thanks

Stuart

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Stuart Beattie
Sent: 10 July 2015 16:24
To: Alpert, Reed; openmama-dev@...
Subject: Re: [Openmama-dev] Java MamaPublisher destroy

 

Hi Reed,

 

No, the missing destroy call looks like a bug.

 

Thanks

Stuart

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Alpert, Reed
Sent: 10 July 2015 13:58
To:
openmama-dev@...
Subject: [Openmama-dev] Java MamaPublisher destroy

 

Hi,

 

For Java MamaPublisher I don’t see a destroy or finalize method, are the C resources being freed in another way I don’t see?

 

Thanks,

 

Reed.

 


Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198  | M: 917.414.4613 | reed.alpert@...

Alternate Contact:  CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@...

 

This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, "JPMC"). This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising in any way from its use. Please note that any electronic communication that is conducted within or through JPMC's systems is subject to interception, monitoring, review, retention and external production in accordance with JPMC's policy and local laws, rules and regulations; may be stored or otherwise processed in countries other than the country in which you are located; and will be treated in accordance with JPMC policies and applicable laws and regulations. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.


Re: Help with bookpublisher

Stuart Beattie <s.beattie@...>
 

Hi Nestor,

 

The key command line arguments for MamdaBookPublisherJava are:

 

-SP – the source name on which book data is published.

-s Symbol to publish (can be passed multiple times)

-p Participant ID (optional)

 

As well as standard options like –tport, -m etc.

 

Any market data subscriber example application, eg MamaListen, can be used to listen to the dummy orderbook data that MamdaBookPublisherJava publishes.

 

Example command line for MamdaBookPublisherJava:

 

java -cp com.wombat.mamda.examples.MamdaBookPublisher -SP STUARTB2 -s STUBOOK -m wmw -tport pub

 

Corresponding MamaListen command:

 

java com.wombat.mama.examples.MamaListen -S STUARTB2 -s STUBOOK -m wmw -tport sub

 

MamaListen output:

 

STUBOOK Type: BOOK_INITIAL Status: OK

   MdMsgType            | 1    | U8         | 16

    wNumLevels           | 651  | I16        | 0

    MamaSenderId         | 20   | U64        | 5841862551486352289

    MdSeqNum             | 10   | U32        | 1

    MdMsgStatus          | 2    | U8         | 0

TRANSPORT CONNECTED!

STUBOOK Type: BOOK_UPDATE Status: OK

   MdMsgType            | 1    | U8         | 17

    wPriceLevels         | 699  | VECTOR_MSG |

   {

      wPlPrice             | 653  | F64        | 100.0

       wPlSize              | 655  | U32        | 1000

       wPlSizeChange        | 656  | U32        | 1000

       wPlTime              | 658  | TIME       | 2015-07-10 15:49:27.208294

       wPlNumAttach         | 659  | U32        | 0

    }

   {

      wPlPrice             | 653  | F64        | 101.0

       wPlSize              | 655  | U32        | 2000

       wPlSizeChange        | 656  | U32        | 2000

       wPlTime              | 658  | TIME       | 2015-07-10 15:49:27.212162

       wPlNumAttach         | 659  | U32        | 0

    }

   wNumLevels           | 651  | U32        | 2

    MdMsgStatus          | 2    | U8         | 0

    MdSeqNum             | 10   | U32        | 2

    MamaSenderId         | 20   | U64        | 5841862551486352289

 

Note that for MamaBookPublisherJava to work, it will also need to subscribe to another source which is publishing a MAMA dictionary.

 

Thanks

Stuart

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Macrux
Sent: 02 July 2015 15:59
To: openmama-dev@...
Subject: [Openmama-dev] Help with bookpublisher

 

Hi there,

Could someone, please, give me a hand with the MamdaBookPublisher.java example, I haven't been able to run it, I mean, I don't know which are the arguments I've to pass. Thank you all.

kind regards,

Nestor

761 - 780 of 2314