[PATCH] AVIS: Refactoring avis middleware and payload bridges.


Lee Skillen <lskillen@...>
 

Most of this work was conducted before QPID was fully implemented,
but this might be useful to make Avis a viable alternative (or as a
second reference bridge for testing purposes).

Primary changes :-

- Significant rewrite of how the transport bridge operates and also
allow more than one transport to run at once (e.g. for separate pub
dict_pub transports, similar to qpid).

- Add support for inbox-to-inbox functionality. Details on this to
follow in a separate patch for Mama. This can be applied without
support being "active" there yet. Essentially it uses a new meta-
attribute on an Avis payload to achieve it.

- Add support for enqueue callback functionality. Avis also now
utilises Mama queue for message processing rather than Wombat queue
so that things like enqueue callbacks are properly invoked.

- Add support for payload wrapping (i.e. form of agnosticism) in the
capability to send any payload through Avis. This also enables the
use of a payload that supports field names to allow Mama dictionary
support.

- Fix for onDestroy not being invoked for subscription cleanup.

- Various fixes for memory leaks and some race conditions between
receiving and sending messages.

- Build fix to allow avis to build against the payload bridge.

- Add support to copy attributes between Avis messages properly to
avoid memory leaks and half-copies.

- Cleanup memory for attributes in Avis messages when destroying.

Notes :-

- Latest development snapshots of Avis required for the opaque
functionality to work correctly. Might work on latest release but it
was buggy in the previous suggested version of Avis.

Signed-off-by: Lee Skillen <lskillen@vulcanft.com>
---
mama/c_cpp/src/c/bridge/avis/SConscript | 2 +-
mama/c_cpp/src/c/bridge/avis/avisdefs.h | 15 +-
mama/c_cpp/src/c/bridge/avis/bridge.c | 29 +-
mama/c_cpp/src/c/bridge/avis/msg.c | 55 +++-
mama/c_cpp/src/c/bridge/avis/publisher.c | 432 ++++++++++++++++++++++---
mama/c_cpp/src/c/bridge/avis/queue.c | 34 +-
mama/c_cpp/src/c/bridge/avis/sub.c | 263 +++++++++++----
mama/c_cpp/src/c/bridge/avis/subinitial.c | 19 +-
mama/c_cpp/src/c/bridge/avis/timer.c | 37 ++-
mama/c_cpp/src/c/bridge/avis/transportbridge.c | 173 ++++++----
mama/c_cpp/src/c/bridge/avis/transportbridge.h | 1 +
mama/c_cpp/src/c/payload/avismsg/SConscript | 1 -
mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c | 5 +-
mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h | 8 +
mama/c_cpp/src/c/payload/avismsg/avispayload.c | 32 +-
15 files changed, 876 insertions(+), 230 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/SConscript b/mama/c_cpp/src/c/bridge/avis/SConscript
index fe6157f..f313df4 100644
--- a/mama/c_cpp/src/c/bridge/avis/SConscript
+++ b/mama/c_cpp/src/c/bridge/avis/SConscript
@@ -16,7 +16,7 @@ incPath.append('#mama/c_cpp/src/c')

env['CCFLAGS'] = [x for x in env['CCFLAGS'] if x != '-pedantic-errors']

-env.Append(LIBS=['avis', 'mama', 'm', 'wombatcommon', 'uuid'],
+env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon', 'uuid'],
LIBPATH=libPath, CPPPATH=incPath)

conf = Configure(env, config_h='./config.h', log_file='./config.log')
diff --git a/mama/c_cpp/src/c/bridge/avis/avisdefs.h b/mama/c_cpp/src/c/bridge/avis/avisdefs.h
index 50ef5b5..d0b42ac 100644
--- a/mama/c_cpp/src/c/bridge/avis/avisdefs.h
+++ b/mama/c_cpp/src/c/bridge/avis/avisdefs.h
@@ -24,6 +24,7 @@

#include <avis/elvin.h>
#include <wombat/wSemaphore.h>
+#include "wombat/wInterlocked.h"

#if defined(__cplusplus)
extern "C" {
@@ -36,21 +37,25 @@ extern "C" {
if (!elvin_is_open(avis)) return MAMA_STATUS_INVALID_ARG; \
} while(0)

-#define SUBJECT_FIELD_NAME "__subj"
-#define INBOX_FIELD_NAME "__inbx"
-#define MAX_SUBJECT_LENGTH 256
+#define SUBJECT_FIELD_NAME "__subj"
+#define INBOX_FIELD_NAME "__inbx"
+#define ENCLOSED_MSG_FIELD_NAME "__emsg"
+
+#define MAX_SUBJECT_LENGTH 256
+
+typedef struct avisBridgeImpl avisBridgeImpl;

typedef struct avisTransportBridge
{
Elvin* mAvis;
mamaTransport mTransport;
wsem_t mAvisDispatchSem;
+ wthread_t mThreadId;
+ wInterlockedInt mDispatching;
} avisTransportBridge;

-
typedef struct avisBridgeImpl
{
- avisTransportBridge* mTransportBridge;
} avisBridgeImpl;
#define avisBridge(bridgeImpl) ((avisBridgeImpl*) bridgeImpl)

diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 20c8e43..97cbb08 100755
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -130,6 +130,7 @@ avisBridge_close (mamaBridge bridgeImpl)
{
mama_status status = MAMA_STATUS_OK;
mamaBridgeImpl* impl = NULL;
+ avisBridgeImpl* avisBridge = NULL;

mama_log (MAMA_LOG_LEVEL_FINEST, "avisBridge_close(): Entering.");

@@ -143,6 +144,15 @@ avisBridge_close (mamaBridge bridgeImpl)
status = MAMA_STATUS_PLATFORM;
}

+ wlock_destroy (impl->mLock);
+
+ mamaBridgeImpl_getClosure(impl, &avisBridge);
+
+ if (avisBridge)
+ {
+ free (avisBridge);
+ }
+
mamaQueue_destroyWait(impl->mDefaultEventQueue);

free (impl);
@@ -160,16 +170,6 @@ avisBridge_start(mamaQueue defaultEventQueue)

mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_start(): Start dispatching on default event queue.");

- // start Avis event loop(s)
- if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) mamaQueueImpl_getBridgeImpl(defaultEventQueue), (void**) &avisBridge))) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_start(): Could not get Elvin object");
- return status;
- }
- if (MAMA_STATUS_OK != (status = avisTransportBridge_start(avisBridge->mTransportBridge))) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_start(): Could not start dispatching on Avis");
- return status;
- }
-
// start Mama event loop
return mamaQueue_dispatch(defaultEventQueue);
}
@@ -182,15 +182,6 @@ avisBridge_stop(mamaQueue defaultEventQueue)

mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_stop(): Stopping bridge.");

- if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) mamaQueueImpl_getBridgeImpl(defaultEventQueue), (void**) &avisBridge))) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_stop(): Could not get Elvin object");
- return status;
- }
- if (MAMA_STATUS_OK != (status = avisTransportBridge_stop(avisBridge->mTransportBridge))) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_stop(): Could not stop dispatching on Avis %d", status);
- return status;
- }
-
// stop Mama event loop
status = mamaQueue_stopDispatch (defaultEventQueue);
if (status != MAMA_STATUS_OK)
diff --git a/mama/c_cpp/src/c/bridge/avis/msg.c b/mama/c_cpp/src/c/bridge/avis/msg.c
index 346598e..675ef6d 100644
--- a/mama/c_cpp/src/c/bridge/avis/msg.c
+++ b/mama/c_cpp/src/c/bridge/avis/msg.c
@@ -29,13 +29,14 @@
#include <msgimpl.h>
#include "avisdefs.h"
#include "avisbridgefunctions.h"
-
+#include "../../payload/avismsg/avispayload.h"
+#include "../../payload/avismsg/avismsgimpl.h"

typedef struct avisMsgImpl
{
- Attributes* mAvisMsg;
- mamaMsg mParent;
- bool mSecure;
+ msgPayload mAvisMsg;
+ mamaMsg mParent;
+ bool mSecure;
} avisMsgImpl;

#define avisMsg(msg) ((avisMsgImpl*)(msg))
@@ -60,9 +61,13 @@ avisBridgeMamaMsg_create (msgBridge* msg, mamaMsg parent)
impl = (avisMsgImpl*) calloc(1, sizeof(avisMsgImpl));
if (!impl) return MAMA_STATUS_NOMEM;

- mamaMsg_getNativeMsg(parent, (void**)&impl->mAvisMsg);
- impl->mParent = parent;
+ /* mamaMsg_getNativeMsg(parent, (void**)&impl->mAvisMsg); */
+ avismsgPayload_create (&impl->mAvisMsg);
+
+ impl->mParent = parent;
+
*msg = (msgBridge) impl;
+
return MAMA_STATUS_OK;
}

@@ -71,10 +76,12 @@ mama_status
avisBridgeMamaMsg_destroy (msgBridge msg, int destroyMsg)
{
CHECK_MSG(msg);
+
if (destroyMsg)
{
avisBridgeMamaMsg_destroyMiddlewareMsg(msg);
}
+
free(avisMsg(msg));

return MAMA_STATUS_OK;
@@ -84,7 +91,8 @@ mama_status
avisBridgeMamaMsg_destroyMiddlewareMsg (msgBridge msg)
{
CHECK_MSG(msg);
- attributes_destroy(avisMsg(msg)->mAvisMsg);
+
+ avismsgPayload_destroy (avisMsg(msg)->mAvisMsg);
avisMsg(msg)->mAvisMsg = NULL;

return MAMA_STATUS_OK;
@@ -94,7 +102,8 @@ mama_status
avisBridgeMamaMsg_detach (msgBridge msg)
{
CHECK_MSG(msg);
- avisMsg(msg)->mAvisMsg = attributes_clone(avisMsg(msg)->mAvisMsg);
+
+ avismsgPayload_copy (avisMsg(msg)->mAvisMsg, &avisMsg(msg)->mAvisMsg);

return MAMA_STATUS_OK;
}
@@ -110,8 +119,12 @@ mama_status
avisBridgeMamaMsgImpl_setReplyHandle (msgBridge msg, void* result)
{
mama_status status = MAMA_STATUS_OK;
+
CHECK_MSG(msg);
- if (MAMA_STATUS_OK != (status = mamaMsg_updateString(avisMsg(msg)->mParent, INBOX_FIELD_NAME, 0, (const char*) result))) {
+
+ if (MAMA_STATUS_OK != (status = avismsgPayload_updateString (
+ avisMsg(msg)->mAvisMsg, INBOX_FIELD_NAME, 0, (const char*) result)))
+ {
return status;
}

@@ -129,7 +142,11 @@ int
avisBridgeMamaMsg_isFromInbox (msgBridge msg)
{
const char* dummy;
- return (mamaMsg_getString(avisMsg(msg)->mParent, INBOX_FIELD_NAME, 0, &dummy) == MAMA_STATUS_OK) ? 1 : 0;
+
+ CHECK_MSG(msg);
+
+ return (MAMA_STATUS_OK == avismsgPayload_getString(
+ avisMsg(msg)->mAvisMsg, INBOX_FIELD_NAME, 0, &dummy));
}

mama_status
@@ -138,6 +155,7 @@ avisBridgeMamaMsg_setSendSubject (msgBridge msg,
const char* subject)
{
mama_status status = MAMA_STATUS_OK;
+
CHECK_MSG(msg);

if (MAMA_STATUS_OK != (status = mamaMsg_updateString(avisMsg(msg)->mParent, SUBJECT_FIELD_NAME, 0, subject))) {
@@ -145,9 +163,9 @@ avisBridgeMamaMsg_setSendSubject (msgBridge msg,
}

mamaMsg_updateString (avisMsg(msg)->mParent,
- MamaFieldSubscSymbol.mName,
- MamaFieldSubscSymbol.mFid,
- symbol);
+ MamaFieldSubscSymbol.mName,
+ MamaFieldSubscSymbol.mFid,
+ symbol);

return MAMA_STATUS_OK;
}
@@ -167,11 +185,15 @@ avisBridgeMamaMsg_duplicateReplyHandle (msgBridge msg, void** result)
mama_status status = MAMA_STATUS_OK;

CHECK_MSG(msg);
- if (MAMA_STATUS_OK != (status = mamaMsg_getString(avisMsg(msg)->mParent, INBOX_FIELD_NAME, 0, &replyAddr))) {
+
+ if (MAMA_STATUS_OK != (status = avismsgPayload_getString(
+ avisMsg(msg)->mAvisMsg, INBOX_FIELD_NAME, 0, &replyAddr)))
+ {
return status;
}

*result = (void*) strdup(replyAddr);
+
return MAMA_STATUS_OK;
}

@@ -193,7 +215,10 @@ avisBridgeMamaMsg_destroyReplyHandle (void* result)
mama_status
avisBridgeMamaMsgImpl_setAttributesAndSecure (msgBridge msg, void* attributes, uint8_t secure)
{
- avisMsg(msg)->mAvisMsg=(Attributes*)attributes;
+ CHECK_MSG(msg);
+
+ avismsgPayload_setAttributes (avisMsg(msg)->mAvisMsg, (Attributes*)attributes);
avisMsg(msg)->mSecure=secure;
+
return MAMA_STATUS_OK;
}
diff --git a/mama/c_cpp/src/c/bridge/avis/publisher.c b/mama/c_cpp/src/c/bridge/avis/publisher.c
index 1f6d598..e1d3e40 100644
--- a/mama/c_cpp/src/c/bridge/avis/publisher.c
+++ b/mama/c_cpp/src/c/bridge/avis/publisher.c
@@ -22,9 +22,11 @@
#include <string.h>

#include <mama/mama.h>
+#include <mama/msg.h>
#include <mama/inbox.h>
#include <mama/publisher.h>
#include <bridge.h>
+#include <msgimpl.h>
#include <inboximpl.h>
#include "subinitial.h"
#include "avisbridgefunctions.h"
@@ -50,12 +52,175 @@ typedef struct avisPublisherBridge
if (!elvin_is_open(avisPublisher(publisher)->mAvis)) return MAMA_STATUS_INVALID_ARG; \
} while(0)

+static
+void sendAvisMessage (Elvin* avis,
+ void* closure)
+{
+ mamaMsg msg = (mamaMsg) closure;
+ Attributes* attr = NULL;
+ mama_size_t dataLen = 0;
+
+ mamaMsgImpl_getPayloadBuffer (msg, (const void**)&attr, &dataLen);
+
+ if (!elvin_send (avis, attr))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "sendAvisMessage(): "
+ "Could not send message - %s", avis->error.message);
+ }

+ if (gMamaLogLevel >= MAMA_LOG_LEVEL_FINEST)
+ {
+ mama_log (MAMA_LOG_LEVEL_FINEST, "sendAvisMessage(): "
+ "Send message: %s", mamaMsg_toString (msg));
+ }

+ mamaMsg_destroy (msg);
+}

static mama_status
avisBridgeMamaPublisherImpl_buildSendSubject (avisPublisherBridge* impl);

+static
+mama_status
+avisBridgeMamaPublisherImpl_prepareMessage (mamaMsg* msg)
+{
+ mama_status ret = MAMA_STATUS_OK;
+ mamaPayloadType payload = MAMA_PAYLOAD_UNKNOWN;
+
+ ret = mamaMsg_getPayloadType (*msg, &payload);
+
+ if (MAMA_STATUS_OK != ret)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisherImpl_prepareMessage(): "
+ "Could not get message payload type. [%d]", ret);
+
+ return ret;
+ }
+
+ if (MAMA_PAYLOAD_AVIS == payload)
+ {
+ /* Avis message, just perform a copy and detach so that it
+ * can be enqueued on the avis dispatch thread. */
+ mamaMsg copyMsg = NULL;
+
+ ret = mamaMsg_copy (*msg, &copyMsg);
+
+ if (MAMA_STATUS_OK != ret)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisherImpl_prepareMessage(): "
+ "Could not copy message. [%d]", ret);
+
+ return ret;
+ }
+
+ *msg = copyMsg;
+ }
+ else
+ {
+ /* Non-Avis message - Strategy is to wrap it within the enclosed field
+ * of a new Avis message. If the existing message had a bridged message
+ * within it, then this forms the basis for the new message. Otherwise,
+ * a brand new message is constructed. */
+ mamaMsg encMsg = NULL;
+ msgBridge bridgeMsg = NULL;
+ const void* buf = NULL;
+ mama_size_t bufSize = 0;
+
+ if (gMamaLogLevel >= MAMA_LOG_LEVEL_FINEST)
+ {
+ mama_log (MAMA_LOG_LEVEL_FINEST,
+ "sendAvisMessage(): "
+ "Enclosing message: %s", mamaMsg_toString (*msg));
+ }
+
+ ret = mamaMsg_getByteBuffer (*msg, &buf, &bufSize);
+
+ if (MAMA_STATUS_OK != ret)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisherImpl_prepareMessage(): "
+ "Could not get byte buffer for message. [%d]", ret);
+
+ return ret;
+ }
+
+ mamaMsgImpl_getBridgeMsg (*msg, &bridgeMsg);
+
+ if (bridgeMsg)
+ {
+ msgPayload payloadMsg = NULL;
+
+ avisBridgeMamaMsg_getNativeHandle (bridgeMsg, (void**) &payloadMsg);
+
+ if (payloadMsg)
+ {
+ const void* buf = NULL;
+ mama_size_t bufSize = 0;
+ Attributes* attributes = NULL;
+
+ avismsgPayload_getByteBuffer (payloadMsg, &buf, &bufSize);
+
+ buf = (const void*)attributes_clone ((Attributes*)buf);
+
+ mamaMsg_create (&encMsg);
+ mamaMsgImpl_setMsgBuffer (encMsg, buf, bufSize, MAMA_PAYLOAD_AVIS);
+ mamaMsgImpl_setMessageOwner (encMsg, 1);
+ }
+ }
+
+ if (!encMsg)
+ {
+ ret = mamaMsg_createForPayload (&encMsg, MAMA_PAYLOAD_AVIS);
+
+ if (MAMA_STATUS_OK != ret)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisherImpl_prepareMessage(): "
+ "Could not create enclosed message. [%d]", ret);
+
+ return ret;
+ }
+ }
+
+ ret = mamaMsg_addOpaque (encMsg, ENCLOSED_MSG_FIELD_NAME, 0,
+ buf, bufSize);
+
+ if (MAMA_STATUS_OK != ret)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisherImpl_prepareMessage(): "
+ "Could not add enclosed message as opaque field. [%d]", ret);
+
+ return ret;
+ }
+
+ *msg = encMsg;
+ }
+
+ return MAMA_STATUS_OK;
+}
+
+
+static
+mama_status
+avisBridgeMamaPublisherImpl_sendMessage (publisherBridge publisher, mamaMsg msg)
+{
+ if (!elvin_invoke (avisPublisher(publisher)->mAvis, &sendAvisMessage, msg))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisherImpl_sendMessage(): "
+ "Could not send message.");
+
+ mamaMsg_destroy (msg);
+
+ return MAMA_STATUS_PLATFORM;
+ }
+
+ return MAMA_STATUS_OK;
+}

mama_status
avisBridgeMamaPublisher_createByIndex (publisherBridge* result,
@@ -148,66 +313,90 @@ avisBridgeMamaPublisherImpl_buildSendSubject (avisPublisherBridge* impl)
mama_status
avisBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg)
{
- mama_size_t dataLen;
- mama_status status;
- Attributes* attributes = NULL;
-
+ mama_status ret = MAMA_STATUS_OK;
+
CHECK_PUBLISHER(publisher);

- status = mamaMsgImpl_getPayloadBuffer (msg, (const void**)&attributes, &dataLen);
- if (attributes == NULL)
- return MAMA_STATUS_INVALID_ARG;
+ ret = avisBridgeMamaPublisherImpl_prepareMessage (&msg);

- mamaMsg_updateString(msg, SUBJECT_FIELD_NAME, 0, avisPublisher(publisher)->mSubject);
+ if (MAMA_STATUS_OK != ret)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisher_send(): "
+ "Could not prepare message. [%d]", ret);

- if (!elvin_send(avisPublisher(publisher)->mAvis, attributes)) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaPublisher_send(): "
- "Could not send message.");
- return MAMA_STATUS_PLATFORM;
+ return ret;
}

- mama_log (MAMA_LOG_LEVEL_FINEST, "avisBridgeMamaPublisher_send(): "
- "Send message. %s", mamaMsg_toString(msg));
- return MAMA_STATUS_OK;
+ ret = mamaMsg_updateString(msg, SUBJECT_FIELD_NAME, 0, avisPublisher(publisher)->mSubject);
+
+ if (MAMA_STATUS_OK != ret)
+ {
+ return ret;
+ }
+
+ return avisBridgeMamaPublisherImpl_sendMessage (publisher, msg);
}

/* Send reply to inbox. */
mama_status
avisBridgeMamaPublisher_sendReplyToInbox (publisherBridge publisher,
- mamaMsg request,
- mamaMsg reply)
+ mamaMsg request,
+ mamaMsg reply)
{
- Attributes* requestMsg = NULL;
- Attributes* replyMsg = NULL;
- const char* replyAddr = NULL;
- mama_size_t dataLen;
+ msgPayload requestMsg = NULL;
+ mamaMsgReply replyHandle = NULL;
+ const char* replyAddr = NULL;
mama_status status;

CHECK_PUBLISHER(publisher);

mamaMsg_getNativeHandle(request, (void**) &requestMsg);
- mamaMsgImpl_getPayloadBuffer (reply, (const void**)&replyMsg, &dataLen);

- if (!requestMsg || !replyMsg) return MAMA_STATUS_NULL_ARG;
+ if (!requestMsg) return MAMA_STATUS_NULL_ARG;
+
+ status = avisBridgeMamaPublisherImpl_prepareMessage (&reply);
+
+ if (MAMA_STATUS_OK != status)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisher_sendReplyToInbox(): "
+ "Could not prepare message. [%d]", status);
+
+ return status;
+ }
+
+ status = mamaMsg_getReplyHandle (request, &replyHandle);
+
+ if (replyHandle)
+ {
+ replyAddr = ((mamaMsgReplyImpl*)replyHandle)->replyHandle;
+ }
+
+ if (status != MAMA_STATUS_OK || (replyAddr == NULL) || replyAddr[0] == '\0')
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisher_sendReplyToInbox(): "
+ "No reply address in message. [%d]", status);
+
+ mamaMsg_destroyReplyHandle (replyHandle);
+ mamaMsg_destroy (reply);

- status = mamaMsg_getString(request, INBOX_FIELD_NAME, 0, &replyAddr);
- if ((status != MAMA_STATUS_OK) || (replyAddr == NULL) || (strlen(replyAddr) == 0)) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaPublisher_sendReplyToInbox(): "
- "No reply address in message.");
return MAMA_STATUS_INVALID_ARG;
}

status = mamaMsg_updateString(reply, SUBJECT_FIELD_NAME, 0, replyAddr);
+
+ mamaMsg_destroyReplyHandle (replyHandle);
+
if (status != MAMA_STATUS_OK)
- return status;
+ {
+ mamaMsg_destroy (reply);

- if (!elvin_send(avisPublisher(publisher)->mAvis, replyMsg)) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaPublisher_send(): "
- "Could not send message.");
- return MAMA_STATUS_PLATFORM;
+ return status;
}

- return MAMA_STATUS_OK;
+ return avisBridgeMamaPublisherImpl_sendMessage (publisher, reply);
}

/* Destroy the publisher.*/
@@ -232,23 +421,43 @@ avisBridgeMamaPublisher_sendFromInboxByIndex (publisherBridge publisher,
const char* replyAddr = NULL;
mama_status status;

- // CHECK_PUBLISHER(publisher);
- if (avisPublisher(publisher) == 0)
- return MAMA_STATUS_NULL_ARG;
- if (avisPublisher(publisher)->mAvis == 0)
- return MAMA_STATUS_INVALID_ARG;
- if (!elvin_is_open(avisPublisher(publisher)->mAvis))
- return MAMA_STATUS_INVALID_ARG;
+ CHECK_PUBLISHER(publisher);
+
+ status = avisBridgeMamaPublisherImpl_prepareMessage (&msg);
+
+ if (MAMA_STATUS_OK != status)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisher_sendFromInboxByIndex(): "
+ "Could not prepare message. [%d]", status);
+
+ return status;
+ }

// get reply address from inbox
replyAddr = avisInboxImpl_getReplySubject(mamaInboxImpl_getInboxBridge(inbox));

// set reply address in msg
status = mamaMsg_updateString(msg, INBOX_FIELD_NAME, 0, replyAddr);
+
if (status != MAMA_STATUS_OK)
+ {
+ mamaMsg_destroy (msg);
+
return status;
+ }
+
+ status = mamaMsg_updateString (msg, SUBJECT_FIELD_NAME, 0,
+ avisPublisher(publisher)->mSubject);
+
+ if (MAMA_STATUS_OK != status)
+ {
+ mamaMsg_destroy (msg);
+
+ return status;
+ }

- return avisBridgeMamaPublisher_send(publisher, msg);
+ return avisBridgeMamaPublisherImpl_sendMessage (publisher, msg);
}

mama_status
@@ -269,9 +478,148 @@ avisBridgeMamaPublisher_sendReplyToInboxHandle (publisherBridge publisher,

CHECK_PUBLISHER(publisher);

+ status = avisBridgeMamaPublisherImpl_prepareMessage (&reply);
+
+ if (MAMA_STATUS_OK != status)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisher_sendReplyToInboxHandle(): "
+ "Could not prepare message. [%d]", status);
+
+ return status;
+ }
+
status = mamaMsg_updateString(reply, SUBJECT_FIELD_NAME, 0, (const char*) inbox);
+
if (status != MAMA_STATUS_OK)
+ {
+ mamaMsg_destroy (reply);
+
return status;
+ }
+
+ return avisBridgeMamaPublisherImpl_sendMessage (publisher, reply);
+}
+
+/* Send reply to inbox. */
+mama_status
+avisBridgeMamaPublisher_sendReplyFromInboxToInbox (publisherBridge publisher,
+ mamaInbox inbox,
+ mamaMsg request,
+ mamaMsg reply)
+{
+ msgPayload requestMsg = NULL;
+ mamaMsgReply replyHandle = NULL;
+ const char* fromAddr = NULL;
+ const char* replyAddr = NULL;
+ mama_status status;
+
+ CHECK_PUBLISHER(publisher);
+
+ mamaMsg_getNativeHandle(request, (void**) &requestMsg);
+
+ if (!requestMsg) return MAMA_STATUS_NULL_ARG;
+
+ status = avisBridgeMamaPublisherImpl_prepareMessage (&reply);
+
+ if (MAMA_STATUS_OK != status)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisher_sendFromInboxToInbox(): "
+ "Could not prepare message. [%d]", status);
+
+ return status;
+ }
+
+ status = mamaMsg_getReplyHandle (request, &replyHandle);
+
+ if (replyHandle)
+ {
+ replyAddr = ((mamaMsgReplyImpl*)replyHandle)->replyHandle;
+ }
+
+ if ((status != MAMA_STATUS_OK) || (fromAddr == NULL) || (*fromAddr == '\0'))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisher_sendReplyFromInboxToInbox(): "
+ "No reply address in message. [%d]", status);
+
+ mamaMsg_destroyReplyHandle (replyHandle);
+ mamaMsg_destroy (reply);
+
+ return MAMA_STATUS_INVALID_ARG;
+ }
+
+ status = mamaMsg_updateString(reply, SUBJECT_FIELD_NAME, 0, fromAddr);
+
+ mamaMsg_destroyReplyHandle (replyHandle);
+
+ if (status != MAMA_STATUS_OK)
+ {
+ mamaMsg_destroy (reply);
+
+ return status;
+ }
+
+ // get reply address from inbox
+ replyAddr = avisInboxImpl_getReplySubject(mamaInboxImpl_getInboxBridge(inbox));
+
+ // set reply address in msg
+ status = mamaMsg_updateString(reply, INBOX_FIELD_NAME, 0, replyAddr);
+
+ if (status != MAMA_STATUS_OK)
+ {
+ mamaMsg_destroy (reply);
+
+ return status;
+ }
+
+ return avisBridgeMamaPublisherImpl_sendMessage (publisher, reply);
+}
+
+mama_status
+avisBridgeMamaPublisher_sendReplyFromInboxToInboxHandle (publisherBridge publisher,
+ mamaInbox inbox,
+ void* replyAddress,
+ mamaMsg reply)
+{
+ const char* fromAddr = NULL;
+ mama_status status;
+
+ CHECK_PUBLISHER(publisher);
+
+ status = avisBridgeMamaPublisherImpl_prepareMessage (&reply);
+
+ if (MAMA_STATUS_OK != status)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avisBridgeMamaPublisher_sendFromInboxToInbox(): "
+ "Could not prepare message. [%d]", status);
+
+ return status;
+ }
+
+ // get reply address from inbox
+ fromAddr = avisInboxImpl_getReplySubject(mamaInboxImpl_getInboxBridge(inbox));
+
+ // set reply address in msg
+ status = mamaMsg_updateString(reply, INBOX_FIELD_NAME, 0, fromAddr);
+
+ if (status != MAMA_STATUS_OK)
+ {
+ mamaMsg_destroy (reply);
+
+ return status;
+ }
+
+ status = mamaMsg_updateString(reply, SUBJECT_FIELD_NAME, 0, (const char*) replyAddress);
+
+ if (status != MAMA_STATUS_OK)
+ {
+ mamaMsg_destroy (reply);
+
+ return status;
+ }

- return avisBridgeMamaPublisher_sendFromInboxByIndex(publisher, 0, (mamaInbox) inbox, reply);
+ return avisBridgeMamaPublisherImpl_sendMessage (publisher, reply);
}
diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c
index 952a644..6416751 100644
--- a/mama/c_cpp/src/c/bridge/avis/queue.c
+++ b/mama/c_cpp/src/c/bridge/avis/queue.c
@@ -30,6 +30,8 @@
typedef struct avisQueueBridge {
mamaQueue mParent;
wombatQueue mQueue;
+ mamaQueueEnqueueCB mEnqueueCb;
+ void* mEnqueueClosure;
uint8_t mIsNative;
} avisQueueBridge;

@@ -61,7 +63,9 @@ avisBridgeMamaQueue_create (queueBridge* queue,
if (avisQueue == NULL)
return MAMA_STATUS_NOMEM;

- avisQueue->mParent = parent;
+ avisQueue->mParent = parent;
+ avisQueue->mEnqueueCb = NULL;
+ avisQueue->mEnqueueClosure = NULL;

wombatQueue_allocate (&avisQueue->mQueue);
wombatQueue_create (avisQueue->mQueue, 0, 0, 0);
@@ -86,9 +90,11 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue,
if (avisQueue == NULL)
return MAMA_STATUS_NOMEM;

- avisQueue->mParent = parent;
- avisQueue->mQueue = (wombatQueue)nativeQueue;
- avisQueue->mIsNative = 1;
+ avisQueue->mParent = parent;
+ avisQueue->mEnqueueCb = NULL;
+ avisQueue->mEnqueueClosure = NULL;
+ avisQueue->mQueue = (wombatQueue)nativeQueue;
+ avisQueue->mIsNative = 1;

*queue = (queueBridge) avisQueue;

@@ -99,7 +105,7 @@ mama_status
avisBridgeMamaQueue_destroy (queueBridge queue)
{
CHECK_QUEUE(queue);
- if (avisQueue(queue)->mIsNative)
+ if (!avisQueue(queue)->mIsNative)
wombatQueue_destroy (avisQueue(queue)->mQueue);
free(avisQueue(queue));
return MAMA_STATUS_OK;
@@ -210,6 +216,12 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge queue,
if (status != WOMBAT_QUEUE_OK)
return MAMA_STATUS_PLATFORM;

+ if (avisQueue(queue)->mEnqueueCb)
+ {
+ avisQueue(queue)->mEnqueueCb (avisQueue(queue)->mParent,
+ avisQueue(queue)->mEnqueueClosure);
+ }
+
return MAMA_STATUS_OK;
}

@@ -240,14 +252,22 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge queue,
void* closure)
{
CHECK_QUEUE(queue);
- return MAMA_STATUS_NOT_IMPLEMENTED;
+
+ avisQueue(queue)->mEnqueueCb = callback;
+ avisQueue(queue)->mEnqueueClosure = closure;
+
+ return MAMA_STATUS_OK;
}

mama_status
avisBridgeMamaQueue_removeEnqueueCallback (queueBridge queue)
{
CHECK_QUEUE(queue);
- return MAMA_STATUS_NOT_IMPLEMENTED;
+
+ avisQueue(queue)->mEnqueueCb = NULL;
+ avisQueue(queue)->mEnqueueClosure = NULL;
+
+ return MAMA_STATUS_OK;
}

mama_status
diff --git a/mama/c_cpp/src/c/bridge/avis/sub.c b/mama/c_cpp/src/c/bridge/avis/sub.c
index a670cc3..ec25185 100644
--- a/mama/c_cpp/src/c/bridge/avis/sub.c
+++ b/mama/c_cpp/src/c/bridge/avis/sub.c
@@ -26,6 +26,7 @@
#include <msgimpl.h>
#include <queueimpl.h>
#include <wombat/queue.h>
+#include <wombat/wSemaphore.h>

#include "avisbridgefunctions.h"
#include "transportbridge.h"
@@ -50,8 +51,16 @@ typedef struct avisSubscription
int mIsNotMuted;
int mIsValid;

+ wsem_t mCreateDestroySem;
+
} avisSubscription;

+typedef struct avisCallbackContext
+{
+ void* attributes;
+ void* subscriber;
+} avisCallbackContext;
+
#define avisSub(subscriber) ((avisSubscription*)(subscriber))
#define CHECK_SUBSCRIBER(subscriber) \
do { \
@@ -72,18 +81,48 @@ const char* makeAvisSubject(const char* subject)
}


-static void MAMACALLTYPE
-destroy_callback(void* subscriber, void* closure)
+static
+void subscribeAvis (Elvin* avis,
+ void* closure)
{
- // cant do anything without a subscriber
- if (!avisSub(subscriber)) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avis_callback(): called with NULL subscriber!");
+ avisSubscription* impl = (avisSubscription*) closure;
+
+ impl->mAvisSubscription = elvin_subscribe(impl->mAvis, makeAvisSubject(impl->mSubject));
+
+ if (impl->mAvisSubscription == NULL)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "subscribeAvis(): "
+ "Failed to subscribe to: %s", impl->mSubject);
+
+ wsem_post (&impl->mCreateDestroySem);
+
return;
}

- (*avisSub(subscriber)->mMamaCallback.onDestroy)(avisSub(subscriber)->mMamaSubscription, avisSub(subscriber)->mClosure);
+ elvin_subscription_add_listener (impl->mAvisSubscription, avis_callback, impl);

- free(avisSub(subscriber));
+ mama_log (MAMA_LOG_LEVEL_FINER,
+ "Made Avis subscription to: %s", impl->mSubject);
+
+ wsem_post (&impl->mCreateDestroySem);
+}
+
+static
+void unsubscribeAvis (Elvin* avis,
+ void* closure)
+{
+ Subscription* sub = (Subscription*) closure;
+
+ elvin_subscription_remove_listener (sub, avis_callback);
+
+ if (!elvin_unsubscribe(avis, sub))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "unsubscribeAvis(): "
+ "Failed to unsubscribe to: %s",
+ sub->subscription_expr);
+ }
+
+ free (sub);
}

/**
@@ -93,28 +132,49 @@ destroy_callback(void* subscriber, void* closure)
* @param closure The subscriber
*/
static void MAMACALLTYPE
-avis_queue_callback (void* data, void* closure)
+avis_queue_callback (mamaQueue queue,
+ void* closure)
{
- mama_status status;
- mamaMsg tmpMsg;
- msgBridge bridgeMsg;
+ mama_status status = MAMA_STATUS_OK;
+ mamaMsg tmpMsg = NULL;
+ msgBridge bridgeMsg = NULL;
+ const void* buf = NULL;
+ mama_size_t bufSize = 0;
+
+ avisCallbackContext* ctx = (avisCallbackContext*) closure;
+
+ void* data = ctx->attributes;
+ void* subscriber = ctx->subscriber;

/* cant do anything without a subscriber */
- if (!avisSub(closure)) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avis_callback(): called with NULL subscriber!");
+ if (!subscriber)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avis_callback(): called with NULL subscriber!");
+
+ attributes_destroy (data);
+ free (ctx);
return;
}

/*Make sure that the subscription is processing messages*/
- if ((!avisSub(closure)->mIsNotMuted) || (!avisSub(closure)->mIsValid)) return;
-
+ if ((!avisSub(subscriber)->mIsNotMuted) ||
+ (!avisSub(subscriber)->mIsValid))
+ {
+ attributes_destroy (data);
+ free (ctx);
+ return;
+ }

/*This is the reuseable message stored on the associated MamaQueue*/
- tmpMsg = mamaQueueImpl_getMsg(avisSub(closure)->mQueue);
+ tmpMsg = mamaQueueImpl_getMsg(avisSub(subscriber)->mQueue);
if (!tmpMsg)
{
mama_log (MAMA_LOG_LEVEL_ERROR, "avis_callback(): "
"Could not get cached mamaMsg from event queue.");
+
+ attributes_destroy (data);
+ free (ctx);
return;
}

@@ -125,12 +185,13 @@ avis_queue_callback (void* data, void* closure)
mama_log (MAMA_LOG_LEVEL_ERROR, "avis_callback(): "
"Could not get bridge message from cached"
" queue mamaMsg [%d]", status);
+
+ attributes_destroy (data);
+ free (ctx);
return;
}

/*Set the buffer and the reply handle on the bridge message structure*/
- avisBridgeMamaMsgImpl_setAttributesAndSecure (bridgeMsg, data, 0);
-
if (MAMA_STATUS_OK!=(status=mamaMsgImpl_setMsgBuffer (tmpMsg,
data,
0, MAMA_PAYLOAD_AVIS)))
@@ -138,21 +199,79 @@ avis_queue_callback (void* data, void* closure)
mama_log (MAMA_LOG_LEVEL_ERROR,
"avis_callback(): mamaMsgImpl_setMsgBuffer() failed. [%d]",
status);
+
+ attributes_destroy (data);
+ free (ctx);
return;
}

- /*Process the message as normal*/
- if (MAMA_STATUS_OK != (status=mamaSubscription_processMsg
- (avisSub(closure)->mMamaSubscription, tmpMsg)))
+ if (MAMA_STATUS_OK == mamaMsg_getOpaque (
+ tmpMsg, ENCLOSED_MSG_FIELD_NAME, 0, &buf, &bufSize) &&
+ bufSize > 0)
{
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "avis_callback(): "
- "mamaSubscription_processMsg() failed. [%d]",
- status);
+ mamaMsg encMsg = NULL;
+ mamaBridgeImpl* bridge =
+ mamaSubscription_getBridgeImpl (avisSub(subscriber)->mMamaSubscription);
+
+ status = mamaMsg_createFromByteBuffer (&encMsg, buf, bufSize);
+
+ if (MAMA_STATUS_OK != status)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avis_callback(): "
+ "Could not create message from enclosed byte buffer. [%d]",
+ status);
+
+ attributes_destroy (data);
+ free (ctx);
+ return;
+ }
+
+ mamaMsgImpl_useBridgePayload (encMsg, bridge);
+ mamaMsgImpl_getBridgeMsg (encMsg, &bridgeMsg);
+
+ /*Set the buffer and the reply handle on the bridge message structure*/
+ avisBridgeMamaMsgImpl_setAttributesAndSecure (bridgeMsg, data, 0);
+
+ if (gMamaLogLevel >= MAMA_LOG_LEVEL_FINEST)
+ {
+ mama_log (MAMA_LOG_LEVEL_FINEST, "avis_callback(): "
+ "Received enclosed message: %s", mamaMsg_toString (encMsg));
+ }
+
+ if (MAMA_STATUS_OK != (status=mamaSubscription_processMsg
+ (avisSub(subscriber)->mMamaSubscription, encMsg)))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avis_callback(): "
+ "mamaSubscription_processMsg() failed. [%d]",
+ status);
+ }
+
+ mamaMsg_destroy (encMsg);
+ }
+ else
+ {
+ avisBridgeMamaMsgImpl_setAttributesAndSecure (bridgeMsg, data, 0);
+
+ if (gMamaLogLevel >= MAMA_LOG_LEVEL_FINEST)
+ {
+ mama_log (MAMA_LOG_LEVEL_FINEST, "avis_callback(): "
+ "Received message: %s", mamaMsg_toString (tmpMsg));
+ }
+
+ /*Process the message as normal*/
+ if (MAMA_STATUS_OK != (status=mamaSubscription_processMsg
+ (avisSub(subscriber)->mMamaSubscription, tmpMsg)))
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR,
+ "avis_callback(): "
+ "mamaSubscription_processMsg() failed. [%d]",
+ status);
+ }
}

- attributes_free ((Attributes*)data);
- free ((Attributes*)data);
+ free (ctx);
}

static void
@@ -162,8 +281,6 @@ avis_callback(
bool secure,
void* subscriber)
{
- wombatQueue queue = NULL;
-
/* cant do anything without a subscriber */
if (!avisSub(subscriber)) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avis_callback(): called with NULL subscriber!");
@@ -173,21 +290,14 @@ avis_callback(
/*Make sure that the subscription is processing messages*/
if ((!avisSub(subscriber)->mIsNotMuted) || (!avisSub(subscriber)->mIsValid)) return;

- mamaQueue_getNativeHandle(avisSub(subscriber)->mQueue, &queue);
- if (!queue)
- {
- mama_log (MAMA_LOG_LEVEL_ERROR, "avis_callback(): "
- "Could not get event queue.");
- return;
- }
-
- wombatQueue_enqueue (queue, avis_queue_callback,
- attributes_clone(attributes), subscriber);
+ avisCallbackContext* ctx = (avisCallbackContext*) malloc (sizeof (avisCallbackContext));
+ ctx->attributes = attributes_clone (attributes);
+ ctx->subscriber = subscriber;

- return;
+ mamaQueue_enqueueEvent (avisSub (subscriber)->mQueue,
+ avis_queue_callback, ctx);
}

-
mama_status
avisBridgeMamaSubscription_create (subscriptionBridge* subscriber,
const char* source,
@@ -198,7 +308,8 @@ avisBridgeMamaSubscription_create (subscriptionBridge* subscriber,
mamaSubscription subscription,
void* closure)
{
- avisSubscription* impl = NULL;
+ avisSubscription* impl = NULL;
+ avisTransportBridge* avisTransport = NULL;

if (!subscriber || !subscription || !transport )
return MAMA_STATUS_NULL_ARG;
@@ -208,9 +319,16 @@ avisBridgeMamaSubscription_create (subscriptionBridge* subscriber,
return MAMA_STATUS_NOMEM;

impl->mAvis = getAvis(transport);
+
if (!impl->mAvis)
return MAMA_STATUS_INVALID_ARG;

+ mamaTransport_getBridgeTransport (
+ transport, (void*) &avisTransport);
+
+ if (!avisTransport)
+ return MAMA_STATUS_INVALID_ARG;
+
if (source != NULL && source[0])
{
impl->mSource = source;
@@ -240,16 +358,22 @@ avisBridgeMamaSubscription_create (subscriptionBridge* subscriber,
impl->mClosure = closure;
impl->mIsNotMuted = 1;
impl->mIsValid = 1;
+ impl->mAvisSubscription = NULL;

- impl->mAvisSubscription = elvin_subscribe(impl->mAvis, makeAvisSubject(impl->mSubject));
- if (impl->mAvisSubscription == NULL)
- return MAMA_STATUS_PLATFORM;
-
- elvin_subscription_add_listener(impl->mAvisSubscription, avis_callback, impl);
- mama_log (MAMA_LOG_LEVEL_FINER,
- "Made Avis subscription to: %s", impl->mSubject);
+ wsem_init(&impl->mCreateDestroySem, 0, 0);

*subscriber = (subscriptionBridge) impl;
+
+ if (avisTransportBridge_isDispatching (avisTransport))
+ {
+ elvin_invoke (impl->mAvis, &subscribeAvis, impl);
+ wsem_wait (&impl->mCreateDestroySem);
+ }
+ else
+ {
+ subscribeAvis (impl->mAvis, impl);
+ }
+
return MAMA_STATUS_OK;
}

@@ -270,32 +394,47 @@ avisBridgeMamaSubscription_createWildCard (
mama_status
avisBridgeMamaSubscription_destroy (subscriptionBridge subscriber)
{
- mama_status status = MAMA_STATUS_OK;
- wombatQueue queue = NULL;
+ mama_status status = MAMA_STATUS_OK;
+ wombatQueue queue = NULL;
+ avisTransportBridge* avisTransport = NULL;

CHECK_SUBSCRIBER(subscriber);

- elvin_subscription_remove_listener(avisSub(subscriber)->mAvisSubscription, avis_callback);
+ mamaTransport_getBridgeTransport (
+ avisSub(subscriber)->mTransport, (void*) &avisTransport);

- if (!elvin_unsubscribe(avisSub(subscriber)->mAvis, avisSub(subscriber)->mAvisSubscription)) {
- // NOTE: elvin_unsubscribe sometimes returns failure for no apparent reason, so dont log errors here:
- // 2011-09-02 11:59:10: avis error code=2, error msg=Illegal frame size: 61
- //log_avis_error(MAMA_LOG_LEVEL_ERROR, avisSub(subscriber)->mAvis);
- //status = MAMA_STATUS_PLATFORM;
- }
+ if (!avisTransport)
+ return MAMA_STATUS_INVALID_ARG;

- free(avisSub(subscriber)->mAvisSubscription);
+ if (avisTransportBridge_isDispatching (avisTransport))
+ {
+ elvin_invoke (avisSub(subscriber)->mAvis, &unsubscribeAvis,
+ avisSub(subscriber)->mAvisSubscription);
+ }
+ else
+ {
+ unsubscribeAvis (avisSub(subscriber)->mAvis,
+ avisSub(subscriber)->mAvisSubscription);
+ }

+ avisSub(subscriber)->mAvisSubscription = NULL;
+
mamaQueue_getNativeHandle(avisSub(subscriber)->mQueue, &queue);
+
if (!queue)
{
- mama_log (MAMA_LOG_LEVEL_ERROR, "avis_callback(): "
+ mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaSubscription_destroy(): "
"Could not get event queue.");
return MAMA_STATUS_PLATFORM;
}
+
+ avisSub(subscriber)->mMamaCallback.onDestroy (
+ avisSub(subscriber)->mMamaSubscription,
+ avisSub(subscriber)->mClosure);

- wombatQueue_enqueue (queue, destroy_callback,
- (void*)subscriber, NULL);
+ wsem_destroy(&avisSub(subscriber)->mCreateDestroySem);
+
+ free(avisSub(subscriber));

return status;
}
diff --git a/mama/c_cpp/src/c/bridge/avis/subinitial.c b/mama/c_cpp/src/c/bridge/avis/subinitial.c
index b230f33..6a0b5af 100644
--- a/mama/c_cpp/src/c/bridge/avis/subinitial.c
+++ b/mama/c_cpp/src/c/bridge/avis/subinitial.c
@@ -78,6 +78,8 @@ avisInbox_onDestroy(

if (avisInbox(closure)->mOnInboxDestroyed)
(avisInbox(closure)->mOnInboxDestroyed)(avisInbox(closure)->mParent, avisInbox(closure)->mClosure);
+
+ free (avisInbox(closure));
}

static void MAMACALLTYPE
@@ -178,11 +180,18 @@ avisBridgeMamaInbox_create (inboxBridge* bridge,
mama_status
avisBridgeMamaInbox_destroy (inboxBridge inbox)
{
- CHECK_INBOX(inbox);
- mamaSubscription_destroy(avisInbox(inbox)->mSubscription);
- mamaSubscription_deallocate(avisInbox(inbox)->mSubscription);
- free(avisInbox(inbox));
- return MAMA_STATUS_OK;
+ mamaSubscription sub = NULL;
+
+ CHECK_INBOX(inbox);
+
+ /* Store subscription before callign destroy as the inbox might be
+ * freed before we get a chance to deallocate it. */
+ sub = avisInbox(inbox)->mSubscription;
+
+ mamaSubscription_destroy(sub);
+ mamaSubscription_deallocate(sub);
+
+ return MAMA_STATUS_OK;
}


diff --git a/mama/c_cpp/src/c/bridge/avis/timer.c b/mama/c_cpp/src/c/bridge/avis/timer.c
index 627a6ed..61cb01f 100755
--- a/mama/c_cpp/src/c/bridge/avis/timer.c
+++ b/mama/c_cpp/src/c/bridge/avis/timer.c
@@ -34,27 +34,27 @@ typedef struct avisTimerImpl_
mamaTimerCb mAction;
void* mClosure;
mamaTimer mParent;
- wombatQueue mQueue;
+ mamaQueue mQueue;

/* This callback will be invoked whenever the timer has been completely destroyed. */
mamaTimerCb mOnTimerDestroyed;

- /* TODO: add queue */
} avisTimerImpl;

static void MAMACALLTYPE
-destroy_callback(void* timer, void* closure)
+destroy_callback (mamaQueue queue, void* closure)
{
- avisTimerImpl* impl = (avisTimerImpl*)timer;
+ avisTimerImpl* impl = (avisTimerImpl*) closure;

(*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure);
+
free (impl);
}

static void MAMACALLTYPE
-timerQueueCb (void* data, void* closure)
+timerQueueCb (mamaQueue queue, void* closure)
{
- avisTimerImpl* impl = (avisTimerImpl*)data;
+ avisTimerImpl* impl = (avisTimerImpl*)closure;

if (impl->mAction)
impl->mAction (impl->mParent, impl->mClosure);
@@ -73,6 +73,7 @@ timerCb (timerElement timer,
/* Mama timers are repeating */
timeout.tv_sec = (time_t)impl->mInterval;
timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
+
if (0 != createTimer (&impl->mTimerElement,
gTimerHeap,
timerCb,
@@ -84,12 +85,13 @@ timerCb (timerElement timer,
"mamaTimer_create ():");
}

- wombatQueue_enqueue (impl->mQueue, timerQueueCb,
- (void*)impl, NULL);
+ mamaQueue_enqueueEvent (impl->mQueue,
+ timerQueueCb,
+ (void*)impl);
}

mama_status
-avisBridgeMamaTimer_create (timerBridge* result,
+avisBridgeMamaTimer_create (timerBridge* result,
void* nativeQueueHandle,
mamaTimerCb action,
mamaTimerCb onTimerDestroyed,
@@ -112,17 +114,21 @@ avisBridgeMamaTimer_create (timerBridge* result,
impl = (avisTimerImpl*)calloc (1, sizeof (avisTimerImpl));
if (impl == NULL) return MAMA_STATUS_NOMEM;

- impl->mQueue = (wombatQueue)nativeQueueHandle;
+ impl->mQueue = NULL;
impl->mParent = parent;
impl->mAction = action;
impl->mClosure = closure;
impl->mInterval = interval;
+
+ mamaTimer_getQueue (parent, &impl->mQueue);
+
impl->mOnTimerDestroyed = onTimerDestroyed;

*result = (timerBridge)impl;

timeout.tv_sec = (time_t)interval;
timeout.tv_usec = ((interval-timeout.tv_sec) * 1000000.0);
+
if (0 != createTimer (&impl->mTimerElement,
gTimerHeap,
timerCb,
@@ -150,8 +156,8 @@ avisBridgeMamaTimer_destroy (timerBridge timer)

impl->mAction = NULL;
mama_log (MAMA_LOG_LEVEL_FINEST,
- "%s Entering.",
- "avisMamaTimer_destroy ():");
+ "%s Entering for 0x%x",
+ "avisMamaTimer_destroy ():", impl);

if (0 != destroyTimer (gTimerHeap, impl->mTimerElement))
{
@@ -161,9 +167,9 @@ avisBridgeMamaTimer_destroy (timerBridge timer)
returnStatus = MAMA_STATUS_PLATFORM;
}

-
- wombatQueue_enqueue (impl->mQueue, destroy_callback,
- (void*)impl, NULL);
+ mamaQueue_enqueueEvent (impl->mQueue,
+ destroy_callback,
+ (void*)impl);

return returnStatus;
}
@@ -208,6 +214,7 @@ avisBridgeMamaTimer_reset (timerBridge timer)
status = MAMA_STATUS_PLATFORM;
}
}
+
return status;
}

diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.c b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
index f274ee7..4217dff 100755
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
@@ -33,6 +33,7 @@
#include "transportbridge.h"
#include "avisbridgefunctions.h"
#include "avisdefs.h"
+#include "wombat/wInterlocked.h"

#define TPORT_PREFIX "mama.avis.transport"
#define TPORT_PARAM "url"
@@ -59,16 +60,24 @@ void log_avis_error(MamaLogLevel logLevel, Elvin* avis)
avis->error.message);
}

+static
+void closeNotification (Elvin* avis,
+ void* closure)
+{
+}
+
void closeListener(Elvin* avis,
CloseReason reason,
const char* message,
void* closure)
{
- const char* errMsg;
- if (avisBridge(closure) == NULL)
+ const char* errMsg = NULL;
+ avisTransportBridge* bridge = (avisTransportBridge*) closure;
+
+ if (bridge == NULL)
{
mama_log (MAMA_LOG_LEVEL_FINE,
- "Avis closeListener: could not get Avis bridge");
+ "Avis closeListener: could not get Avis transport bridge");
return;
}

@@ -89,12 +98,22 @@ void closeListener(Elvin* avis,
default:
errMsg = "Unknown Avis error";
}
+
+ mama_log (MAMA_LOG_LEVEL_FINE, "%s : %s", errMsg, message);
+
+ if (REASON_PROTOCOL_VIOLATION == reason)
+ {
+ /* Ignore protocol violations */
+ return;
+ }
+
mamaTransportImpl_disconnect(
- avisBridge(closure)->mTransportBridge->mTransport,
+ bridge->mTransport,
MAMA_TRANSPORT_DISCONNECT,
NULL,
NULL);
- mama_log (MAMA_LOG_LEVEL_FINE, "%s : %s", errMsg, message);
+
+ wInterlocked_set (0, &bridge->mDispatching);
}

static const char*
@@ -140,54 +159,89 @@ Elvin* getAvis(mamaTransport transport)
void* avisDispatchThread(void* closure)
{
avisTransportBridge* transportBridge = (avisTransportBridge*) closure;
- elvin_event_loop(transportBridge->mAvis);
- wsem_post(&transportBridge->mAvisDispatchSem);
+
+ while (1 == wInterlocked_read (&transportBridge->mDispatching))
+ {
+ elvin_poll (transportBridge->mAvis);
+
+ if (ELVIN_ERROR_TIMEOUT == transportBridge->mAvis->error.code)
+ {
+ /* Timeout is acceptable and expected */
+ elvin_error_reset (&transportBridge->mAvis->error);
+ }
+ }
+
+ wsem_post (&transportBridge->mAvisDispatchSem);
return NULL;
}


mama_status avisTransportBridge_start(avisTransportBridge* transportBridge)
{
- /* stop Avis event loop */
- wthread_t tid;
- int rc;
-
- CHECK_TRANSPORT(transportBridge);
-
- rc = wthread_create(&tid, NULL, avisDispatchThread, transportBridge);
- if (0 != rc)
- {
- mama_log (MAMA_LOG_LEVEL_ERROR, "wthread_create returned %d", rc);
- return MAMA_STATUS_SYSTEM_ERROR;
- }
-
- return MAMA_STATUS_OK;
-}
+ /* stop Avis event loop */
+ int rc;
+
+ CHECK_TRANSPORT(transportBridge);
+
+ if (1 == wInterlocked_read (&transportBridge->mDispatching))
+ {
+ mama_log (MAMA_LOG_LEVEL_WARN, "avisTransportBridge_start(): "
+ "Avis already dispatching");
+ log_avis_error (MAMA_LOG_LEVEL_WARN, transportBridge->mAvis);
+ return MAMA_STATUS_OK;
+ }

+ wInterlocked_set (1, &transportBridge->mDispatching);

+ rc = wthread_create (&transportBridge->mThreadId, NULL,
+ avisDispatchThread, transportBridge);
+
+ if (0 != rc)
+ {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "wthread_create returned %d", rc);
+ return MAMA_STATUS_SYSTEM_ERROR;
+ }
+
+ return MAMA_STATUS_OK;
+}

mama_status avisTransportBridge_stop(avisTransportBridge* transportBridge)
{
- CHECK_TRANSPORT(transportBridge);
-
- /* stop Avis event loop */
- elvin_remove_close_listener(transportBridge->mAvis, closeListener);
- if (!elvin_invoke_close(transportBridge->mAvis))
- {
- /* there appears to be a race condition in Avis libs where router socket
- * can sometimes be closed before we receive the disconnect reply -- log
- * it, and continue */
- log_avis_error(MAMA_LOG_LEVEL_FINE, transportBridge->mAvis);
- }
-
- while (-1 == wsem_wait(&transportBridge->mAvisDispatchSem))
- {
- if (errno != EINTR) return MAMA_STATUS_SYSTEM_ERROR;
- }
-
- return MAMA_STATUS_OK;
+ CHECK_TRANSPORT(transportBridge);
+
+ if (0 == wInterlocked_read (&transportBridge->mDispatching))
+ {
+ mama_log (MAMA_LOG_LEVEL_WARN, "avisTransportBridge_stop(): "
+ "Avis already stopped");
+ log_avis_error (MAMA_LOG_LEVEL_WARN, transportBridge->mAvis);
+ return MAMA_STATUS_OK;
+ }
+
+ wInterlocked_set (0, &transportBridge->mDispatching);
+
+ /* Dispatch a dummy notification to get the event polling to iterate
+ * another loop and examine the mDispatching state */
+ elvin_invoke (transportBridge->mAvis, &closeNotification, transportBridge);
+
+ while (-1 == wsem_wait(&transportBridge->mAvisDispatchSem))
+ {
+ if (errno != EINTR) return MAMA_STATUS_SYSTEM_ERROR;
+ }
+
+ wthread_join (transportBridge->mThreadId, NULL);
+
+ return MAMA_STATUS_OK;
}

+int
+avisTransportBridge_isDispatching (
+ avisTransportBridge* transportBridge)
+{
+ CHECK_TRANSPORT(transportBridge);
+
+ return wInterlocked_read (&transportBridge->mDispatching);
+}
+
/*=========================================================================
= Functions for the mamaTransport =
=========================================================================*/
@@ -255,14 +309,6 @@ avisBridgeMamaTransport_create (transportBridge* result,
return status;
}

- if (avisBridge->mTransportBridge != NULL)
- {
- mama_log (MAMA_LOG_LEVEL_ERROR,
- "avisBridgeMamaTransport_create(): Avis already connected");
- free(transport);
- return MAMA_STATUS_PLATFORM;
- }
-
/* create the Elvin object */
transport->mAvis = (Elvin*)calloc (1, sizeof (Elvin));
if (transport->mAvis == NULL)
@@ -290,22 +336,23 @@ avisBridgeMamaTransport_create (transportBridge* result,
return MAMA_STATUS_PLATFORM;
}

- avisBridge->mTransportBridge = transport;
- elvin_add_close_listener(transport->mAvis, closeListener, avisBridge);
+ wInterlocked_initialize (&transport->mDispatching);
+
+ elvin_add_close_listener(transport->mAvis, closeListener, transport);
wsem_init(&transport->mAvisDispatchSem, 0, 0);

*result = (transportBridge) transport;

- return MAMA_STATUS_OK;
+ return avisTransportBridge_start(transport);
}

mama_status
avisBridgeMamaTransport_destroy (transportBridge transport)
{
mama_status status;
- avisBridgeImpl* avisBridge = NULL;
- mamaBridgeImpl* bridgeImpl = NULL;
-
+ avisTransportBridge* transportBridge = (avisTransportBridge*) transport;
+ avisBridgeImpl* avisBridge = NULL;
+ mamaBridgeImpl* bridgeImpl = NULL;

bridgeImpl = mamaTransportImpl_getBridgeImpl(
avisTransport(transport)->mTransport);
@@ -327,11 +374,25 @@ avisBridgeMamaTransport_destroy (transportBridge transport)
return status;
}

+ if (1 == wInterlocked_read (&transportBridge->mDispatching))
+ {
+ avisTransportBridge_stop (transportBridge);
+ }
+
+ elvin_remove_close_listener (transportBridge->mAvis, closeListener);
+
+ if (!elvin_close (transportBridge->mAvis))
+ {
+ /* there appears to be a race condition in Avis libs where router socket
+ * can sometimes be closed before we receive the disconnect reply -- log
+ * it, and continue */
+ log_avis_error(MAMA_LOG_LEVEL_FINE, transportBridge->mAvis);
+ }
+
+ wInterlocked_destroy (&transportBridge->mDispatching);
wsem_destroy(&avisTransport(transport)->mAvisDispatchSem);
- elvin_error_free(&avisTransport(transport)->mAvis->error);
free(avisTransport(transport)->mAvis);
free(avisTransport(transport));
- avisBridge->mTransportBridge = NULL;
return MAMA_STATUS_OK;
}

diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.h b/mama/c_cpp/src/c/bridge/avis/transportbridge.h
index 5e9a284..631243b 100755
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.h
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.h
@@ -42,6 +42,7 @@ Elvin* getAvis(mamaTransport transport);
mama_status avisTransportBridge_start(avisTransportBridge* transportBridge);
mama_status avisTransportBridge_stop(avisTransportBridge* transportBridge);

+int avisTransportBridge_isDispatching (avisTransportBridge* transportBridge);

#if defined(__cplusplus)
}
diff --git a/mama/c_cpp/src/c/payload/avismsg/SConscript b/mama/c_cpp/src/c/payload/avismsg/SConscript
index 0811194..2f9b3ac 100644
--- a/mama/c_cpp/src/c/payload/avismsg/SConscript
+++ b/mama/c_cpp/src/c/payload/avismsg/SConscript
@@ -10,7 +10,6 @@ includePath = []
includePath.append('.')
includePath.append('../..')
includePath.append('#common/c_cpp/src/c')
-includePath.append('$avis_home/include')

libPath = []
libPath.append('$avis_home/lib')
diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
index 36d1d18..0664c4b 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.c
@@ -298,8 +298,11 @@ avisMsg_setOpaque(
snprintf (tempName, 63, "%d", fid);
id=tempName;
}
- pArray->items = avis_memdup(value, size);
+
+ memcpy (pArray->items, value, size);
+
attributes_set_opaque(attributes, id, *pArray);
+
return MAMA_STATUS_OK;
}

diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
index 4b8ef54..f83a1cb 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
@@ -618,6 +618,14 @@ mama_status avisValue_getMsg(const Value* pValue, mamaMsg* result);

mama_status avisValue_getFieldAsString(const Value* pValue, const char* name, mama_fid_t fid, char* buf, size_t len);

+mama_status
+avismsgPayload_setAttributes (msgPayload msg,
+ Attributes* attributes);
+
+mama_status
+avismsgPayload_copyAttributes (msgPayload msg,
+ Attributes* attributes);
+
#if defined(__cplusplus)
}
#endif
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index c738868..c61d498 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -35,6 +35,7 @@

#include "avispayload.h"
#include "avismsgimpl.h"
+#include "msgfield.h"
#include "msgfieldimpl.h"
#include "../../bridge/avis/avisdefs.h"
#include "platform.h"
@@ -110,7 +111,7 @@ avismsgPayload_createImpl (mamaPayloadBridge* result, char* identifier)

INITIALIZE_PAYLOAD_BRIDGE (impl, avismsg);

- impl->mClosure = NULL;
+ impl->mClosure = impl;

*result = (mamaPayloadBridge)impl;
*identifier = MAMA_PAYLOAD_AVIS;
@@ -641,6 +642,29 @@ avismsgPayload_getNativeMsg (const msgPayload msg,
return MAMA_STATUS_OK;
}

+mama_status
+avismsgPayload_setAttributes (msgPayload msg,
+ Attributes* attributes)
+{
+ CHECK_PAYLOAD(msg);
+
+ attributes_destroy (avisPayload(msg));
+ avisPayload(msg) = attributes;
+
+ return MAMA_STATUS_OK;
+}
+
+mama_status
+avismsgPayload_copyAttributes (msgPayload msg,
+ Attributes* attributes)
+{
+ CHECK_PAYLOAD(msg);
+
+ attributes_copy (avisPayload(msg), attributes);
+
+ return MAMA_STATUS_OK;
+}
+
/******************************************************************************
* add functions
*******************************************************************************/
@@ -1933,6 +1957,12 @@ avismsgPayloadIter_destroy (msgPayloadIter iter)

if (impl->mMsgIterator)
attributes_iter_destroy(impl->mMsgIterator);
+
+ if (impl->mAvisField)
+ {
+ avismsgFieldPayload_destroy (impl->mAvisField);
+ }
+
free(impl);

return MAMA_STATUS_OK;
--
1.9.0


Damian Maguire <DMaguire@...>
 

Hey Lee, 

Changes with this looked fine, with a few tweaks to a couple of formatting things. Pushed them up to the next branch, commit ID: d398643d7fcd2d2d36907c44e19dcb1b4738ad9c

Cheers, 

Damian

From: Lee Skillen <lskillen@...>
Date: Monday, March 3, 2014 3:08 PM
To: "openmama-dev@..." <openmama-dev@...>
Subject: [Openmama-dev] [PATCH] AVIS: Refactoring avis middleware and payload bridges.
Most of this work was conducted before QPID was fully implemented,
but this might be useful to make Avis a viable alternative (or as a
second reference bridge for testing purposes).

Primary changes :-

- Significant rewrite of how the transport bridge operates and also
  allow more than one transport to run at once (e.g. for separate pub
  dict_pub transports, similar to qpid).

- Add support for inbox-to-inbox functionality.  Details on this to
  follow in a separate patch for Mama.

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of IntercontinentalExchange Group, Inc. (ICE), NYSE Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.


Lee Skillen <lskillen@...>
 

Hey Damian,

Fantastic - Thanks for assisting with integrating it, it isn't an insignificant/small change!

Looking through the issue I can see that the subject construction part isn't included yet, but I've also noticed it's missing a header file commit (sub.h), so I'll add that now.

Great to see some traction on the patches - We're hoping to help address the other outstanding RFCs on the patches next week to help move things along. :-)

Cheers,
Lee


On 25 April 2014 09:59, Damian Maguire <DMaguire@...> wrote:
Hey Lee, 

Changes with this looked fine, with a few tweaks to a couple of formatting things. Pushed them up to the next branch, commit ID: d398643d7fcd2d2d36907c44e19dcb1b4738ad9c

Cheers, 

Damian

From: Lee Skillen <lskillen@...>
Date: Monday, March 3, 2014 3:08 PM
To: "openmama-dev@..." <openmama-dev@...>
Subject: [Openmama-dev] [PATCH] AVIS: Refactoring avis middleware and payload bridges.
Most of this work was conducted before QPID was fully implemented,
but this might be useful to make Avis a viable alternative (or as a
second reference bridge for testing purposes).

Primary changes :-

- Significant rewrite of how the transport bridge operates and also
  allow more than one transport to run at once (e.g. for separate pub
  dict_pub transports, similar to qpid).

- Add support for inbox-to-inbox functionality.  Details on this to
  follow in a separate patch for Mama.

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of IntercontinentalExchange Group, Inc. (ICE), NYSE Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.



--
Lee Skillen

Vulcan Financial Technologies
51 Malone Road, Belfast, BT9 6RY

Office:  +44 (0)28 95 817888
Mobile:  +44 (0)78 41 425152
Web:     www.vulcanft.com 


Damian Maguire <DMaguire@...>
 

No problem Lee, I see the updated patch on BZ81, so I'll give it a shot now. 

Cheers, 

D

From: Lee Skillen <lskillen@...>
Date: Friday, April 25, 2014 10:34 AM
To: Damian Maguire <dmaguire@...>
Cc: "openmama-dev@..." <openmama-dev@...>
Subject: Re: [Openmama-dev] [PATCH] AVIS: Refactoring avis middleware and payload bridges.

Hey Damian,

Fantastic - Thanks for assisting with integrating it, it isn't an insignificant/small change!

Looking through the issue I can see that the subject construction part isn't included yet, but I've also noticed it's missing a header file commit (sub.h), so I'll add that now.

Great to see some traction on the patches - We're hoping to help address the other outstanding RFCs on the patches next week to help move things along. :-)

Cheers,
Lee


On 25 April 2014 09:59, Damian Maguire <DMaguire@...> wrote:
Hey Lee, 

Changes with this looked fine, with a few tweaks to a couple of formatting things. Pushed them up to the next branch, commit ID: d398643d7fcd2d2d36907c44e19dcb1b4738ad9c

Cheers, 

Damian

From: Lee Skillen <lskillen@...>
Date: Monday, March 3, 2014 3:08 PM
To: "openmama-dev@..." <openmama-dev@...>
Subject: [Openmama-dev] [PATCH] AVIS: Refactoring avis middleware and payload bridges.
Most of this work was conducted before QPID was fully implemented,
but this might be useful to make Avis a viable alternative (or as a
second reference bridge for testing purposes).

Primary changes :-

- Significant rewrite of how the transport bridge operates and also
  allow more than one transport to run at once (e.g. for separate pub
  dict_pub transports, similar to qpid).

- Add support for inbox-to-inbox functionality.  Details on this to
  follow in a separate patch for Mama.

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of IntercontinentalExchange Group, Inc. (ICE), NYSE Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.



--
Lee Skillen

Vulcan Financial Technologies
51 Malone Road, Belfast, BT9 6RY

Office:  +44 (0)28 95 817888
Mobile:  +44 (0)78 41 425152
Web:     www.vulcanft.com 

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of IntercontinentalExchange Group, Inc. (ICE), NYSE Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.