[PATCH 3/4] QPID: Fixed some memory leaks and removed old backwards compatibility
Frank Quinn <fquinn.ni@...>
This change means that the bridge is only expected to work with version 0.6 and above of proton (ideally 0.7). It also fixed several memory leaks and simplifies many areas of code which were required to maintain backwards compatability - particularly with respect to shutdown code. Signed-off-by: Frank Quinn <fquinn.ni@...> --- mama/c_cpp/src/c/bridge/qpid/SConscript | 6 ++--- mama/c_cpp/src/c/bridge/qpid/publisher.c | 3 ++- mama/c_cpp/src/c/bridge/qpid/qpidcommon.c | 4 +-- mama/c_cpp/src/c/bridge/qpid/qpidcommon.h | 4 +-- mama/c_cpp/src/c/bridge/qpid/qpiddefs.h | 35 ++---------------------- mama/c_cpp/src/c/bridge/qpid/subscription.c | 7 ++--- mama/c_cpp/src/c/bridge/qpid/transport.c | 41 +++++------------------------ 7 files changed, 21 insertions(+), 79 deletions(-) diff --git a/mama/c_cpp/src/c/bridge/qpid/SConscript b/mama/c_cpp/src/c/bridge/qpid/SConscript index 566867c..6ec5396 100644 --- a/mama/c_cpp/src/c/bridge/qpid/SConscript +++ b/mama/c_cpp/src/c/bridge/qpid/SConscript @@ -36,9 +36,9 @@ if not env.GetOption('clean'): if not conf.CheckCHeader('proton/parser.h'): print '+- could not find parser.h in ${QPID_HOME}/include/proton' Exit(1) - if conf.CheckCHeader('proton/version.h'): - print '+- proton release appears to include ${QPID_HOME}/include/proton/version.h' - env.Append(CCFLAGS=['-DHAVE_QPID_PROTON_VERSION_H']) + if not conf.CheckCHeader('proton/version.h'): + print '+- proton release does not have ${QPID_HOME}/include/proton/version.h - too old!' + Exit(1) env = conf.Finish() diff --git a/mama/c_cpp/src/c/bridge/qpid/publisher.c b/mama/c_cpp/src/c/bridge/qpid/publisher.c index 98a9a1a..5f1c206 100644 --- a/mama/c_cpp/src/c/bridge/qpid/publisher.c +++ b/mama/c_cpp/src/c/bridge/qpid/publisher.c @@ -340,7 +340,8 @@ qpidBridgeMamaPublisher_send (publisherBridge publisher, mamaMsg msg) } /* Note the messages don't actually get published until here */ - if (PN_MESSENGER_SEND(impl->mTransport->mOutgoing)) + if (pn_messenger_send(impl->mTransport->mOutgoing, + QPID_MESSENGER_SEND_TIMEOUT)) { qpidError = PN_MESSENGER_ERROR (impl->mTransport->mOutgoing); mama_log (MAMA_LOG_LEVEL_SEVERE, "qpidBridgeMamaPublisher_send(): " diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c index f55f63a..9709bad 100644 --- a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c +++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.c @@ -36,7 +36,7 @@ =========================================================================*/ mama_status -qpidBridgeCommon_parseSubjectKey (char* key, +qpidBridgeCommon_parseSubjectKey (const char* key, const char** root, const char** source, const char** topic, @@ -137,7 +137,7 @@ mama_status qpidBridgeCommon_generateSubjectKey (const char* root, const char* source, const char* topic, - char** keyTarget) + const char** keyTarget) { char subject[MAX_SUBJECT_LENGTH]; char* subjectPos = subject; diff --git a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h index 6cb70a7..a18c6f2 100644 --- a/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h +++ b/mama/c_cpp/src/c/bridge/qpid/qpidcommon.h @@ -77,7 +77,7 @@ mama_status qpidBridgeCommon_generateSubjectKey (const char* root, const char* source, const char* topic, - char** keyTarget); + const char** keyTarget); /** * This function will take the provided format string and use the root, source, @@ -138,7 +138,7 @@ qpidBridgeCommon_generateSubjectUri (const char* format, * @return mama_status indicating whether the method succeeded or failed. */ mama_status -qpidBridgeCommon_parseSubjectKey (char* key, +qpidBridgeCommon_parseSubjectKey (const char* key, const char** root, const char** source, const char** topic, diff --git a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h index 3e3b2e5..2219908 100644 --- a/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h +++ b/mama/c_cpp/src/c/bridge/qpid/qpiddefs.h @@ -32,6 +32,7 @@ #include <list.h> /* Qpid include files */ +#include <proton/version.h> #include <proton/driver.h> #include <proton/message.h> #include <proton/util.h> @@ -39,12 +40,6 @@ #include "endpointpool.h" -/* If this version of proton has provided a version header file (build system - * to provide this macro) */ -#ifdef HAVE_QPID_PROTON_VERSION_H -#include <proton/version.h> -#endif - #if defined(__cplusplus) extern "C" { #endif @@ -79,34 +74,8 @@ typedef enum qpidTransportType_ QPID_TRANSPORT_TYPE_BROKER } qpidTransportType; -/* If a proton version header has been parsed at this point (version >= 0.5) */ -#ifdef _PROTON_VERSION_H - -#if (PN_VERSION_MAJOR > 0 || PN_VERSION_MINOR > 4) -#define PN_MESSENGER_SEND(messenger) \ - pn_messenger_send(messenger, QPID_MESSENGER_SEND_TIMEOUT) #define PN_MESSENGER_ERROR(messenger) \ pn_error_text(pn_messenger_error(messenger)) -#define PN_MESSENGER_STOP(messenger) \ - qpidBridgeMamaTransportImpl_stopProtonMessenger (messenger) -#define PN_MESSENGER_FREE(messenger) \ - qpidBridgeMamaTransportImpl_freeProtonMessenger (messenger) -#endif - -/* Place other version specific macros here */ - -#else - -/* Earliest supported version is 0.4 - header was added in 0.5 */ -#define PN_VERSION_MAJOR 0 -#define PN_VERSION_MINOR 4 - -#define PN_MESSENGER_SEND(messenger) pn_messenger_send(messenger) -#define PN_MESSENGER_ERROR(messenger) pn_messenger_error(messenger) -#define PN_MESSENGER_STOP(messenger) /* disabled as it deadlocks in this ver */ -#define PN_MESSENGER_FREE(messenger) /* disabled as it deadlocks in this ver */ - -#endif /* _PROTON_VERSION_H */ /* Keys for application property map */ #define QPID_KEY_MSGTYPE "MAMAT" @@ -131,7 +100,7 @@ typedef struct qpidSubscription_ const char* mSource; const char* mTopic; const char* mRoot; - char* mSubject; + const char* mSubject; const char* mUri; void* mClosure; int mIsNotMuted; diff --git a/mama/c_cpp/src/c/bridge/qpid/subscription.c b/mama/c_cpp/src/c/bridge/qpid/subscription.c index 2582a5e..d6d2dae 100644 --- a/mama/c_cpp/src/c/bridge/qpid/subscription.c +++ b/mama/c_cpp/src/c/bridge/qpid/subscription.c @@ -150,14 +150,15 @@ qpidBridgeMamaSubscription_create (subscriptionBridge* subscriber, /* Add in the subject key as the only string inside */ pn_data_put_string (data, pn_bytes (strlen (impl->mSubject), - impl->mSubject)); + (char*) impl->mSubject)); /* Send out the subscription registration of interest message */ if (NULL != transport->mOutgoingAddress) { pn_messenger_put (transport->mOutgoing, transport->mMsg); - if (0 != PN_MESSENGER_SEND (transport->mOutgoing)) + if (0 != pn_messenger_send (transport->mOutgoing, + QPID_MESSENGER_SEND_TIMEOUT)) { const char* qpid_error = PN_MESSENGER_ERROR (transport->mOutgoing); mama_log (MAMA_LOG_LEVEL_SEVERE, @@ -257,7 +258,7 @@ qpidBridgeMamaSubscription_destroy (subscriptionBridge subscriber) if (NULL != impl->mSubject) { - free (impl->mSubject); + free ((void*)impl->mSubject); } if (NULL != impl->mRoot) diff --git a/mama/c_cpp/src/c/bridge/qpid/transport.c b/mama/c_cpp/src/c/bridge/qpid/transport.c index 2b2053b..ab9755c 100644 --- a/mama/c_cpp/src/c/bridge/qpid/transport.c +++ b/mama/c_cpp/src/c/bridge/qpid/transport.c @@ -260,9 +260,6 @@ qpidBridgeMamaTransportImpl_releasePoolMsg (qpidMsgPool* pool, static void* qpidBridgeMamaTransportImpl_dispatchThread (void* closure); -/* These functions depend on methods only introduced in qpid proton 0.5 */ -#if (PN_VERSION_MAJOR > 0 || PN_VERSION_MINOR > 4) - /** * This function is a wrapper for pn_messenger_stop as it caused deadlock * before qpid proton 0.5 and pn_messenger_interrupt as it was not introduced @@ -282,8 +279,6 @@ qpidBridgeMamaTransportImpl_stopProtonMessenger (pn_messenger_t* messenger); static void qpidBridgeMamaTransportImpl_freeProtonMessenger (pn_messenger_t* messenger); -#endif - /*========================================================================= = Public interface implementation functions = =========================================================================*/ @@ -348,8 +343,8 @@ qpidBridgeMamaTransport_destroy (transportBridge transport) pn_message_free(impl->mMsg); /* Macro wrapped as these caused deadlock prior to v0.5 of qpid proton */ - PN_MESSENGER_FREE(impl->mIncoming); - PN_MESSENGER_FREE(impl->mOutgoing); + qpidBridgeMamaTransportImpl_freeProtonMessenger (impl->mIncoming); + qpidBridgeMamaTransportImpl_freeProtonMessenger (impl->mOutgoing); endpointPool_destroy (impl->mSubEndpoints); endpointPool_destroy (impl->mPubEndpoints); @@ -943,7 +938,7 @@ qpidBridgeMamaTransportImpl_start (qpidTransportBridge* impl) "Error Subscribing to %s : %s", impl->mIncomingAddress, PN_MESSENGER_ERROR(impl->mIncoming)); - PN_MESSENGER_STOP(impl->mIncoming); + (impl->mIncoming); return MAMA_STATUS_PLATFORM; } } @@ -961,7 +956,7 @@ qpidBridgeMamaTransportImpl_start (qpidTransportBridge* impl) "Error Subscribing to %s : %s", impl->mReplyAddress, PN_MESSENGER_ERROR(impl->mIncoming)); - PN_MESSENGER_STOP(impl->mIncoming); + qpidBridgeMamaTransportImpl_stopProtonMessenger (impl->mIncoming); return MAMA_STATUS_PLATFORM; } } @@ -994,21 +989,6 @@ mama_status qpidBridgeMamaTransportImpl_stop (qpidTransportBridge* impl) */ mama_status status = MAMA_STATUS_OK; - /* - * Ask the messenger nicely to stop by sending a special subject (best we - * can do prior to qpid 0.5 when recv was not interruptable). Known to - * deadlock pn_messenger_recv if the recv block = 1. - */ - pn_message_t* msg = pn_message (); - - /* Set the byte to indicate this is a termination message */ - qpidBridgePublisherImpl_setMessageType (msg, QPID_MSG_TERMINATE); - - /* Create the messenger for publishing out this desist message */ - pn_message_set_address (msg, impl->mReplyAddress); - pn_messenger_put (impl->mOutgoing, msg); - PN_MESSENGER_SEND (impl->mOutgoing); - /* Set the transportBridge mIsDispatching to false */ impl->mIsDispatching = 0; @@ -1020,17 +1000,11 @@ mama_status qpidBridgeMamaTransportImpl_stop (qpidTransportBridge* impl) mama_log (MAMA_LOG_LEVEL_FINE, "qpidBridgeMamaTransportImpl_stop(): " "Stopping the outgoing messenger."); - PN_MESSENGER_STOP(impl->mOutgoing); + qpidBridgeMamaTransportImpl_stopProtonMessenger (impl->mOutgoing); mama_log (MAMA_LOG_LEVEL_FINE, "qpidBridgeMamaTransportImpl_stop(): " "Stopping the incoming messenger."); - PN_MESSENGER_STOP(impl->mIncoming); - - /* Destroy the temporarily created proton message */ - if (NULL != msg) - { - pn_message_free (msg); - } + qpidBridgeMamaTransportImpl_stopProtonMessenger (impl->mIncoming); mama_log (MAMA_LOG_LEVEL_FINEST, "qpidBridgeMamaTransportImpl_stop(): " "Rejoined with status: %s.", @@ -1806,7 +1780,6 @@ void* qpidBridgeMamaTransportImpl_dispatchThread (void* closure) } /* These functions depend on methods only introduced in qpid proton 0.5 */ -#if (PN_VERSION_MAJOR > 0 || PN_VERSION_MINOR > 4) void qpidBridgeMamaTransportImpl_stopProtonMessenger (pn_messenger_t* messenger) { mama_log (MAMA_LOG_LEVEL_FINE, @@ -1815,7 +1788,6 @@ void qpidBridgeMamaTransportImpl_stopProtonMessenger (pn_messenger_t* messenger) messenger, pn_messenger_name(messenger)); - pn_messenger_interrupt (messenger); pn_messenger_stop (messenger); } @@ -1823,4 +1795,3 @@ void qpidBridgeMamaTransportImpl_freeProtonMessenger (pn_messenger_t* messenger) { pn_messenger_free (messenger); } -#endif -- 1.9.3 |
|