[PATCH 1/4] QPID: Added QPID Broker implementation


Frank Quinn <fquinn.ni@...>
 

Broker support has now been added to the qpid proton bridge for OpenMAMA
and example configuration to use it has been added to mama.properties.
Note that to make use of this functionality (using qpidd as an example),
you will need to allow topic patterns matching your provided URI. For
example, for the following example configuration, you could run qpid
with the following command to ensure all these URI topics can be created:

qpidd --topic-patterns MAMA.*

Example configuration is then as follows:

mama.qpid.transport.qpidbroker.type=broker
mama.qpid.transport.qpidbroker.outgoing_url=topic://127.0.0.1/MAMA/%r/%S/%s
mama.qpid.transport.qpidbroker.incoming_url=topic://127.0.0.1/MAMA/%r/%S/%s
mama.qpid.transport.qpidbroker.reply_url=topic://127.0.0.1/MAMA/%u

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/qpid/Makefile.am    |   1 +
 mama/c_cpp/src/c/bridge/qpid/publisher.c    | 177 ++++++++--------
 mama/c_cpp/src/c/bridge/qpid/qpidcommon.c   | 304 ++++++++++++++++++++++++++++
 mama/c_cpp/src/c/bridge/qpid/qpidcommon.h   | 151 ++++++++++++++
 mama/c_cpp/src/c/bridge/qpid/qpiddefs.h     |  21 +-
 mama/c_cpp/src/c/bridge/qpid/subscription.c | 211 +++++++++----------
 mama/c_cpp/src/c/bridge/qpid/subscription.h |  23 ---
 mama/c_cpp/src/c/bridge/qpid/transport.c    | 261 ++++++++++++++++++++++--
 mama/c_cpp/src/c/bridge/qpid/transport.h    |  93 ++++++++-
 mama/c_cpp/src/examples/mama.properties     |  14 ++
 10 files changed, 1006 insertions(+), 250 deletions(-)
 create mode 100644 mama/c_cpp/src/c/bridge/qpid/qpidcommon.c
 create mode 100644 mama/c_cpp/src/c/bridge/qpid/qpidcommon.h

diff --git a/mama/c_cpp/src/c/bridge/qpid/Makefile.am b/mama/c_cpp/src/c/bridge/qpid/Makefile.am
index 53fe5cd..3995816 100644
--- a/mama/c_cpp/src/c/bridge/qpid/Makefile.am
+++ b/mama/c_cpp/src/c/bridge/qpid/Makefile.am
@@ -54,6 +54,7 @@ libmamaqpidimpl_la_SOURCES = \
     timer.c \
     inbox.c \
     codec.c \
+    qpidcommon.c \
     endpointpool.c
 
 
diff --git a/mama/c_cpp/src/c/bridge/qpid/publisher.c b/mama/c_cpp/src/c/bridge/qpid/publisher.c
index 4d32475..98a9a1a 100644
--- a/mama/c_cpp/src/c/bridge/qpid/publisher.c
+++ b/mama/c_cpp/src/c/bridge/qpid/publisher.c
@@ -37,9 +37,9 @@
 #include "msg.h"
 #include "codec.h"
 #include "inbox.h"
-#include "subscription.h"
 #include "publisher.h"
 #include "endpointpool.h"
+#include "qpidcommon.h"
 
 /*=========================================================================
  =                Typedefs, structs, enums and globals                   =
@@ -52,6 +52,7 @@ typedef struct qpidPublisherBridge
     const char*             mSource;
     const char*             mRoot;
     const char*             mSubject;
+    const char*             mUri;
     pn_message_t*           mQpidRawMsg;
     msgBridge               mMamaBridgeMsg;
 } qpidPublisherBridge;
@@ -76,20 +77,6 @@ qpidBridgePublisherImpl_enqueueMessageForAddress (mamaMsg               msg,
                                                   const char*           url,
                                                   qpidPublisherBridge*  impl);
 
-/**
- * When a qpid publisher is created, it calls this function to generate a
- * standard subject key using qpidBridgeMamaSubscriptionImpl_generateSubjectKey
- * with different parameters depending on if it's a market data publisher,
- * basic publisher or a data dictionary publisher.
- *
- * @param msg   The MAMA message to enqueue for sending.
- * @param url   The URL to eneueue the message for sending to.
- * @param impl  The related qpid publisher bridge.
- *
- * @return mama_status indicating whether the method succeeded or failed.
- */
-static mama_status
-qpidBridgeMamaPublisherImpl_buildSendSubject (qpidPublisherBridge* impl);
 
 /*=========================================================================
  =               Public interface implementation functions               =
@@ -105,9 +92,11 @@ qpidBridgeMamaPublisher_createByIndex (publisherBridge*     result,
                                        void*                nativeQueueHandle,
                                        mamaPublisher        parent)
 {
-    qpidPublisherBridge*    impl        = NULL;
-    qpidTransportBridge*    transport   = NULL;
-    mama_status             status      = MAMA_STATUS_OK;
+    qpidPublisherBridge*    impl            = NULL;
+    qpidTransportBridge*    transport       = NULL;
+    mama_status             status          = MAMA_STATUS_OK;
+    const char*             outgoingAddress = NULL;
+    const char*             uuid            = NULL;
 
     if (NULL == result
             || NULL == tport
@@ -149,23 +138,50 @@ qpidBridgeMamaPublisher_createByIndex (publisherBridge*     result,
         return MAMA_STATUS_NOMEM;
     }
 
-    if (NULL != topic)
-    {
-        impl->mTopic = topic;
-    }
+    /* Get the outgoing address format string */
+    outgoingAddress = qpidBridgeMamaTransportImpl_getOutgoingAddress (
+                            (transportBridge) impl->mTransport);
 
-    if (NULL != source)
+    /* Get the transport UUID which is unique to the transport in case we need
+     * it during format string expansion */
+    uuid = qpidBridgeMamaTransportImpl_getUuid (
+                 (transportBridge) impl->mTransport);
+
+    /* Collapse subject key to single string based on supplied values
+     *
+     * _MD requests do not use the topic on the wire as the responder may not
+     * necessarily be listening for requests on that topic until the first
+     * request comes in. */
+    if (NULL != root && 0 == strcmp (root, MAMA_ROOT_MARKET_DATA))
     {
-        impl->mSource = source;
+        qpidBridgeCommon_generateSubjectKey (root,
+                                             source,
+                                             NULL,
+                                             &impl->mSubject);
     }
-
-    if (NULL != root)
+    else
     {
-        impl->mRoot = root;
+        qpidBridgeCommon_generateSubjectKey (root,
+                                             source,
+                                             topic,
+                                             &impl->mSubject);
     }
 
-    /* Generate a topic name based on the publisher details */
-    status = qpidBridgeMamaPublisherImpl_buildSendSubject (impl);
+    /* Parse the collapsed string to extract the standardized values */
+    qpidBridgeCommon_parseSubjectKey (impl->mSubject,
+                                      &impl->mRoot,
+                                      &impl->mSource,
+                                      &impl->mTopic,
+                                      (transportBridge) impl->mTransport);
+
+
+    /* Generate subject URI based on standardized values */
+    qpidBridgeCommon_generateSubjectUri (outgoingAddress,
+                                         impl->mRoot,
+                                         impl->mSource,
+                                         impl->mTopic,
+                                         uuid,
+                                         &impl->mUri);
 
     /* Create a reusable proton message */
     impl->mQpidRawMsg = pn_message ();
@@ -212,6 +228,22 @@ qpidBridgeMamaPublisher_destroy (publisherBridge publisher)
     {
         free ((void*) impl->mSubject);
     }
+    if (NULL != impl->mUri)
+    {
+        free ((void*) impl->mUri);
+    }
+    if (NULL != impl->mRoot)
+    {
+        free ((void*) impl->mRoot);
+    }
+    if (NULL != impl->mSource)
+    {
+        free ((void*) impl->mSource);
+    }
+    if (NULL != impl->mTopic)
+    {
+        free ((void*) impl->mTopic);
+    }
     if (NULL != impl->mMamaBridgeMsg)
     {
         qpidBridgeMamaMsg_destroy (impl->mMamaBridgeMsg, 0);
@@ -258,7 +290,7 @@ qpidBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg)
         /* Use the publisher's default send destination for request */
         qpidBridgePublisherImpl_enqueueMessageForAddress (
                 msg,
-                (char*) impl->mTransport->mOutgoingAddress,
+                (char*) impl->mUri,
                 impl);
         break;
     case QPID_MSG_INBOX_RESPONSE:
@@ -274,25 +306,35 @@ qpidBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg)
                                           impl->mSubject,
                                           impl->mSource);
 
-        /* For each known downstream destination */
-        status = endpointPool_getRegistered (impl->mTransport->mPubEndpoints,
-                                             impl->mSubject,
-                                             &targets,
-                                             &targetCount);
-
-        if (targetCount == 0)
+        if (QPID_TRANSPORT_TYPE_P2P ==
+                qpidBridgeMamaTransportImpl_getType (
+                        (transportBridge) impl->mTransport))
         {
-            mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaPublisher_send(): "
-                      "No one subscribed to subject '%s', not publishing.",
-                      impl->mSubject);
-            return MAMA_STATUS_OK;
+            /* For each known downstream destination */
+            status = endpointPool_getRegistered (impl->mTransport->mPubEndpoints,
+                                                 impl->mSubject,
+                                                 &targets,
+                                                 &targetCount);
+
+            if (targetCount == 0)
+            {
+                mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaPublisher_send(): "
+                          "No one subscribed to subject '%s', not publishing.",
+                          impl->mSubject);
+                return MAMA_STATUS_OK;
+            }
+
+            /* Push the message out to the send queue for each interested party */
+            for (targetInc = 0; targetInc < targetCount; targetInc++)
+            {
+                url = (char*) targets[targetInc];
+                qpidBridgePublisherImpl_enqueueMessageForAddress (msg, url, impl);
+            }
         }
-
-        /* Push the message out to the send queue for each interested party */
-        for (targetInc = 0; targetInc < targetCount; targetInc++)
+        else
         {
-            url = (char*) targets[targetInc];
-            qpidBridgePublisherImpl_enqueueMessageForAddress (msg, url, impl);
+            qpidBridgePublisherImpl_enqueueMessageForAddress (msg, impl->mUri,
+                    impl);
         }
         break;
     }
@@ -643,46 +685,3 @@ qpidBridgePublisherImpl_enqueueMessageForAddress (mamaMsg               msg,
     }
 
 }
-
-mama_status
-qpidBridgeMamaPublisherImpl_buildSendSubject (qpidPublisherBridge* impl)
-{
-    char* keyTarget = NULL;
-
-    /* If this is a special _MD publisher, lose the topic unless dictionary */
-    if (impl->mRoot != NULL)
-    {
-        /*
-         * May use strlen here to increase speed but would need to test to
-         * verify this is the only circumstance in which we want to consider the
-         * topic when a root is specified.
-         */
-        if (strcmp (impl->mRoot, "_MDDD") == 0)
-        {
-            qpidBridgeMamaSubscriptionImpl_generateSubjectKey (impl->mRoot,
-                                                               impl->mSource,
-                                                               impl->mTopic,
-                                                               &keyTarget);
-        }
-        else
-        {
-            qpidBridgeMamaSubscriptionImpl_generateSubjectKey (impl->mRoot,
-                                                               impl->mSource,
-                                                               NULL,
-                                                               &keyTarget);
-        }
-    }
-    /* If this isn't a special _MD publisher */
-    else
-    {
-        qpidBridgeMamaSubscriptionImpl_generateSubjectKey (NULL,
-                                                           impl->mSource,
-                                                           impl->mTopic,
-                                                           &keyTarget);
-    }
-
-    /* Set the subject for publishing here */
-    impl->mSubject = keyTarget;
-
-    return MAMA_STATUS_OK;
-}
diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c
new file mode 100644
index 0000000..f55f63a
--- /dev/null
+++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c
@@ -0,0 +1,304 @@
+/* $Id$
+ *
+ * OpenMAMA: The open middleware agnostic messaging API
+ * Copyright (C) 2011 NYSE Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "qpiddefs.h"
+#include "qpidcommon.h"
+#include "transport.h"
+
+/*=========================================================================
+  =                  Public implementation functions                      =
+  =========================================================================*/
+
+mama_status
+qpidBridgeCommon_parseSubjectKey (char*             key,
+                                  const char**      root,
+                                  const char**      source,
+                                  const char**      topic,
+                                  transportBridge   transport)
+{
+    /* We can assume subject key is of this length */
+    char subject[MAX_SUBJECT_LENGTH];
+
+    char* subjectPosition = subject;
+
+    /* Reset return values */
+    *root   = NULL;
+    *source = NULL;
+    *topic  = NULL;
+
+    if (NULL == key || 0 == strlen(key))
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Copy the key across to writable memory */
+    strcpy (subject, key);
+
+    subjectPosition = strtok (subjectPosition, ".");
+
+    if (0 == strcmp (MAMA_ROOT_MARKET_DATA, subjectPosition) ||
+            0 == strcmp (MAMA_ROOT_MARKET_DATA_DICT, subjectPosition))
+    {
+        *root = strdup (subjectPosition);
+
+        /* This should NULL terminate the end of the source string */
+        subjectPosition = strtok (NULL, ".");
+        if (NULL != subjectPosition)
+        {
+            uint32_t parsedBytes = 0;
+
+            *source = strdup (subjectPosition);
+
+            qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace (
+                    transport,
+                    *source);
+
+            /* String lengths plus NULL characters */
+            parsedBytes = strlen (*root) + 1 + strlen (*source) + 1;
+
+            /* Only parse topic if it exists... */
+            if (parsedBytes < strlen (key))
+            {
+                /* The topic is then whatever is left after source NULL character */
+                *topic = strdup (subject + parsedBytes);
+            }
+        }
+    }
+    /* Might have a symbol namespace... might not... */
+    else
+    {
+        char* firstSubjectPart = subjectPosition;
+
+        /* If a MAMA symbol namespace is involved, there should always be
+         * another part */
+        char* secondSubjectPart = strtok (NULL, ".");
+
+        /* If there is no second part, this looks like a basic topic. Also if
+         * this is a symbol namespace we have never seen before, assume it's
+         * basic. */
+        if (NULL == secondSubjectPart)
+        {
+            *topic = strdup (firstSubjectPart);
+        }
+        /* There is a second part, but we are unconvinced unless the transport
+         * has seen this source somewhere before */
+        else if (qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace (
+                                transport,
+                                firstSubjectPart))
+        {
+            *source = strdup (firstSubjectPart);
+            /* Use offsets from provided strings rather than the one strtok
+             * may have just modified with NULL characters for the topic */
+            *topic  = strdup (key + strlen (firstSubjectPart) + 1);
+        }
+        /* If this is a subject without a symbol namespace, just use the
+         * original string rather than try and revert strtok's trail of
+         * destruction */
+        else
+        {
+            *topic = strdup (key);
+        }
+    }
+    return MAMA_STATUS_OK;
+}
+
+
+/*
+ * Internal function to ensure that the topic names are always calculated
+ * in a particular way
+ */
+mama_status
+qpidBridgeCommon_generateSubjectKey (const char*  root,
+                                     const char*  source,
+                                     const char*  topic,
+                                     char**       keyTarget)
+{
+    char        subject[MAX_SUBJECT_LENGTH];
+    char*       subjectPos     = subject;
+    size_t      bytesRemaining = MAX_SUBJECT_LENGTH;
+    size_t      written        = 0;
+
+    if (NULL != root)
+    {
+        mama_log (MAMA_LOG_LEVEL_FINEST,
+                  "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): R.");
+        written         = snprintf (subjectPos, bytesRemaining, "%s", root);
+        subjectPos     += written;
+        bytesRemaining -= written;
+    }
+
+    if (NULL != source)
+    {
+        mama_log (MAMA_LOG_LEVEL_FINEST,
+                  "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): S.");
+        /* If these are not the first bytes, prepend with a period */
+        if(subjectPos != subject)
+        {
+            written     = snprintf (subjectPos, bytesRemaining, ".%s", source);
+        }
+        else
+        {
+            written     = snprintf (subjectPos, bytesRemaining, "%s", source);
+        }
+        subjectPos     += written;
+        bytesRemaining -= written;
+    }
+
+    if (NULL != topic)
+    {
+        mama_log (MAMA_LOG_LEVEL_FINEST,
+                  "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): T.");
+        /* If these are not the first bytes, prepend with a period */
+        if (subjectPos != subject)
+        {
+            snprintf (subjectPos, bytesRemaining, ".%s", topic);
+        }
+        else
+        {
+            snprintf (subjectPos, bytesRemaining, "%s", topic);
+        }
+    }
+
+    /*
+     * Allocate the memory for copying the string. Caller is responsible for
+     * destroying.
+     */
+    *keyTarget = strdup (subject);
+    if (NULL == *keyTarget)
+    {
+        return MAMA_STATUS_NOMEM;
+    }
+    else
+    {
+        return MAMA_STATUS_OK;
+    }
+}
+
+/*
+ * Internal function to ensure that the topic names are always calculated
+ * in a particular way
+ */
+mama_status
+qpidBridgeCommon_generateSubjectUri (const char*  format,
+                                     const char*  root,
+                                     const char*  source,
+                                     const char*  topic,
+                                     const char*  uuid,
+                                     const char** uriTarget)
+{
+    char    original[MAX_URI_LENGTH];
+    char    uri[MAX_URI_LENGTH];
+    char    lastByte;
+    char*   uriPos              = uri;
+    char*   nextSeg             = NULL;
+    size_t  bytesRemaining      = MAX_URI_LENGTH;
+    size_t  written             = 0;
+
+    strncpy (original, format, MAX_URI_LENGTH);
+
+    nextSeg = strtok ((char*) original, "%");
+
+    while (nextSeg != NULL)
+    {
+        written = snprintf (uriPos, bytesRemaining, "%s", nextSeg);
+        bytesRemaining -= written;
+        uriPos += written;
+
+        nextSeg = strtok (NULL, "%");
+        if (nextSeg != NULL)
+        {
+            switch (*nextSeg)
+            {
+                case 'r':
+                    if (NULL != root)
+                    {
+                        written = snprintf (uriPos, bytesRemaining, "%s", root);
+                        bytesRemaining -= written;
+                        uriPos += written;
+                    }
+                    nextSeg++;
+                    break;
+                case 'S':
+                    if (NULL != source)
+                    {
+                        written = snprintf (uriPos, bytesRemaining, "%s", source);
+                        bytesRemaining -= written;
+                        uriPos += written;
+                    }
+                    nextSeg++;
+                    break;
+                case 's':
+                    if (NULL != topic)
+                    {
+                        written = snprintf (uriPos, bytesRemaining, "%s", topic);
+                        bytesRemaining -= written;
+                        uriPos += written;
+                    }
+                    nextSeg++;
+                    break;
+                case 'u':
+                    if (NULL != uuid)
+                    {
+                        written = snprintf (uriPos, bytesRemaining, "%s", uuid);
+                        bytesRemaining -= written;
+                        uriPos += written;
+                    }
+                    nextSeg++;
+                    break;
+                default: /* do nothing -just go on copying in next iteration */
+                    break;
+            }
+            /* This will ensure there are no double forward slashes */
+            if (NULL != nextSeg && '/' == *nextSeg && '/' == uri[strlen(uri) - 1])
+            {
+                nextSeg++;
+            }
+        }
+    }
+
+    /* Trim trailing forward slashes */
+    if ('/' == uri[strlen(uri) - 1])
+    {
+        uri[strlen(uri) - 1] = '\0';
+    }
+
+    /*
+     * Allocate the memory for copying the string. Caller is responsible for
+     * destroying.
+     */
+    *uriTarget = strdup (uri);
+    if (NULL == *uriTarget)
+    {
+        return MAMA_STATUS_NOMEM;
+    }
+    else
+    {
+        return MAMA_STATUS_OK;
+    }
+}
diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h
new file mode 100644
index 0000000..6cb70a7
--- /dev/null
+++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h
@@ -0,0 +1,151 @@
+/* $Id$
+ *
+ * OpenMAMA: The open middleware agnostic messaging API
+ * Copyright (C) 2011 NYSE Technologies, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#ifndef MAMA_BRIDGE_QPID_COMMON_H__
+#define MAMA_BRIDGE_QPID_COMMON_H__
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <mama/mama.h>
+#include <proton/message.h>
+#include <proton/error.h>
+
+#include "payloadbridge.h"
+#include "msgfieldimpl.h"
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+/*=========================================================================
+  =                              Macros                                   =
+  =========================================================================*/
+
+#define MAMA_ROOT_MARKET_DATA      "_MD"
+#define MAMA_ROOT_MARKET_DATA_DICT "_MDDD"
+
+
+/*=========================================================================
+  =                  Public implementation functions                      =
+  =========================================================================*/
+
+
+/**
+ * This function will generate a string which is unique to the root, source
+ * and topic provided. Centralization of this function means that it can be used
+ * in both the publisher and the subscriber in order to generate a consistent
+ * topic for use throughout the platform. It will ensure that if any provided
+ * parameter is NULL, that it will be ignored in the expansion (e.g. providing
+ * root=NULL, source=EXCHANGENAME.ISIN.CURRENCY and topic=NULL will result in
+ * a new keyTarget="EXCHANGENAME.ISIN.CURRENCY" as opposed to
+ * ".EXCHANGENAME.ISIN.CURRENCY."
+ *
+ * @param root      Prefix to associate with the subject (e.g. _MDDD).
+ * @param source    Source to base the subject key on (e.g. EXCHANGENAME).
+ * @param topic     Topic to base the subject key on (e.g. ISIN.CURRENCY).
+ * @param keyTarget Pointer to populate with the generated subject key. The
+ *                  caller is then responsible for freeing this new strdup'd
+ *                  string.
+ *
+ * @return mama_status indicating whether the method succeeded or failed.
+ */
+mama_status
+qpidBridgeCommon_generateSubjectKey (const char*  root,
+                                     const char*  source,
+                                     const char*  topic,
+                                     char**       keyTarget);
+
+/**
+ * This function will take the provided format string and use the root, source,
+ * topic and uuid provided to expand the format string into a newly allocated
+ * string which the caller is responsible for freeing. The format string uses
+ * the following tokens:
+ *
+ * %r : Root (only for market data requests, otherwise blank). e.g. _MD
+ * %S : MAMA Source / Symbol Namespace. e.g. OPENMAMA
+ * %s : Symbol / Topic. e.g. MSFT
+ * %u : uuid. e.g. 4542dc20-f1ae-11e3-ac10-0800200c9a66
+ *
+ * @param format    The format string to expand.
+ * @param root      Prefix to associate with the subject (e.g. _MDDD).
+ * @param source    Source to base the subject key on (e.g. EXCHANGENAME).
+ * @param topic     Topic to base the subject key on (e.g. ISIN.CURRENCY).
+ * @param keyTarget Pointer to populate with the generated subject key. The
+ *                  caller is then responsible for freeing this new strdup'd
+ *                  string.
+ *
+ * @return mama_status indicating whether the method succeeded or failed.
+ */
+mama_status
+qpidBridgeCommon_generateSubjectUri (const char*  format,
+                                     const char*  root,
+                                     const char*  source,
+                                     const char*  topic,
+                                     const char*  uuid,
+                                     const char** uriTarget);
+
+/**
+ * This function will parse a standard period delimited string as generated via
+ * qpidBridgeCommon_generateSubjectKey and split it up into its individual
+ * components. This allows us to accurately split up these components, even
+ * when the arguments supplied to qpidBridgeCommon_generateSubjectKey aren't
+ * quite in the format expected (e.g. if qpidBridgeCommon_generateSubjectKey
+ * is provided with root=NULL, source=NULL and
+ * topic=_MD.EXCHANGENAME.ISIN.CURRENCY, this will then be collapsed in
+ * qpidBridgeCommon_generateSubjectKey to create
+ * subjectKey=_MD.EXCHANGENAME.ISIN.CURRENCY. If that string is then provided
+ * to this function, it will parse the subjectKey and extract the *real* values
+ * for root=_MD, source=EXCHANGENAME and topic=ISIN.CURRENCY).
+ *
+ * @param key       Subject key to parse (e.g. _MD.EXCHANGENAME.ISIN.CURRENCY).
+ * @param root      Pointer to populate with the parsed root (e.g. _MD). Caller
+ *                  is then responsible for the memory associated with this
+ *                  string.
+ * @param source    Pointer to populate with the parsed source (e.g. _MD).
+ *                  Caller is then responsible for the memory associated with
+ *                  this string.
+ * @param topic     Pointer to populate with the parsed topic (e.g. _MD). Caller
+ *                  is then responsible for the memory associated with this
+ *                  string.
+ * @param transport Transport bridge on which this topic is being created. This
+ *                  is required to help determine whether or not the key
+ *                  contains a known symbol namespace.
+ *
+ * @return mama_status indicating whether the method succeeded or failed.
+ */
+mama_status
+qpidBridgeCommon_parseSubjectKey (char*             key,
+                                  const char**      root,
+                                  const char**      source,
+                                  const char**      topic,
+                                  transportBridge   transport);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* MAMA_BRIDGE_QPID_COMMON_H__ */
diff --git a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h
index 6c1adf4..3e3b2e5 100644
--- a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h
+++ b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h
@@ -56,9 +56,11 @@ extern "C" {
 
 /* Maximum topic length */
 #define     MAX_SUBJECT_LENGTH              256
+#define     MAX_URI_LENGTH                  1024
 
 /* Default timeout for send working threads */
 #define     QPID_MESSENGER_SEND_TIMEOUT     -1
+#define     QPID_MESSENGER_TIMEOUT          1 /* milliseconds */
 
 /* Message types */
 typedef enum qpidMsgType_
@@ -70,6 +72,13 @@ typedef enum qpidMsgType_
     QPID_MSG_TERMINATE      =               0xff
 } qpidMsgType;
 
+/* Message types */
+typedef enum qpidTransportType_
+{
+    QPID_TRANSPORT_TYPE_P2P,
+    QPID_TRANSPORT_TYPE_BROKER
+} qpidTransportType;
+
 /* If a proton version header has been parsed at this point (version >= 0.5) */
 #ifdef _PROTON_VERSION_H
 
@@ -118,9 +127,12 @@ typedef struct qpidSubscription_
     mamaSubscription    mMamaSubscription;
     mamaQueue           mMamaQueue;
     void*               mQpidQueue;
-    mamaTransport       mTransport;
-    const char*         mSymbol;
-    char*               mSubjectKey;
+    transportBridge     mTransport;
+    const char*         mSource;
+    const char*         mTopic;
+    const char*         mRoot;
+    char*               mSubject;
+    const char*         mUri;
     void*               mClosure;
     int                 mIsNotMuted;
     int                 mIsValid;
@@ -143,11 +155,14 @@ typedef struct qpidTransportBridge_
     const char*         mOutgoingAddress;
     const char*         mReplyAddress;
     const char*         mName;
+    const char*         mUuid;
     wthread_t           mQpidDispatchThread;
     int                 mIsDispatching;
     mama_status         mQpidDispatchStatus;
     endpointPool_t      mSubEndpoints;
     endpointPool_t      mPubEndpoints;
+    qpidTransportType   mQpidTransportType;
+    wtable_t            mKnownSources;
 } qpidTransportBridge;
 
 struct qpidMsgNode_
diff --git a/mama/c_cpp/src/c/bridge/qpid/subscription.c b/mama/c_cpp/src/c/bridge/qpid/subscription.c
index bdef796..2582a5e 100644
--- a/mama/c_cpp/src/c/bridge/qpid/subscription.c
+++ b/mama/c_cpp/src/c/bridge/qpid/subscription.c
@@ -33,8 +33,8 @@
 #include <wombat/queue.h>
 #include "qpidbridgefunctions.h"
 #include "transport.h"
+#include "qpidcommon.h"
 #include "qpiddefs.h"
-#include "subscription.h"
 #include "publisher.h"
 #include "endpointpool.h"
 
@@ -57,6 +57,8 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber,
     qpidTransportBridge*    transport   = NULL;
     mama_status             status      = MAMA_STATUS_OK;
     pn_data_t*              data        = NULL;
+    const char*             uuid        = NULL;
+    const char*             outgoingAddress = NULL;
 
     if ( NULL == subscriber || NULL == subscription || NULL == tport )
     {
@@ -75,6 +77,11 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber,
         return MAMA_STATUS_NULL_ARG;
     }
 
+    outgoingAddress = qpidBridgeMamaTransportImpl_getOutgoingAddress (
+                        (transportBridge) transport);
+    uuid = qpidBridgeMamaTransportImpl_getUuid (
+                        (transportBridge) transport);
+
     /* Allocate memory for qpid subscription implementation */
     impl = (qpidSubscription*) calloc (1, sizeof (qpidSubscription));
     if (NULL == impl)
@@ -86,58 +93,89 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber,
     impl->mMamaCallback        = callback;
     impl->mMamaSubscription    = subscription;
     impl->mMamaQueue           = queue;
-    impl->mTransport           = (mamaTransport)transport;
-    impl->mSymbol              = symbol;
+    impl->mTransport           = (transportBridge) transport;
     impl->mClosure             = closure;
     impl->mIsNotMuted          = 1;
     impl->mIsTportDisconnected = 1;
-    impl->mSubjectKey          = NULL;
+    impl->mSubject             = NULL;
 
     /* Use a standard centralized method to determine a topic key */
-    qpidBridgeMamaSubscriptionImpl_generateSubjectKey (NULL,
-                                                       source,
-                                                       symbol,
-                                                       &impl->mSubjectKey);
+    qpidBridgeCommon_generateSubjectKey (NULL,
+                                         source,
+                                         symbol,
+                                         &impl->mSubject);
+
+    /* Parse the collapsed string to extract the standardized values */
+    qpidBridgeCommon_parseSubjectKey (impl->mSubject,
+                                      &impl->mRoot,
+                                      &impl->mSource,
+                                      &impl->mTopic,
+                                      impl->mTransport);
+
+    /* Generate subject URI based on standardized values */
+    qpidBridgeCommon_generateSubjectUri (outgoingAddress,
+                                         impl->mRoot,
+                                         impl->mSource,
+                                         impl->mTopic,
+                                         uuid,
+                                         &impl->mUri);
 
     /* Register the endpoint */
     endpointPool_registerWithoutIdentifier (transport->mSubEndpoints,
-                                            impl->mSubjectKey,
+                                            impl->mSubject,
                                             &impl->mEndpointIdentifier,
                                             impl);
 
-    /* Notify the publisher that you have an interest in this topic */
-    pn_message_clear        (transport->mMsg);
+    if (QPID_TRANSPORT_TYPE_P2P ==
+                qpidBridgeMamaTransportImpl_getType (
+                        impl->mTransport))
+    {
+        /* Notify the publisher that you have an interest in this topic */
+        pn_message_clear        (transport->mMsg);
 
-    /* Set the message meta data to reflect a subscription request */
-    qpidBridgePublisherImpl_setMessageType (transport->mMsg,
-                                            QPID_MSG_SUB_REQUEST);
+        /* Set the message meta data to reflect a subscription request */
+        qpidBridgePublisherImpl_setMessageType (transport->mMsg,
+                                                QPID_MSG_SUB_REQUEST);
 
-    /* Set the outgoing address as provided by the transport configuration */
-    pn_message_set_address  (transport->mMsg,
-                             transport->mOutgoingAddress);
+        /* Set the outgoing address as provided by the transport configuration */
+        pn_message_set_address  (transport->mMsg,
+                                 transport->mOutgoingAddress);
 
-    /* Set the reply address */
-    pn_message_set_reply_to (transport->mMsg,
-                             transport->mReplyAddress);
+        /* Set the reply address */
+        pn_message_set_reply_to (transport->mMsg,
+                                 transport->mReplyAddress);
 
-    /* Get the proton message's body data for writing */
-    data = pn_message_body  (transport->mMsg);
+        /* Get the proton message's body data for writing */
+        data = pn_message_body  (transport->mMsg);
 
-    /* Add in the subject key as the only string inside */
-    pn_data_put_string      (data, pn_bytes (strlen (impl->mSubjectKey),
-                                             impl->mSubjectKey));
+        /* Add in the subject key as the only string inside */
+        pn_data_put_string      (data, pn_bytes (strlen (impl->mSubject),
+                                                 impl->mSubject));
 
-    /* Send out the subscription registration of interest message */
-    if (NULL != transport->mOutgoingAddress)
+        /* Send out the subscription registration of interest message */
+        if (NULL != transport->mOutgoingAddress)
+        {
+            pn_messenger_put    (transport->mOutgoing, transport->mMsg);
+
+            if (0 != PN_MESSENGER_SEND (transport->mOutgoing))
+            {
+                const char* qpid_error = PN_MESSENGER_ERROR (transport->mOutgoing);
+                mama_log (MAMA_LOG_LEVEL_SEVERE,
+                          "qpidBridgeMamaSubscription_create(): "
+                          "pn_messenger_send Error:[%s]", qpid_error);
+                return MAMA_STATUS_PLATFORM;
+            }
+        }
+    }
+    else
     {
-        pn_messenger_put    (transport->mOutgoing, transport->mMsg);
-
-        if (0 != PN_MESSENGER_SEND (transport->mOutgoing))
+        if (pn_messenger_subscribe (transport->mIncoming,
+                                    impl->mUri) <= 0)
         {
-            const char* qpid_error = PN_MESSENGER_ERROR (transport->mOutgoing);
-            mama_log (MAMA_LOG_LEVEL_SEVERE,
-                      "qpidBridgeMamaSubscription_create(): "
-                      "pn_messenger_send Error:[%s]", qpid_error);
+            mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaSubscription_create(): "
+                      "Error Subscribing to %s : %s",
+                      impl->mUri,
+                      PN_MESSENGER_ERROR(transport->mIncoming));
             return MAMA_STATUS_PLATFORM;
         }
     }
@@ -145,7 +183,7 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber,
     mama_log (MAMA_LOG_LEVEL_FINEST,
               "qpidBridgeMamaSubscription_create(): "
               "created interest for %s.",
-              impl->mSubjectKey);
+              impl->mSubject);
 
     /* Mark this subscription as valid */
     impl->mIsValid = 1;
@@ -205,10 +243,10 @@ qpidBridgeMamaSubscription_destroy (subscriptionBridge subscriber)
 
     /* Remove the subscription from the transport's subscription pool. */
     if (NULL != transportBridge && NULL != transportBridge->mSubEndpoints
-        && NULL != impl->mSubjectKey)
+        && NULL != impl->mSubject)
     {
         endpointPool_unregister (transportBridge->mSubEndpoints,
-                                 impl->mSubjectKey,
+                                 impl->mSubject,
                                  impl->mEndpointIdentifier);
     }
 
@@ -217,9 +255,29 @@ qpidBridgeMamaSubscription_destroy (subscriptionBridge subscriber)
         pn_message_free (impl->mMsg);
     }
 
-    if (NULL != impl->mSubjectKey)
+    if (NULL != impl->mSubject)
     {
-        free (impl->mSubjectKey);
+        free (impl->mSubject);
+    }
+
+    if (NULL != impl->mRoot)
+    {
+        free ((void*)impl->mRoot);
+    }
+
+    if (NULL != impl->mSource)
+    {
+        free ((void*)impl->mSource);
+    }
+
+    if (NULL != impl->mTopic)
+    {
+        free ((void*)impl->mTopic);
+    }
+
+    if (NULL != impl->mUri)
+    {
+        free ((void*)impl->mUri);
     }
 
     if (NULL != impl->mEndpointIdentifier)
@@ -289,78 +347,3 @@ qpidBridgeMamaSubscription_muteCurrentTopic (subscriptionBridge subscriber)
     return qpidBridgeMamaSubscription_mute (subscriber);
 }
 
-
-/*=========================================================================
-  =                  Public implementation functions                      =
-  =========================================================================*/
-
-/*
- * Internal function to ensure that the topic names are always calculated
- * in a particular way
- */
-mama_status
-qpidBridgeMamaSubscriptionImpl_generateSubjectKey (const char*  root,
-                                                   const char*  source,
-                                                   const char*  topic,
-                                                   char**       keyTarget)
-{
-    char        subject[MAX_SUBJECT_LENGTH];
-    char*       subjectPos     = subject;
-    size_t      bytesRemaining = MAX_SUBJECT_LENGTH;
-    size_t      written        = 0;
-
-    if (NULL != root)
-    {
-        mama_log (MAMA_LOG_LEVEL_FINEST,
-                  "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): R.");
-        written         = snprintf (subjectPos, bytesRemaining, "%s", root);
-        subjectPos     += written;
-        bytesRemaining -= written;
-    }
-
-    if (NULL != source)
-    {
-        mama_log (MAMA_LOG_LEVEL_FINEST,
-                  "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): S.");
-        /* If these are not the first bytes, prepend with a period */
-        if(subjectPos != subject)
-        {
-            written     = snprintf (subjectPos, bytesRemaining, ".%s", source);
-        }
-        else
-        {
-            written     = snprintf (subjectPos, bytesRemaining, "%s", source);
-        }
-        subjectPos     += written;
-        bytesRemaining -= written;
-    }
-
-    if (NULL != topic)
-    {
-        mama_log (MAMA_LOG_LEVEL_FINEST,
-                  "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): T.");
-        /* If these are not the first bytes, prepend with a period */
-        if (subjectPos != subject)
-        {
-            snprintf (subjectPos, bytesRemaining, ".%s", topic);
-        }
-        else
-        {
-            snprintf (subjectPos, bytesRemaining, "%s", topic);
-        }
-    }
-
-    /*
-     * Allocate the memory for copying the string. Caller is responsible for
-     * destroying.
-     */
-    *keyTarget = strdup (subject);
-    if (NULL == *keyTarget)
-    {
-        return MAMA_STATUS_NOMEM;
-    }
-    else
-    {
-        return MAMA_STATUS_OK;
-    }
-}
diff --git a/mama/c_cpp/src/c/bridge/qpid/subscription.h b/mama/c_cpp/src/c/bridge/qpid/subscription.h
index ea1aceb..992a495 100644
--- a/mama/c_cpp/src/c/bridge/qpid/subscription.h
+++ b/mama/c_cpp/src/c/bridge/qpid/subscription.h
@@ -26,29 +26,6 @@
 extern "C" {
 #endif
 
-/*=========================================================================
-  =                  Public implementation functions                      =
-  =========================================================================*/
-
-/**
- * This function will generate a string which is unique to the root, source
- * and topic provided. Centralization of this function means that it can be used
- * in both the publisher and the subscriber in order to generate a consistent
- * topic for use throughout the platform.
- *
- * @param root      Prefix to associate with the subject (e.g. _MDDD)
- * @param inbox     Source to base the subject key on (e.g. EXCHANGENAME).
- * @param topic     Topic to base the subject key on (e.g. ISIN.CURRENCY).
- * @param keyTarget Pointer to populate with the generated subject key.
- *
- * @return mama_status indicating whether the method succeeded or failed.
- */
-mama_status
-qpidBridgeMamaSubscriptionImpl_generateSubjectKey (const char*  root,
-                                                   const char*  source,
-                                                   const char*  topic,
-                                                   char**       keyTarget);
-
 #if defined(__cplusplus)
 }
 #endif
diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.c b/mama/c_cpp/src/c/bridge/qpid/transport.c
index ba21810..2b2053b 100644
--- a/mama/c_cpp/src/c/bridge/qpid/transport.c
+++ b/mama/c_cpp/src/c/bridge/qpid/transport.c
@@ -37,11 +37,13 @@
 #include <timers.h>
 #include <stdio.h>
 #include <wombat/queue.h>
+#include <wombat/wUuid.h>
 
 #include "transport.h"
 #include "publisher.h"
 #include "qpidbridgefunctions.h"
 #include "qpiddefs.h"
+#include "qpidcommon.h"
 #include "msg.h"
 #include "codec.h"
 #include "endpointpool.h"
@@ -58,11 +60,14 @@
 #define     TPORT_PARAM_SUB_POOL_SIZE       "msg_pool_size"
 #define     TPORT_PARAM_SUB_POOL_INC_SIZE   "msg_pool_inc_size"
 #define     TPORT_PARAM_RECV_BLOCK_SIZE     "recv_block_size"
+#define     TPORT_PARAM_TPORT_TYPE          "type"
 
 /* Default values for corresponding configuration parameters */
 #define     DEFAULT_OUTGOING_URL            "amqp://127.0.0.1:7777"
 #define     DEFAULT_INCOMING_URL            "amqp://~127.0.0.1:6666"
 #define     DEFAULT_REPLY_URL               "amqp://127.0.0.1:6666"
+#define     DEFAULT_TPORT_TYPE              "p2p"
+#define     DEFAULT_TPORT_TYPE_VALUE        QPID_TRANSPORT_TYPE_P2P
 #define     DEFAULT_SUB_POOL_SIZE           128
 #define     DEFAULT_SUB_POOL_INC_SIZE       128
 #define     DEFAULT_RECV_BLOCK_SIZE         10
@@ -76,6 +81,10 @@
 #define     MAX_SUB_POOL_INC_SIZE           2000L
 #define     MIN_RECV_BLOCK_SIZE             -1L
 #define     MAX_RECV_BLOCK_SIZE             100L
+#define     CONFIG_VALUE_TPORT_TYPE_BROKER  "broker"
+#define     CONFIG_VALUE_TPORT_TYPE_P2P     "p2p"
+#define     UUID_STRING_BUF_SIZE            37
+#define     KNOWN_SOURCES_WTABLE_SIZE       10
 
 
 /*=========================================================================
@@ -345,6 +354,21 @@ qpidBridgeMamaTransport_destroy (transportBridge transport)
     endpointPool_destroy (impl->mSubEndpoints);
     endpointPool_destroy (impl->mPubEndpoints);
 
+    /* Free the strdup-ed keys still held by the wtable */
+    wtable_free_all_xdata (impl->mKnownSources);
+    /* Finally, destroy the wtable */
+    wtable_destroy (impl->mKnownSources);
+
+    if (NULL != impl->mReplyAddress)
+    {
+        free ((void*) impl->mReplyAddress);
+    }
+
+    if (NULL != impl->mUuid)
+    {
+        free ((void*) impl->mUuid);
+    }
+
     free (impl);
 
     return status;
@@ -358,6 +382,8 @@ qpidBridgeMamaTransport_create (transportBridge*    result,
     qpidTransportBridge*    impl       = NULL;
     int                     poolSize   = 0;
     mama_status             status     = MAMA_STATUS_OK;
+    const char*             tportType  = NULL;
+    const char*             tmpReply   = NULL;
 
     if (NULL == result || NULL == name || NULL == parent)
     {
@@ -374,6 +400,8 @@ qpidBridgeMamaTransport_create (transportBridge*    result,
     impl->mMsg                 = pn_message ();
     impl->mQpidDispatchStatus  = MAMA_STATUS_OK;
     impl->mName                = name;
+    impl->mKnownSources        = wtable_create ("mKnownSources",
+                                                KNOWN_SOURCES_WTABLE_SIZE);
 
     mama_log (MAMA_LOG_LEVEL_FINE,
               "qpidBridgeMamaTransport_create(): Initializing Transport %s",
@@ -390,14 +418,14 @@ qpidBridgeMamaTransport_create (transportBridge*    result,
     /* Set the outgoing address */
     impl->mOutgoingAddress =
         qpidBridgeMamaTransportImpl_getParameter (
-            NULL,
+            DEFAULT_OUTGOING_URL,
             "%s.%s.%s",
             TPORT_PARAM_PREFIX,
             name,
             TPORT_PARAM_OUTGOING_URL);
 
     /* Set the reply address */
-    impl->mReplyAddress =
+    tmpReply =
         qpidBridgeMamaTransportImpl_getParameter (
             DEFAULT_REPLY_URL,
             "%s.%s.%s",
@@ -405,6 +433,44 @@ qpidBridgeMamaTransport_create (transportBridge*    result,
             name,
             TPORT_PARAM_REPLY_URL);
 
+    /* Expand the wildcards in the reply then populates mReplyAddress with a
+     * newly allocated result */
+    qpidBridgeCommon_generateSubjectUri (
+            tmpReply,
+            NULL,
+            NULL,
+            NULL,
+            qpidBridgeMamaTransportImpl_getUuid ((transportBridge) impl),
+            &impl->mReplyAddress);
+
+    /* Set the transport type */
+    tportType =
+        qpidBridgeMamaTransportImpl_getParameter (
+            DEFAULT_TPORT_TYPE,
+            "%s.%s.%s",
+            TPORT_PARAM_PREFIX,
+            name,
+            TPORT_PARAM_TPORT_TYPE);
+
+    if (0 == strcmp (tportType, CONFIG_VALUE_TPORT_TYPE_BROKER))
+    {
+        impl->mQpidTransportType = QPID_TRANSPORT_TYPE_BROKER;
+    }
+    else if (0 == strcmp (tportType, CONFIG_VALUE_TPORT_TYPE_P2P))
+    {
+        impl->mQpidTransportType = QPID_TRANSPORT_TYPE_P2P;
+    }
+    else
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                "Could not parse %s.%s.%s=%s. Using [%s].",
+                TPORT_PARAM_PREFIX,
+                name,
+                TPORT_PARAM_TPORT_TYPE,
+                tportType,
+                DEFAULT_TPORT_TYPE);
+        impl->mQpidTransportType = DEFAULT_TPORT_TYPE_VALUE;
+    }
 
     /* Set the message pool size for pool initialization later */
     poolSize =
@@ -475,14 +541,18 @@ qpidBridgeMamaTransport_create (transportBridge*    result,
         return MAMA_STATUS_PLATFORM;
     }
 
-    status = endpointPool_create (&impl->mPubEndpoints, "mPubEndpoints");
-    if (MAMA_STATUS_OK != status)
+    /* Endpoints are only required if a broker is not being used */
+    if (QPID_TRANSPORT_TYPE_P2P == impl->mQpidTransportType)
     {
-        mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "qpidBridgeMamaTransport_create(): "
-                  "Failed to create publishing endpoints");
-        free (impl);
-        return MAMA_STATUS_PLATFORM;
+        status = endpointPool_create (&impl->mPubEndpoints, "mPubEndpoints");
+        if (MAMA_STATUS_OK != status)
+        {
+            mama_log (MAMA_LOG_LEVEL_ERROR,
+                      "qpidBridgeMamaTransport_create(): "
+                      "Failed to create publishing endpoints");
+            free (impl);
+            return MAMA_STATUS_PLATFORM;
+        }
     }
 
     impl->mIsValid = 1;
@@ -693,6 +763,51 @@ qpidBridgeMamaTransport_getNativeTransportNamingCtx (transportBridge transport,
   =                  Public implementation functions                      =
   =========================================================================*/
 
+/* Call this every time a known MAMA symbol namespace is created */
+void
+qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace (transportBridge transport,
+        const char* symbolNamespace)
+{
+    qpidTransportBridge*    impl   = (qpidTransportBridge*) transport;
+
+    if (NULL == impl)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                "qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace(): "
+                "transport NULL");
+        return;
+    }
+
+    wtable_insert (impl->mKnownSources, symbolNamespace, (void*) symbolNamespace);
+
+    return;
+}
+
+/* Check if this is a publisher for a namespace which we know about */
+mama_bool_t
+qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace (transportBridge transport,
+        const char* symbolNamespace)
+{
+    qpidTransportBridge*    impl   = (qpidTransportBridge*) transport;
+
+    if (NULL == impl)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                "qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace(): "
+                "transport NULL");
+        return 0;
+    }
+
+    if (NULL == wtable_lookup (impl->mKnownSources, symbolNamespace))
+    {
+        return 0;
+    }
+    else
+    {
+        return 1;
+    }
+}
+
 qpidTransportBridge*
 qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport)
 {
@@ -710,6 +825,74 @@ qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport)
     return impl;
 }
 
+qpidTransportType
+qpidBridgeMamaTransportImpl_getType (transportBridge transport)
+{
+    qpidTransportBridge*    impl = (qpidTransportBridge*) transport;
+
+    if (impl == NULL)
+    {
+        return DEFAULT_TPORT_TYPE_VALUE;
+    }
+
+    return impl->mQpidTransportType;
+}
+
+const char*
+qpidBridgeMamaTransportImpl_getUuid (transportBridge transport)
+{
+    qpidTransportBridge*    impl = (qpidTransportBridge*) transport;
+    if (NULL == impl->mUuid)
+    {
+        wUuid                tempUuid;
+        char                 uuidStringBuffer[UUID_STRING_BUF_SIZE];
+
+        wUuid_generate_time  (tempUuid);
+        wUuid_unparse        (tempUuid, uuidStringBuffer);
+        impl->mUuid = strdup (uuidStringBuffer);
+    }
+    return impl->mUuid;
+}
+
+
+const char*
+qpidBridgeMamaTransportImpl_getOutgoingAddress (transportBridge transport)
+{
+    qpidTransportBridge*    impl = (qpidTransportBridge*) transport;
+
+    if (impl == NULL)
+    {
+        return NULL;
+    }
+
+    return impl->mOutgoingAddress;
+}
+
+const char*
+qpidBridgeMamaTransportImpl_getIncomingAddress (transportBridge transport)
+{
+    qpidTransportBridge*    impl = (qpidTransportBridge*) transport;
+
+    if (impl == NULL)
+    {
+        return NULL;
+    }
+
+    return impl->mIncomingAddress;
+}
+
+const char*
+qpidBridgeMamaTransportImpl_getReplyAddress (transportBridge transport)
+{
+    qpidTransportBridge*    impl = (qpidTransportBridge*) transport;
+
+    if (impl == NULL)
+    {
+        return NULL;
+    }
+
+    return impl->mReplyAddress;
+}
 
 /*=========================================================================
   =                  Private implementation functions                     =
@@ -750,15 +933,37 @@ qpidBridgeMamaTransportImpl_start (qpidTransportBridge* impl)
             return MAMA_STATUS_PLATFORM;
         }
 
-        if (pn_messenger_subscribe (impl->mIncoming,
-                                    impl->mIncomingAddress) <= 0)
+        /* Blanket subscriptions are only required in p2p */
+        if (QPID_TRANSPORT_TYPE_P2P == impl->mQpidTransportType)
+        {
+            if (pn_messenger_subscribe (impl->mIncoming,
+                                        impl->mIncomingAddress) <= 0)
+            {
+                mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): "
+                          "Error Subscribing to %s : %s",
+                          impl->mIncomingAddress,
+                          PN_MESSENGER_ERROR(impl->mIncoming));
+                PN_MESSENGER_STOP(impl->mIncoming);
+                return MAMA_STATUS_PLATFORM;
+            }
+        }
+        else if (QPID_TRANSPORT_TYPE_BROKER == impl->mQpidTransportType)
         {
             mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): "
-                      "Error Subscribing to %s : %s",
-                      impl->mIncomingAddress,
+                      "Subscribing to %s : %s",
+                      impl->mReplyAddress,
                       PN_MESSENGER_ERROR(impl->mIncoming));
-            PN_MESSENGER_STOP(impl->mIncoming);
-            return MAMA_STATUS_PLATFORM;
+            /* Subscribe to the URL advertised in reply_to */
+            if (pn_messenger_subscribe (impl->mIncoming,
+                                        impl->mReplyAddress) <= 0)
+            {
+                mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): "
+                          "Error Subscribing to %s : %s",
+                          impl->mReplyAddress,
+                          PN_MESSENGER_ERROR(impl->mIncoming));
+                PN_MESSENGER_STOP(impl->mIncoming);
+                return MAMA_STATUS_PLATFORM;
+            }
         }
     }
 
@@ -892,9 +1097,11 @@ qpidBridgeMamaTransportImpl_queueCallback (mamaQueue queue, void* closure)
         return;
     }
 
+    /* If this is a broker connection, we don't need to be selective at our
+     * layer */
     if (0 == endpointPool_isRegistedByContent (impl->mSubEndpoints,
-                                            subject,
-                                            subscription))
+                                               subject,
+                                               subscription))
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
                   "qpidBridgeMamaTransportImpl_queueCallback(): "
@@ -1369,11 +1576,25 @@ void* qpidBridgeMamaTransportImpl_dispatchThread (void* closure)
      */
     while (1 == impl->mIsDispatching)
     {
-        if (pn_messenger_recv (impl->mIncoming, impl->mQpidRecvBlockSize))
+        int pnErr = 0;
+
+        pnErr = pn_messenger_recv (impl->mIncoming,
+                impl->mQpidRecvBlockSize);
+
+
+        if (PN_TIMEOUT == pnErr)
+        {
+            continue;
+        }
+
+        if (0 != pnErr)
         {
             mama_log (MAMA_LOG_LEVEL_ERROR,
-                      "qpidBridgeMamaTransportImpl_dispatchThread():",
-                      PN_MESSENGER_ERROR (impl->mIncoming));
+                      "qpidBridgeMamaTransportImpl_dispatchThread(): "
+                      "Recv Error [%d] (%s) '%s'",
+                      pnErr,
+                      pn_code(pnErr),
+                      PN_MESSENGER_ERROR(impl->mIncoming));
             impl->mQpidDispatchStatus = MAMA_STATUS_PLATFORM;
             return NULL;
         }
diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.h b/mama/c_cpp/src/c/bridge/qpid/transport.h
index ebb6877..64504f6 100644
--- a/mama/c_cpp/src/c/bridge/qpid/transport.h
+++ b/mama/c_cpp/src/c/bridge/qpid/transport.h
@@ -42,7 +42,7 @@ extern "C" {
  * This is a simple convenience function to return a qpidTransportBridge
  * pointer based on the provided mamaTransport.
  *
- * @param transport  The mamaTransport to extract the bridge transport from.
+ * @param transport     The mamaTransport to extract the bridge transport from.
  *
  * @return qpidTransportBridge* associated with the mamaTransport.
  */
@@ -50,6 +50,97 @@ qpidTransportBridge*
 qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport);
 
 /**
+ * This is a simple convenience function to return the transport's distinct
+ * uuid.
+ *
+ * @param transport     The mamaTransport to extract the uuid from.
+ *
+ * @return const char*  String representation of the UUID (memory still owned
+ *                      by the transport bridge.
+ */
+const char*
+qpidBridgeMamaTransportImpl_getUuid (transportBridge transport);
+
+/**
+ * This is a simple convenience function to return the transport's qpid bridge
+ * type,
+ *
+ * @param transport     The transportBridge to extract the uuid from.
+ *
+ * @return qpidTransportType releating to the type of the qpid transport bridge.
+ */
+qpidTransportType
+qpidBridgeMamaTransportImpl_getType (transportBridge transport);
+
+/**
+ * This is a simple convenience function to return the transport's outgoing
+ * address (not format string expanded).
+ *
+ * @param transport     The transportBridge to extract the outgoing address from.
+ *
+ * @return const char*  String representation of the address (memory still owned
+ *                      by the transport bridge.
+ */
+const char*
+qpidBridgeMamaTransportImpl_getOutgoingAddress (transportBridge transport);
+
+/**
+ * This is a simple convenience function to return the transport's incoming
+ * address (not format string expanded).
+ *
+ * @param transport     The transportBridge to extract the incoming address from.
+ *
+ * @return const char*  String representation of the address (memory still owned
+ *                      by the transport bridge.
+ */
+const char*
+qpidBridgeMamaTransportImpl_getIncomingAddress (transportBridge transport);
+
+/**
+ * This is a simple convenience function to return the transport's reply
+ * address (not format string expanded).
+ *
+ * @param transport     The transportBridge to extract the reply address from.
+ *
+ * @return const char*  String representation of the address (memory still owned
+ *                      by the transport bridge.
+ */
+const char*
+qpidBridgeMamaTransportImpl_getReplyAddress (transportBridge transport);
+
+/**
+ * This advises the transport bridge about a known MAMA symbol namespace. This
+ * allows flattened fully qualified topics (e.g. SOURCENAME.TOPIC) to be
+ * detected and re-expanded where necessary (i.e. so we can tell the difference
+ * between a MAMA basic subscription topic and a market data subscription for %S
+ * and %s expansion in the URI).
+ *
+ * @param transport         The transportBridge to add a known MAMA symbol
+ *                          namespace to.
+ * @param symbolNamespace   The known MAMA symbol namespace to add.
+ */
+void
+qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace (transportBridge transport,
+        const char* symbolNamespace);
+
+/**
+ * This is a simple convenience function to return a qpidTransportBridge
+ * pointer based on the provided mamaTransport.
+ *
+ * @param transport         The transportBridge to check for the provided MAMA
+ *                          symbol namespace.
+ * @param symbolNamespace   The MAMA symbol namespace to check.
+ *
+ * @return mama_bool_t returning true if the provided symbolNamespace is a
+ *                     confirmed MAMA symbol namespace as added to the
+ *                     transport bridge using
+ *                     qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace.
+ */
+mama_bool_t
+qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace (transportBridge transport,
+        const char* symbolNamespace);
+
+/**
  * This is purely a debug function to dump to screen a snapshot of the status
  * of the transport's message pool.
  *
diff --git a/mama/c_cpp/src/examples/mama.properties b/mama/c_cpp/src/examples/mama.properties
index d43824a..86c7b1c 100644
--- a/mama/c_cpp/src/examples/mama.properties
+++ b/mama/c_cpp/src/examples/mama.properties
@@ -44,6 +44,20 @@ mama.entitlement.porthigh=9550
 # Example MAMA properties for QPID messaging middleware
 ################################################################################
 
+# For qpid transports, the following format strings are observed
+# %r : Root (only for market data requests, otherwise blank). e.g. _MD
+# %S : MAMA Source / Symbol Namespace. e.g. OPENMAMA
+# %s : Symbol / Topic. e.g. MSFT
+# %u : Transport uuid. e.g. 4542dc20-f1ae-11e3-ac10-0800200c9a66
+# Type of qpid proton transport
+mama.qpid.transport.broker.type=broker
+# This is where we send subscription requests and inbox requests
+mama.qpid.transport.broker.outgoing_url=topic://127.0.0.1/MAMA/%r/%S/%s
+# This is where we listen for incoming messages
+mama.qpid.transport.broker.incoming_url=topic://127.0.0.1/MAMA/%r/%S/%s
+# This is where we listen for replies during request / reply
+mama.qpid.transport.broker.reply_url=topic://127.0.0.1/MAMA/%u
+
 # Source which you are going to consume from
 mama.qpid.transport.pub.outgoing_url=amqp://127.0.0.1:6666
 # Where qpid is going to listen to for data to be pushed to
--
1.9.3


Join Openmama-dev@lists.openmama.org to automatically receive all group messages.