Date
1 - 2 of 2
[PATCH 1/4] QPID: Added QPID Broker implementation
Frank Quinn <fquinn.ni@...>
Broker support has now been added to the qpid proton bridge for OpenMAMA and example configuration to use it has been added to mama.properties. Note that to make use of this functionality (using qpidd as an example), you will need to allow topic patterns matching your provided URI. For example, for the following example configuration, you could run qpid with the following command to ensure all these URI topics can be created: qpidd --topic-patterns MAMA.* Example configuration is then as follows: mama.qpid.transport.qpidbroker.type=broker mama.qpid.transport.qpidbroker.outgoing_url=topic://127.0.0.1/MAMA/%r/%S/%s mama.qpid.transport.qpidbroker.incoming_url=topic://127.0.0.1/MAMA/%r/%S/%s mama.qpid.transport.qpidbroker.reply_url=topic://127.0.0.1/MAMA/%u Signed-off-by: Frank Quinn <fquinn.ni@...> --- mama/c_cpp/src/c/bridge/qpid/Makefile.am | 1 + mama/c_cpp/src/c/bridge/qpid/publisher.c | 177 ++++++++-------- mama/c_cpp/src/c/bridge/qpid/qpidcommon.c | 304 ++++++++++++++++++++++++++++ mama/c_cpp/src/c/bridge/qpid/qpidcommon.h | 151 ++++++++++++++ mama/c_cpp/src/c/bridge/qpid/qpiddefs.h | 21 +- mama/c_cpp/src/c/bridge/qpid/subscription.c | 211 +++++++++---------- mama/c_cpp/src/c/bridge/qpid/subscription.h | 23 --- mama/c_cpp/src/c/bridge/qpid/transport.c | 261 ++++++++++++++++++++++-- mama/c_cpp/src/c/bridge/qpid/transport.h | 93 ++++++++- mama/c_cpp/src/examples/mama.properties | 14 ++ 10 files changed, 1006 insertions(+), 250 deletions(-) create mode 100644 mama/c_cpp/src/c/bridge/qpid/qpidcommon.c create mode 100644 mama/c_cpp/src/c/bridge/qpid/qpidcommon.h diff --git a/mama/c_cpp/src/c/bridge/qpid/Makefile.am b/mama/c_cpp/src/c/bridge/qpid/Makefile.am index 53fe5cd..3995816 100644 --- a/mama/c_cpp/src/c/bridge/qpid/Makefile.am +++ b/mama/c_cpp/src/c/bridge/qpid/Makefile.am @@ -54,6 +54,7 @@ libmamaqpidimpl_la_SOURCES = \ timer.c \ inbox.c \ codec.c \ + qpidcommon.c \ endpointpool.c diff --git a/mama/c_cpp/src/c/bridge/qpid/publisher.c b/mama/c_cpp/src/c/bridge/qpid/publisher.c index 4d32475..98a9a1a 100644 --- a/mama/c_cpp/src/c/bridge/qpid/publisher.c +++ b/mama/c_cpp/src/c/bridge/qpid/publisher.c @@ -37,9 +37,9 @@ #include "msg.h" #include "codec.h" #include "inbox.h" -#include "subscription.h" #include "publisher.h" #include "endpointpool.h" +#include "qpidcommon.h" /*========================================================================= = Typedefs, structs, enums and globals = @@ -52,6 +52,7 @@ typedef struct qpidPublisherBridge const char* mSource; const char* mRoot; const char* mSubject; + const char* mUri; pn_message_t* mQpidRawMsg; msgBridge mMamaBridgeMsg; } qpidPublisherBridge; @@ -76,20 +77,6 @@ qpidBridgePublisherImpl_enqueueMessageForAddress (mamaMsg msg, const char* url, qpidPublisherBridge* impl); -/** - * When a qpid publisher is created, it calls this function to generate a - * standard subject key using qpidBridgeMamaSubscriptionImpl_generateSubjectKey - * with different parameters depending on if it's a market data publisher, - * basic publisher or a data dictionary publisher. - * - * @param msg The MAMA message to enqueue for sending. - * @param url The URL to eneueue the message for sending to. - * @param impl The related qpid publisher bridge. - * - * @return mama_status indicating whether the method succeeded or failed. - */ -static mama_status -qpidBridgeMamaPublisherImpl_buildSendSubject (qpidPublisherBridge* impl); /*========================================================================= = Public interface implementation functions = @@ -105,9 +92,11 @@ qpidBridgeMamaPublisher_createByIndex (publisherBridge* result, void* nativeQueueHandle, mamaPublisher parent) { - qpidPublisherBridge* impl = NULL; - qpidTransportBridge* transport = NULL; - mama_status status = MAMA_STATUS_OK; + qpidPublisherBridge* impl = NULL; + qpidTransportBridge* transport = NULL; + mama_status status = MAMA_STATUS_OK; + const char* outgoingAddress = NULL; + const char* uuid = NULL; if (NULL == result || NULL == tport @@ -149,23 +138,50 @@ qpidBridgeMamaPublisher_createByIndex (publisherBridge* result, return MAMA_STATUS_NOMEM; } - if (NULL != topic) - { - impl->mTopic = topic; - } + /* Get the outgoing address format string */ + outgoingAddress = qpidBridgeMamaTransportImpl_getOutgoingAddress ( + (transportBridge) impl->mTransport); - if (NULL != source) + /* Get the transport UUID which is unique to the transport in case we need + * it during format string expansion */ + uuid = qpidBridgeMamaTransportImpl_getUuid ( + (transportBridge) impl->mTransport); + + /* Collapse subject key to single string based on supplied values + * + * _MD requests do not use the topic on the wire as the responder may not + * necessarily be listening for requests on that topic until the first + * request comes in. */ + if (NULL != root && 0 == strcmp (root, MAMA_ROOT_MARKET_DATA)) { - impl->mSource = source; + qpidBridgeCommon_generateSubjectKey (root, + source, + NULL, + &impl->mSubject); } - - if (NULL != root) + else { - impl->mRoot = root; + qpidBridgeCommon_generateSubjectKey (root, + source, + topic, + &impl->mSubject); } - /* Generate a topic name based on the publisher details */ - status = qpidBridgeMamaPublisherImpl_buildSendSubject (impl); + /* Parse the collapsed string to extract the standardized values */ + qpidBridgeCommon_parseSubjectKey (impl->mSubject, + &impl->mRoot, + &impl->mSource, + &impl->mTopic, + (transportBridge) impl->mTransport); + + + /* Generate subject URI based on standardized values */ + qpidBridgeCommon_generateSubjectUri (outgoingAddress, + impl->mRoot, + impl->mSource, + impl->mTopic, + uuid, + &impl->mUri); /* Create a reusable proton message */ impl->mQpidRawMsg = pn_message (); @@ -212,6 +228,22 @@ qpidBridgeMamaPublisher_destroy (publisherBridge publisher) { free ((void*) impl->mSubject); } + if (NULL != impl->mUri) + { + free ((void*) impl->mUri); + } + if (NULL != impl->mRoot) + { + free ((void*) impl->mRoot); + } + if (NULL != impl->mSource) + { + free ((void*) impl->mSource); + } + if (NULL != impl->mTopic) + { + free ((void*) impl->mTopic); + } if (NULL != impl->mMamaBridgeMsg) { qpidBridgeMamaMsg_destroy (impl->mMamaBridgeMsg, 0); @@ -258,7 +290,7 @@ qpidBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg) /* Use the publisher's default send destination for request */ qpidBridgePublisherImpl_enqueueMessageForAddress ( msg, - (char*) impl->mTransport->mOutgoingAddress, + (char*) impl->mUri, impl); break; case QPID_MSG_INBOX_RESPONSE: @@ -274,25 +306,35 @@ qpidBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg) impl->mSubject, impl->mSource); - /* For each known downstream destination */ - status = endpointPool_getRegistered (impl->mTransport->mPubEndpoints, - impl->mSubject, - &targets, - &targetCount); - - if (targetCount == 0) + if (QPID_TRANSPORT_TYPE_P2P == + qpidBridgeMamaTransportImpl_getType ( + (transportBridge) impl->mTransport)) { - mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaPublisher_send(): " - "No one subscribed to subject '%s', not publishing.", - impl->mSubject); - return MAMA_STATUS_OK; + /* For each known downstream destination */ + status = endpointPool_getRegistered (impl->mTransport->mPubEndpoints, + impl->mSubject, + &targets, + &targetCount); + + if (targetCount == 0) + { + mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaPublisher_send(): " + "No one subscribed to subject '%s', not publishing.", + impl->mSubject); + return MAMA_STATUS_OK; + } + + /* Push the message out to the send queue for each interested party */ + for (targetInc = 0; targetInc < targetCount; targetInc++) + { + url = (char*) targets[targetInc]; + qpidBridgePublisherImpl_enqueueMessageForAddress (msg, url, impl); + } } - - /* Push the message out to the send queue for each interested party */ - for (targetInc = 0; targetInc < targetCount; targetInc++) + else { - url = (char*) targets[targetInc]; - qpidBridgePublisherImpl_enqueueMessageForAddress (msg, url, impl); + qpidBridgePublisherImpl_enqueueMessageForAddress (msg, impl->mUri, + impl); } break; } @@ -643,46 +685,3 @@ qpidBridgePublisherImpl_enqueueMessageForAddress (mamaMsg msg, } } - -mama_status -qpidBridgeMamaPublisherImpl_buildSendSubject (qpidPublisherBridge* impl) -{ - char* keyTarget = NULL; - - /* If this is a special _MD publisher, lose the topic unless dictionary */ - if (impl->mRoot != NULL) - { - /* - * May use strlen here to increase speed but would need to test to - * verify this is the only circumstance in which we want to consider the - * topic when a root is specified. - */ - if (strcmp (impl->mRoot, "_MDDD") == 0) - { - qpidBridgeMamaSubscriptionImpl_generateSubjectKey (impl->mRoot, - impl->mSource, - impl->mTopic, - &keyTarget); - } - else - { - qpidBridgeMamaSubscriptionImpl_generateSubjectKey (impl->mRoot, - impl->mSource, - NULL, - &keyTarget); - } - } - /* If this isn't a special _MD publisher */ - else - { - qpidBridgeMamaSubscriptionImpl_generateSubjectKey (NULL, - impl->mSource, - impl->mTopic, - &keyTarget); - } - - /* Set the subject for publishing here */ - impl->mSubject = keyTarget; - - return MAMA_STATUS_OK; -} diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c new file mode 100644 index 0000000..f55f63a --- /dev/null +++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c @@ -0,0 +1,304 @@ +/* $Id$ + * + * OpenMAMA: The open middleware agnostic messaging API + * Copyright (C) 2011 NYSE Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +/*========================================================================= + = Includes = + =========================================================================*/ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include "qpiddefs.h" +#include "qpidcommon.h" +#include "transport.h" + +/*========================================================================= + = Public implementation functions = + =========================================================================*/ + +mama_status +qpidBridgeCommon_parseSubjectKey (char* key, + const char** root, + const char** source, + const char** topic, + transportBridge transport) +{ + /* We can assume subject key is of this length */ + char subject[MAX_SUBJECT_LENGTH]; + + char* subjectPosition = subject; + + /* Reset return values */ + *root = NULL; + *source = NULL; + *topic = NULL; + + if (NULL == key || 0 == strlen(key)) + { + return MAMA_STATUS_NULL_ARG; + } + + /* Copy the key across to writable memory */ + strcpy (subject, key); + + subjectPosition = strtok (subjectPosition, "."); + + if (0 == strcmp (MAMA_ROOT_MARKET_DATA, subjectPosition) || + 0 == strcmp (MAMA_ROOT_MARKET_DATA_DICT, subjectPosition)) + { + *root = strdup (subjectPosition); + + /* This should NULL terminate the end of the source string */ + subjectPosition = strtok (NULL, "."); + if (NULL != subjectPosition) + { + uint32_t parsedBytes = 0; + + *source = strdup (subjectPosition); + + qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace ( + transport, + *source); + + /* String lengths plus NULL characters */ + parsedBytes = strlen (*root) + 1 + strlen (*source) + 1; + + /* Only parse topic if it exists... */ + if (parsedBytes < strlen (key)) + { + /* The topic is then whatever is left after source NULL character */ + *topic = strdup (subject + parsedBytes); + } + } + } + /* Might have a symbol namespace... might not... */ + else + { + char* firstSubjectPart = subjectPosition; + + /* If a MAMA symbol namespace is involved, there should always be + * another part */ + char* secondSubjectPart = strtok (NULL, "."); + + /* If there is no second part, this looks like a basic topic. Also if + * this is a symbol namespace we have never seen before, assume it's + * basic. */ + if (NULL == secondSubjectPart) + { + *topic = strdup (firstSubjectPart); + } + /* There is a second part, but we are unconvinced unless the transport + * has seen this source somewhere before */ + else if (qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace ( + transport, + firstSubjectPart)) + { + *source = strdup (firstSubjectPart); + /* Use offsets from provided strings rather than the one strtok + * may have just modified with NULL characters for the topic */ + *topic = strdup (key + strlen (firstSubjectPart) + 1); + } + /* If this is a subject without a symbol namespace, just use the + * original string rather than try and revert strtok's trail of + * destruction */ + else + { + *topic = strdup (key); + } + } + return MAMA_STATUS_OK; +} + + +/* + * Internal function to ensure that the topic names are always calculated + * in a particular way + */ +mama_status +qpidBridgeCommon_generateSubjectKey (const char* root, + const char* source, + const char* topic, + char** keyTarget) +{ + char subject[MAX_SUBJECT_LENGTH]; + char* subjectPos = subject; + size_t bytesRemaining = MAX_SUBJECT_LENGTH; + size_t written = 0; + + if (NULL != root) + { + mama_log (MAMA_LOG_LEVEL_FINEST, + "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): R."); + written = snprintf (subjectPos, bytesRemaining, "%s", root); + subjectPos += written; + bytesRemaining -= written; + } + + if (NULL != source) + { + mama_log (MAMA_LOG_LEVEL_FINEST, + "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): S."); + /* If these are not the first bytes, prepend with a period */ + if(subjectPos != subject) + { + written = snprintf (subjectPos, bytesRemaining, ".%s", source); + } + else + { + written = snprintf (subjectPos, bytesRemaining, "%s", source); + } + subjectPos += written; + bytesRemaining -= written; + } + + if (NULL != topic) + { + mama_log (MAMA_LOG_LEVEL_FINEST, + "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): T."); + /* If these are not the first bytes, prepend with a period */ + if (subjectPos != subject) + { + snprintf (subjectPos, bytesRemaining, ".%s", topic); + } + else + { + snprintf (subjectPos, bytesRemaining, "%s", topic); + } + } + + /* + * Allocate the memory for copying the string. Caller is responsible for + * destroying. + */ + *keyTarget = strdup (subject); + if (NULL == *keyTarget) + { + return MAMA_STATUS_NOMEM; + } + else + { + return MAMA_STATUS_OK; + } +} + +/* + * Internal function to ensure that the topic names are always calculated + * in a particular way + */ +mama_status +qpidBridgeCommon_generateSubjectUri (const char* format, + const char* root, + const char* source, + const char* topic, + const char* uuid, + const char** uriTarget) +{ + char original[MAX_URI_LENGTH]; + char uri[MAX_URI_LENGTH]; + char lastByte; + char* uriPos = uri; + char* nextSeg = NULL; + size_t bytesRemaining = MAX_URI_LENGTH; + size_t written = 0; + + strncpy (original, format, MAX_URI_LENGTH); + + nextSeg = strtok ((char*) original, "%"); + + while (nextSeg != NULL) + { + written = snprintf (uriPos, bytesRemaining, "%s", nextSeg); + bytesRemaining -= written; + uriPos += written; + + nextSeg = strtok (NULL, "%"); + if (nextSeg != NULL) + { + switch (*nextSeg) + { + case 'r': + if (NULL != root) + { + written = snprintf (uriPos, bytesRemaining, "%s", root); + bytesRemaining -= written; + uriPos += written; + } + nextSeg++; + break; + case 'S': + if (NULL != source) + { + written = snprintf (uriPos, bytesRemaining, "%s", source); + bytesRemaining -= written; + uriPos += written; + } + nextSeg++; + break; + case 's': + if (NULL != topic) + { + written = snprintf (uriPos, bytesRemaining, "%s", topic); + bytesRemaining -= written; + uriPos += written; + } + nextSeg++; + break; + case 'u': + if (NULL != uuid) + { + written = snprintf (uriPos, bytesRemaining, "%s", uuid); + bytesRemaining -= written; + uriPos += written; + } + nextSeg++; + break; + default: /* do nothing -just go on copying in next iteration */ + break; + } + /* This will ensure there are no double forward slashes */ + if (NULL != nextSeg && '/' == *nextSeg && '/' == uri[strlen(uri) - 1]) + { + nextSeg++; + } + } + } + + /* Trim trailing forward slashes */ + if ('/' == uri[strlen(uri) - 1]) + { + uri[strlen(uri) - 1] = '\0'; + } + + /* + * Allocate the memory for copying the string. Caller is responsible for + * destroying. + */ + *uriTarget = strdup (uri); + if (NULL == *uriTarget) + { + return MAMA_STATUS_NOMEM; + } + else + { + return MAMA_STATUS_OK; + } +} diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h new file mode 100644 index 0000000..6cb70a7 --- /dev/null +++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h @@ -0,0 +1,151 @@ +/* $Id$ + * + * OpenMAMA: The open middleware agnostic messaging API + * Copyright (C) 2011 NYSE Technologies, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#ifndef MAMA_BRIDGE_QPID_COMMON_H__ +#define MAMA_BRIDGE_QPID_COMMON_H__ + +/*========================================================================= + = Includes = + =========================================================================*/ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <mama/mama.h> +#include <proton/message.h> +#include <proton/error.h> + +#include "payloadbridge.h" +#include "msgfieldimpl.h" + +#if defined(__cplusplus) +extern "C" { +#endif + +/*========================================================================= + = Macros = + =========================================================================*/ + +#define MAMA_ROOT_MARKET_DATA "_MD" +#define MAMA_ROOT_MARKET_DATA_DICT "_MDDD" + + +/*========================================================================= + = Public implementation functions = + =========================================================================*/ + + +/** + * This function will generate a string which is unique to the root, source + * and topic provided. Centralization of this function means that it can be used + * in both the publisher and the subscriber in order to generate a consistent + * topic for use throughout the platform. It will ensure that if any provided + * parameter is NULL, that it will be ignored in the expansion (e.g. providing + * root=NULL, source=EXCHANGENAME.ISIN.CURRENCY and topic=NULL will result in + * a new keyTarget="EXCHANGENAME.ISIN.CURRENCY" as opposed to + * ".EXCHANGENAME.ISIN.CURRENCY." + * + * @param root Prefix to associate with the subject (e.g. _MDDD). + * @param source Source to base the subject key on (e.g. EXCHANGENAME). + * @param topic Topic to base the subject key on (e.g. ISIN.CURRENCY). + * @param keyTarget Pointer to populate with the generated subject key. The + * caller is then responsible for freeing this new strdup'd + * string. + * + * @return mama_status indicating whether the method succeeded or failed. + */ +mama_status +qpidBridgeCommon_generateSubjectKey (const char* root, + const char* source, + const char* topic, + char** keyTarget); + +/** + * This function will take the provided format string and use the root, source, + * topic and uuid provided to expand the format string into a newly allocated + * string which the caller is responsible for freeing. The format string uses + * the following tokens: + * + * %r : Root (only for market data requests, otherwise blank). e.g. _MD + * %S : MAMA Source / Symbol Namespace. e.g. OPENMAMA + * %s : Symbol / Topic. e.g. MSFT + * %u : uuid. e.g. 4542dc20-f1ae-11e3-ac10-0800200c9a66 + * + * @param format The format string to expand. + * @param root Prefix to associate with the subject (e.g. _MDDD). + * @param source Source to base the subject key on (e.g. EXCHANGENAME). + * @param topic Topic to base the subject key on (e.g. ISIN.CURRENCY). + * @param keyTarget Pointer to populate with the generated subject key. The + * caller is then responsible for freeing this new strdup'd + * string. + * + * @return mama_status indicating whether the method succeeded or failed. + */ +mama_status +qpidBridgeCommon_generateSubjectUri (const char* format, + const char* root, + const char* source, + const char* topic, + const char* uuid, + const char** uriTarget); + +/** + * This function will parse a standard period delimited string as generated via + * qpidBridgeCommon_generateSubjectKey and split it up into its individual + * components. This allows us to accurately split up these components, even + * when the arguments supplied to qpidBridgeCommon_generateSubjectKey aren't + * quite in the format expected (e.g. if qpidBridgeCommon_generateSubjectKey + * is provided with root=NULL, source=NULL and + * topic=_MD.EXCHANGENAME.ISIN.CURRENCY, this will then be collapsed in + * qpidBridgeCommon_generateSubjectKey to create + * subjectKey=_MD.EXCHANGENAME.ISIN.CURRENCY. If that string is then provided + * to this function, it will parse the subjectKey and extract the *real* values + * for root=_MD, source=EXCHANGENAME and topic=ISIN.CURRENCY). + * + * @param key Subject key to parse (e.g. _MD.EXCHANGENAME.ISIN.CURRENCY). + * @param root Pointer to populate with the parsed root (e.g. _MD). Caller + * is then responsible for the memory associated with this + * string. + * @param source Pointer to populate with the parsed source (e.g. _MD). + * Caller is then responsible for the memory associated with + * this string. + * @param topic Pointer to populate with the parsed topic (e.g. _MD). Caller + * is then responsible for the memory associated with this + * string. + * @param transport Transport bridge on which this topic is being created. This + * is required to help determine whether or not the key + * contains a known symbol namespace. + * + * @return mama_status indicating whether the method succeeded or failed. + */ +mama_status +qpidBridgeCommon_parseSubjectKey (char* key, + const char** root, + const char** source, + const char** topic, + transportBridge transport); + +#if defined(__cplusplus) +} +#endif + +#endif /* MAMA_BRIDGE_QPID_COMMON_H__ */ diff --git a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h index 6c1adf4..3e3b2e5 100644 --- a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h +++ b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h @@ -56,9 +56,11 @@ extern "C" { /* Maximum topic length */ #define MAX_SUBJECT_LENGTH 256 +#define MAX_URI_LENGTH 1024 /* Default timeout for send working threads */ #define QPID_MESSENGER_SEND_TIMEOUT -1 +#define QPID_MESSENGER_TIMEOUT 1 /* milliseconds */ /* Message types */ typedef enum qpidMsgType_ @@ -70,6 +72,13 @@ typedef enum qpidMsgType_ QPID_MSG_TERMINATE = 0xff } qpidMsgType; +/* Message types */ +typedef enum qpidTransportType_ +{ + QPID_TRANSPORT_TYPE_P2P, + QPID_TRANSPORT_TYPE_BROKER +} qpidTransportType; + /* If a proton version header has been parsed at this point (version >= 0.5) */ #ifdef _PROTON_VERSION_H @@ -118,9 +127,12 @@ typedef struct qpidSubscription_ mamaSubscription mMamaSubscription; mamaQueue mMamaQueue; void* mQpidQueue; - mamaTransport mTransport; - const char* mSymbol; - char* mSubjectKey; + transportBridge mTransport; + const char* mSource; + const char* mTopic; + const char* mRoot; + char* mSubject; + const char* mUri; void* mClosure; int mIsNotMuted; int mIsValid; @@ -143,11 +155,14 @@ typedef struct qpidTransportBridge_ const char* mOutgoingAddress; const char* mReplyAddress; const char* mName; + const char* mUuid; wthread_t mQpidDispatchThread; int mIsDispatching; mama_status mQpidDispatchStatus; endpointPool_t mSubEndpoints; endpointPool_t mPubEndpoints; + qpidTransportType mQpidTransportType; + wtable_t mKnownSources; } qpidTransportBridge; struct qpidMsgNode_ diff --git a/mama/c_cpp/src/c/bridge/qpid/subscription.c b/mama/c_cpp/src/c/bridge/qpid/subscription.c index bdef796..2582a5e 100644 --- a/mama/c_cpp/src/c/bridge/qpid/subscription.c +++ b/mama/c_cpp/src/c/bridge/qpid/subscription.c @@ -33,8 +33,8 @@ #include <wombat/queue.h> #include "qpidbridgefunctions.h" #include "transport.h" +#include "qpidcommon.h" #include "qpiddefs.h" -#include "subscription.h" #include "publisher.h" #include "endpointpool.h" @@ -57,6 +57,8 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, qpidTransportBridge* transport = NULL; mama_status status = MAMA_STATUS_OK; pn_data_t* data = NULL; + const char* uuid = NULL; + const char* outgoingAddress = NULL; if ( NULL == subscriber || NULL == subscription || NULL == tport ) { @@ -75,6 +77,11 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, return MAMA_STATUS_NULL_ARG; } + outgoingAddress = qpidBridgeMamaTransportImpl_getOutgoingAddress ( + (transportBridge) transport); + uuid = qpidBridgeMamaTransportImpl_getUuid ( + (transportBridge) transport); + /* Allocate memory for qpid subscription implementation */ impl = (qpidSubscription*) calloc (1, sizeof (qpidSubscription)); if (NULL == impl) @@ -86,58 +93,89 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, impl->mMamaCallback = callback; impl->mMamaSubscription = subscription; impl->mMamaQueue = queue; - impl->mTransport = (mamaTransport)transport; - impl->mSymbol = symbol; + impl->mTransport = (transportBridge) transport; impl->mClosure = closure; impl->mIsNotMuted = 1; impl->mIsTportDisconnected = 1; - impl->mSubjectKey = NULL; + impl->mSubject = NULL; /* Use a standard centralized method to determine a topic key */ - qpidBridgeMamaSubscriptionImpl_generateSubjectKey (NULL, - source, - symbol, - &impl->mSubjectKey); + qpidBridgeCommon_generateSubjectKey (NULL, + source, + symbol, + &impl->mSubject); + + /* Parse the collapsed string to extract the standardized values */ + qpidBridgeCommon_parseSubjectKey (impl->mSubject, + &impl->mRoot, + &impl->mSource, + &impl->mTopic, + impl->mTransport); + + /* Generate subject URI based on standardized values */ + qpidBridgeCommon_generateSubjectUri (outgoingAddress, + impl->mRoot, + impl->mSource, + impl->mTopic, + uuid, + &impl->mUri); /* Register the endpoint */ endpointPool_registerWithoutIdentifier (transport->mSubEndpoints, - impl->mSubjectKey, + impl->mSubject, &impl->mEndpointIdentifier, impl); - /* Notify the publisher that you have an interest in this topic */ - pn_message_clear (transport->mMsg); + if (QPID_TRANSPORT_TYPE_P2P == + qpidBridgeMamaTransportImpl_getType ( + impl->mTransport)) + { + /* Notify the publisher that you have an interest in this topic */ + pn_message_clear (transport->mMsg); - /* Set the message meta data to reflect a subscription request */ - qpidBridgePublisherImpl_setMessageType (transport->mMsg, - QPID_MSG_SUB_REQUEST); + /* Set the message meta data to reflect a subscription request */ + qpidBridgePublisherImpl_setMessageType (transport->mMsg, + QPID_MSG_SUB_REQUEST); - /* Set the outgoing address as provided by the transport configuration */ - pn_message_set_address (transport->mMsg, - transport->mOutgoingAddress); + /* Set the outgoing address as provided by the transport configuration */ + pn_message_set_address (transport->mMsg, + transport->mOutgoingAddress); - /* Set the reply address */ - pn_message_set_reply_to (transport->mMsg, - transport->mReplyAddress); + /* Set the reply address */ + pn_message_set_reply_to (transport->mMsg, + transport->mReplyAddress); - /* Get the proton message's body data for writing */ - data = pn_message_body (transport->mMsg); + /* Get the proton message's body data for writing */ + data = pn_message_body (transport->mMsg); - /* Add in the subject key as the only string inside */ - pn_data_put_string (data, pn_bytes (strlen (impl->mSubjectKey), - impl->mSubjectKey)); + /* Add in the subject key as the only string inside */ + pn_data_put_string (data, pn_bytes (strlen (impl->mSubject), + impl->mSubject)); - /* Send out the subscription registration of interest message */ - if (NULL != transport->mOutgoingAddress) + /* Send out the subscription registration of interest message */ + if (NULL != transport->mOutgoingAddress) + { + pn_messenger_put (transport->mOutgoing, transport->mMsg); + + if (0 != PN_MESSENGER_SEND (transport->mOutgoing)) + { + const char* qpid_error = PN_MESSENGER_ERROR (transport->mOutgoing); + mama_log (MAMA_LOG_LEVEL_SEVERE, + "qpidBridgeMamaSubscription_create(): " + "pn_messenger_send Error:[%s]", qpid_error); + return MAMA_STATUS_PLATFORM; + } + } + } + else { - pn_messenger_put (transport->mOutgoing, transport->mMsg); - - if (0 != PN_MESSENGER_SEND (transport->mOutgoing)) + if (pn_messenger_subscribe (transport->mIncoming, + impl->mUri) <= 0) { - const char* qpid_error = PN_MESSENGER_ERROR (transport->mOutgoing); - mama_log (MAMA_LOG_LEVEL_SEVERE, - "qpidBridgeMamaSubscription_create(): " - "pn_messenger_send Error:[%s]", qpid_error); + mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaSubscription_create(): " + "Error Subscribing to %s : %s", + impl->mUri, + PN_MESSENGER_ERROR(transport->mIncoming)); return MAMA_STATUS_PLATFORM; } } @@ -145,7 +183,7 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaSubscription_create(): " "created interest for %s.", - impl->mSubjectKey); + impl->mSubject); /* Mark this subscription as valid */ impl->mIsValid = 1; @@ -205,10 +243,10 @@ qpidBridgeMamaSubscription_destroy (subscriptionBridge subscriber) /* Remove the subscription from the transport's subscription pool. */ if (NULL != transportBridge && NULL != transportBridge->mSubEndpoints - && NULL != impl->mSubjectKey) + && NULL != impl->mSubject) { endpointPool_unregister (transportBridge->mSubEndpoints, - impl->mSubjectKey, + impl->mSubject, impl->mEndpointIdentifier); } @@ -217,9 +255,29 @@ qpidBridgeMamaSubscription_destroy (subscriptionBridge subscriber) pn_message_free (impl->mMsg); } - if (NULL != impl->mSubjectKey) + if (NULL != impl->mSubject) { - free (impl->mSubjectKey); + free (impl->mSubject); + } + + if (NULL != impl->mRoot) + { + free ((void*)impl->mRoot); + } + + if (NULL != impl->mSource) + { + free ((void*)impl->mSource); + } + + if (NULL != impl->mTopic) + { + free ((void*)impl->mTopic); + } + + if (NULL != impl->mUri) + { + free ((void*)impl->mUri); } if (NULL != impl->mEndpointIdentifier) @@ -289,78 +347,3 @@ qpidBridgeMamaSubscription_muteCurrentTopic (subscriptionBridge subscriber) return qpidBridgeMamaSubscription_mute (subscriber); } - -/*========================================================================= - = Public implementation functions = - =========================================================================*/ - -/* - * Internal function to ensure that the topic names are always calculated - * in a particular way - */ -mama_status -qpidBridgeMamaSubscriptionImpl_generateSubjectKey (const char* root, - const char* source, - const char* topic, - char** keyTarget) -{ - char subject[MAX_SUBJECT_LENGTH]; - char* subjectPos = subject; - size_t bytesRemaining = MAX_SUBJECT_LENGTH; - size_t written = 0; - - if (NULL != root) - { - mama_log (MAMA_LOG_LEVEL_FINEST, - "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): R."); - written = snprintf (subjectPos, bytesRemaining, "%s", root); - subjectPos += written; - bytesRemaining -= written; - } - - if (NULL != source) - { - mama_log (MAMA_LOG_LEVEL_FINEST, - "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): S."); - /* If these are not the first bytes, prepend with a period */ - if(subjectPos != subject) - { - written = snprintf (subjectPos, bytesRemaining, ".%s", source); - } - else - { - written = snprintf (subjectPos, bytesRemaining, "%s", source); - } - subjectPos += written; - bytesRemaining -= written; - } - - if (NULL != topic) - { - mama_log (MAMA_LOG_LEVEL_FINEST, - "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): T."); - /* If these are not the first bytes, prepend with a period */ - if (subjectPos != subject) - { - snprintf (subjectPos, bytesRemaining, ".%s", topic); - } - else - { - snprintf (subjectPos, bytesRemaining, "%s", topic); - } - } - - /* - * Allocate the memory for copying the string. Caller is responsible for - * destroying. - */ - *keyTarget = strdup (subject); - if (NULL == *keyTarget) - { - return MAMA_STATUS_NOMEM; - } - else - { - return MAMA_STATUS_OK; - } -} diff --git a/mama/c_cpp/src/c/bridge/qpid/subscription.h b/mama/c_cpp/src/c/bridge/qpid/subscription.h index ea1aceb..992a495 100644 --- a/mama/c_cpp/src/c/bridge/qpid/subscription.h +++ b/mama/c_cpp/src/c/bridge/qpid/subscription.h @@ -26,29 +26,6 @@ extern "C" { #endif -/*========================================================================= - = Public implementation functions = - =========================================================================*/ - -/** - * This function will generate a string which is unique to the root, source - * and topic provided. Centralization of this function means that it can be used - * in both the publisher and the subscriber in order to generate a consistent - * topic for use throughout the platform. - * - * @param root Prefix to associate with the subject (e.g. _MDDD) - * @param inbox Source to base the subject key on (e.g. EXCHANGENAME). - * @param topic Topic to base the subject key on (e.g. ISIN.CURRENCY). - * @param keyTarget Pointer to populate with the generated subject key. - * - * @return mama_status indicating whether the method succeeded or failed. - */ -mama_status -qpidBridgeMamaSubscriptionImpl_generateSubjectKey (const char* root, - const char* source, - const char* topic, - char** keyTarget); - #if defined(__cplusplus) } #endif diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.c b/mama/c_cpp/src/c/bridge/qpid/transport.c index ba21810..2b2053b 100644 --- a/mama/c_cpp/src/c/bridge/qpid/transport.c +++ b/mama/c_cpp/src/c/bridge/qpid/transport.c @@ -37,11 +37,13 @@ #include <timers.h> #include <stdio.h> #include <wombat/queue.h> +#include <wombat/wUuid.h> #include "transport.h" #include "publisher.h" #include "qpidbridgefunctions.h" #include "qpiddefs.h" +#include "qpidcommon.h" #include "msg.h" #include "codec.h" #include "endpointpool.h" @@ -58,11 +60,14 @@ #define TPORT_PARAM_SUB_POOL_SIZE "msg_pool_size" #define TPORT_PARAM_SUB_POOL_INC_SIZE "msg_pool_inc_size" #define TPORT_PARAM_RECV_BLOCK_SIZE "recv_block_size" +#define TPORT_PARAM_TPORT_TYPE "type" /* Default values for corresponding configuration parameters */ #define DEFAULT_OUTGOING_URL "amqp://127.0.0.1:7777" #define DEFAULT_INCOMING_URL "amqp://~127.0.0.1:6666" #define DEFAULT_REPLY_URL "amqp://127.0.0.1:6666" +#define DEFAULT_TPORT_TYPE "p2p" +#define DEFAULT_TPORT_TYPE_VALUE QPID_TRANSPORT_TYPE_P2P #define DEFAULT_SUB_POOL_SIZE 128 #define DEFAULT_SUB_POOL_INC_SIZE 128 #define DEFAULT_RECV_BLOCK_SIZE 10 @@ -76,6 +81,10 @@ #define MAX_SUB_POOL_INC_SIZE 2000L #define MIN_RECV_BLOCK_SIZE -1L #define MAX_RECV_BLOCK_SIZE 100L +#define CONFIG_VALUE_TPORT_TYPE_BROKER "broker" +#define CONFIG_VALUE_TPORT_TYPE_P2P "p2p" +#define UUID_STRING_BUF_SIZE 37 +#define KNOWN_SOURCES_WTABLE_SIZE 10 /*========================================================================= @@ -345,6 +354,21 @@ qpidBridgeMamaTransport_destroy (transportBridge transport) endpointPool_destroy (impl->mSubEndpoints); endpointPool_destroy (impl->mPubEndpoints); + /* Free the strdup-ed keys still held by the wtable */ + wtable_free_all_xdata (impl->mKnownSources); + /* Finally, destroy the wtable */ + wtable_destroy (impl->mKnownSources); + + if (NULL != impl->mReplyAddress) + { + free ((void*) impl->mReplyAddress); + } + + if (NULL != impl->mUuid) + { + free ((void*) impl->mUuid); + } + free (impl); return status; @@ -358,6 +382,8 @@ qpidBridgeMamaTransport_create (transportBridge* result, qpidTransportBridge* impl = NULL; int poolSize = 0; mama_status status = MAMA_STATUS_OK; + const char* tportType = NULL; + const char* tmpReply = NULL; if (NULL == result || NULL == name || NULL == parent) { @@ -374,6 +400,8 @@ qpidBridgeMamaTransport_create (transportBridge* result, impl->mMsg = pn_message (); impl->mQpidDispatchStatus = MAMA_STATUS_OK; impl->mName = name; + impl->mKnownSources = wtable_create ("mKnownSources", + KNOWN_SOURCES_WTABLE_SIZE); mama_log (MAMA_LOG_LEVEL_FINE, "qpidBridgeMamaTransport_create(): Initializing Transport %s", @@ -390,14 +418,14 @@ qpidBridgeMamaTransport_create (transportBridge* result, /* Set the outgoing address */ impl->mOutgoingAddress = qpidBridgeMamaTransportImpl_getParameter ( - NULL, + DEFAULT_OUTGOING_URL, "%s.%s.%s", TPORT_PARAM_PREFIX, name, TPORT_PARAM_OUTGOING_URL); /* Set the reply address */ - impl->mReplyAddress = + tmpReply = qpidBridgeMamaTransportImpl_getParameter ( DEFAULT_REPLY_URL, "%s.%s.%s", @@ -405,6 +433,44 @@ qpidBridgeMamaTransport_create (transportBridge* result, name, TPORT_PARAM_REPLY_URL); + /* Expand the wildcards in the reply then populates mReplyAddress with a + * newly allocated result */ + qpidBridgeCommon_generateSubjectUri ( + tmpReply, + NULL, + NULL, + NULL, + qpidBridgeMamaTransportImpl_getUuid ((transportBridge) impl), + &impl->mReplyAddress); + + /* Set the transport type */ + tportType = + qpidBridgeMamaTransportImpl_getParameter ( + DEFAULT_TPORT_TYPE, + "%s.%s.%s", + TPORT_PARAM_PREFIX, + name, + TPORT_PARAM_TPORT_TYPE); + + if (0 == strcmp (tportType, CONFIG_VALUE_TPORT_TYPE_BROKER)) + { + impl->mQpidTransportType = QPID_TRANSPORT_TYPE_BROKER; + } + else if (0 == strcmp (tportType, CONFIG_VALUE_TPORT_TYPE_P2P)) + { + impl->mQpidTransportType = QPID_TRANSPORT_TYPE_P2P; + } + else + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "Could not parse %s.%s.%s=%s. Using [%s].", + TPORT_PARAM_PREFIX, + name, + TPORT_PARAM_TPORT_TYPE, + tportType, + DEFAULT_TPORT_TYPE); + impl->mQpidTransportType = DEFAULT_TPORT_TYPE_VALUE; + } /* Set the message pool size for pool initialization later */ poolSize = @@ -475,14 +541,18 @@ qpidBridgeMamaTransport_create (transportBridge* result, return MAMA_STATUS_PLATFORM; } - status = endpointPool_create (&impl->mPubEndpoints, "mPubEndpoints"); - if (MAMA_STATUS_OK != status) + /* Endpoints are only required if a broker is not being used */ + if (QPID_TRANSPORT_TYPE_P2P == impl->mQpidTransportType) { - mama_log (MAMA_LOG_LEVEL_ERROR, - "qpidBridgeMamaTransport_create(): " - "Failed to create publishing endpoints"); - free (impl); - return MAMA_STATUS_PLATFORM; + status = endpointPool_create (&impl->mPubEndpoints, "mPubEndpoints"); + if (MAMA_STATUS_OK != status) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "qpidBridgeMamaTransport_create(): " + "Failed to create publishing endpoints"); + free (impl); + return MAMA_STATUS_PLATFORM; + } } impl->mIsValid = 1; @@ -693,6 +763,51 @@ qpidBridgeMamaTransport_getNativeTransportNamingCtx (transportBridge transport, = Public implementation functions = =========================================================================*/ +/* Call this every time a known MAMA symbol namespace is created */ +void +qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace (transportBridge transport, + const char* symbolNamespace) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (NULL == impl) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace(): " + "transport NULL"); + return; + } + + wtable_insert (impl->mKnownSources, symbolNamespace, (void*) symbolNamespace); + + return; +} + +/* Check if this is a publisher for a namespace which we know about */ +mama_bool_t +qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace (transportBridge transport, + const char* symbolNamespace) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (NULL == impl) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace(): " + "transport NULL"); + return 0; + } + + if (NULL == wtable_lookup (impl->mKnownSources, symbolNamespace)) + { + return 0; + } + else + { + return 1; + } +} + qpidTransportBridge* qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport) { @@ -710,6 +825,74 @@ qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport) return impl; } +qpidTransportType +qpidBridgeMamaTransportImpl_getType (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (impl == NULL) + { + return DEFAULT_TPORT_TYPE_VALUE; + } + + return impl->mQpidTransportType; +} + +const char* +qpidBridgeMamaTransportImpl_getUuid (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + if (NULL == impl->mUuid) + { + wUuid tempUuid; + char uuidStringBuffer[UUID_STRING_BUF_SIZE]; + + wUuid_generate_time (tempUuid); + wUuid_unparse (tempUuid, uuidStringBuffer); + impl->mUuid = strdup (uuidStringBuffer); + } + return impl->mUuid; +} + + +const char* +qpidBridgeMamaTransportImpl_getOutgoingAddress (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (impl == NULL) + { + return NULL; + } + + return impl->mOutgoingAddress; +} + +const char* +qpidBridgeMamaTransportImpl_getIncomingAddress (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (impl == NULL) + { + return NULL; + } + + return impl->mIncomingAddress; +} + +const char* +qpidBridgeMamaTransportImpl_getReplyAddress (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (impl == NULL) + { + return NULL; + } + + return impl->mReplyAddress; +} /*========================================================================= = Private implementation functions = @@ -750,15 +933,37 @@ qpidBridgeMamaTransportImpl_start (qpidTransportBridge* impl) return MAMA_STATUS_PLATFORM; } - if (pn_messenger_subscribe (impl->mIncoming, - impl->mIncomingAddress) <= 0) + /* Blanket subscriptions are only required in p2p */ + if (QPID_TRANSPORT_TYPE_P2P == impl->mQpidTransportType) + { + if (pn_messenger_subscribe (impl->mIncoming, + impl->mIncomingAddress) <= 0) + { + mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): " + "Error Subscribing to %s : %s", + impl->mIncomingAddress, + PN_MESSENGER_ERROR(impl->mIncoming)); + PN_MESSENGER_STOP(impl->mIncoming); + return MAMA_STATUS_PLATFORM; + } + } + else if (QPID_TRANSPORT_TYPE_BROKER == impl->mQpidTransportType) { mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): " - "Error Subscribing to %s : %s", - impl->mIncomingAddress, + "Subscribing to %s : %s", + impl->mReplyAddress, PN_MESSENGER_ERROR(impl->mIncoming)); - PN_MESSENGER_STOP(impl->mIncoming); - return MAMA_STATUS_PLATFORM; + /* Subscribe to the URL advertised in reply_to */ + if (pn_messenger_subscribe (impl->mIncoming, + impl->mReplyAddress) <= 0) + { + mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): " + "Error Subscribing to %s : %s", + impl->mReplyAddress, + PN_MESSENGER_ERROR(impl->mIncoming)); + PN_MESSENGER_STOP(impl->mIncoming); + return MAMA_STATUS_PLATFORM; + } } } @@ -892,9 +1097,11 @@ qpidBridgeMamaTransportImpl_queueCallback (mamaQueue queue, void* closure) return; } + /* If this is a broker connection, we don't need to be selective at our + * layer */ if (0 == endpointPool_isRegistedByContent (impl->mSubEndpoints, - subject, - subscription)) + subject, + subscription)) { mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_queueCallback(): " @@ -1369,11 +1576,25 @@ void* qpidBridgeMamaTransportImpl_dispatchThread (void* closure) */ while (1 == impl->mIsDispatching) { - if (pn_messenger_recv (impl->mIncoming, impl->mQpidRecvBlockSize)) + int pnErr = 0; + + pnErr = pn_messenger_recv (impl->mIncoming, + impl->mQpidRecvBlockSize); + + + if (PN_TIMEOUT == pnErr) + { + continue; + } + + if (0 != pnErr) { mama_log (MAMA_LOG_LEVEL_ERROR, - "qpidBridgeMamaTransportImpl_dispatchThread():", - PN_MESSENGER_ERROR (impl->mIncoming)); + "qpidBridgeMamaTransportImpl_dispatchThread(): " + "Recv Error [%d] (%s) '%s'", + pnErr, + pn_code(pnErr), + PN_MESSENGER_ERROR(impl->mIncoming)); impl->mQpidDispatchStatus = MAMA_STATUS_PLATFORM; return NULL; } diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.h b/mama/c_cpp/src/c/bridge/qpid/transport.h index ebb6877..64504f6 100644 --- a/mama/c_cpp/src/c/bridge/qpid/transport.h +++ b/mama/c_cpp/src/c/bridge/qpid/transport.h @@ -42,7 +42,7 @@ extern "C" { * This is a simple convenience function to return a qpidTransportBridge * pointer based on the provided mamaTransport. * - * @param transport The mamaTransport to extract the bridge transport from. + * @param transport The mamaTransport to extract the bridge transport from. * * @return qpidTransportBridge* associated with the mamaTransport. */ @@ -50,6 +50,97 @@ qpidTransportBridge* qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport); /** + * This is a simple convenience function to return the transport's distinct + * uuid. + * + * @param transport The mamaTransport to extract the uuid from. + * + * @return const char* String representation of the UUID (memory still owned + * by the transport bridge. + */ +const char* +qpidBridgeMamaTransportImpl_getUuid (transportBridge transport); + +/** + * This is a simple convenience function to return the transport's qpid bridge + * type, + * + * @param transport The transportBridge to extract the uuid from. + * + * @return qpidTransportType releating to the type of the qpid transport bridge. + */ +qpidTransportType +qpidBridgeMamaTransportImpl_getType (transportBridge transport); + +/** + * This is a simple convenience function to return the transport's outgoing + * address (not format string expanded). + * + * @param transport The transportBridge to extract the outgoing address from. + * + * @return const char* String representation of the address (memory still owned + * by the transport bridge. + */ +const char* +qpidBridgeMamaTransportImpl_getOutgoingAddress (transportBridge transport); + +/** + * This is a simple convenience function to return the transport's incoming + * address (not format string expanded). + * + * @param transport The transportBridge to extract the incoming address from. + * + * @return const char* String representation of the address (memory still owned + * by the transport bridge. + */ +const char* +qpidBridgeMamaTransportImpl_getIncomingAddress (transportBridge transport); + +/** + * This is a simple convenience function to return the transport's reply + * address (not format string expanded). + * + * @param transport The transportBridge to extract the reply address from. + * + * @return const char* String representation of the address (memory still owned + * by the transport bridge. + */ +const char* +qpidBridgeMamaTransportImpl_getReplyAddress (transportBridge transport); + +/** + * This advises the transport bridge about a known MAMA symbol namespace. This + * allows flattened fully qualified topics (e.g. SOURCENAME.TOPIC) to be + * detected and re-expanded where necessary (i.e. so we can tell the difference + * between a MAMA basic subscription topic and a market data subscription for %S + * and %s expansion in the URI). + * + * @param transport The transportBridge to add a known MAMA symbol + * namespace to. + * @param symbolNamespace The known MAMA symbol namespace to add. + */ +void +qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace (transportBridge transport, + const char* symbolNamespace); + +/** + * This is a simple convenience function to return a qpidTransportBridge + * pointer based on the provided mamaTransport. + * + * @param transport The transportBridge to check for the provided MAMA + * symbol namespace. + * @param symbolNamespace The MAMA symbol namespace to check. + * + * @return mama_bool_t returning true if the provided symbolNamespace is a + * confirmed MAMA symbol namespace as added to the + * transport bridge using + * qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace. + */ +mama_bool_t +qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace (transportBridge transport, + const char* symbolNamespace); + +/** * This is purely a debug function to dump to screen a snapshot of the status * of the transport's message pool. * diff --git a/mama/c_cpp/src/examples/mama.properties b/mama/c_cpp/src/examples/mama.properties index d43824a..86c7b1c 100644 --- a/mama/c_cpp/src/examples/mama.properties +++ b/mama/c_cpp/src/examples/mama.properties @@ -44,6 +44,20 @@ mama.entitlement.porthigh=9550 # Example MAMA properties for QPID messaging middleware ################################################################################ +# For qpid transports, the following format strings are observed +# %r : Root (only for market data requests, otherwise blank). e.g. _MD +# %S : MAMA Source / Symbol Namespace. e.g. OPENMAMA +# %s : Symbol / Topic. e.g. MSFT +# %u : Transport uuid. e.g. 4542dc20-f1ae-11e3-ac10-0800200c9a66 +# Type of qpid proton transport +mama.qpid.transport.broker.type=broker +# This is where we send subscription requests and inbox requests +mama.qpid.transport.broker.outgoing_url=topic://127.0.0.1/MAMA/%r/%S/%s +# This is where we listen for incoming messages +mama.qpid.transport.broker.incoming_url=topic://127.0.0.1/MAMA/%r/%S/%s +# This is where we listen for replies during request / reply +mama.qpid.transport.broker.reply_url=topic://127.0.0.1/MAMA/%u + # Source which you are going to consume from mama.qpid.transport.pub.outgoing_url=amqp://127.0.0.1:6666 # Where qpid is going to listen to for data to be pushed to -- 1.9.3 |
|
Damian Maguire
Cheers for these Frank, looks good. As discussed off list, we should probably stick these into a feature branch for a bit while they undergo a bit of further testing - that way we can review and make additional fixes, while allowing anyone else who's interested a chance to pull down the changes and work away themselves. I know we'd mentioned resurrecting the feature-qpid branch, but I think a new one is probably more appropriate - maybe feature-qpid-broker or similar. Any chance you can raise a Bugzilla so we can track the changes there, and we'll see about getting them added. Thanks, Damian On Tue, Sep 16, 2014 at 9:51 PM, Frank Quinn <fquinn.ni@...> wrote:
|
|