Re: problem with mamaDictionary_getDictionaryMessage when multiple bridges are loaded
Damian Maguire
Cheers for this Tom, looks good. Standard practice to proceed with this one - firstly can you raise a Bugzilla ticket for tracking, and secondly can you provide some tests demonstrating the use of the new API? Ideally a set of unit tests which show the usage in various combinations would be ideal. Let me know if you have any other questions. Cheers, Damian
On Fri, Sep 5, 2014 at 10:26 AM, Tom Doust <tom.doust@...> wrote:
|
||||
|
||||
Re: [PATCH 2.3.1] MamaPublisher: Overloaded MamaPublisher create method
Damian Maguire
Cheers for the new patch Chad, looks good. At this stage we really need two things to progress getting this into OpenMAMA - firstly, can you raise a Bugzilla ticket, just to make it a bit easier to track the progress of the patch. Secondly, can you provide some evidence of testing? Generally we'd ask for unit tests, but the Java framework needs a bit of work, so in this case can you show a simple example application which demonstrates the usage of the new API? Thanks again for the contribution. Damian
On Mon, Sep 15, 2014 at 3:31 PM, Meyer, Chad J <chad.j.meyer@...> wrote:
|
||||
|
||||
Re: [PATCH 1/4] QPID: Added QPID Broker implementation
Damian Maguire
Cheers for these Frank, looks good. As discussed off list, we should probably stick these into a feature branch for a bit while they undergo a bit of further testing - that way we can review and make additional fixes, while allowing anyone else who's interested a chance to pull down the changes and work away themselves. I know we'd mentioned resurrecting the feature-qpid branch, but I think a new one is probably more appropriate - maybe feature-qpid-broker or similar. Any chance you can raise a Bugzilla so we can track the changes there, and we'll see about getting them added. Thanks, Damian
On Tue, Sep 16, 2014 at 9:51 PM, Frank Quinn <fquinn.ni@...> wrote:
|
||||
|
||||
Re: "C" Unit-Test Support on Visual Studio
Damian Maguire
Cheers for that Guy, this stuff looks pretty nice. We'll take a look at the changes and follow up on the Bugzilla ticket if we need any further information. Cheers, D
On Tue, Sep 16, 2014 at 5:40 PM, Guy <guy.tal@...> wrote: Hi,
|
||||
|
||||
Re: [PATCH 2.3.1 1/1] Common: variable expansion in property value on the last line of properties file fails
Damian Maguire
Cheers for that Guy, I'll take a look on the Bugzilla and get back to you there if I have any questions. Thanks, Damian
On Thu, Sep 11, 2014 at 12:06 PM, Guy <guy.tal@...> wrote:
|
||||
|
||||
Re: Enforcing field type on publish
Alpert, Reed <reed.alpert@...>
Hi,
I agree, the callback to a module that can do publish checking/etc is good.
I prototyped a checking module, and it works, although loading the dictionary on 1st publish has a small delay, and also required that the publish not be from a mama timer callback, since that deadlocked either on sending the subscription on the throttle queue, or getting the dict back.
We actually have more requirements for publish that are very specific to our env (adding system audit fields to all publishes to track when/where the data comes from). Having a callback we can engage would allow us to do this w/o any other users needing to have our stuff in the way. So loading our module at runtime and calling it would work quite well.
Thanks,
Reed.
Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198 | M: 917.414.4613 | reed.alpert@...
Alternate Contact: CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@...
From: Glenn McClements [mailto:gmcclements@...]
Sent: Thursday, September 11, 2014 5:20 AM To: Alpert, Reed; Benjamin Taieb; Frank Quinn; fquinn.ni@... Cc: Openmama-dev@... Subject: Re: [Openmama-dev] Enforcing field type on publish
Hi Alpert, I like #2 more, but I still want to keep the more data model/market data/custom logic away from the core messaging part of OpenMAMA.
What I’m thinking of is a hook/callback to a programmer defined function that would take the publisher, transport, message, a user defined closure etc that would return a status to indicate if the message should be published or not. The dictionary etc could be obtained at initialisation time through a separate init function.
This dovetails with some of the other pieces of work going on:
I still need to work though this a bit more but let me know what you think.
Glenn
From:
<Alpert>, Reed <reed.alpert@...>
Hi,
Yes I agree with Ben’s comments. Here are some caveats: 1. This is a feature that is requested by our market data infrastructure team to prevent app dev teams from accidentally publishing incorrect field types. We don’t believe that a voluntary program will work, which is why the FieldCache as a location is not preferred. Apps won’t use the FieldCache if they don’t need it, or are using an Excel plugin for publishing (quite common). 2. This is a configured feature, and only for publication. 3. This is not for subscription, that is, a subscriber can use any of the getXX() methods that work.
There seems to be general agreement that the OM layer is much better place for this feature than bridges themselves.
The two best locations seem to be: 1. In mamaMsg, in the addXX and updateXX methods, probably with a macro like CHECK_MODIFY() that calls out to another module to do the work. a. This is good since it keep the message construction in mamaMsg module. b. This is good since it give the programmer the quickest feedback on an error condition. c. This is bad since it puts some performance hit on construction of every message, even on subscribe. d. This is bad since it makes the code active for creating subscribe messages too. e. This is bad since it puts more code in more places for this feature. 2. In publisher.send() that checks a config var and then calls out to another module to do the work. a. This is good since it has minimal code changes, and minimal performance effect. b. This is good since it only affects publish messages. c. This is bad since it gives delayed failure, and there may be multiple fields that fail. i. Probably logging each failed field and returning an error code will work well to help developers fix problems.
I think the publisher.send() method is the best place, given the above.
But figuring out which dictionary to use for the checking seems to be the most difficult part. Adding this feature as a config option may require also adding some optional config info about how to get a dictionary for a specific bridge, e.g., the source name.
Once the dictionary loading/mapping is solved then the rest seems fairly straightforward, except for the determination of allowed/denied conversions as configurable policy, but starting with 3 simple ones seems best: 1. No checking (default) 2. Type must match dictionary. 3. Type must match dictionary, with a small number of allowed conversions that do not incur data loss (e.g., F32 -> F64).
Thanks,
Reed. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired. This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.
|
||||
|
||||
[PATCH 4/4] QPIDMSG: Fixed issue with reallocating vector fields
Frank Quinn <fquinn.ni@...>
This patch fixes an issue where a realloc occurs for member types which the qpid bridge will typically attempt to re-use rather than create / destroy every time. This is really just MAMA Message at this time but the same would apply to date time and price if they were implemented. Without this change, if the newly appended memory happens to be non-NULL, the qpid bridge will likely crash. Signed-off-by: Frank Quinn <fquinn@...> --- mama/c_cpp/src/c/payload/qpidmsg/payload.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mama/c_cpp/src/c/payload/qpidmsg/payload.c b/mama/c_cpp/src/c/payload/qpidmsg/payload.c index 4bf8d9f..5699edb 100644 --- a/mama/c_cpp/src/c/payload/qpidmsg/payload.c +++ b/mama/c_cpp/src/c/payload/qpidmsg/payload.c @@ -3262,6 +3262,9 @@ qpidmsgPayloadImpl_allocateBufferMemory (void** buffer, } else { + /* set newly added bytes to 0 */ + memset ((uint8_t*) newbuf + *size, 0, newSize - *size); + *buffer = newbuf; *size = newSize; return MAMA_STATUS_OK; -- 1.9.3
|
||||
|
||||
[PATCH 3/4] QPID: Fixed some memory leaks and removed old backwards compatibility
Frank Quinn <fquinn.ni@...>
This change means that the bridge is only expected to work with version 0.6 and above of proton (ideally 0.7). It also fixed several memory leaks and simplifies many areas of code which were required to maintain backwards compatability - particularly with respect to shutdown code. Signed-off-by: Frank Quinn <fquinn.ni@...> --- mama/c_cpp/src/c/bridge/qpid/SConscript | 6 ++--- mama/c_cpp/src/c/bridge/qpid/publisher.c | 3 ++- mama/c_cpp/src/c/bridge/qpid/qpidcommon.c | 4 +-- mama/c_cpp/src/c/bridge/qpid/qpidcommon.h | 4 +-- mama/c_cpp/src/c/bridge/qpid/qpiddefs.h | 35 ++---------------------- mama/c_cpp/src/c/bridge/qpid/subscription.c | 7 ++--- mama/c_cpp/src/c/bridge/qpid/transport.c | 41 +++++------------------------ 7 files changed, 21 insertions(+), 79 deletions(-) diff --git a/mama/c_cpp/src/c/bridge/qpid/SConscript b/mama/c_cpp/src/c/bridge/qpid/SConscript index 566867c..6ec5396 100644 --- a/mama/c_cpp/src/c/bridge/qpid/SConscript +++ b/mama/c_cpp/src/c/bridge/qpid/SConscript @@ -36,9 +36,9 @@ if not env.GetOption('clean'): if not conf.CheckCHeader('proton/parser.h'): print '+- could not find parser.h in ${QPID_HOME}/include/proton' Exit(1) - if conf.CheckCHeader('proton/version.h'): - print '+- proton release appears to include ${QPID_HOME}/include/proton/version.h' - env.Append(CCFLAGS=['-DHAVE_QPID_PROTON_VERSION_H']) + if not conf.CheckCHeader('proton/version.h'): + print '+- proton release does not have ${QPID_HOME}/include/proton/version.h - too old!' + Exit(1) env = conf.Finish() diff --git a/mama/c_cpp/src/c/bridge/qpid/publisher.c b/mama/c_cpp/src/c/bridge/qpid/publisher.c index 98a9a1a..5f1c206 100644 --- a/mama/c_cpp/src/c/bridge/qpid/publisher.c +++ b/mama/c_cpp/src/c/bridge/qpid/publisher.c @@ -340,7 +340,8 @@ qpidBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg) } /* Note the messages don't actually get published until here */ - if (PN_MESSENGER_SEND(impl->mTransport->mOutgoing)) + if (pn_messenger_send(impl->mTransport->mOutgoing, + QPID_MESSENGER_SEND_TIMEOUT)) { qpidError = PN_MESSENGER_ERROR (impl->mTransport->mOutgoing); mama_log (MAMA_LOG_LEVEL_SEVERE, "qpidBridgeMamaPublisher_send(): " diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c index f55f63a..9709bad 100644 --- a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c +++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c @@ -36,7 +36,7 @@ =========================================================================*/ mama_status -qpidBridgeCommon_parseSubjectKey (char* key, +qpidBridgeCommon_parseSubjectKey (const char* key, const char** root, const char** source, const char** topic, @@ -137,7 +137,7 @@ mama_status qpidBridgeCommon_generateSubjectKey (const char* root, const char* source, const char* topic, - char** keyTarget) + const char** keyTarget) { char subject[MAX_SUBJECT_LENGTH]; char* subjectPos = subject; diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h index 6cb70a7..a18c6f2 100644 --- a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h +++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h @@ -77,7 +77,7 @@ mama_status qpidBridgeCommon_generateSubjectKey (const char* root, const char* source, const char* topic, - char** keyTarget); + const char** keyTarget); /** * This function will take the provided format string and use the root, source, @@ -138,7 +138,7 @@ qpidBridgeCommon_generateSubjectUri (const char* format, * @return mama_status indicating whether the method succeeded or failed. */ mama_status -qpidBridgeCommon_parseSubjectKey (char* key, +qpidBridgeCommon_parseSubjectKey (const char* key, const char** root, const char** source, const char** topic, diff --git a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h index 3e3b2e5..2219908 100644 --- a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h +++ b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h @@ -32,6 +32,7 @@ #include <list.h> /* Qpid include files */ +#include <proton/version.h> #include <proton/driver.h> #include <proton/message.h> #include <proton/util.h> @@ -39,12 +40,6 @@ #include "endpointpool.h" -/* If this version of proton has provided a version header file (build system - * to provide this macro) */ -#ifdef HAVE_QPID_PROTON_VERSION_H -#include <proton/version.h> -#endif - #if defined(__cplusplus) extern "C" { #endif @@ -79,34 +74,8 @@ typedef enum qpidTransportType_ QPID_TRANSPORT_TYPE_BROKER } qpidTransportType; -/* If a proton version header has been parsed at this point (version >= 0.5) */ -#ifdef _PROTON_VERSION_H - -#if (PN_VERSION_MAJOR > 0 || PN_VERSION_MINOR > 4) -#define PN_MESSENGER_SEND(messenger) \ - pn_messenger_send(messenger, QPID_MESSENGER_SEND_TIMEOUT) #define PN_MESSENGER_ERROR(messenger) \ pn_error_text(pn_messenger_error(messenger)) -#define PN_MESSENGER_STOP(messenger) \ - qpidBridgeMamaTransportImpl_stopProtonMessenger (messenger) -#define PN_MESSENGER_FREE(messenger) \ - qpidBridgeMamaTransportImpl_freeProtonMessenger (messenger) -#endif - -/* Place other version specific macros here */ - -#else - -/* Earliest supported version is 0.4 - header was added in 0.5 */ -#define PN_VERSION_MAJOR 0 -#define PN_VERSION_MINOR 4 - -#define PN_MESSENGER_SEND(messenger) pn_messenger_send(messenger) -#define PN_MESSENGER_ERROR(messenger) pn_messenger_error(messenger) -#define PN_MESSENGER_STOP(messenger) /* disabled as it deadlocks in this ver */ -#define PN_MESSENGER_FREE(messenger) /* disabled as it deadlocks in this ver */ - -#endif /* _PROTON_VERSION_H */ /* Keys for application property map */ #define QPID_KEY_MSGTYPE "MAMAT" @@ -131,7 +100,7 @@ typedef struct qpidSubscription_ const char* mSource; const char* mTopic; const char* mRoot; - char* mSubject; + const char* mSubject; const char* mUri; void* mClosure; int mIsNotMuted; diff --git a/mama/c_cpp/src/c/bridge/qpid/subscription.c b/mama/c_cpp/src/c/bridge/qpid/subscription.c index 2582a5e..d6d2dae 100644 --- a/mama/c_cpp/src/c/bridge/qpid/subscription.c +++ b/mama/c_cpp/src/c/bridge/qpid/subscription.c @@ -150,14 +150,15 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, /* Add in the subject key as the only string inside */ pn_data_put_string (data, pn_bytes (strlen (impl->mSubject), - impl->mSubject)); + (char*) impl->mSubject)); /* Send out the subscription registration of interest message */ if (NULL != transport->mOutgoingAddress) { pn_messenger_put (transport->mOutgoing, transport->mMsg); - if (0 != PN_MESSENGER_SEND (transport->mOutgoing)) + if (0 != pn_messenger_send (transport->mOutgoing, + QPID_MESSENGER_SEND_TIMEOUT)) { const char* qpid_error = PN_MESSENGER_ERROR (transport->mOutgoing); mama_log (MAMA_LOG_LEVEL_SEVERE, @@ -257,7 +258,7 @@ qpidBridgeMamaSubscription_destroy (subscriptionBridge subscriber) if (NULL != impl->mSubject) { - free (impl->mSubject); + free ((void*)impl->mSubject); } if (NULL != impl->mRoot) diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.c b/mama/c_cpp/src/c/bridge/qpid/transport.c index 2b2053b..ab9755c 100644 --- a/mama/c_cpp/src/c/bridge/qpid/transport.c +++ b/mama/c_cpp/src/c/bridge/qpid/transport.c @@ -260,9 +260,6 @@ qpidBridgeMamaTransportImpl_releasePoolMsg (qpidMsgPool* pool, static void* qpidBridgeMamaTransportImpl_dispatchThread (void* closure); -/* These functions depend on methods only introduced in qpid proton 0.5 */ -#if (PN_VERSION_MAJOR > 0 || PN_VERSION_MINOR > 4) - /** * This function is a wrapper for pn_messenger_stop as it caused deadlock * before qpid proton 0.5 and pn_messenger_interrupt as it was not introduced @@ -282,8 +279,6 @@ qpidBridgeMamaTransportImpl_stopProtonMessenger (pn_messenger_t* messenger); static void qpidBridgeMamaTransportImpl_freeProtonMessenger (pn_messenger_t* messenger); -#endif - /*========================================================================= = Public interface implementation functions = =========================================================================*/ @@ -348,8 +343,8 @@ qpidBridgeMamaTransport_destroy (transportBridge transport) pn_message_free(impl->mMsg); /* Macro wrapped as these caused deadlock prior to v0.5 of qpid proton */ - PN_MESSENGER_FREE(impl->mIncoming); - PN_MESSENGER_FREE(impl->mOutgoing); + qpidBridgeMamaTransportImpl_freeProtonMessenger (impl->mIncoming); + qpidBridgeMamaTransportImpl_freeProtonMessenger (impl->mOutgoing); endpointPool_destroy (impl->mSubEndpoints); endpointPool_destroy (impl->mPubEndpoints); @@ -943,7 +938,7 @@ qpidBridgeMamaTransportImpl_start (qpidTransportBridge* impl) "Error Subscribing to %s : %s", impl->mIncomingAddress, PN_MESSENGER_ERROR(impl->mIncoming)); - PN_MESSENGER_STOP(impl->mIncoming); + (impl->mIncoming); return MAMA_STATUS_PLATFORM; } } @@ -961,7 +956,7 @@ qpidBridgeMamaTransportImpl_start (qpidTransportBridge* impl) "Error Subscribing to %s : %s", impl->mReplyAddress, PN_MESSENGER_ERROR(impl->mIncoming)); - PN_MESSENGER_STOP(impl->mIncoming); + qpidBridgeMamaTransportImpl_stopProtonMessenger (impl->mIncoming); return MAMA_STATUS_PLATFORM; } } @@ -994,21 +989,6 @@ mama_status qpidBridgeMamaTransportImpl_stop (qpidTransportBridge* impl) */ mama_status status = MAMA_STATUS_OK; - /* - * Ask the messenger nicely to stop by sending a special subject (best we - * can do prior to qpid 0.5 when recv was not interruptable). Known to - * deadlock pn_messenger_recv if the recv block = 1. - */ - pn_message_t* msg = pn_message (); - - /* Set the byte to indicate this is a termination message */ - qpidBridgePublisherImpl_setMessageType (msg, QPID_MSG_TERMINATE); - - /* Create the messenger for publishing out this desist message */ - pn_message_set_address (msg, impl->mReplyAddress); - pn_messenger_put (impl->mOutgoing, msg); - PN_MESSENGER_SEND (impl->mOutgoing); - /* Set the transportBridge mIsDispatching to false */ impl->mIsDispatching = 0; @@ -1020,17 +1000,11 @@ mama_status qpidBridgeMamaTransportImpl_stop (qpidTransportBridge* impl) mama_log (MAMA_LOG_LEVEL_FINE, "qpidBridgeMamaTransportImpl_stop(): " "Stopping the outgoing messenger."); - PN_MESSENGER_STOP(impl->mOutgoing); + qpidBridgeMamaTransportImpl_stopProtonMessenger (impl->mOutgoing); mama_log (MAMA_LOG_LEVEL_FINE, "qpidBridgeMamaTransportImpl_stop(): " "Stopping the incoming messenger."); - PN_MESSENGER_STOP(impl->mIncoming); - - /* Destroy the temporarily created proton message */ - if (NULL != msg) - { - pn_message_free (msg); - } + qpidBridgeMamaTransportImpl_stopProtonMessenger (impl->mIncoming); mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaTransportImpl_stop(): " "Rejoined with status: %s.", @@ -1806,7 +1780,6 @@ void* qpidBridgeMamaTransportImpl_dispatchThread (void* closure) } /* These functions depend on methods only introduced in qpid proton 0.5 */ -#if (PN_VERSION_MAJOR > 0 || PN_VERSION_MINOR > 4) void qpidBridgeMamaTransportImpl_stopProtonMessenger (pn_messenger_t* messenger) { mama_log (MAMA_LOG_LEVEL_FINE, @@ -1815,7 +1788,6 @@ void qpidBridgeMamaTransportImpl_stopProtonMessenger (pn_messenger_t* messenger) messenger, pn_messenger_name(messenger)); - pn_messenger_interrupt (messenger); pn_messenger_stop (messenger); } @@ -1823,4 +1795,3 @@ void qpidBridgeMamaTransportImpl_freeProtonMessenger (pn_messenger_t* messenger) { pn_messenger_free (messenger); } -#endif -- 1.9.3
|
||||
|
||||
[PATCH 2/4] QPID: Added back -Werror to scons scripts
Frank Quinn <fquinn.ni@...>
We are no longer going to support proton version 0.5 which contained compiler warnings within the header (see https://issues.apache.org/jira/browse/PROTON-420) so we can re-enable -Werror in the compiler to treate all warnings as errors. Signed-off-by: Frank Quinn <fquinn.ni@...> --- mama/c_cpp/src/c/bridge/qpid/SConscript | 5 +---- mama/c_cpp/src/c/payload/qpidmsg/SConscript | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/mama/c_cpp/src/c/bridge/qpid/SConscript b/mama/c_cpp/src/c/bridge/qpid/SConscript index 006b3f4..566867c 100644 --- a/mama/c_cpp/src/c/bridge/qpid/SConscript +++ b/mama/c_cpp/src/c/bridge/qpid/SConscript @@ -24,10 +24,7 @@ else: env.Append(LIBS=['qpid-proton', 'mama', 'm', 'wombatcommon', 'uuid', 'event'], LIBPATH=libPath, CPPPATH=incPath) -''' Generally we would like to make use of -Werror, but unfortunately it causes - proton to choke during the build phase. Leaving the option here should we - find the proton issues resolved.''' -#env.Append(CFLAGS=['-Werror']) +env.Append(CFLAGS=['-Werror']) conf = Configure(env, config_h='./config.h', log_file='./config.log') diff --git a/mama/c_cpp/src/c/payload/qpidmsg/SConscript b/mama/c_cpp/src/c/payload/qpidmsg/SConscript index 77275a3..76e06e5 100644 --- a/mama/c_cpp/src/c/payload/qpidmsg/SConscript +++ b/mama/c_cpp/src/c/payload/qpidmsg/SConscript @@ -20,10 +20,7 @@ env['CCFLAGS'] = [x for x in env['CCFLAGS'] if x != '-pedantic-errors'] env.Append(LIBS=['mama', 'qpid-proton', 'm'], LIBPATH=libPath, CPPPATH=[includePath]) -''' Generally we would like to make use of -Werror, but unfortunately it causes - proton to choke during the build phase. Leaving the option here should we - find the proton issues resolved.''' -#env.Append(CFLAGS=['-Werror']) +env.Append(CFLAGS=['-Werror']) sources = Glob('*.c') -- 1.9.3
|
||||
|
||||
[PATCH 1/4] QPID: Added QPID Broker implementation
Frank Quinn <fquinn.ni@...>
Broker support has now been added to the qpid proton bridge for OpenMAMA and example configuration to use it has been added to mama.properties. Note that to make use of this functionality (using qpidd as an example), you will need to allow topic patterns matching your provided URI. For example, for the following example configuration, you could run qpid with the following command to ensure all these URI topics can be created: qpidd --topic-patterns MAMA.* Example configuration is then as follows: mama.qpid.transport.qpidbroker.type=broker mama.qpid.transport.qpidbroker.outgoing_url=topic://127.0.0.1/MAMA/%r/%S/%s mama.qpid.transport.qpidbroker.incoming_url=topic://127.0.0.1/MAMA/%r/%S/%s mama.qpid.transport.qpidbroker.reply_url=topic://127.0.0.1/MAMA/%u Signed-off-by: Frank Quinn <fquinn.ni@...> --- mama/c_cpp/src/c/bridge/qpid/Makefile.am | 1 + mama/c_cpp/src/c/bridge/qpid/publisher.c | 177 ++++++++-------- mama/c_cpp/src/c/bridge/qpid/qpidcommon.c | 304 ++++++++++++++++++++++++++++ mama/c_cpp/src/c/bridge/qpid/qpidcommon.h | 151 ++++++++++++++ mama/c_cpp/src/c/bridge/qpid/qpiddefs.h | 21 +- mama/c_cpp/src/c/bridge/qpid/subscription.c | 211 +++++++++---------- mama/c_cpp/src/c/bridge/qpid/subscription.h | 23 --- mama/c_cpp/src/c/bridge/qpid/transport.c | 261 ++++++++++++++++++++++-- mama/c_cpp/src/c/bridge/qpid/transport.h | 93 ++++++++- mama/c_cpp/src/examples/mama.properties | 14 ++ 10 files changed, 1006 insertions(+), 250 deletions(-) create mode 100644 mama/c_cpp/src/c/bridge/qpid/qpidcommon.c create mode 100644 mama/c_cpp/src/c/bridge/qpid/qpidcommon.h diff --git a/mama/c_cpp/src/c/bridge/qpid/Makefile.am b/mama/c_cpp/src/c/bridge/qpid/Makefile.am index 53fe5cd..3995816 100644 --- a/mama/c_cpp/src/c/bridge/qpid/Makefile.am +++ b/mama/c_cpp/src/c/bridge/qpid/Makefile.am @@ -54,6 +54,7 @@ libmamaqpidimpl_la_SOURCES = \ timer.c \ inbox.c \ codec.c \ + qpidcommon.c \ endpointpool.c diff --git a/mama/c_cpp/src/c/bridge/qpid/publisher.c b/mama/c_cpp/src/c/bridge/qpid/publisher.c index 4d32475..98a9a1a 100644 --- a/mama/c_cpp/src/c/bridge/qpid/publisher.c +++ b/mama/c_cpp/src/c/bridge/qpid/publisher.c @@ -37,9 +37,9 @@ #include "msg.h" #include "codec.h" #include "inbox.h" -#include "subscription.h" #include "publisher.h" #include "endpointpool.h" +#include "qpidcommon.h" /*========================================================================= = Typedefs, structs, enums and globals = @@ -52,6 +52,7 @@ typedef struct qpidPublisherBridge const char* mSource; const char* mRoot; const char* mSubject; + const char* mUri; pn_message_t* mQpidRawMsg; msgBridge mMamaBridgeMsg; } qpidPublisherBridge; @@ -76,20 +77,6 @@ qpidBridgePublisherImpl_enqueueMessageForAddress (mamaMsg msg, const char* url, qpidPublisherBridge* impl); -/** - * When a qpid publisher is created, it calls this function to generate a - * standard subject key using qpidBridgeMamaSubscriptionImpl_generateSubjectKey - * with different parameters depending on if it's a market data publisher, - * basic publisher or a data dictionary publisher. - * - * @param msg The MAMA message to enqueue for sending. - * @param url The URL to eneueue the message for sending to. - * @param impl The related qpid publisher bridge. - * - * @return mama_status indicating whether the method succeeded or failed. - */ -static mama_status -qpidBridgeMamaPublisherImpl_buildSendSubject (qpidPublisherBridge* impl); /*========================================================================= = Public interface implementation functions = @@ -105,9 +92,11 @@ qpidBridgeMamaPublisher_createByIndex (publisherBridge* result, void* nativeQueueHandle, mamaPublisher parent) { - qpidPublisherBridge* impl = NULL; - qpidTransportBridge* transport = NULL; - mama_status status = MAMA_STATUS_OK; + qpidPublisherBridge* impl = NULL; + qpidTransportBridge* transport = NULL; + mama_status status = MAMA_STATUS_OK; + const char* outgoingAddress = NULL; + const char* uuid = NULL; if (NULL == result || NULL == tport @@ -149,23 +138,50 @@ qpidBridgeMamaPublisher_createByIndex (publisherBridge* result, return MAMA_STATUS_NOMEM; } - if (NULL != topic) - { - impl->mTopic = topic; - } + /* Get the outgoing address format string */ + outgoingAddress = qpidBridgeMamaTransportImpl_getOutgoingAddress ( + (transportBridge) impl->mTransport); - if (NULL != source) + /* Get the transport UUID which is unique to the transport in case we need + * it during format string expansion */ + uuid = qpidBridgeMamaTransportImpl_getUuid ( + (transportBridge) impl->mTransport); + + /* Collapse subject key to single string based on supplied values + * + * _MD requests do not use the topic on the wire as the responder may not + * necessarily be listening for requests on that topic until the first + * request comes in. */ + if (NULL != root && 0 == strcmp (root, MAMA_ROOT_MARKET_DATA)) { - impl->mSource = source; + qpidBridgeCommon_generateSubjectKey (root, + source, + NULL, + &impl->mSubject); } - - if (NULL != root) + else { - impl->mRoot = root; + qpidBridgeCommon_generateSubjectKey (root, + source, + topic, + &impl->mSubject); } - /* Generate a topic name based on the publisher details */ - status = qpidBridgeMamaPublisherImpl_buildSendSubject (impl); + /* Parse the collapsed string to extract the standardized values */ + qpidBridgeCommon_parseSubjectKey (impl->mSubject, + &impl->mRoot, + &impl->mSource, + &impl->mTopic, + (transportBridge) impl->mTransport); + + + /* Generate subject URI based on standardized values */ + qpidBridgeCommon_generateSubjectUri (outgoingAddress, + impl->mRoot, + impl->mSource, + impl->mTopic, + uuid, + &impl->mUri); /* Create a reusable proton message */ impl->mQpidRawMsg = pn_message (); @@ -212,6 +228,22 @@ qpidBridgeMamaPublisher_destroy (publisherBridge publisher) { free ((void*) impl->mSubject); } + if (NULL != impl->mUri) + { + free ((void*) impl->mUri); + } + if (NULL != impl->mRoot) + { + free ((void*) impl->mRoot); + } + if (NULL != impl->mSource) + { + free ((void*) impl->mSource); + } + if (NULL != impl->mTopic) + { + free ((void*) impl->mTopic); + } if (NULL != impl->mMamaBridgeMsg) { qpidBridgeMamaMsg_destroy (impl->mMamaBridgeMsg, 0); @@ -258,7 +290,7 @@ qpidBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg) /* Use the publisher's default send destination for request */ qpidBridgePublisherImpl_enqueueMessageForAddress ( msg, - (char*) impl->mTransport->mOutgoingAddress, + (char*) impl->mUri, impl); break; case QPID_MSG_INBOX_RESPONSE: @@ -274,25 +306,35 @@ qpidBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg) impl->mSubject, impl->mSource); - /* For each known downstream destination */ - status = endpointPool_getRegistered (impl->mTransport->mPubEndpoints, - impl->mSubject, - &targets, - &targetCount); - - if (targetCount == 0) + if (QPID_TRANSPORT_TYPE_P2P == + qpidBridgeMamaTransportImpl_getType ( + (transportBridge) impl->mTransport)) { - mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaPublisher_send(): " - "No one subscribed to subject '%s', not publishing.", - impl->mSubject); - return MAMA_STATUS_OK; + /* For each known downstream destination */ + status = endpointPool_getRegistered (impl->mTransport->mPubEndpoints, + impl->mSubject, + &targets, + &targetCount); + + if (targetCount == 0) + { + mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaPublisher_send(): " + "No one subscribed to subject '%s', not publishing.", + impl->mSubject); + return MAMA_STATUS_OK; + } + + /* Push the message out to the send queue for each interested party */ + for (targetInc = 0; targetInc < targetCount; targetInc++) + { + url = (char*) targets[targetInc]; + qpidBridgePublisherImpl_enqueueMessageForAddress (msg, url, impl); + } } - - /* Push the message out to the send queue for each interested party */ - for (targetInc = 0; targetInc < targetCount; targetInc++) + else { - url = (char*) targets[targetInc]; - qpidBridgePublisherImpl_enqueueMessageForAddress (msg, url, impl); + qpidBridgePublisherImpl_enqueueMessageForAddress (msg, impl->mUri, + impl); } break; } @@ -643,46 +685,3 @@ qpidBridgePublisherImpl_enqueueMessageForAddress (mamaMsg msg, } } - -mama_status -qpidBridgeMamaPublisherImpl_buildSendSubject (qpidPublisherBridge* impl) -{ - char* keyTarget = NULL; - - /* If this is a special _MD publisher, lose the topic unless dictionary */ - if (impl->mRoot != NULL) - { - /* - * May use strlen here to increase speed but would need to test to - * verify this is the only circumstance in which we want to consider the - * topic when a root is specified. - */ - if (strcmp (impl->mRoot, "_MDDD") == 0) - { - qpidBridgeMamaSubscriptionImpl_generateSubjectKey (impl->mRoot, - impl->mSource, - impl->mTopic, - &keyTarget); - } - else - { - qpidBridgeMamaSubscriptionImpl_generateSubjectKey (impl->mRoot, - impl->mSource, - NULL, - &keyTarget); - } - } - /* If this isn't a special _MD publisher */ - else - { - qpidBridgeMamaSubscriptionImpl_generateSubjectKey (NULL, - impl->mSource, - impl->mTopic, - &keyTarget); - } - - /* Set the subject for publishing here */ - impl->mSubject = keyTarget; - - return MAMA_STATUS_OK; -} diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c new file mode 100644 index 0000000..f55f63a --- /dev/null +++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c @@ -0,0 +1,304 @@ +/* $Id$ + * + * OpenMAMA: The open middleware agnostic messaging API + * Copyright (C) 2011 NYSE Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +/*========================================================================= + = Includes = + =========================================================================*/ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include "qpiddefs.h" +#include "qpidcommon.h" +#include "transport.h" + +/*========================================================================= + = Public implementation functions = + =========================================================================*/ + +mama_status +qpidBridgeCommon_parseSubjectKey (char* key, + const char** root, + const char** source, + const char** topic, + transportBridge transport) +{ + /* We can assume subject key is of this length */ + char subject[MAX_SUBJECT_LENGTH]; + + char* subjectPosition = subject; + + /* Reset return values */ + *root = NULL; + *source = NULL; + *topic = NULL; + + if (NULL == key || 0 == strlen(key)) + { + return MAMA_STATUS_NULL_ARG; + } + + /* Copy the key across to writable memory */ + strcpy (subject, key); + + subjectPosition = strtok (subjectPosition, "."); + + if (0 == strcmp (MAMA_ROOT_MARKET_DATA, subjectPosition) || + 0 == strcmp (MAMA_ROOT_MARKET_DATA_DICT, subjectPosition)) + { + *root = strdup (subjectPosition); + + /* This should NULL terminate the end of the source string */ + subjectPosition = strtok (NULL, "."); + if (NULL != subjectPosition) + { + uint32_t parsedBytes = 0; + + *source = strdup (subjectPosition); + + qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace ( + transport, + *source); + + /* String lengths plus NULL characters */ + parsedBytes = strlen (*root) + 1 + strlen (*source) + 1; + + /* Only parse topic if it exists... */ + if (parsedBytes < strlen (key)) + { + /* The topic is then whatever is left after source NULL character */ + *topic = strdup (subject + parsedBytes); + } + } + } + /* Might have a symbol namespace... might not... */ + else + { + char* firstSubjectPart = subjectPosition; + + /* If a MAMA symbol namespace is involved, there should always be + * another part */ + char* secondSubjectPart = strtok (NULL, "."); + + /* If there is no second part, this looks like a basic topic. Also if + * this is a symbol namespace we have never seen before, assume it's + * basic. */ + if (NULL == secondSubjectPart) + { + *topic = strdup (firstSubjectPart); + } + /* There is a second part, but we are unconvinced unless the transport + * has seen this source somewhere before */ + else if (qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace ( + transport, + firstSubjectPart)) + { + *source = strdup (firstSubjectPart); + /* Use offsets from provided strings rather than the one strtok + * may have just modified with NULL characters for the topic */ + *topic = strdup (key + strlen (firstSubjectPart) + 1); + } + /* If this is a subject without a symbol namespace, just use the + * original string rather than try and revert strtok's trail of + * destruction */ + else + { + *topic = strdup (key); + } + } + return MAMA_STATUS_OK; +} + + +/* + * Internal function to ensure that the topic names are always calculated + * in a particular way + */ +mama_status +qpidBridgeCommon_generateSubjectKey (const char* root, + const char* source, + const char* topic, + char** keyTarget) +{ + char subject[MAX_SUBJECT_LENGTH]; + char* subjectPos = subject; + size_t bytesRemaining = MAX_SUBJECT_LENGTH; + size_t written = 0; + + if (NULL != root) + { + mama_log (MAMA_LOG_LEVEL_FINEST, + "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): R."); + written = snprintf (subjectPos, bytesRemaining, "%s", root); + subjectPos += written; + bytesRemaining -= written; + } + + if (NULL != source) + { + mama_log (MAMA_LOG_LEVEL_FINEST, + "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): S."); + /* If these are not the first bytes, prepend with a period */ + if(subjectPos != subject) + { + written = snprintf (subjectPos, bytesRemaining, ".%s", source); + } + else + { + written = snprintf (subjectPos, bytesRemaining, "%s", source); + } + subjectPos += written; + bytesRemaining -= written; + } + + if (NULL != topic) + { + mama_log (MAMA_LOG_LEVEL_FINEST, + "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): T."); + /* If these are not the first bytes, prepend with a period */ + if (subjectPos != subject) + { + snprintf (subjectPos, bytesRemaining, ".%s", topic); + } + else + { + snprintf (subjectPos, bytesRemaining, "%s", topic); + } + } + + /* + * Allocate the memory for copying the string. Caller is responsible for + * destroying. + */ + *keyTarget = strdup (subject); + if (NULL == *keyTarget) + { + return MAMA_STATUS_NOMEM; + } + else + { + return MAMA_STATUS_OK; + } +} + +/* + * Internal function to ensure that the topic names are always calculated + * in a particular way + */ +mama_status +qpidBridgeCommon_generateSubjectUri (const char* format, + const char* root, + const char* source, + const char* topic, + const char* uuid, + const char** uriTarget) +{ + char original[MAX_URI_LENGTH]; + char uri[MAX_URI_LENGTH]; + char lastByte; + char* uriPos = uri; + char* nextSeg = NULL; + size_t bytesRemaining = MAX_URI_LENGTH; + size_t written = 0; + + strncpy (original, format, MAX_URI_LENGTH); + + nextSeg = strtok ((char*) original, "%"); + + while (nextSeg != NULL) + { + written = snprintf (uriPos, bytesRemaining, "%s", nextSeg); + bytesRemaining -= written; + uriPos += written; + + nextSeg = strtok (NULL, "%"); + if (nextSeg != NULL) + { + switch (*nextSeg) + { + case 'r': + if (NULL != root) + { + written = snprintf (uriPos, bytesRemaining, "%s", root); + bytesRemaining -= written; + uriPos += written; + } + nextSeg++; + break; + case 'S': + if (NULL != source) + { + written = snprintf (uriPos, bytesRemaining, "%s", source); + bytesRemaining -= written; + uriPos += written; + } + nextSeg++; + break; + case 's': + if (NULL != topic) + { + written = snprintf (uriPos, bytesRemaining, "%s", topic); + bytesRemaining -= written; + uriPos += written; + } + nextSeg++; + break; + case 'u': + if (NULL != uuid) + { + written = snprintf (uriPos, bytesRemaining, "%s", uuid); + bytesRemaining -= written; + uriPos += written; + } + nextSeg++; + break; + default: /* do nothing -just go on copying in next iteration */ + break; + } + /* This will ensure there are no double forward slashes */ + if (NULL != nextSeg && '/' == *nextSeg && '/' == uri[strlen(uri) - 1]) + { + nextSeg++; + } + } + } + + /* Trim trailing forward slashes */ + if ('/' == uri[strlen(uri) - 1]) + { + uri[strlen(uri) - 1] = '\0'; + } + + /* + * Allocate the memory for copying the string. Caller is responsible for + * destroying. + */ + *uriTarget = strdup (uri); + if (NULL == *uriTarget) + { + return MAMA_STATUS_NOMEM; + } + else + { + return MAMA_STATUS_OK; + } +} diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h new file mode 100644 index 0000000..6cb70a7 --- /dev/null +++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h @@ -0,0 +1,151 @@ +/* $Id$ + * + * OpenMAMA: The open middleware agnostic messaging API + * Copyright (C) 2011 NYSE Technologies, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#ifndef MAMA_BRIDGE_QPID_COMMON_H__ +#define MAMA_BRIDGE_QPID_COMMON_H__ + +/*========================================================================= + = Includes = + =========================================================================*/ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <mama/mama.h> +#include <proton/message.h> +#include <proton/error.h> + +#include "payloadbridge.h" +#include "msgfieldimpl.h" + +#if defined(__cplusplus) +extern "C" { +#endif + +/*========================================================================= + = Macros = + =========================================================================*/ + +#define MAMA_ROOT_MARKET_DATA "_MD" +#define MAMA_ROOT_MARKET_DATA_DICT "_MDDD" + + +/*========================================================================= + = Public implementation functions = + =========================================================================*/ + + +/** + * This function will generate a string which is unique to the root, source + * and topic provided. Centralization of this function means that it can be used + * in both the publisher and the subscriber in order to generate a consistent + * topic for use throughout the platform. It will ensure that if any provided + * parameter is NULL, that it will be ignored in the expansion (e.g. providing + * root=NULL, source=EXCHANGENAME.ISIN.CURRENCY and topic=NULL will result in + * a new keyTarget="EXCHANGENAME.ISIN.CURRENCY" as opposed to + * ".EXCHANGENAME.ISIN.CURRENCY." + * + * @param root Prefix to associate with the subject (e.g. _MDDD). + * @param source Source to base the subject key on (e.g. EXCHANGENAME). + * @param topic Topic to base the subject key on (e.g. ISIN.CURRENCY). + * @param keyTarget Pointer to populate with the generated subject key. The + * caller is then responsible for freeing this new strdup'd + * string. + * + * @return mama_status indicating whether the method succeeded or failed. + */ +mama_status +qpidBridgeCommon_generateSubjectKey (const char* root, + const char* source, + const char* topic, + char** keyTarget); + +/** + * This function will take the provided format string and use the root, source, + * topic and uuid provided to expand the format string into a newly allocated + * string which the caller is responsible for freeing. The format string uses + * the following tokens: + * + * %r : Root (only for market data requests, otherwise blank). e.g. _MD + * %S : MAMA Source / Symbol Namespace. e.g. OPENMAMA + * %s : Symbol / Topic. e.g. MSFT + * %u : uuid. e.g. 4542dc20-f1ae-11e3-ac10-0800200c9a66 + * + * @param format The format string to expand. + * @param root Prefix to associate with the subject (e.g. _MDDD). + * @param source Source to base the subject key on (e.g. EXCHANGENAME). + * @param topic Topic to base the subject key on (e.g. ISIN.CURRENCY). + * @param keyTarget Pointer to populate with the generated subject key. The + * caller is then responsible for freeing this new strdup'd + * string. + * + * @return mama_status indicating whether the method succeeded or failed. + */ +mama_status +qpidBridgeCommon_generateSubjectUri (const char* format, + const char* root, + const char* source, + const char* topic, + const char* uuid, + const char** uriTarget); + +/** + * This function will parse a standard period delimited string as generated via + * qpidBridgeCommon_generateSubjectKey and split it up into its individual + * components. This allows us to accurately split up these components, even + * when the arguments supplied to qpidBridgeCommon_generateSubjectKey aren't + * quite in the format expected (e.g. if qpidBridgeCommon_generateSubjectKey + * is provided with root=NULL, source=NULL and + * topic=_MD.EXCHANGENAME.ISIN.CURRENCY, this will then be collapsed in + * qpidBridgeCommon_generateSubjectKey to create + * subjectKey=_MD.EXCHANGENAME.ISIN.CURRENCY. If that string is then provided + * to this function, it will parse the subjectKey and extract the *real* values + * for root=_MD, source=EXCHANGENAME and topic=ISIN.CURRENCY). + * + * @param key Subject key to parse (e.g. _MD.EXCHANGENAME.ISIN.CURRENCY). + * @param root Pointer to populate with the parsed root (e.g. _MD). Caller + * is then responsible for the memory associated with this + * string. + * @param source Pointer to populate with the parsed source (e.g. _MD). + * Caller is then responsible for the memory associated with + * this string. + * @param topic Pointer to populate with the parsed topic (e.g. _MD). Caller + * is then responsible for the memory associated with this + * string. + * @param transport Transport bridge on which this topic is being created. This + * is required to help determine whether or not the key + * contains a known symbol namespace. + * + * @return mama_status indicating whether the method succeeded or failed. + */ +mama_status +qpidBridgeCommon_parseSubjectKey (char* key, + const char** root, + const char** source, + const char** topic, + transportBridge transport); + +#if defined(__cplusplus) +} +#endif + +#endif /* MAMA_BRIDGE_QPID_COMMON_H__ */ diff --git a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h index 6c1adf4..3e3b2e5 100644 --- a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h +++ b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h @@ -56,9 +56,11 @@ extern "C" { /* Maximum topic length */ #define MAX_SUBJECT_LENGTH 256 +#define MAX_URI_LENGTH 1024 /* Default timeout for send working threads */ #define QPID_MESSENGER_SEND_TIMEOUT -1 +#define QPID_MESSENGER_TIMEOUT 1 /* milliseconds */ /* Message types */ typedef enum qpidMsgType_ @@ -70,6 +72,13 @@ typedef enum qpidMsgType_ QPID_MSG_TERMINATE = 0xff } qpidMsgType; +/* Message types */ +typedef enum qpidTransportType_ +{ + QPID_TRANSPORT_TYPE_P2P, + QPID_TRANSPORT_TYPE_BROKER +} qpidTransportType; + /* If a proton version header has been parsed at this point (version >= 0.5) */ #ifdef _PROTON_VERSION_H @@ -118,9 +127,12 @@ typedef struct qpidSubscription_ mamaSubscription mMamaSubscription; mamaQueue mMamaQueue; void* mQpidQueue; - mamaTransport mTransport; - const char* mSymbol; - char* mSubjectKey; + transportBridge mTransport; + const char* mSource; + const char* mTopic; + const char* mRoot; + char* mSubject; + const char* mUri; void* mClosure; int mIsNotMuted; int mIsValid; @@ -143,11 +155,14 @@ typedef struct qpidTransportBridge_ const char* mOutgoingAddress; const char* mReplyAddress; const char* mName; + const char* mUuid; wthread_t mQpidDispatchThread; int mIsDispatching; mama_status mQpidDispatchStatus; endpointPool_t mSubEndpoints; endpointPool_t mPubEndpoints; + qpidTransportType mQpidTransportType; + wtable_t mKnownSources; } qpidTransportBridge; struct qpidMsgNode_ diff --git a/mama/c_cpp/src/c/bridge/qpid/subscription.c b/mama/c_cpp/src/c/bridge/qpid/subscription.c index bdef796..2582a5e 100644 --- a/mama/c_cpp/src/c/bridge/qpid/subscription.c +++ b/mama/c_cpp/src/c/bridge/qpid/subscription.c @@ -33,8 +33,8 @@ #include <wombat/queue.h> #include "qpidbridgefunctions.h" #include "transport.h" +#include "qpidcommon.h" #include "qpiddefs.h" -#include "subscription.h" #include "publisher.h" #include "endpointpool.h" @@ -57,6 +57,8 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, qpidTransportBridge* transport = NULL; mama_status status = MAMA_STATUS_OK; pn_data_t* data = NULL; + const char* uuid = NULL; + const char* outgoingAddress = NULL; if ( NULL == subscriber || NULL == subscription || NULL == tport ) { @@ -75,6 +77,11 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, return MAMA_STATUS_NULL_ARG; } + outgoingAddress = qpidBridgeMamaTransportImpl_getOutgoingAddress ( + (transportBridge) transport); + uuid = qpidBridgeMamaTransportImpl_getUuid ( + (transportBridge) transport); + /* Allocate memory for qpid subscription implementation */ impl = (qpidSubscription*) calloc (1, sizeof (qpidSubscription)); if (NULL == impl) @@ -86,58 +93,89 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, impl->mMamaCallback = callback; impl->mMamaSubscription = subscription; impl->mMamaQueue = queue; - impl->mTransport = (mamaTransport)transport; - impl->mSymbol = symbol; + impl->mTransport = (transportBridge) transport; impl->mClosure = closure; impl->mIsNotMuted = 1; impl->mIsTportDisconnected = 1; - impl->mSubjectKey = NULL; + impl->mSubject = NULL; /* Use a standard centralized method to determine a topic key */ - qpidBridgeMamaSubscriptionImpl_generateSubjectKey (NULL, - source, - symbol, - &impl->mSubjectKey); + qpidBridgeCommon_generateSubjectKey (NULL, + source, + symbol, + &impl->mSubject); + + /* Parse the collapsed string to extract the standardized values */ + qpidBridgeCommon_parseSubjectKey (impl->mSubject, + &impl->mRoot, + &impl->mSource, + &impl->mTopic, + impl->mTransport); + + /* Generate subject URI based on standardized values */ + qpidBridgeCommon_generateSubjectUri (outgoingAddress, + impl->mRoot, + impl->mSource, + impl->mTopic, + uuid, + &impl->mUri); /* Register the endpoint */ endpointPool_registerWithoutIdentifier (transport->mSubEndpoints, - impl->mSubjectKey, + impl->mSubject, &impl->mEndpointIdentifier, impl); - /* Notify the publisher that you have an interest in this topic */ - pn_message_clear (transport->mMsg); + if (QPID_TRANSPORT_TYPE_P2P == + qpidBridgeMamaTransportImpl_getType ( + impl->mTransport)) + { + /* Notify the publisher that you have an interest in this topic */ + pn_message_clear (transport->mMsg); - /* Set the message meta data to reflect a subscription request */ - qpidBridgePublisherImpl_setMessageType (transport->mMsg, - QPID_MSG_SUB_REQUEST); + /* Set the message meta data to reflect a subscription request */ + qpidBridgePublisherImpl_setMessageType (transport->mMsg, + QPID_MSG_SUB_REQUEST); - /* Set the outgoing address as provided by the transport configuration */ - pn_message_set_address (transport->mMsg, - transport->mOutgoingAddress); + /* Set the outgoing address as provided by the transport configuration */ + pn_message_set_address (transport->mMsg, + transport->mOutgoingAddress); - /* Set the reply address */ - pn_message_set_reply_to (transport->mMsg, - transport->mReplyAddress); + /* Set the reply address */ + pn_message_set_reply_to (transport->mMsg, + transport->mReplyAddress); - /* Get the proton message's body data for writing */ - data = pn_message_body (transport->mMsg); + /* Get the proton message's body data for writing */ + data = pn_message_body (transport->mMsg); - /* Add in the subject key as the only string inside */ - pn_data_put_string (data, pn_bytes (strlen (impl->mSubjectKey), - impl->mSubjectKey)); + /* Add in the subject key as the only string inside */ + pn_data_put_string (data, pn_bytes (strlen (impl->mSubject), + impl->mSubject)); - /* Send out the subscription registration of interest message */ - if (NULL != transport->mOutgoingAddress) + /* Send out the subscription registration of interest message */ + if (NULL != transport->mOutgoingAddress) + { + pn_messenger_put (transport->mOutgoing, transport->mMsg); + + if (0 != PN_MESSENGER_SEND (transport->mOutgoing)) + { + const char* qpid_error = PN_MESSENGER_ERROR (transport->mOutgoing); + mama_log (MAMA_LOG_LEVEL_SEVERE, + "qpidBridgeMamaSubscription_create(): " + "pn_messenger_send Error:[%s]", qpid_error); + return MAMA_STATUS_PLATFORM; + } + } + } + else { - pn_messenger_put (transport->mOutgoing, transport->mMsg); - - if (0 != PN_MESSENGER_SEND (transport->mOutgoing)) + if (pn_messenger_subscribe (transport->mIncoming, + impl->mUri) <= 0) { - const char* qpid_error = PN_MESSENGER_ERROR (transport->mOutgoing); - mama_log (MAMA_LOG_LEVEL_SEVERE, - "qpidBridgeMamaSubscription_create(): " - "pn_messenger_send Error:[%s]", qpid_error); + mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaSubscription_create(): " + "Error Subscribing to %s : %s", + impl->mUri, + PN_MESSENGER_ERROR(transport->mIncoming)); return MAMA_STATUS_PLATFORM; } } @@ -145,7 +183,7 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaSubscription_create(): " "created interest for %s.", - impl->mSubjectKey); + impl->mSubject); /* Mark this subscription as valid */ impl->mIsValid = 1; @@ -205,10 +243,10 @@ qpidBridgeMamaSubscription_destroy (subscriptionBridge subscriber) /* Remove the subscription from the transport's subscription pool. */ if (NULL != transportBridge && NULL != transportBridge->mSubEndpoints - && NULL != impl->mSubjectKey) + && NULL != impl->mSubject) { endpointPool_unregister (transportBridge->mSubEndpoints, - impl->mSubjectKey, + impl->mSubject, impl->mEndpointIdentifier); } @@ -217,9 +255,29 @@ qpidBridgeMamaSubscription_destroy (subscriptionBridge subscriber) pn_message_free (impl->mMsg); } - if (NULL != impl->mSubjectKey) + if (NULL != impl->mSubject) { - free (impl->mSubjectKey); + free (impl->mSubject); + } + + if (NULL != impl->mRoot) + { + free ((void*)impl->mRoot); + } + + if (NULL != impl->mSource) + { + free ((void*)impl->mSource); + } + + if (NULL != impl->mTopic) + { + free ((void*)impl->mTopic); + } + + if (NULL != impl->mUri) + { + free ((void*)impl->mUri); } if (NULL != impl->mEndpointIdentifier) @@ -289,78 +347,3 @@ qpidBridgeMamaSubscription_muteCurrentTopic (subscriptionBridge subscriber) return qpidBridgeMamaSubscription_mute (subscriber); } - -/*========================================================================= - = Public implementation functions = - =========================================================================*/ - -/* - * Internal function to ensure that the topic names are always calculated - * in a particular way - */ -mama_status -qpidBridgeMamaSubscriptionImpl_generateSubjectKey (const char* root, - const char* source, - const char* topic, - char** keyTarget) -{ - char subject[MAX_SUBJECT_LENGTH]; - char* subjectPos = subject; - size_t bytesRemaining = MAX_SUBJECT_LENGTH; - size_t written = 0; - - if (NULL != root) - { - mama_log (MAMA_LOG_LEVEL_FINEST, - "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): R."); - written = snprintf (subjectPos, bytesRemaining, "%s", root); - subjectPos += written; - bytesRemaining -= written; - } - - if (NULL != source) - { - mama_log (MAMA_LOG_LEVEL_FINEST, - "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): S."); - /* If these are not the first bytes, prepend with a period */ - if(subjectPos != subject) - { - written = snprintf (subjectPos, bytesRemaining, ".%s", source); - } - else - { - written = snprintf (subjectPos, bytesRemaining, "%s", source); - } - subjectPos += written; - bytesRemaining -= written; - } - - if (NULL != topic) - { - mama_log (MAMA_LOG_LEVEL_FINEST, - "qpidBridgeMamaSubscriptionImpl_generateSubjectKey(): T."); - /* If these are not the first bytes, prepend with a period */ - if (subjectPos != subject) - { - snprintf (subjectPos, bytesRemaining, ".%s", topic); - } - else - { - snprintf (subjectPos, bytesRemaining, "%s", topic); - } - } - - /* - * Allocate the memory for copying the string. Caller is responsible for - * destroying. - */ - *keyTarget = strdup (subject); - if (NULL == *keyTarget) - { - return MAMA_STATUS_NOMEM; - } - else - { - return MAMA_STATUS_OK; - } -} diff --git a/mama/c_cpp/src/c/bridge/qpid/subscription.h b/mama/c_cpp/src/c/bridge/qpid/subscription.h index ea1aceb..992a495 100644 --- a/mama/c_cpp/src/c/bridge/qpid/subscription.h +++ b/mama/c_cpp/src/c/bridge/qpid/subscription.h @@ -26,29 +26,6 @@ extern "C" { #endif -/*========================================================================= - = Public implementation functions = - =========================================================================*/ - -/** - * This function will generate a string which is unique to the root, source - * and topic provided. Centralization of this function means that it can be used - * in both the publisher and the subscriber in order to generate a consistent - * topic for use throughout the platform. - * - * @param root Prefix to associate with the subject (e.g. _MDDD) - * @param inbox Source to base the subject key on (e.g. EXCHANGENAME). - * @param topic Topic to base the subject key on (e.g. ISIN.CURRENCY). - * @param keyTarget Pointer to populate with the generated subject key. - * - * @return mama_status indicating whether the method succeeded or failed. - */ -mama_status -qpidBridgeMamaSubscriptionImpl_generateSubjectKey (const char* root, - const char* source, - const char* topic, - char** keyTarget); - #if defined(__cplusplus) } #endif diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.c b/mama/c_cpp/src/c/bridge/qpid/transport.c index ba21810..2b2053b 100644 --- a/mama/c_cpp/src/c/bridge/qpid/transport.c +++ b/mama/c_cpp/src/c/bridge/qpid/transport.c @@ -37,11 +37,13 @@ #include <timers.h> #include <stdio.h> #include <wombat/queue.h> +#include <wombat/wUuid.h> #include "transport.h" #include "publisher.h" #include "qpidbridgefunctions.h" #include "qpiddefs.h" +#include "qpidcommon.h" #include "msg.h" #include "codec.h" #include "endpointpool.h" @@ -58,11 +60,14 @@ #define TPORT_PARAM_SUB_POOL_SIZE "msg_pool_size" #define TPORT_PARAM_SUB_POOL_INC_SIZE "msg_pool_inc_size" #define TPORT_PARAM_RECV_BLOCK_SIZE "recv_block_size" +#define TPORT_PARAM_TPORT_TYPE "type" /* Default values for corresponding configuration parameters */ #define DEFAULT_OUTGOING_URL "amqp://127.0.0.1:7777" #define DEFAULT_INCOMING_URL "amqp://~127.0.0.1:6666" #define DEFAULT_REPLY_URL "amqp://127.0.0.1:6666" +#define DEFAULT_TPORT_TYPE "p2p" +#define DEFAULT_TPORT_TYPE_VALUE QPID_TRANSPORT_TYPE_P2P #define DEFAULT_SUB_POOL_SIZE 128 #define DEFAULT_SUB_POOL_INC_SIZE 128 #define DEFAULT_RECV_BLOCK_SIZE 10 @@ -76,6 +81,10 @@ #define MAX_SUB_POOL_INC_SIZE 2000L #define MIN_RECV_BLOCK_SIZE -1L #define MAX_RECV_BLOCK_SIZE 100L +#define CONFIG_VALUE_TPORT_TYPE_BROKER "broker" +#define CONFIG_VALUE_TPORT_TYPE_P2P "p2p" +#define UUID_STRING_BUF_SIZE 37 +#define KNOWN_SOURCES_WTABLE_SIZE 10 /*========================================================================= @@ -345,6 +354,21 @@ qpidBridgeMamaTransport_destroy (transportBridge transport) endpointPool_destroy (impl->mSubEndpoints); endpointPool_destroy (impl->mPubEndpoints); + /* Free the strdup-ed keys still held by the wtable */ + wtable_free_all_xdata (impl->mKnownSources); + /* Finally, destroy the wtable */ + wtable_destroy (impl->mKnownSources); + + if (NULL != impl->mReplyAddress) + { + free ((void*) impl->mReplyAddress); + } + + if (NULL != impl->mUuid) + { + free ((void*) impl->mUuid); + } + free (impl); return status; @@ -358,6 +382,8 @@ qpidBridgeMamaTransport_create (transportBridge* result, qpidTransportBridge* impl = NULL; int poolSize = 0; mama_status status = MAMA_STATUS_OK; + const char* tportType = NULL; + const char* tmpReply = NULL; if (NULL == result || NULL == name || NULL == parent) { @@ -374,6 +400,8 @@ qpidBridgeMamaTransport_create (transportBridge* result, impl->mMsg = pn_message (); impl->mQpidDispatchStatus = MAMA_STATUS_OK; impl->mName = name; + impl->mKnownSources = wtable_create ("mKnownSources", + KNOWN_SOURCES_WTABLE_SIZE); mama_log (MAMA_LOG_LEVEL_FINE, "qpidBridgeMamaTransport_create(): Initializing Transport %s", @@ -390,14 +418,14 @@ qpidBridgeMamaTransport_create (transportBridge* result, /* Set the outgoing address */ impl->mOutgoingAddress = qpidBridgeMamaTransportImpl_getParameter ( - NULL, + DEFAULT_OUTGOING_URL, "%s.%s.%s", TPORT_PARAM_PREFIX, name, TPORT_PARAM_OUTGOING_URL); /* Set the reply address */ - impl->mReplyAddress = + tmpReply = qpidBridgeMamaTransportImpl_getParameter ( DEFAULT_REPLY_URL, "%s.%s.%s", @@ -405,6 +433,44 @@ qpidBridgeMamaTransport_create (transportBridge* result, name, TPORT_PARAM_REPLY_URL); + /* Expand the wildcards in the reply then populates mReplyAddress with a + * newly allocated result */ + qpidBridgeCommon_generateSubjectUri ( + tmpReply, + NULL, + NULL, + NULL, + qpidBridgeMamaTransportImpl_getUuid ((transportBridge) impl), + &impl->mReplyAddress); + + /* Set the transport type */ + tportType = + qpidBridgeMamaTransportImpl_getParameter ( + DEFAULT_TPORT_TYPE, + "%s.%s.%s", + TPORT_PARAM_PREFIX, + name, + TPORT_PARAM_TPORT_TYPE); + + if (0 == strcmp (tportType, CONFIG_VALUE_TPORT_TYPE_BROKER)) + { + impl->mQpidTransportType = QPID_TRANSPORT_TYPE_BROKER; + } + else if (0 == strcmp (tportType, CONFIG_VALUE_TPORT_TYPE_P2P)) + { + impl->mQpidTransportType = QPID_TRANSPORT_TYPE_P2P; + } + else + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "Could not parse %s.%s.%s=%s. Using [%s].", + TPORT_PARAM_PREFIX, + name, + TPORT_PARAM_TPORT_TYPE, + tportType, + DEFAULT_TPORT_TYPE); + impl->mQpidTransportType = DEFAULT_TPORT_TYPE_VALUE; + } /* Set the message pool size for pool initialization later */ poolSize = @@ -475,14 +541,18 @@ qpidBridgeMamaTransport_create (transportBridge* result, return MAMA_STATUS_PLATFORM; } - status = endpointPool_create (&impl->mPubEndpoints, "mPubEndpoints"); - if (MAMA_STATUS_OK != status) + /* Endpoints are only required if a broker is not being used */ + if (QPID_TRANSPORT_TYPE_P2P == impl->mQpidTransportType) { - mama_log (MAMA_LOG_LEVEL_ERROR, - "qpidBridgeMamaTransport_create(): " - "Failed to create publishing endpoints"); - free (impl); - return MAMA_STATUS_PLATFORM; + status = endpointPool_create (&impl->mPubEndpoints, "mPubEndpoints"); + if (MAMA_STATUS_OK != status) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "qpidBridgeMamaTransport_create(): " + "Failed to create publishing endpoints"); + free (impl); + return MAMA_STATUS_PLATFORM; + } } impl->mIsValid = 1; @@ -693,6 +763,51 @@ qpidBridgeMamaTransport_getNativeTransportNamingCtx (transportBridge transport, = Public implementation functions = =========================================================================*/ +/* Call this every time a known MAMA symbol namespace is created */ +void +qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace (transportBridge transport, + const char* symbolNamespace) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (NULL == impl) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace(): " + "transport NULL"); + return; + } + + wtable_insert (impl->mKnownSources, symbolNamespace, (void*) symbolNamespace); + + return; +} + +/* Check if this is a publisher for a namespace which we know about */ +mama_bool_t +qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace (transportBridge transport, + const char* symbolNamespace) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (NULL == impl) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace(): " + "transport NULL"); + return 0; + } + + if (NULL == wtable_lookup (impl->mKnownSources, symbolNamespace)) + { + return 0; + } + else + { + return 1; + } +} + qpidTransportBridge* qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport) { @@ -710,6 +825,74 @@ qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport) return impl; } +qpidTransportType +qpidBridgeMamaTransportImpl_getType (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (impl == NULL) + { + return DEFAULT_TPORT_TYPE_VALUE; + } + + return impl->mQpidTransportType; +} + +const char* +qpidBridgeMamaTransportImpl_getUuid (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + if (NULL == impl->mUuid) + { + wUuid tempUuid; + char uuidStringBuffer[UUID_STRING_BUF_SIZE]; + + wUuid_generate_time (tempUuid); + wUuid_unparse (tempUuid, uuidStringBuffer); + impl->mUuid = strdup (uuidStringBuffer); + } + return impl->mUuid; +} + + +const char* +qpidBridgeMamaTransportImpl_getOutgoingAddress (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (impl == NULL) + { + return NULL; + } + + return impl->mOutgoingAddress; +} + +const char* +qpidBridgeMamaTransportImpl_getIncomingAddress (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (impl == NULL) + { + return NULL; + } + + return impl->mIncomingAddress; +} + +const char* +qpidBridgeMamaTransportImpl_getReplyAddress (transportBridge transport) +{ + qpidTransportBridge* impl = (qpidTransportBridge*) transport; + + if (impl == NULL) + { + return NULL; + } + + return impl->mReplyAddress; +} /*========================================================================= = Private implementation functions = @@ -750,15 +933,37 @@ qpidBridgeMamaTransportImpl_start (qpidTransportBridge* impl) return MAMA_STATUS_PLATFORM; } - if (pn_messenger_subscribe (impl->mIncoming, - impl->mIncomingAddress) <= 0) + /* Blanket subscriptions are only required in p2p */ + if (QPID_TRANSPORT_TYPE_P2P == impl->mQpidTransportType) + { + if (pn_messenger_subscribe (impl->mIncoming, + impl->mIncomingAddress) <= 0) + { + mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): " + "Error Subscribing to %s : %s", + impl->mIncomingAddress, + PN_MESSENGER_ERROR(impl->mIncoming)); + PN_MESSENGER_STOP(impl->mIncoming); + return MAMA_STATUS_PLATFORM; + } + } + else if (QPID_TRANSPORT_TYPE_BROKER == impl->mQpidTransportType) { mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): " - "Error Subscribing to %s : %s", - impl->mIncomingAddress, + "Subscribing to %s : %s", + impl->mReplyAddress, PN_MESSENGER_ERROR(impl->mIncoming)); - PN_MESSENGER_STOP(impl->mIncoming); - return MAMA_STATUS_PLATFORM; + /* Subscribe to the URL advertised in reply_to */ + if (pn_messenger_subscribe (impl->mIncoming, + impl->mReplyAddress) <= 0) + { + mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_start(): " + "Error Subscribing to %s : %s", + impl->mReplyAddress, + PN_MESSENGER_ERROR(impl->mIncoming)); + PN_MESSENGER_STOP(impl->mIncoming); + return MAMA_STATUS_PLATFORM; + } } } @@ -892,9 +1097,11 @@ qpidBridgeMamaTransportImpl_queueCallback (mamaQueue queue, void* closure) return; } + /* If this is a broker connection, we don't need to be selective at our + * layer */ if (0 == endpointPool_isRegistedByContent (impl->mSubEndpoints, - subject, - subscription)) + subject, + subscription)) { mama_log (MAMA_LOG_LEVEL_ERROR, "qpidBridgeMamaTransportImpl_queueCallback(): " @@ -1369,11 +1576,25 @@ void* qpidBridgeMamaTransportImpl_dispatchThread (void* closure) */ while (1 == impl->mIsDispatching) { - if (pn_messenger_recv (impl->mIncoming, impl->mQpidRecvBlockSize)) + int pnErr = 0; + + pnErr = pn_messenger_recv (impl->mIncoming, + impl->mQpidRecvBlockSize); + + + if (PN_TIMEOUT == pnErr) + { + continue; + } + + if (0 != pnErr) { mama_log (MAMA_LOG_LEVEL_ERROR, - "qpidBridgeMamaTransportImpl_dispatchThread():", - PN_MESSENGER_ERROR (impl->mIncoming)); + "qpidBridgeMamaTransportImpl_dispatchThread(): " + "Recv Error [%d] (%s) '%s'", + pnErr, + pn_code(pnErr), + PN_MESSENGER_ERROR(impl->mIncoming)); impl->mQpidDispatchStatus = MAMA_STATUS_PLATFORM; return NULL; } diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.h b/mama/c_cpp/src/c/bridge/qpid/transport.h index ebb6877..64504f6 100644 --- a/mama/c_cpp/src/c/bridge/qpid/transport.h +++ b/mama/c_cpp/src/c/bridge/qpid/transport.h @@ -42,7 +42,7 @@ extern "C" { * This is a simple convenience function to return a qpidTransportBridge * pointer based on the provided mamaTransport. * - * @param transport The mamaTransport to extract the bridge transport from. + * @param transport The mamaTransport to extract the bridge transport from. * * @return qpidTransportBridge* associated with the mamaTransport. */ @@ -50,6 +50,97 @@ qpidTransportBridge* qpidBridgeMamaTransportImpl_getTransportBridge (mamaTransport transport); /** + * This is a simple convenience function to return the transport's distinct + * uuid. + * + * @param transport The mamaTransport to extract the uuid from. + * + * @return const char* String representation of the UUID (memory still owned + * by the transport bridge. + */ +const char* +qpidBridgeMamaTransportImpl_getUuid (transportBridge transport); + +/** + * This is a simple convenience function to return the transport's qpid bridge + * type, + * + * @param transport The transportBridge to extract the uuid from. + * + * @return qpidTransportType releating to the type of the qpid transport bridge. + */ +qpidTransportType +qpidBridgeMamaTransportImpl_getType (transportBridge transport); + +/** + * This is a simple convenience function to return the transport's outgoing + * address (not format string expanded). + * + * @param transport The transportBridge to extract the outgoing address from. + * + * @return const char* String representation of the address (memory still owned + * by the transport bridge. + */ +const char* +qpidBridgeMamaTransportImpl_getOutgoingAddress (transportBridge transport); + +/** + * This is a simple convenience function to return the transport's incoming + * address (not format string expanded). + * + * @param transport The transportBridge to extract the incoming address from. + * + * @return const char* String representation of the address (memory still owned + * by the transport bridge. + */ +const char* +qpidBridgeMamaTransportImpl_getIncomingAddress (transportBridge transport); + +/** + * This is a simple convenience function to return the transport's reply + * address (not format string expanded). + * + * @param transport The transportBridge to extract the reply address from. + * + * @return const char* String representation of the address (memory still owned + * by the transport bridge. + */ +const char* +qpidBridgeMamaTransportImpl_getReplyAddress (transportBridge transport); + +/** + * This advises the transport bridge about a known MAMA symbol namespace. This + * allows flattened fully qualified topics (e.g. SOURCENAME.TOPIC) to be + * detected and re-expanded where necessary (i.e. so we can tell the difference + * between a MAMA basic subscription topic and a market data subscription for %S + * and %s expansion in the URI). + * + * @param transport The transportBridge to add a known MAMA symbol + * namespace to. + * @param symbolNamespace The known MAMA symbol namespace to add. + */ +void +qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace (transportBridge transport, + const char* symbolNamespace); + +/** + * This is a simple convenience function to return a qpidTransportBridge + * pointer based on the provided mamaTransport. + * + * @param transport The transportBridge to check for the provided MAMA + * symbol namespace. + * @param symbolNamespace The MAMA symbol namespace to check. + * + * @return mama_bool_t returning true if the provided symbolNamespace is a + * confirmed MAMA symbol namespace as added to the + * transport bridge using + * qpidBridgeMamaTransportImpl_addKnownMamaSymbolNamespace. + */ +mama_bool_t +qpidBridgeMamaTransportImpl_isKnownMamaSymbolNamespace (transportBridge transport, + const char* symbolNamespace); + +/** * This is purely a debug function to dump to screen a snapshot of the status * of the transport's message pool. * diff --git a/mama/c_cpp/src/examples/mama.properties b/mama/c_cpp/src/examples/mama.properties index d43824a..86c7b1c 100644 --- a/mama/c_cpp/src/examples/mama.properties +++ b/mama/c_cpp/src/examples/mama.properties @@ -44,6 +44,20 @@ mama.entitlement.porthigh=9550 # Example MAMA properties for QPID messaging middleware ################################################################################ +# For qpid transports, the following format strings are observed +# %r : Root (only for market data requests, otherwise blank). e.g. _MD +# %S : MAMA Source / Symbol Namespace. e.g. OPENMAMA +# %s : Symbol / Topic. e.g. MSFT +# %u : Transport uuid. e.g. 4542dc20-f1ae-11e3-ac10-0800200c9a66 +# Type of qpid proton transport +mama.qpid.transport.broker.type=broker +# This is where we send subscription requests and inbox requests +mama.qpid.transport.broker.outgoing_url=topic://127.0.0.1/MAMA/%r/%S/%s +# This is where we listen for incoming messages +mama.qpid.transport.broker.incoming_url=topic://127.0.0.1/MAMA/%r/%S/%s +# This is where we listen for replies during request / reply +mama.qpid.transport.broker.reply_url=topic://127.0.0.1/MAMA/%u + # Source which you are going to consume from mama.qpid.transport.pub.outgoing_url=amqp://127.0.0.1:6666 # Where qpid is going to listen to for data to be pushed to -- 1.9.3
|
||||
|
||||
Re: MAMA Qpid Proton... with broker support
Frank Quinn <fquinn.ni@...>
Hi Folks, Cheers,As Phil reminded me today, I actually forgot to submit this, so I'll be submitting it shortly to the list for transparency, then resurrecting the feature-qpid-branch branch to contain these changes and any other bug fixes that will be required to get a qpid 1.1 bridge out. Frank
On Sat, Jun 21, 2014 at 10:34 PM, Frank Quinn <fquinn.ni@...> wrote:
|
||||
|
||||
Re: "C" Unit-Test Support on Visual Studio
Guy <guy.tal@...>
Hi,
toggle quoted messageShow quoted text
My last patch had an error with strndup/strdup changes. Attached here fixed patch against 'master' branch. Guy Tick42
On 16/09/2014 16:55, Guy wrote:
Hi Damian,
|
||||
|
||||
Re: "C" Unit-Test Support on Visual Studio
Guy <guy.tal@...>
Hi Damian, I've prepared here a revised version of the last experimental patch to support MAMA C Unit-Tests in Visual Studio. I removed in the last version both implementation and use of strndup, other than that it should almost be the same as last version. This should work against the master branch. I'm working now on a patch that should work correctly on 'next' branch (including git format etc.) This mail is a follow up for the bug #125 in bugzilla, basically for other people to experiment with that too. Regards, Guy Tick42
On 11/09/2014 16:06, Guy wrote:
Hello,
|
||||
|
||||
Re: [PATCH 2.3.1] MamaPublisher: Overloaded MamaPublisher create method
Chad Meyer
Hi Glenn,
I have applied the changes you recommended and created a new patch. Please see attached.
diff --git a/mama/jni/src/c/mamapublisherjni.c b/mama/jni/src/c/mamapublisherjni.c index 518bacf..aa1c4c6 100644 --- a/mama/jni/src/c/mamapublisherjni.c +++ b/mama/jni/src/c/mamapublisherjni.c @@ -76,40 +76,47 @@ static void MAMACALLTYPE sendCompleteCB (mamaPublisher publisher, /* * Class: com_wombat_mama_MamaPublisher * Method: _create - * Signature: (Lcom/wombat/mama/Transport;Ljava/lang/String;)V + * Signature: (Lcom/wombat/mama/MamaTransport;Ljava/lang/String;Ljava/lang/String;)V */ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create - (JNIEnv* env, jobject this, jobject transport, jstring topic) + (JNIEnv* env, jobject this, jobject transport, jstring topic, jstring source) { - mamaPublisher cPublisher = NULL; + mamaPublisher cPublisher = NULL; mamaTransport cTransport = NULL; - const char* cTopic = NULL; + const char* cSource = NULL; + const char* cTopic = NULL; jlong transportPointer = 0; mama_status status = MAMA_STATUS_OK; char errorString[UTILS_MAX_ERROR_STRING_LENGTH]; - /*Get the transport pointer*/ assert(transport!=NULL); transportPointer = (*env)->GetLongField(env, transport, transportPointerFieldId_g); + cTransport = CAST_JLONG_TO_POINTER(mamaTransport, transportPointer); - assert(transportPointer!=0);
+ assert(transportPointer!=0); /*Get the char* from the jstring*/ if(NULL!=topic) { cTopic = (*env)->GetStringUTFChars(env,topic,0); if(!cTopic)return; } - + if(NULL!=source) + { + cSource = (*env)->GetStringUTFChars(env,source,0); + if(!cSource)return; + } + if(MAMA_STATUS_OK!=(mamaPublisher_create( &cPublisher, cTransport, cTopic, - NULL, + cSource, NULL))) { - if(cTopic)(*env)->ReleaseStringUTFChars(env,topic, cTopic); + if(cTopic)(*env)->ReleaseStringUTFChars(env,topic, cTopic); + if(cSource)(*env)->ReleaseStringUTFChars(env,source, cSource); utils_buildErrorStringForStatus( errorString, UTILS_MAX_ERROR_STRING_LENGTH, "Failed to create publisher.", status); @@ -121,6 +128,7 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create CAST_POINTER_TO_JLONG(cPublisher));
if(cTopic)(*env)->ReleaseStringUTFChars(env,topic, cTopic); + if(cSource)(*env)->ReleaseStringUTFChars(env,source, cSource);
return; } diff --git a/mama/jni/src/com/wombat/mama/MamaPublisher.java b/mama/jni/src/com/wombat/mama/MamaPublisher.java index 0146945..31bb1f5 100644 --- a/mama/jni/src/com/wombat/mama/MamaPublisher.java +++ b/mama/jni/src/com/wombat/mama/MamaPublisher.java @@ -34,10 +34,15 @@ public class MamaPublisher
/*A long value containing a pointer to the underlying C publisher structure*/ private long publisherPointer_i = 0; + + public void create (MamaTransport transport, String topic) + { + _create(transport,topic,null); + }
- public void create (MamaTransport transport, String topic) + public void create (MamaTransport transport, String topic, String source) { - _create(transport,topic); + _create(transport,topic,source); }
public void send (MamaMsg msg) @@ -76,7 +81,7 @@ public class MamaPublisher _sendFromInbox(inbox,msg); }
- private native void _create (MamaTransport transport, String topic); + private native void _create (MamaTransport transport, String topic, String source);
private native void _send (MamaMsg msg);
-- 1.8.4.msysgit.0
Regards,
Chad Meyer | Corporate & Investment Bank | PIM Market Data Services | J.P. Morgan | 4 Metrotech Center 23rd Floor Brooklyn, NY 11201 | Tel: (718) 242-5165 | M: (215) 801-2606 | chad.j.meyer@...
From: Glenn McClements [mailto:gmcclements@...]
Sent: Monday, September 08, 2014 7:14 AM To: Meyer, Chad J; openmama-dev@... Subject: Re: [Openmama-dev] [PATCH 2.3.1] MamaPublisher: Overloaded MamaPublisher create method
Thanks Chad.
For consistency though with the C++ interface, it should be (publisher, topic, source), not (publisher, source, topic).
Also consider replacing/extending the existing JNI _create() function rather than having two. This reduces code duplication and you already check for a NULL source being passed down so it should handle both Java methods.
Glenn
From: <Meyer>, Chad J <chad.j.meyer@...>
Hi,
Attached is a patch for OpenMAMA JNI that overloads the MamaPublisher create method. For consistency purposes, the new method requires two String parameters, source and symbol, as opposed to a single string currently
found in the MamaPublisher class. diff --git a/mama/jni/src/c/mamapublisherjni.c b/mama/jni/src/c/mamapublisherjni.c index 518bacf..629a51b 100644 --- a/mama/jni/src/c/mamapublisherjni.c +++ b/mama/jni/src/c/mamapublisherjni.c @@ -78,7 +78,7 @@ static void MAMACALLTYPE sendCompleteCB (mamaPublisher publisher, * Method: _create * Signature: (Lcom/wombat/mama/Transport;Ljava/lang/String;)V */ -JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create +JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create__Lcom_wombat_mama_MamaTransport_2Ljava_lang_String_2 (JNIEnv* env, jobject this, jobject transport, jstring topic) { mamaPublisher cPublisher = NULL; @@ -125,6 +125,66 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create return; }
+ /* + * Class: com_wombat_mama_MamaPublisher + * Method: _create + * Signature: (Lcom/wombat/mama/MamaTransport;Ljava/lang/String;Ljava/lang/String;)V + */ +JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create__Lcom_wombat_mama_MamaTransport_2Ljava_lang_String_2Ljava_lang_String_2 + (JNIEnv* env, jobject this, jobject transport, jstring source, jstring symbol) +{ + mamaPublisher cPublisher = NULL; + mamaTransport cTransport = NULL; + const char* cSource = NULL; + const char* cSymbol = NULL; + jlong transportPointer = 0; + mama_status status = MAMA_STATUS_OK; + char errorString[UTILS_MAX_ERROR_STRING_LENGTH]; + + /*Get the transport pointer*/ + assert(transport!=NULL); + transportPointer = (*env)->GetLongField(env, transport, + transportPointerFieldId_g); + cTransport = CAST_JLONG_TO_POINTER(mamaTransport, transportPointer); + assert(transportPointer!=0); + + /*Get the char* from the jstring*/ + if(NULL!=source) + { + cSource = (*env)->GetStringUTFChars(env,source,0); + if(!cSource)return; + } + if(NULL!=symbol) + { + cSymbol = (*env)->GetStringUTFChars(env,symbol,0); + if(!cSymbol)return; + } + + if(MAMA_STATUS_OK!=(mamaPublisher_create( + &cPublisher, + cTransport, + cSymbol, + cSource, + NULL))) + { + if(cSource)(*env)->ReleaseStringUTFChars(env,source, cSource); + if(cSymbol)(*env)->ReleaseStringUTFChars(env,symbol, cSymbol); + utils_buildErrorStringForStatus( + errorString, UTILS_MAX_ERROR_STRING_LENGTH, + "Failed to create publisher.", status); + utils_throwMamaException(env,errorString); + return; + } + + (*env)->SetLongField(env,this,publisherPointerFieldId_g, + CAST_POINTER_TO_JLONG(cPublisher)); + + if(cSource)(*env)->ReleaseStringUTFChars(env,source, cSource); + if(cSymbol)(*env)->ReleaseStringUTFChars(env,symbol, cSymbol); + + return; +} + /* * Class: com_wombat_mama_MamaPublisher * Method: _send diff --git a/mama/jni/src/com/wombat/mama/MamaPublisher.java b/mama/jni/src/com/wombat/mama/MamaPublisher.java index 0146945..c1e8dc8 100644 --- a/mama/jni/src/com/wombat/mama/MamaPublisher.java +++ b/mama/jni/src/com/wombat/mama/MamaPublisher.java @@ -39,6 +39,11 @@ public class MamaPublisher { _create(transport,topic); } + + public void create (MamaTransport transport, String source, String symbol) + { + _create(transport,source,symbol); + } public void send (MamaMsg msg) { @@ -77,6 +82,8 @@ public class MamaPublisher } private native void _create (MamaTransport transport, String topic); + + private native void _create (MamaTransport transport, String source, String symbol); private native void _send (MamaMsg msg); -- 1.8.4.msysgit.0
Chad Meyer| Corporate & Investment Bank | PIM Market Data Services | J.P. Morgan | 4 Metrotech Center 23rd Floor Brooklyn, NY 11201 | Tel: (718) 242-5165 | M: (215) 801-2606 | chad.j.meyer@...
This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired. This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.
|
||||
|
||||
Re: [PATCH 2.3.1] MamaPublisher: Overloaded MamaPublisher create method
Chad Meyer
Hi Glenn,
I have applied the changes you recommended and created a new patch. Please see attached.
diff --git a/mama/jni/src/c/mamapublisherjni.c b/mama/jni/src/c/mamapublisherjni.c index 518bacf..aa1c4c6 100644 --- a/mama/jni/src/c/mamapublisherjni.c +++ b/mama/jni/src/c/mamapublisherjni.c @@ -76,40 +76,47 @@ static void MAMACALLTYPE sendCompleteCB (mamaPublisher publisher, /* * Class: com_wombat_mama_MamaPublisher * Method: _create - * Signature: (Lcom/wombat/mama/Transport;Ljava/lang/String;)V + * Signature: (Lcom/wombat/mama/MamaTransport;Ljava/lang/String;Ljava/lang/String;)V */ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create - (JNIEnv* env, jobject this, jobject transport, jstring topic) + (JNIEnv* env, jobject this, jobject transport, jstring topic, jstring source) { - mamaPublisher cPublisher = NULL; + mamaPublisher cPublisher = NULL; mamaTransport cTransport = NULL; - const char* cTopic = NULL; + const char* cSource = NULL; + const char* cTopic = NULL; jlong transportPointer = 0; mama_status status = MAMA_STATUS_OK; char errorString[UTILS_MAX_ERROR_STRING_LENGTH]; - /*Get the transport pointer*/ assert(transport!=NULL); transportPointer = (*env)->GetLongField(env, transport, transportPointerFieldId_g); + cTransport = CAST_JLONG_TO_POINTER(mamaTransport, transportPointer); - assert(transportPointer!=0);
+ assert(transportPointer!=0); /*Get the char* from the jstring*/ if(NULL!=topic) { cTopic = (*env)->GetStringUTFChars(env,topic,0); if(!cTopic)return; } - + if(NULL!=source) + { + cSource = (*env)->GetStringUTFChars(env,source,0); + if(!cSource)return; + } + if(MAMA_STATUS_OK!=(mamaPublisher_create( &cPublisher, cTransport, cTopic, - NULL, + cSource, NULL))) { - if(cTopic)(*env)->ReleaseStringUTFChars(env,topic, cTopic); + if(cTopic)(*env)->ReleaseStringUTFChars(env,topic, cTopic); + if(cSource)(*env)->ReleaseStringUTFChars(env,source, cSource); utils_buildErrorStringForStatus( errorString, UTILS_MAX_ERROR_STRING_LENGTH, "Failed to create publisher.", status); @@ -121,6 +128,7 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create CAST_POINTER_TO_JLONG(cPublisher));
if(cTopic)(*env)->ReleaseStringUTFChars(env,topic, cTopic); + if(cSource)(*env)->ReleaseStringUTFChars(env,source, cSource);
return; } diff --git a/mama/jni/src/com/wombat/mama/MamaPublisher.java b/mama/jni/src/com/wombat/mama/MamaPublisher.java index 0146945..31bb1f5 100644 --- a/mama/jni/src/com/wombat/mama/MamaPublisher.java +++ b/mama/jni/src/com/wombat/mama/MamaPublisher.java @@ -34,10 +34,15 @@ public class MamaPublisher
/*A long value containing a pointer to the underlying C publisher structure*/ private long publisherPointer_i = 0; + + public void create (MamaTransport transport, String topic) + { + _create(transport,topic,null); + }
- public void create (MamaTransport transport, String topic) + public void create (MamaTransport transport, String topic, String source) { - _create(transport,topic); + _create(transport,topic,source); }
public void send (MamaMsg msg) @@ -76,7 +81,7 @@ public class MamaPublisher _sendFromInbox(inbox,msg); }
- private native void _create (MamaTransport transport, String topic); + private native void _create (MamaTransport transport, String topic, String source);
private native void _send (MamaMsg msg);
-- 1.8.4.msysgit.0
From: Glenn McClements [mailto:gmcclements@...]
Sent: Monday, September 08, 2014 7:14 AM To: Meyer, Chad J; openmama-dev@... Subject: Re: [Openmama-dev] [PATCH 2.3.1] MamaPublisher: Overloaded MamaPublisher create method
Thanks Chad.
For consistency though with the C++ interface, it should be (publisher, topic, source), not (publisher, source, topic).
Also consider replacing/extending the existing JNI _create() function rather than having two. This reduces code duplication and you already check for a NULL source being passed down so it should handle both Java methods.
Glenn
From: <Meyer>, Chad J <chad.j.meyer@...>
Hi,
Attached is a patch for OpenMAMA JNI that overloads the MamaPublisher create method. For consistency purposes, the new method requires two String parameters, source and symbol, as opposed to a single string currently
found in the MamaPublisher class. diff --git a/mama/jni/src/c/mamapublisherjni.c b/mama/jni/src/c/mamapublisherjni.c index 518bacf..629a51b 100644 --- a/mama/jni/src/c/mamapublisherjni.c +++ b/mama/jni/src/c/mamapublisherjni.c @@ -78,7 +78,7 @@ static void MAMACALLTYPE sendCompleteCB (mamaPublisher publisher, * Method: _create * Signature: (Lcom/wombat/mama/Transport;Ljava/lang/String;)V */ -JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create +JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create__Lcom_wombat_mama_MamaTransport_2Ljava_lang_String_2 (JNIEnv* env, jobject this, jobject transport, jstring topic) { mamaPublisher cPublisher = NULL; @@ -125,6 +125,66 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create return; }
+ /* + * Class: com_wombat_mama_MamaPublisher + * Method: _create + * Signature: (Lcom/wombat/mama/MamaTransport;Ljava/lang/String;Ljava/lang/String;)V + */ +JNIEXPORT void JNICALL Java_com_wombat_mama_MamaPublisher__1create__Lcom_wombat_mama_MamaTransport_2Ljava_lang_String_2Ljava_lang_String_2 + (JNIEnv* env, jobject this, jobject transport, jstring source, jstring symbol) +{ + mamaPublisher cPublisher = NULL; + mamaTransport cTransport = NULL; + const char* cSource = NULL; + const char* cSymbol = NULL; + jlong transportPointer = 0; + mama_status status = MAMA_STATUS_OK; + char errorString[UTILS_MAX_ERROR_STRING_LENGTH]; + + /*Get the transport pointer*/ + assert(transport!=NULL); + transportPointer = (*env)->GetLongField(env, transport, + transportPointerFieldId_g); + cTransport = CAST_JLONG_TO_POINTER(mamaTransport, transportPointer); + assert(transportPointer!=0); + + /*Get the char* from the jstring*/ + if(NULL!=source) + { + cSource = (*env)->GetStringUTFChars(env,source,0); + if(!cSource)return; + } + if(NULL!=symbol) + { + cSymbol = (*env)->GetStringUTFChars(env,symbol,0); + if(!cSymbol)return; + } + + if(MAMA_STATUS_OK!=(mamaPublisher_create( + &cPublisher, + cTransport, + cSymbol, + cSource, + NULL))) + { + if(cSource)(*env)->ReleaseStringUTFChars(env,source, cSource); + if(cSymbol)(*env)->ReleaseStringUTFChars(env,symbol, cSymbol); + utils_buildErrorStringForStatus( + errorString, UTILS_MAX_ERROR_STRING_LENGTH, + "Failed to create publisher.", status); + utils_throwMamaException(env,errorString); + return; + } + + (*env)->SetLongField(env,this,publisherPointerFieldId_g, + CAST_POINTER_TO_JLONG(cPublisher)); + + if(cSource)(*env)->ReleaseStringUTFChars(env,source, cSource); + if(cSymbol)(*env)->ReleaseStringUTFChars(env,symbol, cSymbol); + + return; +} + /* * Class: com_wombat_mama_MamaPublisher * Method: _send diff --git a/mama/jni/src/com/wombat/mama/MamaPublisher.java b/mama/jni/src/com/wombat/mama/MamaPublisher.java index 0146945..c1e8dc8 100644 --- a/mama/jni/src/com/wombat/mama/MamaPublisher.java +++ b/mama/jni/src/com/wombat/mama/MamaPublisher.java @@ -39,6 +39,11 @@ public class MamaPublisher { _create(transport,topic); } + + public void create (MamaTransport transport, String source, String symbol) + { + _create(transport,source,symbol); + } public void send (MamaMsg msg) { @@ -77,6 +82,8 @@ public class MamaPublisher } private native void _create (MamaTransport transport, String topic); + + private native void _create (MamaTransport transport, String source, String symbol); private native void _send (MamaMsg msg); -- 1.8.4.msysgit.0
Chad Meyer| Corporate & Investment Bank | PIM Market Data Services | J.P. Morgan | 4 Metrotech Center 23rd Floor Brooklyn, NY 11201 | Tel: (718) 242-5165 | M: (215) 801-2606 | chad.j.meyer@...
This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired. This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.
|
||||
|
||||
"C" Unit-Test Support on Visual Studio
Guy <guy.tal@...>
Hello,
The payload unit tests do not run on VS 2012, I have fixed this and produced a large patch with all "C" unit tests. I have prepared the patch against the 'master' (rev. 2.3.1 and not the 'next' branch) with unit-tests ported to Visual Studio 2005 format. I've tested the unit tests to build on VS2005 against Google-tests 1.6.0, then I tested them to build in VS2012. If any one has other versions of Visual Studio and wants to check it out with VS 2010, 2008 and 2005 that would be great. The patch consists of changes to the source and few additional projects as described below:
- ret = aBridge->msgPayloadAddU8 (testPayload, NULL, ++fid, std::numeric_limits<mama_u8_t>::max ());
Guy Tick42
|
||||
|
||||
Re: [PATCH 2.3.1 1/1] Common: variable expansion in property value on the last line of properties file fails
Guy <guy.tal@...>
Hi Damian, I have a follow up on the bug as you asked. Regards, Guy Tick42
On 10/09/2014 11:55, Damian Maguire wrote:
|
||||
|
||||
Re: Enforcing field type on publish
Glenn McClements <gmcclements@...>
Hi Alpert,
I like #2 more, but I still want to keep the more data model/market data/custom logic away from the core messaging part of OpenMAMA.
What I’m thinking of is a hook/callback to a programmer defined function that would take the publisher, transport, message, a user defined closure etc that would return a status to indicate if the message should be published or not. The dictionary etc
could be obtained at initialisation time through a separate init function.
This dovetails with some of the other pieces of work going on:
I still need to work though this a bit more but let me know what you think.
Glenn
From: <Alpert>, Reed <reed.alpert@...>
Date: Wednesday, 10 September 2014 19:13 To: Benjamin Taieb <benjamin.taieb@...>, Frank Quinn <fquinn@...>, "fquinn.ni@..." <fquinn.ni@...>, Glenn McClements <gmcclements@...> Cc: "Openmama-dev@..." <Openmama-dev@...> Subject: RE: [Openmama-dev] Enforcing field type on publish Hi,
Yes I agree with Ben’s comments. Here are some caveats: 1. This is a feature that is requested by our market data infrastructure team to prevent app dev teams from accidentally publishing incorrect field types. We don’t believe that a voluntary program will work, which is why the FieldCache as a location is not preferred. Apps won’t use the FieldCache if they don’t need it, or are using an Excel plugin for publishing (quite common). 2. This is a configured feature, and only for publication. 3. This is not for subscription, that is, a subscriber can use any of the getXX() methods that work.
There seems to be general agreement that the OM layer is much better place for this feature than bridges themselves.
The two best locations seem to be: 1. In mamaMsg, in the addXX and updateXX methods, probably with a macro like CHECK_MODIFY() that calls out to another module to do the work. a. This is good since it keep the message construction in mamaMsg module. b. This is good since it give the programmer the quickest feedback on an error condition. c. This is bad since it puts some performance hit on construction of every message, even on subscribe. d. This is bad since it makes the code active for creating subscribe messages too. e. This is bad since it puts more code in more places for this feature. 2. In publisher.send() that checks a config var and then calls out to another module to do the work. a. This is good since it has minimal code changes, and minimal performance effect. b. This is good since it only affects publish messages. c. This is bad since it gives delayed failure, and there may be multiple fields that fail. i. Probably logging each failed field and returning an error code will work well to help developers fix problems.
I think the publisher.send() method is the best place, given the above.
But figuring out which dictionary to use for the checking seems to be the most difficult part. Adding this feature as a config option may require also adding some optional config info about how to get a dictionary for a specific bridge, e.g., the source name.
Once the dictionary loading/mapping is solved then the rest seems fairly straightforward, except for the determination of allowed/denied conversions as configurable policy, but starting with 3 simple ones seems best: 1. No checking (default) 2. Type must match dictionary. 3. Type must match dictionary, with a small number of allowed conversions that do not incur data loss (e.g., F32 -> F64).
Thanks,
Reed. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
|
||||
|
||||
Re: Enforcing field type on publish
Alpert, Reed <reed.alpert@...>
Hi,
Yes I agree with Ben’s comments. Here are some caveats: 1. This is a feature that is requested by our market data infrastructure team to prevent app dev teams from accidentally publishing incorrect field types. We don’t believe that a voluntary program will work, which is why the FieldCache as a location is not preferred. Apps won’t use the FieldCache if they don’t need it, or are using an Excel plugin for publishing (quite common). 2. This is a configured feature, and only for publication. 3. This is not for subscription, that is, a subscriber can use any of the getXX() methods that work.
There seems to be general agreement that the OM layer is much better place for this feature than bridges themselves.
The two best locations seem to be: 1. In mamaMsg, in the addXX and updateXX methods, probably with a macro like CHECK_MODIFY() that calls out to another module to do the work. a. This is good since it keep the message construction in mamaMsg module. b. This is good since it give the programmer the quickest feedback on an error condition. c. This is bad since it puts some performance hit on construction of every message, even on subscribe. d. This is bad since it makes the code active for creating subscribe messages too. e. This is bad since it puts more code in more places for this feature. 2. In publisher.send() that checks a config var and then calls out to another module to do the work. a. This is good since it has minimal code changes, and minimal performance effect. b. This is good since it only affects publish messages. c. This is bad since it gives delayed failure, and there may be multiple fields that fail. i. Probably logging each failed field and returning an error code will work well to help developers fix problems.
I think the publisher.send() method is the best place, given the above.
But figuring out which dictionary to use for the checking seems to be the most difficult part. Adding this feature as a config option may require also adding some optional config info about how to get a dictionary for a specific bridge, e.g., the source name.
Once the dictionary loading/mapping is solved then the rest seems fairly straightforward, except for the determination of allowed/denied conversions as configurable policy, but starting with 3 simple ones seems best: 1. No checking (default) 2. Type must match dictionary. 3. Type must match dictionary, with a small number of allowed conversions that do not incur data loss (e.g., F32 -> F64).
Thanks,
Reed.
Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198 | M: 917.414.4613 | reed.alpert@...
Alternate Contact: CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@...
From: Benjamin Taieb [mailto:benjamin.taieb@...]
Sent: Wednesday, September 10, 2014 6:06 AM To: Frank Quinn; Alpert, Reed; fquinn.ni@...; Glenn McClements Cc: Openmama-dev@... Subject: RE: [Openmama-dev] Enforcing field type on publish
Hi Frank, You right, I should specify further what we have in mind: -Of course dictionary are not mandatory, and I don't think anyone wants the "dictionary check" to be default behaviour, it has to be optional to maintain compatibility with the many other use case. After all, mama is a middleware API, not a market data API and should be kept that way. So property based behaviour here. In some respect, your point highlight also the fact that there is no way to prevent application developers from doing the "bad thing", as a property can always be changed. What we are after is to prevent applications developers from doing the "bad thing" by accident.
-UnitTest have been run and pass on our payload (well at least last time I checked), but last time I checked, they were no composite tests for scalar type for example. The Qpid payload use a macro for all scalar, and in particular I believe: pn_data_put##TYPE (impl->mBody, value); will result in "brutal" cast without any further check, i.e. truncation, interpreting double as I8 and all sort of nice things. I don't think that is desirable from a design point of view, but would like to hear if people think there is a performance argument about it ? Performance apart, I think also that defining behaviour in reference implementation is not desirable, comparing to having a specification. Just having a matrix somewhere (google spreadsheet or OpenMama wiki ?) will be very useful.
-That why the proposed changes is on the publisher, not on the subscriber, as in this case, there is strong asymmetry between publisher and subscriber, i.e. publisher are pretty low volume, but number of them, and subscribers will have higher traffic to process. I understand this is not the case for example with a low latency environment. That's why it is optional and not the default.
I agree about having flexibility in the cast belong to the payload, however I don't think UnitTest should verify the mandatory minimum, but rather enforces required interoperability, as this could makes payload incompatible. The whole benefit of using OpenMama collapse in this case in my opinion.
Cheers, Ben.
From: Frank Quinn [mailto:fquinn@...]
Hi Ben,
There are a few things to consider here:
1. The MAMA Dictionary is optional too. If you look at basic mamapublisherc / mamasubscriberc, you’ll see it doesn’t use the dictionary, but it does use mamaMsg. 2. Unit tests actually do exist for the various mandatory castings required. If you run the payload unit tests today, for example, you’ll find that getting a date time from a string as well as a date time field should be supported, as well as other payload level casting expectations… In fact if you haven’t run these tests already, you really should, especially if you want to use MAMDA. If you look at the Qpid bridge, you’ll see a bunch of code put in to make this casting more forgiving. 3. While iterating over a message, in the interest of performance, you probably don’t want to hit the data dictionary on every field being processed, particularly if you’re trying to do high performance field caching.
Basically, I’m of the opinion that if you want to make a payload flexible with respect to the field types it supports, that should be done at a payload layer and we have unit tests to verify the mandatory minimum.
If you want to enforce MAMA Dictionary compliance, this should be done in the MAMA application, where you can use field descriptors to verify content type at send and receive sides. E.g.
switch (mamaFieldDescriptor_getType (fieldDescriptor)) { case MAMA_FIELD_TYPE_I32: case MAMA_FIELD_TYPE_I64: allow the add / access; default: handle condition; }
As well as everything else, whether or not the I64 to I32 casting (yes - the other way around) will be legal will depend on the data content being cast and sent across. Depending on the content, a U8 / U16 could even be fine. Without knowledge and consideration of this within the application, undesired truncation of data could be found, or opportunities to avoid failed data transfer due to strictness checking could be avoided.
Cheers, Frank
From:
openmama-dev-bounces@... [mailto:openmama-dev-bounces@...]
On Behalf Of Benjamin Taieb
Just to put emphasis on what Reed explain: It should be probably implemented in the C layer, making sure that it "catches" all languages "for free". The goal is to enforce it in OpenMama layer, so that OpenMama application are forced to adheres to it.
I believe use of MamaFieldCache is optional, which would rules it out. And referring to Glenn comments about mamaMsg, the goal is that the payload does not trust the calling application.
There is multiples challenges I believe in implementing that: -How does the mama layer get dictionary ? This could be easily solves by using dictionnary requestor wherever we decides to implement it. -How does the mapping between a particular msg and namespace is going to work ? At the moment, a dictionary is attached to namespace, namespace could be shared across multiples sources, but a message is source agnostic, only the publisher when it send the msg will send on a particular source/ symbol. On the other side, we would want to fail as soon as possible in the process, probably in the mamaMsg_addXX and mamaMsg_setXX calls, so that the calling application know for which fields it is doing something wrong. -We need to clarify the "authorised cast": If you're doing addI32 on a I64 fields, should probably get authorised. At the moment, various Payload implement various level of flexibility, would be good that we defines UnitTests defining precisely what it should be, probably modelled from what is defined in wombatmsg, for retro-compatibility purpose.
I'll take any input from the community, particularly on point 2, which is the hardest in my opinion. Cheers, Ben.
From:
openmama-dev-bounces@... [mailto:openmama-dev-bounces@...]
On Behalf Of Alpert, Reed
Yes, this is C++, but we also have Java and DotNet clients (some C# and some Excel). We do publish best practices, but our app dev groups sometimes do not follow them, and we want to prevent the SolCache from getting out of sync with the dictionary. We also provide them with sample apps that have this type of control, but sometimes … J
Thanks,
Reed.
Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198 | M: 917.414.4613 | reed.alpert@...
Alternate Contact: CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@...
From: Frank Quinn [mailto:fquinn.ni@...]
Guessing by your capitalization, I'm guessing this is C++? If so, have you considered mandating that all function calls use MamaFieldDescriptors pulled from a standardized dictionary rather than using string / fid pairs?
On Thu, Sep 4, 2014 at 9:33 PM, Glenn McClements <gmcclements@...> wrote: There are a number of places where this *could* be done, but not all make sense to me:
- In the payload library - this doesn't feel right as I feel the payload should be 'dumb' and just trust the caller. Also it would need to be done for each supported payload.
- In the payload bridge - same points as above
- In MamaMsg - this would work across all supported payloads, but again I think the payload should be trust the caller as to what fields should be what type and act a a simple wrapper
- In a field cache (e.g. the C & C++ MamaFieldCache) - this makes the most sense to me as the logic for maintaining state about fields and if they have changed is already there, so extra rules and checks would belong here
With the current MamaFieldCache you could do this already if you primed the fields by creating them with the correct type on startup, this would depend on having a well known set of fields though because you don't want to be adding everything in the dictionary. You could also try this technique with the MamaMsg object, but it will be payload specific because some will throw an error if you try to update a field with a new type, and others may replace it.
Glenn
From: Alpert, Reed [reed.alpert@...]
Hi,
Yes, the addString() is from MamaMsg.
The cache is our own market data infrastructure, not the MamaFieldCache. In this case it is either Solace/SolCache or TREP RTIC.
We want business app dev groups to be able to look at the dictionary, see a field that is MamaPrice, and be guaranteed that MamaMsg.getPrice() will always work (although MamaPrice.getIsValidPrice() may return false). A certain number of the type mismatches come from Excel, where a failed formula may populate a cell as #NA, and that gets published into a price/float field.
Thanks,
Reed.
Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198 | M: 917.414.4613 | reed.alpert@...
Alternate Contact: CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@...
From: Glenn
McClements [mailto:gmcclements@...]
Hi Guy, When you refer to 'addString' below you mention a cache, but this is not a method of the C/C++MamaFieldCache are you refering to your own cache object or actually a MamaMsg object?
Regards, Glenn
From:
openmama-dev-bounces@... [openmama-dev-bounces@...] on behalf of Alpert,
Reed [reed.alpert@...] Hi,
This is regarding a way to check that the field data type being published matches (closely) the dictionary data type.
The motivation for this is to prevent our internally published data caches from invalid data. For example, if a field (name=MARK_PRICE fid=300) is a MamaPrice in the dictionary, and an app uses this: addString("MARK_PRICE", 300, "invalid price"); then we have a field that is now a string in the cache, and that causes problems with subscribing apps that use getPrice().
Currently we are looking to the bridges that we use (Tick42/TREP & Solace) to enforce these restrictions.
Adding this to the OpenMAMA layer is tricky, but allows a single policy to be enforced across multiple bridges. The main question from my point of view being how to set the policy. They can range from none (default), to most strict (published field's type must match dictionary), to common conversions supported (e.g., F32 -> F64), to a custom grid setting exactly what conversions are allowed.
I welcome any input from others who have run into this same issue, or found other ways to solve it.
Thanks,
Reed.
Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198 | M: 917.414.4613 | reed.alpert@...
Alternate Contact: CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@... This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired. This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired. This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.
|
||||
|
||||
Re: Enforcing field type on publish
Benjamin Taieb
Hi Frank, You right, I should specify further what we have in mind: -Of course dictionary are not mandatory, and I don't think anyone wants the "dictionary check" to be default behaviour, it has to be optional to maintain compatibility with the many other use case. After all, mama is a middleware API, not a market data API and should be kept that way. So property based behaviour here. In some respect, your point highlight also the fact that there is no way to prevent application developers from doing the "bad thing", as a property can always be changed. What we are after is to prevent applications developers from doing the "bad thing" by accident.
-UnitTest have been run and pass on our payload (well at least last time I checked), but last time I checked, they were no composite tests for scalar type for example. The Qpid payload use a macro for all scalar, and in particular I believe: pn_data_put##TYPE (impl->mBody, value); will result in "brutal" cast without any further check, i.e. truncation, interpreting double as I8 and all sort of nice things. I don't think that is desirable from a design point of view, but would like to hear if people think there is a performance argument about it ? Performance apart, I think also that defining behaviour in reference implementation is not desirable, comparing to having a specification. Just having a matrix somewhere (google spreadsheet or OpenMama wiki ?) will be very useful.
-That why the proposed changes is on the publisher, not on the subscriber, as in this case, there is strong asymmetry between publisher and subscriber, i.e. publisher are pretty low volume, but number of them, and subscribers will have higher traffic to process. I understand this is not the case for example with a low latency environment. That's why it is optional and not the default.
I agree about having flexibility in the cast belong to the payload, however I don't think UnitTest should verify the mandatory minimum, but rather enforces required interoperability, as this could makes payload incompatible. The whole benefit of using OpenMama collapse in this case in my opinion.
Cheers, Ben.
From: Frank Quinn [mailto:fquinn@...]
Sent: 10 September 2014 09:31 To: Benjamin Taieb; Alpert, Reed; fquinn.ni@...; Glenn McClements Cc: Openmama-dev@... Subject: RE: [Openmama-dev] Enforcing field type on publish
Hi Ben,
There are a few things to consider here:
1. The MAMA Dictionary is optional too. If you look at basic mamapublisherc / mamasubscriberc, you’ll see it doesn’t use the dictionary, but it does use mamaMsg. 2. Unit tests actually do exist for the various mandatory castings required. If you run the payload unit tests today, for example, you’ll find that getting a date time from a string as well as a date time field should be supported, as well as other payload level casting expectations… In fact if you haven’t run these tests already, you really should, especially if you want to use MAMDA. If you look at the Qpid bridge, you’ll see a bunch of code put in to make this casting more forgiving. 3. While iterating over a message, in the interest of performance, you probably don’t want to hit the data dictionary on every field being processed, particularly if you’re trying to do high performance field caching.
Basically, I’m of the opinion that if you want to make a payload flexible with respect to the field types it supports, that should be done at a payload layer and we have unit tests to verify the mandatory minimum.
If you want to enforce MAMA Dictionary compliance, this should be done in the MAMA application, where you can use field descriptors to verify content type at send and receive sides. E.g.
switch (mamaFieldDescriptor_getType (fieldDescriptor)) { case MAMA_FIELD_TYPE_I32: case MAMA_FIELD_TYPE_I64: allow the add / access; default: handle condition; }
As well as everything else, whether or not the I64 to I32 casting (yes - the other way around) will be legal will depend on the data content being cast and sent across. Depending on the content, a U8 / U16 could even be fine. Without knowledge and consideration of this within the application, undesired truncation of data could be found, or opportunities to avoid failed data transfer due to strictness checking could be avoided.
Cheers, Frank
From:
openmama-dev-bounces@... [mailto:openmama-dev-bounces@...]
On Behalf Of Benjamin Taieb
Just to put emphasis on what Reed explain: It should be probably implemented in the C layer, making sure that it "catches" all languages "for free". The goal is to enforce it in OpenMama layer, so that OpenMama application are forced to adheres to it.
I believe use of MamaFieldCache is optional, which would rules it out. And referring to Glenn comments about mamaMsg, the goal is that the payload does not trust the calling application.
There is multiples challenges I believe in implementing that: -How does the mama layer get dictionary ? This could be easily solves by using dictionnary requestor wherever we decides to implement it. -How does the mapping between a particular msg and namespace is going to work ? At the moment, a dictionary is attached to namespace, namespace could be shared across multiples sources, but a message is source agnostic, only the publisher when it send the msg will send on a particular source/ symbol. On the other side, we would want to fail as soon as possible in the process, probably in the mamaMsg_addXX and mamaMsg_setXX calls, so that the calling application know for which fields it is doing something wrong. -We need to clarify the "authorised cast": If you're doing addI32 on a I64 fields, should probably get authorised. At the moment, various Payload implement various level of flexibility, would be good that we defines UnitTests defining precisely what it should be, probably modelled from what is defined in wombatmsg, for retro-compatibility purpose.
I'll take any input from the community, particularly on point 2, which is the hardest in my opinion. Cheers, Ben.
From:
openmama-dev-bounces@... [mailto:openmama-dev-bounces@...]
On Behalf Of Alpert, Reed
Yes, this is C++, but we also have Java and DotNet clients (some C# and some Excel). We do publish best practices, but our app dev groups sometimes do not follow them, and we want to prevent the SolCache from getting out of sync with the dictionary. We also provide them with sample apps that have this type of control, but sometimes … J
Thanks,
Reed.
Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198 | M: 917.414.4613 | reed.alpert@...
Alternate Contact: CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@...
From: Frank Quinn [mailto:fquinn.ni@...]
Guessing by your capitalization, I'm guessing this is C++? If so, have you considered mandating that all function calls use MamaFieldDescriptors pulled from a standardized dictionary rather than using string / fid pairs?
On Thu, Sep 4, 2014 at 9:33 PM, Glenn McClements <gmcclements@...> wrote: There are a number of places where this *could* be done, but not all make sense to me:
- In the payload library - this doesn't feel right as I feel the payload should be 'dumb' and just trust the caller. Also it would need to be done for each supported payload.
- In the payload bridge - same points as above
- In MamaMsg - this would work across all supported payloads, but again I think the payload should be trust the caller as to what fields should be what type and act a a simple wrapper
- In a field cache (e.g. the C & C++ MamaFieldCache) - this makes the most sense to me as the logic for maintaining state about fields and if they have changed is already there, so extra rules and checks would belong here
With the current MamaFieldCache you could do this already if you primed the fields by creating them with the correct type on startup, this would depend on having a well known set of fields though because you don't want to be adding everything in the dictionary. You could also try this technique with the MamaMsg object, but it will be payload specific because some will throw an error if you try to update a field with a new type, and others may replace it.
Glenn
From: Alpert, Reed [reed.alpert@...]
Hi,
Yes, the addString() is from MamaMsg.
The cache is our own market data infrastructure, not the MamaFieldCache. In this case it is either Solace/SolCache or TREP RTIC.
We want business app dev groups to be able to look at the dictionary, see a field that is MamaPrice, and be guaranteed that MamaMsg.getPrice() will always work (although MamaPrice.getIsValidPrice() may return false). A certain number of the type mismatches come from Excel, where a failed formula may populate a cell as #NA, and that gets published into a price/float field.
Thanks,
Reed.
Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198 | M: 917.414.4613 | reed.alpert@...
Alternate Contact: CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@...
From:
Glenn McClements [mailto:gmcclements@...]
Hi Guy, When you refer to 'addString' below you mention a cache, but this is not a method of the C/C++MamaFieldCache are you refering to your own cache object or actually a MamaMsg object?
Regards, Glenn
From:
openmama-dev-bounces@... [openmama-dev-bounces@...] on behalf of Alpert,
Reed [reed.alpert@...] Hi,
This is regarding a way to check that the field data type being published matches (closely) the dictionary data type.
The motivation for this is to prevent our internally published data caches from invalid data. For example, if a field (name=MARK_PRICE fid=300) is a MamaPrice in the dictionary, and an app uses this: addString("MARK_PRICE", 300, "invalid price"); then we have a field that is now a string in the cache, and that causes problems with subscribing apps that use getPrice().
Currently we are looking to the bridges that we use (Tick42/TREP & Solace) to enforce these restrictions.
Adding this to the OpenMAMA layer is tricky, but allows a single policy to be enforced across multiple bridges. The main question from my point of view being how to set the policy. They can range from none (default), to most strict (published field's type must match dictionary), to common conversions supported (e.g., F32 -> F64), to a custom grid setting exactly what conversions are allowed.
I welcome any input from others who have run into this same issue, or found other ways to solve it.
Thanks,
Reed.
Reed Alpert | Corporate & Investment Bank | Market Data Services | J.P. Morgan | 4 Metrotech Center, 23rd Floor, Brooklyn, NY 11245 | T: 718.242.5198 | M: 917.414.4613 | reed.alpert@...
Alternate Contact: CIB PIM Trading Technology Solutions NA | CIB_PIM_Trading_Technology_Solutions_NA@... This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired. This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates. This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMorgan Chase & Co., its subsidiaries and affiliates, as applicable, for any loss or damage arising in any way from its use. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Thank you. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities. This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
|
||||
|