Date   

[PATCH] Added MAMA_STATUS_SUBSCRIPTION_GAP

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Middleware bridges pass this status to the mamaSubscription onError callback
when the middleware detects a gap:

mamaSubscription_getUserCallbacks(sub)->onError(...)

Middlewares that do not detect dropped messages will not use this status.

Note that this callback is independent of the sequence number gap detection
implemented by dqpublisher and dqstrategy.

Signed-off-by: John Gray <jgray@...>
---
mama/c_cpp/src/c/mama/status.h | 4 +++-
mama/c_cpp/src/c/status.c | 3 ++-
2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/status.h b/mama/c_cpp/src/c/mama/status.h
index bccb7e0..fa0c5f5 100644
--- a/mama/c_cpp/src/c/mama/status.h
+++ b/mama/c_cpp/src/c/mama/status.h
@@ -99,7 +99,9 @@ typedef enum
/* Queue has open objects. */
MAMA_STATUS_QUEUE_OPEN_OBJECTS = 5002,
/* The function isn't supported for this type of subscription. */
- MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE = 5003
+ MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE = 5003,
+ /* The underlying transport saw a gap. */
+ MAMA_STATUS_SUBSCRIPTION_GAP = 5004

#ifdef WITH_ENTITLEMENTS
/* Out of memory */
diff --git a/mama/c_cpp/src/c/status.c b/mama/c_cpp/src/c/status.c
index 1c394e3..7e7ff91 100644
--- a/mama/c_cpp/src/c/status.c
+++ b/mama/c_cpp/src/c/status.c
@@ -61,7 +61,8 @@ mamaStatus_stringForStatus (mama_status status)
case MAMA_STATUS_SUBSCRIPTION_INVALID_STATE: return "MAMA_STATUS_SUBSCRIPTION_INVALID_STATE";
case MAMA_STATUS_QUEUE_OPEN_OBJECTS: return "MAMA_STATUS_QUEUE_OPEN_OBJECTS";
case MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE: return "MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE";
-
+ case MAMA_STATUS_SUBSCRIPTION_GAP: return "MAMA_STATUS_SUBSCRIPTION_GAP";
+
#ifdef WITH_ENTITLEMENTS
case MAMA_ENTITLE_STATUS_NOMEM : return "ENTITLE_STATUS_NOMEM";
case MAMA_ENTITLE_STATUS_BAD_PARAM : return "ENTITLE_STATUS_BAD_PARAM";
--
1.7.5.4


[PATCH] Correctly log when the queue gets dereferenced too many times

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

The existing logic contained a race condition which would not consistently log
the attempt to dereference a queue with a zero reference count.

The interlocked variable also requires initialization. The methods are added as
no-ops in common/wombat/wInterlocked.h for Linux, but they will have
implementations for Windows.

Signed-off-by: John Gray <jgray@...>
---
common/c_cpp/src/c/wombat/wInterlocked.h | 23 +++++++++++++++++++++++
mama/c_cpp/src/c/queue.c | 6 +++++-
mama/c_cpp/src/c/subscription.c | 3 +++
3 files changed, 31 insertions(+), 1 deletions(-)

diff --git a/common/c_cpp/src/c/wombat/wInterlocked.h b/common/c_cpp/src/c/wombat/wInterlocked.h
index b75924e..c128804 100644
--- a/common/c_cpp/src/c/wombat/wInterlocked.h
+++ b/common/c_cpp/src/c/wombat/wInterlocked.h
@@ -69,6 +69,29 @@ adec32 (uint32_t* ptr)
typedef uint32_t wInterlockedInt;

/**
+ * This function will initialise a wInterlockedInt.
+ *
+ * @param[in] value Pointer to the item to be initialized.
+ * @return 0 on success.
+ */
+
+WCOMMONINLINE int wInterlocked_initialize(wInterlockedInt *value)
+{
+ return 0;
+}
+
+/**
+ * This function will destroy a wInterlockedInt.
+ *
+ * @param[in] value Pointer to the item to be destroyed.
+ * @return 0 on success.
+ */
+WCOMMONINLINE int wInterlocked_destroy(wInterlockedInt *value)
+{
+ return 0;
+}
+
+/**
* This function will atomically decrement a 32-bit integer value.
*
* @param[in] value Pointer to the value to be decremented.
diff --git a/mama/c_cpp/src/c/queue.c b/mama/c_cpp/src/c/queue.c
index 72e9892..7190819 100644
--- a/mama/c_cpp/src/c/queue.c
+++ b/mama/c_cpp/src/c/queue.c
@@ -198,6 +198,7 @@ mamaQueue_create (mamaQueue* queue,
impl->mQueueMonitorClosure = NULL;

/* Create the counter lock. */
+ wInterlocked_initialize(&impl->mNumberOpenObjects);
wInterlocked_set(0, &impl->mNumberOpenObjects);


@@ -436,7 +437,7 @@ mamaQueue_decrementObjectCount(mamaQueueLockHandle *handle,
int newCount = wInterlocked_decrement(&impl->mNumberOpenObjects);

/* Write a log if something has gone wrong. */
- if(impl->mNumberOpenObjects < 0)
+ if(newCount < 0)
{
mama_log(MAMA_LOG_LEVEL_ERROR, "Queue 0x%p has been dereferenced too many times.", queue);
}
@@ -719,6 +720,9 @@ mamaQueue_destroy (mamaQueue queue)
impl->mMamaQueueBridgeImpl = NULL;
impl->mMsg = NULL;

+ /* Destroy the counter lock */
+ wInterlocked_destroy(&impl->mNumberOpenObjects);
+
free (impl);

mama_log (MAMA_LOG_LEVEL_FINEST, "Leaving mamaQueue_destroy for queue 0x%X.", queue);
diff --git a/mama/c_cpp/src/c/subscription.c b/mama/c_cpp/src/c/subscription.c
index d6d0c00..6c73978 100644
--- a/mama/c_cpp/src/c/subscription.c
+++ b/mama/c_cpp/src/c/subscription.c
@@ -1831,6 +1831,9 @@ void mamaSubscriptionImpl_deallocate(mamaSubscriptionImpl *impl)
/* Destroy the mutex. */
wlock_destroy(impl->mCreateDestroyLock);

+ /* Destroy the state. */
+ wInterlocked_destroy(&impl->mState);
+
/* Free the subscription impl. */
free(impl);
}
--
1.7.5.4


[PATCH] Fixed memory leak in properties

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Free memory and fail if realloc fails in propertiesImpl_addKey(). When we
replace a value in the hash table in propertiesImpl_addProperty(), free the old
value.

Signed-off-by: John Gray <jgray@...>
---
common/c_cpp/src/c/property.c | 48 ++++++++++++++++++++++++++++------------
1 files changed, 33 insertions(+), 15 deletions(-)

diff --git a/common/c_cpp/src/c/property.c b/common/c_cpp/src/c/property.c
index 7bc1f38..456c08f 100644
--- a/common/c_cpp/src/c/property.c
+++ b/common/c_cpp/src/c/property.c
@@ -326,7 +326,7 @@ properties_Get( wproperty_t handle, const char* name )
propertiesImpl_ *this = (propertiesImpl_ *)handle;
const char* rval = NULL;

- if( name == NULL || strlen( name ) == 0 )
+ if( name == NULL || NULL == this || strlen( name ) == 0 )
{
return NULL;
}
@@ -554,9 +554,19 @@ propertiesImpl_AddKey( propertiesImpl_ *this, const char* name )
}
else
{
- this->mKeys = (const char* *)realloc(
- (void* )this->mKeys,
- allocSize * sizeof( const char* ) );
+ void* reallocBlock =
+ realloc((void* )this->mKeys, (allocSize * sizeof(const char*)));
+
+ if(NULL != reallocBlock)
+ {
+ this->mKeys = (const char**)reallocBlock;
+ }
+ else
+ {
+ free((void* )this->mKeys);
+ this->mKeys = NULL;
+ return 0;
+ }
}
}

@@ -571,28 +581,36 @@ propertiesImpl_AddProperty( propertiesImpl properties,
const char* value )
{
propertiesImpl_ *this = (propertiesImpl_*)properties;
+ void* data = NULL;
+ int ret = 0;

- if( gPropertyDebug )
- {
- fprintf (stderr,
- "\nAddProperty KEY: %s, VALUE: %s\n",
- name,
- value);
- }
+ if( NULL == this )
+ return 0;

- if ( NULL == wtable_lookup( this->mTable, (char* )name ))
+ if ( NULL == (data = wtable_lookup(this->mTable, (char*)name)))
{
if( !propertiesImpl_AddKey( this, name ))
return 0;
}

- if (-1==wtable_insert( this->mTable, (char* )name, (caddr_t)value ))
+ if(-1 == (ret = wtable_insert( this->mTable, (char* )name, (caddr_t)value)))
{
return 0;
}

+ if(0 == ret) /* If 0 is returned then data has been replaced. */
+ {
+ /* If existing data in the table has now been replaced then the old data must be freed. */
+ if(NULL != data)
+ {
+ free(data);
+ }
+
+ if(gPropertyDebug)
+ {
+ fprintf(stderr, "\nAddProperty KEY: %s, VALUE: %s\n", name, value);
+ }
+ }

return 1;
}
-
-
--
1.7.5.4


[PATCH] Include mama/types.h from mamainternal.h

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

The C++ wrappers require mama/types.h to be included from mamainternal.h in
order to build properly.

Signed-off-by: John Gray <jgray@...>
---
mama/c_cpp/src/c/mamainternal.h | 1 +
1 files changed, 1 insertions(+), 0 deletions(-)

diff --git a/mama/c_cpp/src/c/mamainternal.h b/mama/c_cpp/src/c/mamainternal.h
index 340d922..23f23c7 100644
--- a/mama/c_cpp/src/c/mamainternal.h
+++ b/mama/c_cpp/src/c/mamainternal.h
@@ -23,6 +23,7 @@
#define MamaInternalH__

#include <property.h>
+#include "mama/types.h"

#if defined(__cplusplus)
extern "C"
--
1.7.5.4


[PATCH] [common] Renamed hash functions to whash

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Avoids potential namespace conflicts as the hash functions are now used
outside of common.

Signed-off-by: John Gray <jgray@...>
---
common/c_cpp/src/c/lookup2.c | 28 ++++++++++++++--------------
common/c_cpp/src/c/lookup2.h | 2 +-
common/c_cpp/src/c/wtable.c | 6 +++---
3 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/common/c_cpp/src/c/lookup2.c b/common/c_cpp/src/c/lookup2.c
index 4a1b181..a932293 100644
--- a/common/c_cpp/src/c/lookup2.c
+++ b/common/c_cpp/src/c/lookup2.c
@@ -96,7 +96,7 @@ acceptable. Do NOT use for cryptographic purposes.
--------------------------------------------------------------------
*/

-ub4 hash( k, length, initval)
+ub4 whash( k, length, initval)
register ub1 *k; /* the key */
register ub4 length; /* the length of the key */
register ub4 initval; /* the previous hash, or an arbitrary value */
@@ -151,7 +151,7 @@ register ub4 initval; /* the previous hash, or an arbitrary value */
-- that the length be the number of ub4's in the key
--------------------------------------------------------------------
*/
-ub4 hash2( k, length, initval)
+ub4 whash2( k, length, initval)
register ub4 *k; /* the key */
register ub4 length; /* the length of the key */
register ub4 initval; /* the previous hash, or an arbitrary value */
@@ -196,7 +196,7 @@ register ub4 initval; /* the previous hash, or an arbitrary value */
--------------------------------------------------------------------
*/

-ub4 hash3( k, length, initval)
+ub4 whash3( k, length, initval)
register ub1 *k; /* the key */
register ub4 length; /* the length of the key */
register ub4 initval; /* the previous hash, or an arbitrary value */
@@ -268,7 +268,7 @@ void driver1()

for (i=0; i<256; ++i)
{
- h = hash(buf,i,h);
+ h = whash(buf,i,h);
}
}

@@ -306,10 +306,10 @@ void driver2()
/* have a and b be two keys differing in only one bit */
a[i] ^= (k<<j);
a[i] ^= (k>>(8-j));
- c[0] = hash(a, hlen, m);
+ c[0] = whash(a, hlen, m);
b[i] ^= ((k+1)<<j);
b[i] ^= ((k+1)>>(8-j));
- d[0] = hash(b, hlen, m);
+ d[0] = whash(b, hlen, m);
/* check every bit is 1, 0, set, and not set at least once */
for (l=0; l<HASHSTATE; ++l)
{
@@ -357,10 +357,10 @@ void driver3()
ub4 h,i,j,ref,x,y;

printf("Endianness. These should all be the same:\n");
- printf("%.8lx\n", hash(q, sizeof(q)-1, (ub4)0));
- printf("%.8lx\n", hash(qq+1, sizeof(q)-1, (ub4)0));
- printf("%.8lx\n", hash(qqq+2, sizeof(q)-1, (ub4)0));
- printf("%.8lx\n", hash(qqqq+3, sizeof(q)-1, (ub4)0));
+ printf("%.8lx\n", whash(q, sizeof(q)-1, (ub4)0));
+ printf("%.8lx\n", whash(qq+1, sizeof(q)-1, (ub4)0));
+ printf("%.8lx\n", whash(qqq+2, sizeof(q)-1, (ub4)0));
+ printf("%.8lx\n", whash(qqqq+3, sizeof(q)-1, (ub4)0));
printf("\n");
for (h=0, b=buf+1; h<8; ++h, ++b)
{
@@ -370,11 +370,11 @@ void driver3()
for (j=0; j<i; ++j) *(b+j)=0;

/* these should all be equal */
- ref = hash(b, len, (ub4)1);
+ ref = whash(b, len, (ub4)1);
*(b+i)=(ub1)~0;
*(b-1)=(ub1)~0;
- x = hash(b, len, (ub4)1);
- y = hash(b, len, (ub4)1);
+ x = whash(b, len, (ub4)1);
+ y = whash(b, len, (ub4)1);
if ((ref != x) || (ref != y))
{
printf("alignment error: %.8lx %.8lx %.8lx %ld %ld\n",ref,x,y,h,i);
@@ -395,7 +395,7 @@ void driver3()
printf("These should all be different\n");
for (i=0, h=0; i<8; ++i)
{
- h = hash(buf, (ub4)0, h);
+ h = whash(buf, (ub4)0, h);
printf("%2ld 0-byte strings, hash is %.8lx\n", i, h);
}
}
diff --git a/common/c_cpp/src/c/lookup2.h b/common/c_cpp/src/c/lookup2.h
index e03bf82..e77a36d 100644
--- a/common/c_cpp/src/c/lookup2.h
+++ b/common/c_cpp/src/c/lookup2.h
@@ -13,6 +13,6 @@ typedef unsigned char ub1;
#define hashsize(n) ((ub4)1<<(n))
#define hashmask(n) (hashsize(n)-1)

-ub4 hash (ub1 *k, ub4 length, ub4 initval);
+ub4 whash (ub1 *k, ub4 length, ub4 initval);

#endif /* _LOOKUP2_H_ */
diff --git a/common/c_cpp/src/c/wtable.c b/common/c_cpp/src/c/wtable.c
index 63a1530..d60c545 100644
--- a/common/c_cpp/src/c/wtable.c
+++ b/common/c_cpp/src/c/wtable.c
@@ -236,7 +236,7 @@ int wtable_insert (wtable_t table, const char* key, void* data)
}

len = strlen(key);
- h = hash((unsigned char *)key, len, HASH_INITVAL);
+ h = whash((unsigned char *)key, len, HASH_INITVAL);
h = (h & hashmask(wtable->size));
if (wtable_debug)
{
@@ -310,7 +310,7 @@ void* wtable_lookup (wtable_t table, const char* key)
}

len = strlen(key);
- h = hash((unsigned char *)key, len, HASH_INITVAL);
+ h = whash((unsigned char *)key, len, HASH_INITVAL);
h = (h & hashmask(wtable->size));
if (wtable_debug)
{
@@ -380,7 +380,7 @@ void* wtable_remove (wtable_t table, const char* key)
}

len = strlen(key);
- h = hash((unsigned char *)key, len, HASH_INITVAL);
+ h = whash((unsigned char *)key, len, HASH_INITVAL);
h = (h & hashmask(wtable->size));
if (wtable_debug)
{
--
1.7.5.4


[PATCH] [transport] Added method to disable refreshes

Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Added method to disable refreshes for a transport. This method must be called
after mamaTransport_allocate() but before mamaTransport_create(). If invoked
with a non-zero argument the transport will not send refresh messages.

Signed-off-by: John Gray <jgray@...>
---
mama/c_cpp/src/c/mama/transport.h | 11 +++++++++++
mama/c_cpp/src/c/transport.c | 14 +++++++++-----
2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/transport.h b/mama/c_cpp/src/c/mama/transport.h
index f97021b..d66905c 100644
--- a/mama/c_cpp/src/c/mama/transport.h
+++ b/mama/c_cpp/src/c/mama/transport.h
@@ -291,6 +291,17 @@ mamaTransport_getOutboundThrottle (mamaTransport transport,
double *result);

/**
+ * Disable Refreshing.
+ *
+ * @param transport the transport instance
+ * @param disable t/f.
+ */
+MAMAExpDLL
+extern void
+mamaTransport_disableRefresh (mamaTransport transport,
+ uint8_t disable);
+
+/**
* Set the throttle rate.
*
* @param transport the transport.
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index de92a56..95e912c 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -161,6 +161,7 @@ typedef struct transportImpl_
int mGroupSizeHint;

uint8_t mDisableDisconnectCb;
+ uint8_t mDisableRefresh;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -369,14 +370,18 @@ static void setPreInitialStrategy (mamaTransport transport)
*
* Return non-zero to disable refresh messages.
*/
-static int mamaTransportInternal_disableRefreshes (const char* transportName)
+void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
+{
+ self->mDisableRefresh=disable;
+}
+
+static int mamaTransportInternal_disableRefreshes(const char* transportName)
{
const char* propValue;
char propString[MAX_PROP_STRING];
int retVal;

- retVal=snprintf (propString, MAX_PROP_STRING,
- "mama.transport.%s.%s",
+ retVal=snprintf(propString, MAX_PROP_STRING, "mama.transport.%s.%s",
transportName ? transportName : "", PROP_NAME_DISABLE_REFRESH);

if ((retVal<0) || (retVal>=MAX_PROP_STRING))
@@ -759,8 +764,7 @@ mamaTransport_create (mamaTransport transport,
name);
}

-
- if (!mamaTransportInternal_disableRefreshes (name))
+ if ((!self->mDisableRefresh) && (!mamaTransportInternal_disableRefreshes(name)))
{
return refreshTransport_create (&self->mRefreshTransport,
(mamaTransport)self,
--
1.7.5.4


Upcoming Patches

Mike Schonberg <mschonberg@...>
 

This list has been pretty quiet over the past couple of months. That does not mean that we have not been busy. In fact, the list has been quiet because we have been fully engaged preparing the remaining components of OpenMAMA for release at the end of April. These components include the C++ and Java (JNI) OpenMAMA APIs as well as the C++ and Java OpenMAMDA APIs. Win32 support for all flavors of OpenMAMA and OpenMAMDA will also be part of the April release. We plan on releasing .NET support after the remaining components.

Over the next several days, we (several developers from NYSE Technologies and I) will post a significant number of patches. These patches fall broadly into three categories:

1. Bug fixes and minor enhancements added as part of normal maintenance while we prepared the initial release of the OpenMAMA C API.

2. Changes required to support C++, Java and .NET

3. Win32 support.

Please do not hesitate to comment on any of the patches submitted.

After being submitted to this list for review, they will be committed the git repository as OpenMAMA-1.1.2-rc1. If this release requires additional fixes we will create additional release candidates as necessary until we deem OpenMAMA-1.1.2 to be stable. I don't expect the stabilization period to be very long since most of changes were tested as part of previous commercial MAMA releases.

I am considering placing the entire collection of patches into a "OpenMAMA-next" branch in the interim if there is sufficient interest. This might make it easier to evaluate the entire set of changes.

At the conclusion of this process we will commit the C++, Java, MAMDA APIs creating OpenMAMA/OpenMAMDA 2.1.

Regards,
Mike
mschonberg@...
Please consider the environment before printing this email.

Visit our website at http://www.nyse.com

****************************************************

Note: The information contained in this message and any attachment to it is privileged, confidential and protected from disclosure. If the reader of this message is not the intended recipient, or an employee or agent responsible for delivering this message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please notify the sender immediately by replying to the message, and please delete it from your system. Thank you. NYSE Euronext.


[PATCH 1.1] ft.c: Multicast fault tolerance

John Gray <jgray@...>
 

Fault tolerance for multicast.

 

? ft.patch

Index: ft.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/Attic/ft.c,v

retrieving revision 1.1.2.7

diff -w -u -r1.1.2.7 ft.c

--- ft.c    2 Oct 2011 19:02:17 -0000     1.1.2.7

+++ ft.c    30 Jan 2012 16:19:51 -0000

@@ -58,6 +58,7 @@

#define FT_FIELD_IPADDRESS          "MamaIPAddress"

#define FT_FIELD_IPADDRESS_FID      206

+#define FT_MAX_PROPERTY_NAME_LENGTH 1024

mama_status

multicastFt_setup (

     mamaFtMember                   member,

@@ -104,6 +105,7 @@

static void bridgeFt_sendHeartbeat (mamaFtMember member);

static int multicastFt_receiveHeartbeat(void* member);

static int bridgeFt_receiveHeartbeat(void* member);

+const char *multicastFt_getProperty(char *buffer, const char *propertyName, const char *transportName);

 static int foundheartbeat=0;

@@ -878,6 +880,28 @@

/****************************************************************************

*       Multicast FT specific

******************************************************************************/

+const char *multicastFt_getProperty(char *buffer, const char *propertyName, const char *transportName)

+{

+    /* Returns. */

+    const char *ret = NULL;

+

+    /* Format the full property name. */

+    int numberWritten = snprintf(buffer, (FT_MAX_PROPERTY_NAME_LENGTH - 1), propertyName, transportName);

+

+    /* If too many characters are written the log an error. */

+    if((FT_MAX_PROPERTY_NAME_LENGTH - 1) < numberWritten)

+    {

+        mama_log(MAMA_LOG_LEVEL_ERROR, "%s Property name too long. Length [%d], Max Length [%d].", transportName, strlen(transportName), (FT_MAX_PROPERTY_NAME_LENGTH - 1));

+    }

+

+    else

+    {

+        /* Otherwise get the property. */

+        ret = mama_getProperty(buffer);

+    }

+

+    return ret;

+}

mama_status

multicastFt_setup (

     mamaFtMember                   member,

@@ -904,46 +928,68 @@

     const char* ftNetwork   = NULL;

     const char* ftTtl       = NULL;

     const char* iorecvstr   = NULL;

+    const char* transportName   = NULL;

+    const char defTportName[3]  = "ft";

+    mama_status status          = MAMA_STATUS_OK;   

     struct in_addr iface;

     struct in_addr cFtIfAddr;

+    #ifdef WIN32

+    int nonBlock = 1;

+    #endif

+

+    /* This buffer is used for formatting property names. */

+    char propertyName[FT_MAX_PROPERTY_NAME_LENGTH] = "";

     mamaFtMemberImpl* impl = (mamaFtMemberImpl*) member;

     if (!impl || !groupName)

         return MAMA_STATUS_INVALID_ARG;

-    ftInterface = mama_getProperty ("mama.multicast.transport.ft.interface");

+    /* Get the transport name. */

+    if (transport)

+    {

+        status = mamaTransport_getName(transport, &transportName);

+        if((MAMA_STATUS_OK != status) || (NULL == transportName) || ('\0' == transportName[0]))

+        {

+            mama_log (MAMA_LOG_LEVEL_ERROR, "MAMA multicast FT: the transport name is invalid");

+            return MAMA_STATUS_INVALID_ARG;

+        }

+    }

+    else

+        transportName = defTportName;

+

+    ftInterface = multicastFt_getProperty(propertyName, "mama.multicast.transport.%s.interface", transportName);

     if (ftInterface == NULL)

     {

-    ftInterface = mama_getProperty ("mama.native.transport.ft.interface");

+    ftInterface = multicastFt_getProperty(propertyName, "mama.native.transport.%s.interface", transportName);       

     if (ftInterface == NULL)

             ftInterface = "";

     }

-    ftNetwork = mama_getProperty ("mama.multicast.transport.ft.network");

+    ftNetwork = multicastFt_getProperty (propertyName, "mama.multicast.transport.%s.network", transportName);

     if (ftNetwork == NULL)

     {

-        ftNetwork = mama_getProperty ("mama.native.transport.ft.network");

+        ftNetwork = multicastFt_getProperty (propertyName, "mama.native.transport.%s.network", transportName);

             if (ftNetwork == NULL)

                 ftNetwork = FT_NETWORK;

     }

-    ftService = mama_getProperty ("mama.multicast.transport.ft.service");

+    ftService = multicastFt_getProperty (propertyName, "mama.multicast.transport.%s.service", transportName);

     if (ftService == NULL)

     {

-        ftService = mama_getProperty ("mama.native.transport.ft.service");

+        ftService = multicastFt_getProperty (propertyName, "mama.native.transport.%s.service", transportName);

         if (ftService != NULL)

                 service = atol (ftService);

     }

-    ftTtl = mama_getProperty ("mama.multicast.transport.ft.ttl");

+    ftTtl = multicastFt_getProperty (propertyName, "mama.multicast.transport.%s.ttl", transportName);

     if (ftTtl == NULL)

     {

-        ftTtl = mama_getProperty ("mama.native.transport.ft.ttl");

+        ftTtl = multicastFt_getProperty (propertyName, "mama.native.transport.%s.ttl", transportName);

         if (ftTtl != NULL)

                 ttl = atol (ftTtl);

     }

-    iorecvstr = mama_getProperty ("mama.multicast.transport.ft.iowindow");

+    iorecvstr = multicastFt_getProperty (propertyName, "mama.multicast.transport.%s.iowindow", transportName);

     if (iorecvstr != NULL)

     {

         iorecv=atoi (iorecvstr);

 

Signed-off-by: John Gray <jgray@...>

 

 


[PATCH 1.1] mama.c: Make sure that we load and unload bridges properly

John Gray <jgray@...>
 

These changes ensure that we hold the lock while opening and closing payload and

messaging brdiges. Additionally, we store the shared library handles so we can

close them when cleaning up.

 

Index: c_cpp/src/c/mama.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/mama.c,v

retrieving revision 1.128.4.7.2.2.4.19.4.8

diff -u -r1.128.4.7.2.2.4.19.4.8 mama.c

--- c_cpp/src/c/mama.c                12 Jan 2012 00:52:22 -0000            1.128.4.7.2.2.4.19.4.8

+++ c_cpp/src/c/mama.c             15 Jan 2012 04:31:33 -0000

@@ -187,6 +187,11 @@

/* Private Function Prototypes. */

/* ************************************************************************* */

+static mama_status

+mama_loadBridgeWithPathInternal (mamaBridge* impl,

+                                 const char* middlewareName,

+                                 const char* path,

+                                 uint8_t lock);

 /*  Description :   This function will free any memory associated with a

  *                  mamaApplicationContext object but will not free the

@@ -332,8 +337,12 @@

     }

     /* Will load the bridge if its not already loaded */

+    /* Lock is alread acquired at this point */

     if (MAMA_STATUS_OK !=

-       (status = mama_loadBridge (&bridge, statsLogMiddlewareName)))

+       (status = mama_loadBridgeWithPathInternal (&bridge,

+                                                  statsLogMiddlewareName,

+                                                  NULL,

+                                                  0)))

     {

        mama_log (MAMA_LOG_LEVEL_ERROR,

                   "mamaInternal_loadStatsLogger(): ",

@@ -914,12 +923,12 @@

         mama_log (MAMA_LOG_LEVEL_SEVERE,

                   "mama_openWithProperties(): "

                   "Error connecting to Entitlements Server");

+        pthread_mutex_unlock (&gImpl.myLock);

         mama_close();

        

         if (count)

             *count = gImpl.myRefCount;

-        pthread_mutex_unlock (&gImpl.myLock);

         return result;

     }

#endif /* WITH_ENTITLEMENTS */

@@ -1151,6 +1160,11 @@

                /* mamaPayloadBridgeImpl* impl = (mamaPayloadBridgeImpl*)

              * gImpl.myPayloads [(uint8_t)payload];*/

             gImpl.myPayloads[(uint8_t)payload] = NULL;

+            if(gImpl.myPayloadLibraries[(uint8_t)payload])

+            {

+                closeSharedLib (gImpl.myPayloadLibraries[(uint8_t)payload]);

+                gImpl.myPayloadLibraries[(uint8_t)payload] = NULL;

+            }

         }

         gDefaultPayload = NULL;

@@ -1250,8 +1264,10 @@

                               mamaMiddleware_convertToString (middleware));

                 }

+                gImpl.myBridges[middleware] = NULL;

+                closeSharedLib (gImpl.myBridgeLibraries[middleware]);

+                gImpl.myBridgeLibraries[middleware] = NULL;

             }

-            gImpl.myBridges[middleware] = NULL;

         }

         /* The properties must not be closed down until after the bridges have been destroyed. */

@@ -1803,8 +1821,9 @@

}

 mama_status

-mama_loadPayloadBridge (mamaPayloadBridge* impl,

-                        const char*        payloadName)

+mama_loadPayloadBridgeInternal  (mamaPayloadBridge* impl,

+                                 const char*        payloadName,

+                                 uint8_t lock)

{

     char                    bridgeImplName  [256];

     char                    initFuncName    [256];

@@ -1819,6 +1838,9 @@

     snprintf (bridgeImplName, 256, "mama%simpl",

               payloadName);

+    if (lock)

+        pthread_mutex_lock (&gImpl.myLock);

+

     bridgeLib = openSharedLib (bridgeImplName, NULL);

     if (!bridgeLib)

@@ -1829,6 +1851,8 @@

                 "Could not open payload bridge library [%s] [%s]",

                  bridgeImplName ? bridgeImplName : "",

                  getLibError());

+       if (lock)

+           pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

@@ -1845,11 +1869,18 @@

                    initFuncName ? initFuncName : "",

                    bridgeImplName ? bridgeImplName : "");

         closeSharedLib (bridgeLib);

+      

+        if (lock)

+            pthread_mutex_unlock (&gImpl.myLock);

+      

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

     if (MAMA_STATUS_OK != (status = initFunc (impl, &payloadChar)))

     {

+       if (lock)

+           pthread_mutex_unlock (&gImpl.myLock);

+

         return status;

     }

@@ -1857,6 +1888,10 @@

     {

         mama_log (MAMA_LOG_LEVEL_ERROR,

                   "mama_loadPayloadBridge(): Error in [%s] ", initFuncName);

+      

+        if (lock)

+           pthread_mutex_unlock (&gImpl.myLock);

+  

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

@@ -1866,7 +1901,11 @@

              "mama_loadPayloadBridge(): "

              "Payload bridge %s already loaded",

              payloadName);

-            return MAMA_STATUS_OK;

+      

+        if (lock)

+           pthread_mutex_unlock (&gImpl.myLock);

+

+        return MAMA_STATUS_OK;

     }

     gImpl.myPayloads [(int)payloadChar] = *impl;

@@ -1881,10 +1920,20 @@

              "mama_loadPayloadBridge(): "

              "Sucessfully loaded %s payload bridge from library [%s]",

              payloadName, bridgeImplName);

+    

+    if (lock)

+        pthread_mutex_unlock (&gImpl.myLock);

     return MAMA_STATUS_OK;

}

+mama_status

+mama_loadPayloadBridge (mamaPayloadBridge* impl,

+                        const char*        payloadName)

+{

+    return mama_loadPayloadBridgeInternal (impl, payloadName, 1);

+}

+

int

mamaInternal_generateLbmStats ()

{

@@ -1900,9 +1949,10 @@

}

 mama_status

-mama_loadBridgeWithPath (mamaBridge* impl,

-                         const char* middlewareName,

-                         const char* path)

+mama_loadBridgeWithPathInternal (mamaBridge* impl,

+                                 const char* middlewareName,

+                                 const char* path,

+                                 uint8_t lock)

{

     char                bridgeImplName  [256];

     char                initFuncName    [256];

@@ -1924,15 +1974,19 @@

                   "mama_loadBridge(): Invalid middleware [%s]",

                   middlewareName);

     }

+  

+    if (lock)

+        pthread_mutex_lock (&gImpl.myLock);

     /* Check if a bridge has already been initialized for the middleware */

     if (gImpl.myBridges [middleware])

     {

         *impl = gImpl.myBridges [middleware];

+        if (lock)

+            pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_OK;

     }

-

     snprintf (bridgeImplName, 256, "mama%simpl",

               middlewareName);

@@ -1940,7 +1994,6 @@

     if (!bridgeLib)

     {

-

         if (path)

         {

                 mama_log (MAMA_LOG_LEVEL_ERROR,

@@ -1958,6 +2011,8 @@

                 bridgeImplName ? bridgeImplName : "",

                 getLibError());

         }

+        if (lock)

+            pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

@@ -1974,6 +2029,8 @@

                    initFuncName ? initFuncName : "",

                    bridgeImplName ? bridgeImplName : "");

         closeSharedLib (bridgeLib);

+        if (lock)

+            pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

@@ -1983,6 +2040,8 @@

     {

         mama_log (MAMA_LOG_LEVEL_ERROR,

                   "mama_loadBridge(): Error in [%s] ", initFuncName);

+        if (lock)

+            pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

@@ -1995,22 +2054,37 @@

     result = ((mamaBridgeImpl*)(*impl))->bridgeOpen (*impl);

     if (MAMA_STATUS_OK != result)

+    {

+        if (lock)

+            pthread_mutex_unlock (&gImpl.myLock);

         return result;

+    }

     if (((mamaBridgeImpl*)(*impl))->bridgeGetDefaultPayloadId(&payloadName, &payloadId) == MAMA_STATUS_OK)

     {

                               if (!gImpl.myPayloads [(uint8_t)payloadId])

                               {

                                               mamaPayloadBridge payloadImpl;

-                                              mama_loadPayloadBridge (&payloadImpl,payloadName);

+                                             mama_loadPayloadBridgeInternal (&payloadImpl,payloadName,0);

                               }

     }

     gImpl.myBridges [middleware] = *impl;

+    gImpl.myBridgeLibraries [middleware] = bridgeLib;

+    if (lock)

+        pthread_mutex_unlock (&gImpl.myLock);

     return MAMA_STATUS_OK;

}

+mama_status

+mama_loadBridgeWithPath (mamaBridge* impl,

+                         const char* middlewareName,

+                         const char* path)

+{

+    return mama_loadBridgeWithPathInternal(impl, middlewareName, path, 1);

+}

+

/*

  * Function pointer type for calling getVersion in the wrapper

  */

 

Signed-off-by: John Gray <jgray@...>

 


[PATCH 1.1] mama.c: Added Per-bridge Reference Count

John Gray <jgray@...>
 

Added a reference count to mamaBridgeImpl struct. mama_start() and mama_stop()

increment and decrement this reference count to determine when initialization is

required and when the bridges can be unloaded. The internal registerBridge()

method ensures that it is properly initialized to 0.

 

index 5a9588b..5ab7707 100644

--- a/mama/c_cpp/src/c/bridge.h

+++ b/mama/c_cpp/src/c/bridge.h

@@ -681,6 +681,9 @@ typedef mama_status

  */

typedef struct mamaBridgeImpl

{

+    /* Used by mama_start() and mama_stop(). */

+    unsigned int mRefCount;

+

     /*The default event queue is now middleware specific. (Originally global)*/

     mamaQueue mDefaultEventQueue;

 

index 7b6add9..2623f29 100644

--- a/mama/c_cpp/src/c/mama.c

+++ b/mama/c_cpp/src/c/mama.c

@@ -1295,6 +1295,8 @@ mama_status

mama_start (mamaBridge bridgeImpl)

{

     mamaBridgeImpl* impl =  (mamaBridgeImpl*)bridgeImpl;

+    mama_status rval = MAMA_STATUS_OK;

+    unsigned int prevRefCnt = 0;

     if (!impl)

     {

@@ -1310,8 +1312,25 @@ mama_start (mamaBridge bridgeImpl)

         return MAMA_STATUS_INVALID_QUEUE;

     }

-    /*Delegate to the bridge specific implementation*/

-    return impl->bridgeStart (impl->mDefaultEventQueue);

+    pthread_mutex_lock(&gImpl.myLock);

+    prevRefCnt = impl->mRefCount++;

+    pthread_mutex_unlock(&gImpl.myLock);

+

+    if (prevRefCnt > 0)

+        return MAMA_STATUS_OK;

+

+    /* Delegate to the bridge specific implementation */

+    /* Can't hold lock because bridgeStart blocks */

+    rval =  impl->bridgeStart (impl->mDefaultEventQueue);

+

+    if (rval != MAMA_STATUS_OK)

+    {

+        pthread_mutex_lock(&gImpl.myLock);

+        impl->mRefCount--;

+        pthread_mutex_unlock(&gImpl.myLock);

+    }

+

+    return rval;

}

 struct startBackgroundClosure

@@ -1385,6 +1404,7 @@ mama_status

mama_stop (mamaBridge bridgeImpl)

{

     mamaBridgeImpl* impl =  (mamaBridgeImpl*)bridgeImpl;

+    mama_status rval = MAMA_STATUS_OK;

     if (!impl)

     {

@@ -1402,7 +1422,19 @@ mama_stop (mamaBridge bridgeImpl)

     }

     /*Delegate to the bridge specific implementation*/

-    return impl->bridgeStop (impl->mDefaultEventQueue);

+    pthread_mutex_lock(&gImpl.myLock);

+    if (impl->mRefCount > 0)

+    {

+        impl->mRefCount--;

+        if(impl->mRefCount == 0)

+        {

+            rval = impl->bridgeStop (impl->mDefaultEventQueue);

+            if (MAMA_STATUS_OK != rval)

+                impl->mRefCount++;

+        }

+    }

+    pthread_mutex_unlock(&gImpl.myLock);

+    return rval;

}

 /**

@@ -1749,7 +1781,6 @@ enableEntitlements (const char **servers)

 #endif

-

void

mamaInternal_registerBridge (mamaBridge     bridge,

                              const char*    middlewareName)

@@ -1767,6 +1798,7 @@ mamaInternal_registerBridge (mamaBridge     bridge,

     }

     gImpl.myBridges[middleware] = bridge;

+    ((mamaBridgeImpl*)(bridge))->mRefCount = 0;

}

 mama_status

@@ -1779,6 +1811,7 @@ mama_setDefaultPayload (char id)

     return MAMA_STATUS_OK;

}

+

mama_status

mama_loadPayloadBridge (mamaPayloadBridge* impl,

                         const char*        payloadName)

--

1.7.5.4

 

Signed-off-by: John Gray <jgray@...>

 


[PATCH 1.1] mama.c: Add mama_openWithPropertiesCount() and mama_closeCount()

John Gray <jgray@...>
 

These methods take an additonal unsigned int* parameter that upon return

contains the the reference count for mama_openXXX() and mama_closeXXX(). This

allows applications to performa one-time global initialization when

the int is 1 and mama_openWithPropertiesCount() returns MAMA_STATUS_OK. Likewise

applications can clean up global data when the count is zero and

mama_closeCount() returns MAMA_STATUS_OK.

 

Index: c_cpp/src/c/mama.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/mama.c,v

retrieving revision 1.128.4.7.2.2.4.19.4.6

diff -u -r1.128.4.7.2.2.4.19.4.6 mama.c

--- c_cpp/src/c/mama.c                7 Jan 2012 03:42:40 -0000              1.128.4.7.2.2.4.19.4.6

+++ c_cpp/src/c/mama.c             8 Jan 2012 03:48:20 -0000

@@ -643,8 +643,9 @@

}

 mama_status

-mama_openWithProperties (const char* path,

-                         const char* filename)

+mama_openWithPropertiesCount (const char* path,

+                              const char* filename,

+                              unsigned int* count)

{

     mama_status     result                                                       = MAMA_STATUS_OK;

     mama_size_t     numBridges              = 0;

@@ -695,6 +696,10 @@

     {

         if (MAMA_STATUS_OK == result)

             gImpl.myRefCount++;

+       

+        if (count)

+            *count = gImpl.myRefCount;

+

         pthread_mutex_unlock (&gImpl.myLock);

         return result;

     }

@@ -757,6 +762,9 @@

             mama_log (MAMA_LOG_LEVEL_ERROR,

                       "mama_openWithProperties(): "

                       "Could not create stats generator.");

+            if (count)

+                *count = gImpl.myRefCount;

+           

             pthread_mutex_unlock (&gImpl.myLock);

             return result;

         }

@@ -874,6 +882,9 @@

         mama_log (MAMA_LOG_LEVEL_SEVERE,

                   "mama_openWithProperties(): "

                   "At least one bridge must be specified");

+        if (count)

+            *count = gImpl.myRefCount;

+       

         pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

@@ -883,6 +894,9 @@

         mama_log (MAMA_LOG_LEVEL_SEVERE,

                   "mama_openWithProperties(): "

                   "At least one payload must be specified");

+        if (count)

+            *count = gImpl.myRefCount;

+       

         pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

@@ -901,6 +915,10 @@

                   "mama_openWithProperties(): "

                   "Error connecting to Entitlements Server");

         mama_close();

+       

+        if (count)

+            *count = gImpl.myRefCount;

+

         pthread_mutex_unlock (&gImpl.myLock);

         return result;

     }

@@ -938,6 +956,8 @@

             if (MAMA_STATUS_OK != (result = mamaBridgeImpl_getInternalEventQueue (bridge,

                                                                &statsGenQueue)))

             {

+                if (count)

+                    *count = gImpl.myRefCount;

                 pthread_mutex_unlock (&gImpl.myLock);

                 return result;

             }

@@ -948,6 +968,8 @@

             mama_log (MAMA_LOG_LEVEL_ERROR,

                       "mama_openWithProperties(): "

                       "Could not set queue for stats generator.");

+            if (count)

+                *count = gImpl.myRefCount;

             pthread_mutex_unlock (&gImpl.myLock);

             return result;

         }

@@ -957,12 +979,16 @@

             mama_log (MAMA_LOG_LEVEL_ERROR,

                       "mama_openWithProperties(): "

                       "Failed to enable stats logging");

+            if (count)

+                *count = gImpl.myRefCount;

             pthread_mutex_unlock (&gImpl.myLock);

             return result;

         }

     }

     gImpl.myRefCount++;

+    if (count)

+        *count = gImpl.myRefCount;

     pthread_mutex_unlock (&gImpl.myLock);

     return result;

}

@@ -972,7 +998,14 @@

{

     /*Passing NULL as path and filename will result in the

      default behaviour - mama.properties on $WOMBAT_PATH*/

-    return mama_openWithProperties (NULL, NULL);

+    return mama_openWithPropertiesCount (NULL, NULL, NULL);

+}

+

+mama_status

+mama_openWithProperties (const char* path,

+                         const char* filename)

+{

+    return mama_openWithPropertiesCount (path, filename, NULL);

}

 mama_status

@@ -1079,7 +1112,7 @@

}

 mama_status

-mama_close ()

+mama_closeCount (unsigned int* count)

{

     mama_status    result     = MAMA_STATUS_OK;

     mamaMiddleware middleware = 0;

@@ -1088,6 +1121,8 @@

     pthread_mutex_lock (&gImpl.myLock);

     if (gImpl.myRefCount == 0)

     {

+        if (count)

+            *count = gImpl.myRefCount;

         pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_OK;

     }

@@ -1113,11 +1148,8 @@

         /* Look for a bridge for each of the payloads and close them */

         for (payload = 0; payload != MAMA_PAYLOAD_MAX; ++payload)

         {

-              mamaPayloadBridgeImpl* impl = (mamaPayloadBridgeImpl*) gImpl.myPayloads [(uint8_t)payload];

-            if (impl)

-            {

-

-            }

+             /* mamaPayloadBridgeImpl* impl = (mamaPayloadBridgeImpl*)

+             * gImpl.myPayloads [(uint8_t)payload];*/

             gImpl.myPayloads[(uint8_t)payload] = NULL;

         }

@@ -1244,10 +1276,18 @@

         mama_freeAppContext(&appContext);

     }

+    if (count)

+        *count = gImpl.myRefCount;

     pthread_mutex_unlock (&gImpl.myLock);

     return result;

}

+mama_status

+mama_close (void)

+{

+    return mama_closeCount (NULL);

+}

+

/**

  * Start processing messages.

  */

Index: c_cpp/src/c/mama/mama.h

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/mama/mama.h,v

retrieving revision 1.74.4.4.8.10

diff -u -r1.74.4.4.8.10 mama.h

--- c_cpp/src/c/mama/mama.h 2 Oct 2011 19:02:18 -0000              1.74.4.4.8.10

+++ c_cpp/src/c/mama/mama.h              8 Jan 2012 03:48:20 -0000

@@ -220,6 +220,40 @@

     extern mama_status

     mama_openWithProperties (const char*    path,

                              const char*    filename);

+    /**

+     * Initialize MAMA.

+     *

+     * Allows users of the API to override the default behavior of mama_open()

+     * where a file mama.properties is required to be located in the directory

+     * specified by \$WOMBAT_PATH.

+     *

+     * The properties file must have the same structure as a standard

+     * mama.properties file.

+     *

+     * If null is passed as the path the API will look for the properties file on

+     * the \$WOMBAT_PATH.

+     *

+     * If null is passed as the filename the API will look for the default

+     * filename of mama.properties.

+     *

+     * The count value on return will be the number of times that mama_openXxx()

+     * has ben invoked successfully. Applicatiins can use this to perform

+     * one-time initialization when the value is 1 and the return is

+     * MAMA_STATUS_OK

+     *

+     * @param path Fully qualified path to the directory containing the properties

+     * file

+     * @param filename The name of the file containing MAMA properties.

+     * @param count The number of times mama_OpenXXX() has been called

+     * successfully.

+     *

+     * @return mama_status Whether the call was successful or not.

+     */

+    MAMAExpDLL

+    extern mama_status

+    mama_openWithPropertiesCount (const char* path,

+                                  const char* filename,

+                                  unsigned int* count);

     /**

      * Set a specific property for the API.

@@ -291,6 +325,17 @@

     MAMAExpDLL

     extern mama_status

     mama_close (void);

+   

+    /**

+     * Close MAMA and free all associated resource.

+     *

+     * @param count Filled with the number of times mama has been opened

+     * successfully. Applications can perform global one-time cleanup when this

+     * value is 0 and the return value is MAMA_STATUS_OK.

+     */

+    MAMAExpDLL

+    extern mama_status

+    mama_closeCount (unsigned int* count);

     /**

     * Return the version information for the library.

 

Signed-off-by: John Gray <jgray@...>

 


[PATCH 1.1] mama.c : Added locking around mama_open() and mama_close()

John Gray <jgray@...>
 

Improved the reference counting logic and added locking around mama_open() and

mama_close(). The change ensures that all initialization (one-time) and cleanup

only occurs once. It also ensures that the reference count never goes belowe

zero.

 

Index: c_cpp/src/c/mama.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/mama.c,v

retrieving revision 1.128.4.7.2.2.4.19.4.5

diff -u -r1.128.4.7.2.2.4.19.4.5 mama.c

--- c_cpp/src/c/mama.c                7 Jan 2012 03:28:35 -0000              1.128.4.7.2.2.4.19.4.5

+++ c_cpp/src/c/mama.c             7 Jan 2012 03:39:39 -0000

@@ -654,6 +654,8 @@

     const char*     statsLogging            = "false";

               const char*                         catchCallbackExceptions = NULL;

+    pthread_mutex_lock (&gImpl.myLock);

+

     if (pthread_key_create(&last_err_key, NULL) != 0)

     {

         mama_log (MAMA_LOG_LEVEL_NORMAL, "WARNING!!! - CANNOT ALLOCATE KEY FOR ERRORS");

@@ -689,7 +691,14 @@

             "********************************************************");

#endif

-   if (gImpl.myRefCount++)  return result;

+    if (0 != gImpl.myRefCount)

+    {

+        if (MAMA_STATUS_OK == result)

+            gImpl.myRefCount++;

+        pthread_mutex_unlock (&gImpl.myLock);

+        return result;

+    }

+    /* Code after this point is one-time initialization */

 #ifdef WITH_INACTIVE_CHECK

     mama_log (MAMA_LOG_LEVEL_WARN,

@@ -748,10 +757,10 @@

             mama_log (MAMA_LOG_LEVEL_ERROR,

                       "mama_openWithProperties(): "

                       "Could not create stats generator.");

+            pthread_mutex_unlock (&gImpl.myLock);

             return result;

         }

-

         globalLogging           = properties_Get (gProperties, "mama.statslogging.global.logging");

         globalPublishing        = properties_Get (gProperties, "mama.statslogging.global.publishing");

         transportLogging        = properties_Get (gProperties, "mama.statslogging.transport.logging");

@@ -842,7 +851,6 @@

             }

         }

-

         if (mamaInternal_statsPublishingEnabled())

         {

             mamaInternal_loadStatsPublisher();

@@ -866,16 +874,16 @@

         mama_log (MAMA_LOG_LEVEL_SEVERE,

                   "mama_openWithProperties(): "

                   "At least one bridge must be specified");

+        pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

     if (!gDefaultPayload)

     {

-

-

         mama_log (MAMA_LOG_LEVEL_SEVERE,

                   "mama_openWithProperties(): "

                   "At least one payload must be specified");

+        pthread_mutex_unlock (&gImpl.myLock);

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

@@ -893,6 +901,7 @@

                   "mama_openWithProperties(): "

                   "Error connecting to Entitlements Server");

         mama_close();

+        pthread_mutex_unlock (&gImpl.myLock);

         return result;

     }

#endif /* WITH_ENTITLEMENTS */

@@ -929,6 +938,7 @@

             if (MAMA_STATUS_OK != (result = mamaBridgeImpl_getInternalEventQueue (bridge,

                                                                &statsGenQueue)))

             {

+                pthread_mutex_unlock (&gImpl.myLock);

                 return result;

             }

         }

@@ -938,6 +948,7 @@

             mama_log (MAMA_LOG_LEVEL_ERROR,

                       "mama_openWithProperties(): "

                       "Could not set queue for stats generator.");

+            pthread_mutex_unlock (&gImpl.myLock);

             return result;

         }

@@ -946,10 +957,13 @@

             mama_log (MAMA_LOG_LEVEL_ERROR,

                       "mama_openWithProperties(): "

                       "Failed to enable stats logging");

+            pthread_mutex_unlock (&gImpl.myLock);

             return result;

         }

     }

+    gImpl.myRefCount++;

+    pthread_mutex_unlock (&gImpl.myLock);

     return result;

}

@@ -1071,7 +1085,14 @@

     mamaMiddleware middleware = 0;

     int payload = 0;

-    if( !--gImpl.myRefCount )

+    pthread_mutex_lock (&gImpl.myLock);

+    if (gImpl.myRefCount == 0)

+    {

+        pthread_mutex_unlock (&gImpl.myLock);

+        return MAMA_STATUS_OK;

+    }

+

+    if (!--gImpl.myRefCount)

     {

#ifdef WITH_ENTITLEMENTS

         if( gEntitlementClient != 0 )

@@ -1120,7 +1141,6 @@

             gUnknownMsgStat = NULL;

         }

-

         if (gMessageStat)

         {

             mamaStat_destroy (gMessageStat);

@@ -1224,10 +1244,7 @@

         mama_freeAppContext(&appContext);

     }

-    if (gImpl.myRefCount < 0)

-    {

-        gImpl.myRefCount = 0;

-    }

+    pthread_mutex_unlock (&gImpl.myLock);

     return result;

}

 

Signed-off-by: John Gray <jgray@...>

 


[PATCH 1.1] mama.c: Reference counting prep

John Gray <jgray@...>
 

This is in preparation for adding improved reference counting and locking around

mama_open(), mama_start(), mama_stop(), and mama_close() to enable them to be

called safely from multi-threaded applications.

This is in preparation for adding improved reference counting and locking

around mama_open() and mama_close() to enable them to be called safely from

multi-threaded applications.

 

Index: c_cpp/src/c/mama.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/mama.c,v

retrieving revision 1.128.4.7.2.2.4.19.4.2

diff -u -r1.128.4.7.2.2.4.19.4.2 mama.c

--- c_cpp/src/c/mama.c                7 Jan 2012 01:17:50 -0000              1.128.4.7.2.2.4.19.4.2

+++ c_cpp/src/c/mama.c             7 Jan 2012 01:34:20 -0000

@@ -1,4 +1,4 @@

-/* $Id: mama.c,v 1.128.4.7.2.2.4.19.4.2 2012/01/07 01:17:50 mikeschonberg Exp $

+/* $Id$

  *

  * OpenMAMA: The open middleware agnostic messaging API

  * Copyright (C) 2011 NYSE Inc.

@@ -128,7 +128,6 @@

 static void lookupIPAddress (void);

-static int gRefCount = 0;

 wproperty_t             gProperties      = 0;

static mamaStatsLogger  gStatsPublisher  = NULL;

@@ -148,11 +147,9 @@

 #define MAMA_PAYLOAD_MAX             CHAR_MAX

-static mamaBridge         gMamaBridges    [MAMA_MIDDLEWARE_MAX];

-static mamaPayloadBridge    gMamaPayloads   [MAMA_PAYLOAD_MAX];

static mamaPayloadBridge    gDefaultPayload = NULL;

-pthread_key_t last_err_key;

+static pthread_key_t last_err_key;

 /**

  * struct mamaApplicationGroup

@@ -164,16 +161,39 @@

     const char* myApplicationClass;

} mamaApplicationContext;

+/**

+ * This structure contains data needed to control starting and stopping of

+ * mama.

+ *

+ * TODO: Access to this structure will ultimately be protected by a reference

+ * count and a lock.

+ */

+typedef struct mamaImpl_

+{

+    mamaBridge           myBridges[MAMA_MIDDLEWARE_MAX];

+    mamaPayloadBridge    myPayloads[MAMA_PAYLOAD_MAX];

+    LIB_HANDLE           myBridgeLibraries[MAMA_MIDDLEWARE_MAX];

+    LIB_HANDLE           myPayloadLibraries[MAMA_PAYLOAD_MAX];

+    unsigned int         myRefCount;

+    pthread_mutex_t      myLock;

+} mamaImpl;

 static mamaApplicationContext  appContext;

static char mama_ver_string[256];

+static mamaImpl gImpl = {{0}, {0}, {0}, {0}, 0, PTHREAD_MUTEX_INITIALIZER};

+

+/* ************************************************************************* */

+/* Private Function Prototypes. */

+/* ************************************************************************* */

+

+

/*  Description :   This function will free any memory associated with a

  *                  mamaApplicationContext object but will not free the

  *                  object itself.

  *  Arguments   :   context [I] The context object to free.

  */

-void

+static void

mama_freeAppContext(mamaApplicationContext *context)

{

     /* Only continue if the object is valid. */

@@ -348,7 +368,7 @@

         statsLogMiddlewareName = "wmw";

     }

-    bridge = gMamaBridges[mamaMiddleware_convertFromString (statsLogMiddlewareName)];

+    bridge = gImpl.myBridges[mamaMiddleware_convertFromString (statsLogMiddlewareName)];

     if (MAMA_STATUS_OK != (result = mamaBridgeImpl_getInternalEventQueue (bridge,

                                                                           &queue)))

@@ -596,7 +616,7 @@

     for (middleware = 0; middleware < MAMA_MIDDLEWARE_MAX; middleware++)

     {

-        bridge = gMamaBridges [middleware];

+        bridge = gImpl.myBridges [middleware];

         if (bridge != NULL)

         {

@@ -610,9 +630,10 @@

mamaPayloadBridge

mamaInternal_findPayload (char id)

{

-    if (('\0' == id) || (MAMA_PAYLOAD_MAX < id)) return NULL;

+    if ('\0' == id)

+        return NULL;

-    return gMamaPayloads[(uint8_t)id];

+    return gImpl.myPayloads[(uint8_t)id];

}

 mamaPayloadBridge

@@ -668,7 +689,7 @@

             "********************************************************");

#endif

-   if (gRefCount++)  return result;

+   if (gImpl.myRefCount++)  return result;

 #ifdef WITH_INACTIVE_CHECK

     mama_log (MAMA_LOG_LEVEL_WARN,

@@ -831,10 +852,10 @@

     /* Look for a bridge for each of the middlewares and open them */

     for (middleware = 0; middleware != MAMA_MIDDLEWARE_MAX; ++middleware)

     {

-        mamaBridgeImpl* impl = (mamaBridgeImpl*) gMamaBridges [middleware];

+        mamaBridgeImpl* impl = (mamaBridgeImpl*) gImpl.myBridges [middleware];

         if (impl)

         {

-            mama_log (MAMA_LOG_LEVEL_FINE, mama_getVersion (gMamaBridges[middleware]));

+            mama_log (MAMA_LOG_LEVEL_FINE, mama_getVersion (gImpl.myBridges[middleware]));

             mamaQueue_enableStats(impl->mDefaultEventQueue);

             ++numBridges;

         }

@@ -903,7 +924,7 @@

                 statsMiddleware = "wmw";

             }

-            bridge = gMamaBridges[mamaMiddleware_convertFromString (statsMiddleware)];

+            bridge = gImpl.myBridges[mamaMiddleware_convertFromString (statsMiddleware)];

             if (MAMA_STATUS_OK != (result = mamaBridgeImpl_getInternalEventQueue (bridge,

                                                                &statsGenQueue)))

@@ -1050,7 +1071,7 @@

     mamaMiddleware middleware = 0;

     int payload = 0;

-    if( !--gRefCount )

+    if( !--gImpl.myRefCount )

     {

#ifdef WITH_ENTITLEMENTS

         if( gEntitlementClient != 0 )

@@ -1064,19 +1085,19 @@

         for (middleware = 0; middleware != MAMA_MIDDLEWARE_MAX; ++middleware)

         {

-            mamaBridge bridge = gMamaBridges[middleware];

+            mamaBridge bridge = gImpl.myBridges[middleware];

             if (bridge)

                mamaBridgeImpl_stopInternalEventQueue (bridge);

         }

         /* Look for a bridge for each of the payloads and close them */

         for (payload = 0; payload != MAMA_PAYLOAD_MAX; ++payload)

         {

-              mamaPayloadBridgeImpl* impl = (mamaPayloadBridgeImpl*) gMamaPayloads [(uint8_t)payload];

+             mamaPayloadBridgeImpl* impl = (mamaPayloadBridgeImpl*) gImpl.myPayloads [(uint8_t)payload];

             if (impl)

             {

             }

-            gMamaPayloads[(uint8_t)payload] = NULL;

+            gImpl.myPayloads[(uint8_t)payload] = NULL;

         }

         gDefaultPayload = NULL;

@@ -1166,11 +1187,11 @@

         /* Look for a bridge for each of the middlewares and close them */

         for (middleware = 0; middleware != MAMA_MIDDLEWARE_MAX; ++middleware)

         {

-            mamaBridgeImpl* impl = (mamaBridgeImpl*) gMamaBridges [middleware];

+            mamaBridgeImpl* impl = (mamaBridgeImpl*) gImpl.myBridges [middleware];

             if (impl)

             {

                 if (MAMA_STATUS_OK != (

-                   (result = impl->bridgeClose (gMamaBridges[middleware]))))

+                   (result = impl->bridgeClose (gImpl.myBridges[middleware]))))

                 {

                     mama_log (MAMA_LOG_LEVEL_ERROR,

                               "mama_close(): Error closing %s bridge.",

@@ -1178,7 +1199,7 @@

                 }

             }

-            gMamaBridges[middleware] = NULL;

+            gImpl.myBridges[middleware] = NULL;

         }

         /* The properties must not be closed down until after the bridges have been destroyed. */

@@ -1203,9 +1224,9 @@

         mama_freeAppContext(&appContext);

     }

-    if (gRefCount < 0)

+    if (gImpl.myRefCount < 0)

     {

-        gRefCount = 0;

+        gImpl.myRefCount = 0;

     }

     return result;

}

@@ -1339,10 +1360,10 @@

     /* Look for a bridge for each of the middlewares and open them */

     for (middleware = 0; middleware != MAMA_MIDDLEWARE_MAX; ++middleware)

     {

-        mamaBridgeImpl* impl = (mamaBridgeImpl*) gMamaBridges [middleware];

+        mamaBridgeImpl* impl = (mamaBridgeImpl*) gImpl.myBridges [middleware];

         if (impl)

         {

-            status = mama_stop (gMamaBridges[middleware]);

+            status = mama_stop (gImpl.myBridges[middleware]);

             if (MAMA_STATUS_OK != status)

             {

                 mama_log (MAMA_LOG_LEVEL_ERROR,

@@ -1676,8 +1697,10 @@

mamaInternal_registerBridge (mamaBridge     bridge,

                              const char*    middlewareName)

{

-    mamaMiddleware middleware =

-                    mamaMiddleware_convertFromString (middlewareName);

+    mamaMiddleware middleware;

+   

+    middleware = mamaMiddleware_convertFromString (middlewareName);

+

     if (middleware >= MAMA_MIDDLEWARE_MAX)

     {

         mama_log (MAMA_LOG_LEVEL_SEVERE,

@@ -1686,15 +1709,16 @@

         return;

     }

-    gMamaBridges [middleware] = bridge;

+    gImpl.myBridges[middleware] = bridge;

}

 mama_status

mama_setDefaultPayload (char id)

{

-    if (('\0' == id) || (MAMA_PAYLOAD_MAX < id) || gMamaPayloads[(uint8_t)id] == NULL) return MAMA_STATUS_NULL_ARG;

+    if ('\0' == id || gImpl.myPayloads[(uint8_t)id] == NULL)

+        return MAMA_STATUS_NULL_ARG;

-    gDefaultPayload = gMamaPayloads[(uint8_t)id];

+    gDefaultPayload = gImpl.myPayloads[(uint8_t)id];

     return MAMA_STATUS_OK;

}

@@ -1756,7 +1780,7 @@

         return MAMA_STATUS_NO_BRIDGE_IMPL;

     }

-    if (gMamaPayloads [payloadChar])

+    if (gImpl.myPayloads [(int)payloadChar])

     {

         mama_log (MAMA_LOG_LEVEL_NORMAL,

              "mama_loadPayloadBridge(): "

@@ -1765,7 +1789,8 @@

             return MAMA_STATUS_OK;

     }

-    gMamaPayloads [payloadChar] = *impl;

+    gImpl.myPayloads [(int)payloadChar] = *impl;

+    gImpl.myPayloadLibraries [(int)payloadChar] = bridgeLib;

     if (!gDefaultPayload)

     {

@@ -1804,14 +1829,15 @@

     LIB_HANDLE          bridgeLib       = NULL;

     bridge_createImpl   initFunc        = NULL;

     char*                                                                payloadName                    = NULL;

-    char                                                  payloadId                            = NULL;

+    char                                                 payloadId                            = '\0';

     mama_status                               result                                    = MAMA_STATUS_OK;

-    mamaMiddleware      middleware      =

-                    mamaMiddleware_convertFromString (middlewareName);

+    mamaMiddleware      middleware      = 0;

     if (!impl)

         return MAMA_STATUS_NULL_ARG;

+    middleware = mamaMiddleware_convertFromString (middlewareName);

+

     if (middleware >= MAMA_MIDDLEWARE_MAX)

     {

         mama_log (MAMA_LOG_LEVEL_ERROR,

@@ -1820,9 +1846,9 @@

     }

     /* Check if a bridge has already been initialized for the middleware */

-    if (gMamaBridges [middleware])

+    if (gImpl.myBridges [middleware])

     {

-        *impl = gMamaBridges [middleware];

+        *impl = gImpl.myBridges [middleware];

         return MAMA_STATUS_OK;

     }

@@ -1844,7 +1870,6 @@

                 bridgeImplName ? bridgeImplName : "",

                 getLibError());

         }

-

         else

         {

                 mama_log (MAMA_LOG_LEVEL_ERROR,

@@ -1889,18 +1914,19 @@

     result = ((mamaBridgeImpl*)(*impl))->bridgeOpen (*impl);

-    if (MAMA_STATUS_OK != result) return result;

+    if (MAMA_STATUS_OK != result)

+        return result;

     if (((mamaBridgeImpl*)(*impl))->bridgeGetDefaultPayloadId(&payloadName, &payloadId) == MAMA_STATUS_OK)

     {

-                              if (!gMamaPayloads [(uint8_t)payloadId])

+                             if (!gImpl.myPayloads [(uint8_t)payloadId])

                               {

                                               mamaPayloadBridge payloadImpl;

                                               mama_loadPayloadBridge (&payloadImpl,payloadName);

                               }

     }

-    gMamaBridges [middleware] = *impl;

+    gImpl.myBridges [middleware] = *impl;

     return MAMA_STATUS_OK;

}

 

Signed-off-by: John Gray <jgray@...>

 

 


[PATCH 1.1] transport: mamaTransportImpl_getAdvisoryCauseAndPlatformInfo new function

John Gray <jgray@...>
 

This method allows bridges to pass middleware specific details regarding errors

and events.

 

Index: c_cpp/src/c/transport.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/transport.c,v

retrieving revision 1.79.4.6.2.2.2.8.4.1

diff -u -r1.79.4.6.2.2.2.8.4.1 transport.c

--- c_cpp/src/c/transport.c          27 Dec 2011 21:27:26 -0000           1.79.4.6.2.2.2.8.4.1

+++ c_cpp/src/c/transport.c       29 Dec 2011 19:38:47 -0000

@@ -1960,32 +1960,6 @@

     }

}

-/**

- * Return the cause and platform info for the last message processed on the

- * transport.

- *

- * @param transport       The transport.

- * @param cause           To return the cause.

- * @param platformInfo    To return the bridge specific info, under no circumstances

- *                                        should the returned object be deleted.

- */

-void

-mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport transport,

-                                                   short*        cause,

-                                                   const void**  platformInfo)

-{

-    if (!self)

-    {

-        mama_log (MAMA_LOG_LEVEL_ERROR,

-                "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (): "

-                "NULL transport.");

-        return;

-    }

-

-    *cause  = self->mCause;

-    *platformInfo = self->mPlatformInfo;

-}

-

void

mamaTransportImpl_getTransportIndex (mamaTransport transport,

                                      int*          transportIndex)

@@ -2363,3 +2337,44 @@

     return MAMA_STATUS_OK;

}

+/* *************************************************** */

+/* Internal Functions. */

+/* *************************************************** */

+

+void mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(mamaTransport transport,

+        short *cause, const void **platformInfo)

+{

+    /* Get the impl. */

+    transportImpl *impl = (transportImpl *)transport;

+    if(NULL != impl)

+    {

+        /* Return the cause. */

+        *cause        = impl->mCause;

+        *platformInfo = impl->mPlatformInfo;

+    }

+    else

+    {

+        mama_log (MAMA_LOG_LEVEL_ERROR,

+                "mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(): NULL "

+                "transport.");

+    }

+}

+

+void mamaTransportImpl_setAdvisoryCauseAndPlatformInfo (mamaTransport transport,

+        short cause, const void *platformInfo)

+{

+    /* Get the impl. */

+    transportImpl *impl = (transportImpl *)transport;

+    if(NULL != impl)

+    {

+        /* Set the cause. */

+        impl->mCause        = cause;

+        impl->mPlatformInfo = (void*)platformInfo;

+    }

+    else

+    {

+        mama_log (MAMA_LOG_LEVEL_ERROR,

+                "mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(): NULL "

+                "transport.");

+    }

+}

Index: c_cpp/src/c/transportimpl.h

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/transportimpl.h,v

retrieving revision 1.22.4.2.2.1.4.5

diff -u -r1.22.4.2.2.1.4.5 transportimpl.h

--- c_cpp/src/c/transportimpl.h 1 Sep 2011 16:34:38 -0000             1.22.4.2.2.1.4.5

+++ c_cpp/src/c/transportimpl.h              29 Dec 2011 19:38:47 -0000

@@ -158,12 +158,6 @@

extern void

mamaTransportImpl_unsetAllPossiblyStale (mamaTransport tport);

-MAMAExpDLL

-extern void

-mamaTransportImpl_getAdvisoryCauseAndPlatformInfo (mamaTransport tport,

-                                                   short*        cause,

-                                                   const void**  platformInfo);

-

/*

    Get the bridge impl associated with the specified transport.

    This will be how other objects gain access to the bridge.

@@ -251,4 +245,37 @@

}

#endif

+

+/**

+ * This function will return the cause and platform info for the last message

+ * processed on the transport.

+ *

+ * @param[in] transport            The transport.

+ * @param[out] cause To return the cause.

+ * @param[out] platformInfo To return the bridge specific info, under no

+ *             circumstances should the returned object be deleted.

+ *

+ */

+MAMAExpDLL

+extern void

+mamaTransportImpl_getAdvisoryCauseAndPlatformInfo(

+        mamaTransport tport,

+        short *cause,

+        const void **platformInfo);

+

+/**

+ * This function will set the cause and platform info for the transport.

+ *

+ * @param[in] transport            The transport.

+ * @param[in] cause The cause.

+ * @param[in] platformInfo Bridge specific info.

+ *

+ */

+MAMAExpDLL

+extern void

+mamaTransportImpl_setAdvisoryCauseAndPlatformInfo(

+        mamaTransport transport,

+        short cause,

+        const void *platformInfo);

+

#endif /* TransportImplH__ */

 

Signed-off-by: John Gray <jgray@...>

 

 

 


[PATCH 1.1] Interlocked changes queue.c: Correct reference counting and destroy interlock

John Gray <jgray@...>
 

Index: queue.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/queue.c,v

retrieving revision 1.35.4.5.2.1.4.11

diff -w -u -r1.35.4.5.2.1.4.11 queue.c

--- queue.c         27 Sep 2011 12:38:50 -0000           1.35.4.5.2.1.4.11

+++ queue.c      14 Jan 2012 09:41:43 -0000

@@ -198,6 +198,7 @@

     impl->mQueueMonitorClosure  =   NULL;

     /* Create the counter lock. */

+    wInterlocked_initialize(&impl->mNumberOpenObjects);

     wInterlocked_set(0, &impl->mNumberOpenObjects);

 

@@ -436,7 +437,7 @@

     int newCount = wInterlocked_decrement(&impl->mNumberOpenObjects);

     /* Write a log if something has gone wrong. */

-    if(impl->mNumberOpenObjects < 0)

+    if(newCount < 0)

     {

         mama_log(MAMA_LOG_LEVEL_ERROR, "Queue 0x%p has been dereferenced too many times.", queue);

     }

@@ -719,6 +720,9 @@

         impl->mMamaQueueBridgeImpl = NULL;

         impl->mMsg                 = NULL;

+        /* Destroy the counter lock */

+        wInterlocked_destroy(&impl->mNumberOpenObjects);

+

         free (impl);

         mama_log (MAMA_LOG_LEVEL_FINEST, "Leaving mamaQueue_destroy for queue 0x%X.", queue);

Index: subscription.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/subscription.c,v

retrieving revision 1.152.4.15.2.8.2.10

diff -w -u -r1.152.4.15.2.8.2.10 subscription.c

--- subscription.c              10 Oct 2011 16:03:18 -0000           1.152.4.15.2.8.2.10

+++ subscription.c           14 Jan 2012 09:41:43 -0000

@@ -1831,6 +1831,9 @@

     /* Destroy the mutex. */

     wlock_destroy(impl->mCreateDestroyLock);

+    /* Destroy the state. */

+       wInterlocked_destroy(&impl->mState);

+

     /* Free the subscription impl. */

     free(impl);

}

 

Signed-off-by: John Gray <jgray@...>


[PATCH 1.1] Fault Tolerence Configuration: Use Transport Properties for FT Configuration

John Gray <jgray@...>
 

Currently, the FT setup code uses the hard-coded transport name, "ft", to read

properties from mama.properties despite the fact that the caller supplies a

mamaTransport reference. This change uses the supplied transport's name to

lookup FT configuration properties.

 

Index: c_cpp/src/c/ft.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/Attic/ft.c,v

retrieving revision 1.1.2.7

diff -u -r1.1.2.7 ft.c

--- c_cpp/src/c/ft.c          2 Oct 2011 19:02:17 -0000              1.1.2.7

+++ c_cpp/src/c/ft.c       29 Dec 2011 17:05:46 -0000

@@ -58,6 +58,8 @@

#define FT_FIELD_IPADDRESS          "MamaIPAddress"

#define FT_FIELD_IPADDRESS_FID      206

+#define FT_MAX_PROPERTY_NAME_LENGTH 1024

+

mama_status

multicastFt_setup (

     mamaFtMember                   member,

@@ -105,6 +107,10 @@

static int multicastFt_receiveHeartbeat(void* member);

static int bridgeFt_receiveHeartbeat(void* member);

+static const char*

+multicastFt_getProperty(char *buffer, const char *propertyName, const char

+        *transportName);

+

static int foundheartbeat=0;

 /*****************************************

@@ -878,6 +884,32 @@

/****************************************************************************

*       Multicast FT specific

******************************************************************************/

+

+const char *multicastFt_getProperty(char *buffer, const char *propertyName, const char *transportName)

+{

+    /* Returns. */

+    const char *ret = NULL;

+

+    /* Format the full property name. */

+    int numberWritten = snprintf(buffer, (FT_MAX_PROPERTY_NAME_LENGTH - 1), propertyName, transportName);

+

+    /* If too many characters are written the log an error. */

+    if((FT_MAX_PROPERTY_NAME_LENGTH - 1) < numberWritten)

+    {

+        mama_log(MAMA_LOG_LEVEL_ERROR, "%s Property name too long. Length [%d],"

+                " Max Length [%d].", transportName, strlen(transportName),

+                (FT_MAX_PROPERTY_NAME_LENGTH - 1));

+    }

+

+    else

+    {

+        /* Otherwise get the property. */

+        ret = mama_getProperty(buffer);

+    }

+

+    return ret;

+}

+

mama_status

multicastFt_setup (

     mamaFtMember                   member,

@@ -904,46 +936,70 @@

     const char* ftNetwork   = NULL;

     const char* ftTtl       = NULL;

     const char* iorecvstr   = NULL;

+    const char* transportName   = NULL;

+    mama_status status          = MAMA_STATUS_OK;   

     struct in_addr iface;

     struct in_addr cFtIfAddr;

+    /* This buffer is used for formatting property names. */

+    char propertyName[FT_MAX_PROPERTY_NAME_LENGTH] = "";

+

     mamaFtMemberImpl* impl = (mamaFtMemberImpl*) member;

-    if (!impl || !groupName)

+    if (!impl || !groupName || !transport)

         return MAMA_STATUS_INVALID_ARG;

-    ftInterface = mama_getProperty ("mama.multicast.transport.ft.interface");

-    if (ftInterface == NULL)

+    /* Get the transport name. */

+    status = mamaTransport_getName(transport, &transportName);

+    if((MAMA_STATUS_OK != status) || (NULL == transportName) || ('\0' ==

+                transportName[0]))

     {

-    ftInterface = mama_getProperty ("mama.native.transport.ft.interface");

+        mama_log (MAMA_LOG_LEVEL_ERROR, "MAMA multicast FT: the transport name "

+                "is invalid");

+        return MAMA_STATUS_INVALID_ARG;

+    }

+

+    ftInterface = multicastFt_getProperty(propertyName,

+            "mama.multicast.transport.%s.interface", transportName);

     if (ftInterface == NULL)

+    {

+        ftInterface = multicastFt_getProperty(propertyName,

+                "mama.native.transport.%s.interface", transportName);       

+        if (ftInterface == NULL)

             ftInterface = "";

     }

-    ftNetwork = mama_getProperty ("mama.multicast.transport.ft.network");

+    ftNetwork = multicastFt_getProperty (propertyName,

+            "mama.multicast.transport.%s.network", transportName);

     if (ftNetwork == NULL)

     {

-        ftNetwork = mama_getProperty ("mama.native.transport.ft.network");

-            if (ftNetwork == NULL)

-                ftNetwork = FT_NETWORK;

+        ftNetwork = multicastFt_getProperty (propertyName,

+                "mama.native.transport.%s.network", transportName);

+        if (ftNetwork == NULL)

+            ftNetwork = FT_NETWORK;

     }

-    ftService = mama_getProperty ("mama.multicast.transport.ft.service");

+    ftService = multicastFt_getProperty (propertyName,

+            "mama.multicast.transport.%s.service", transportName);

     if (ftService == NULL)

     {

-        ftService = mama_getProperty ("mama.native.transport.ft.service");

+        ftService = multicastFt_getProperty (propertyName,

+                "mama.native.transport.%s.service", transportName);

         if (ftService != NULL)

-                service = atol (ftService);

+            service = atol (ftService);

     }

-    ftTtl = mama_getProperty ("mama.multicast.transport.ft.ttl");

+    ftTtl = multicastFt_getProperty (propertyName,

+            "mama.multicast.transport.%s.ttl", transportName);

     if (ftTtl == NULL)

     {

-        ftTtl = mama_getProperty ("mama.native.transport.ft.ttl");

+        ftTtl = multicastFt_getProperty (propertyName,

+                "mama.native.transport.%s.ttl", transportName);

         if (ftTtl != NULL)

-                ttl = atol (ftTtl);

+            ttl = atol (ftTtl);

     }

-    iorecvstr = mama_getProperty ("mama.multicast.transport.ft.iowindow");

+    iorecvstr = multicastFt_getProperty (propertyName,

+            "mama.multicast.transport.%s.iowindow", transportName);

     if (iorecvstr != NULL)

     {

         iorecv=atoi (iorecvstr);

 

Signed-off-by: John Gray <jgray@...>

 


[PATCH 1.1] mamasymbolist subscriber: Initialize Callbacks structs to 0 for mamasymbolist subscriber

John Gray <jgray@...>
 

This avoids a potential segmentation fault in the event that an uninitialized

callback gets invoked.

 

 

Index: c_cpp/src/examples/c/mamasymbollistsubscriberc.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/examples/c/Attic/mamasymbollistsubscriberc.c,v

retrieving revision 1.1.2.2

diff -u -r1.1.2.2 mamasymbollistsubscriberc.c

--- c_cpp/src/examples/c/mamasymbollistsubscriberc.c               27 Sep 2011 16:54:39 -0000           1.1.2.2

+++ c_cpp/src/examples/c/mamasymbollistsubscriberc.c            29 Dec 2011 05:26:02 -0000

@@ -151,6 +151,7 @@

     mama_status         status;

     mamaMsgCallbacks    symbolListCallbacks;

     mamaSubscription    symbolListSubscription;

+    memset(&symbolListCallbacks, 0, sizeof(symbolListCallbacks));

     gSymbolList = (const char**) calloc (MAX_SUBSCRIPTIONS, sizeof (char*));

     gSubscriptionList = (mamaSubscription*)calloc (MAX_SUBSCRIPTIONS,

@@ -463,6 +464,7 @@

     mama_status      status  = MAMA_STATUS_OK;

     size_t           i;

     mamaMsgCallbacks callbacks;

+    memset(&callbacks, 0, sizeof(callbacks));

     callbacks.onCreate       = subscriptionOnCreate;

     callbacks.onError        = subscriptionOnError;

 

Signed-off-by: John Gray <jgray@...>

 


[PATCH 1.1] listenmsgcallback.c: Msg status possibly stale and unknown

John Gray <jgray@...>
 

Added new msg status of possibly stale and unknown

 

--- listenermsgcallback.c               2012-01-27 10:45:39.000000000 +0000

+++ listenermsgcallback.c            2012-01-27 11:10:30.000000000 +0000

@@ -314,6 +314,23 @@

                                                               listenerMsgCallbackImpl_logUnknownStatus(ctx, status, subscription);

                                                               break;

         }

+                                             /* The possibly stale messages are sent by the MAMACACHE with a miscellanious type and should be translated

+                                             * into a quality event.

+                                             */

+                                             case MAMA_MSG_STATUS_POSSIBLY_STALE:

+                                             {

+                                                             /* Verify that the type is misc. */

+                                                             if(msgType == MAMA_MSG_TYPE_MISC)

+                                                             {

+                                                                             /* Change the state to maybe stale and invoke the onquality callback. */

+                                                                             mamaSubscription_setPossiblyStale(subscription);

+                                                                             break;

+                                                             }

+

+                                                             /* Otherwise log the fact we have received an unknown message. */

+                                                             listenerMsgCallbackImpl_logUnknownStatus(ctx, status, subscription);

+                                                             break;

+                                             }

         case MAMA_MSG_STATUS_MISC:

         {

             if (msgType == MAMA_MSG_TYPE_REFRESH)

@@ -635,3 +652,17 @@

                   userSymbol,

                   closure);

}

+

+void listenerMsgCallbackImpl_logUnknownStatus(SubjectContext *ctx, int status, mamaSubscription subscription)

+{

+    /* Write the log at fine level. */

+    if ((gMamaLogLevel >= MAMA_LOG_LEVEL_FINE) || (mamaSubscription_checkDebugLevel (subscription, MAMA_LOG_LEVEL_FINE)))

+    {

+        const char* userSymbol = NULL;

+        mamaSubscription_getSymbol (subscription, &userSymbol);

+        mama_log (MAMA_LOG_LEVEL_FINE,

+                        "%s%s%s%s Unexpected status: %s",

+                        userSymbolFormatted, ctxSymbolFormatted,

+                        mamaMsgStatus_stringForStatus( status ) );

+    }

+}

Signed-off-by: John Gray <jgray@...>

 


[PATCH 1.1] listenmsgcallback.c: invokeErrorCallback function renamed

John Gray <jgray@...>
 

Renamed listenerMsgCallbackImpl_invokeErrorCallback to

listenerMsgCallbackImpl_invokeErrorCallback because it is an internal method.

The method is also static and moved to the end of the file.

 

Index: c_cpp/src/c/listenermsgcallback.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/listenermsgcallback.c,v

retrieving revision 1.62.4.1.14.6

diff -u -r1.62.4.1.14.6 listenermsgcallback.c

--- c_cpp/src/c/listenermsgcallback.c      1 Sep 2011 09:41:02 -0000             1.62.4.1.14.6

+++ c_cpp/src/c/listenermsgcallback.c   29 Dec 2011 04:41:36 -0000

@@ -45,8 +45,23 @@

extern int gGenerateGlobalStats;

extern int gGenerateQueueStats;

-/* Function prototypes. */

-void listenerMsgCallback_invokeErrorCallback(listenerMsgCallback callback, SubjectContext *ctx, mama_status mamaStatus, mamaSubscription subscription, const char *userSymbol);

+/* *************************************************** */

+/* Private Function Prototypes. */

+/* *************************************************** */

+

+/**

+ * This function will invoke the subscription's onError callback passing in a particular error code.

+ *

+ * @param[in] callback The impl.

+ * @param[in] ctx The subscription context.

+ * @param[in] mamaStatus The status that will be passed to the error callback.

+ * @param[in] subscription The subscription.

+ * @param[in] userSymbol The symbol.

+ */

+static void

+listenerMsgCallbackImpl_invokeErrorCallback(listenerMsgCallback callback,

+        SubjectContext *ctx, mama_status mamaStatus, mamaSubscription

+        subscription, const char *userSymbol);

 /**

  * Main callback for MamaListener. This is the base strategy for

@@ -197,30 +212,6 @@

      * may have been destroyed in the callback! */

}

-/*          Description         :               This function will invoke the subscription's onError callback passing in a particular error code.

- */

-void listenerMsgCallback_invokeErrorCallback(listenerMsgCallback callback, SubjectContext *ctx, mama_status mamaStatus, mamaSubscription subscription, const char *userSymbol)

-{

-    /* Local variables. */

-              void *closure = NULL;

-

-              /* Get the callback object from the subscription. */

-    mamaMsgCallbacks *cbs = mamaSubscription_getUserCallbacks (subscription);

-

-              /* Wait for a response. */

-    mamaSubscription_stopWaitForResponse(subscription, ctx);

-

-              /* Get the closure from the subscription. */

-              mamaSubscription_getClosure (subscription, &closure);

-

-    mama_setLastError (MAMA_ERROR_DEFAULT);

-    cbs->onError (subscription,

-                  mamaStatus,

-                  NULL,

-                  userSymbol,

-                  closure);

-}

-

static int isInitialMessageOrRecap (msgCallback *callback, int msgType)

{

     return msgType == MAMA_MSG_TYPE_INITIAL        ||

@@ -287,17 +278,17 @@

     {

         switch (status)

         {

-                              case MAMA_MSG_STATUS_NOT_PERMISSIONED:

-                                              listenerMsgCallback_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_PERMISSIONED, subscription, userSymbol);

-                                              return;

+        case MAMA_MSG_STATUS_NOT_PERMISSIONED:

+            listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_PERMISSIONED, subscription, userSymbol);

+            return;

         case MAMA_MSG_STATUS_BAD_SYMBOL:

-                                              listenerMsgCallback_invokeErrorCallback(callback, ctx, MAMA_STATUS_BAD_SYMBOL, subscription, userSymbol);

-                                              return;

+            listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_BAD_SYMBOL, subscription, userSymbol);

+            return;

         case MAMA_MSG_STATUS_NOT_FOUND:

-                                              listenerMsgCallback_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_FOUND, subscription, userSymbol);

-                                              return;

+            listenerMsgCallbackImpl_invokeErrorCallback(callback, ctx, MAMA_STATUS_NOT_FOUND, subscription, userSymbol);

+            return;

         case MAMA_MSG_STATUS_NO_SUBSCRIBERS:

         {

@@ -553,3 +544,31 @@

    return 1;

#endif /* WITH_ENTITLEMENTS */

}

+

+/* *************************************************** */

+/* Private Functions. */

+/* *************************************************** */

+

+/* Description: This function will invoke the subscription's onError callback passing in a particular error code.

+ */

+void listenerMsgCallbackImpl_invokeErrorCallback(listenerMsgCallback callback, SubjectContext *ctx, mama_status mamaStatus, mamaSubscription subscription, const char *userSymbol)

+{

+    /* Local variables. */

+    void *closure = NULL;

+

+    /* Get the callback object from the subscription. */

+    mamaMsgCallbacks *cbs = mamaSubscription_getUserCallbacks (subscription);

+

+    /* Wait for a response. */

+    mamaSubscription_stopWaitForResponse(subscription, ctx);

+

+    /* Get the closure from the subscription. */

+    mamaSubscription_getClosure (subscription, &closure);

+

+    mama_setLastError (MAMA_ERROR_DEFAULT);

+    cbs->onError (subscription,

+                  mamaStatus,

+                  NULL,

+                  userSymbol,

+                  closure);

+}

 

Signed-off-by: John Gray <jgray@...>


[PATCH 1.1] status.c: Added MAMA_STATUS_SUBSCRIPTION_GAP

John Gray <jgray@...>
 

Added MAMA_STATUS_SUBSCRIPTION_GAP

 

Middleware bridges pass this status to the mamaSubscription onError callback

when the middleware detects a gap:

 

    mamaSubscription_getUserCallbacks(sub)->onError(...)

 

Middlewares that do not detect dropped messages will not use this status.

 

Note that this callback is independent of the sequence number gap detection

implemented by dqpublisher and dqstrategy.

 

Index: c_cpp/src/c/status.c

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/status.c,v

retrieving revision 1.18.4.3.2.1.4.5

diff -u -r1.18.4.3.2.1.4.5 status.c

--- c_cpp/src/c/status.c 29 Aug 2011 11:52:44 -0000          1.18.4.3.2.1.4.5

+++ c_cpp/src/c/status.c              28 Dec 2011 17:47:07 -0000

@@ -61,7 +61,8 @@

     case MAMA_STATUS_SUBSCRIPTION_INVALID_STATE: return "MAMA_STATUS_SUBSCRIPTION_INVALID_STATE";

     case MAMA_STATUS_QUEUE_OPEN_OBJECTS: return "MAMA_STATUS_QUEUE_OPEN_OBJECTS";

     case MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE: return "MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE"; 

-

+    case MAMA_STATUS_SUBSCRIPTION_GAP: return "MAMA_STATUS_SUBSCRIPTION_GAP";

+   

 #ifdef WITH_ENTITLEMENTS

     case MAMA_ENTITLE_STATUS_NOMEM : return "ENTITLE_STATUS_NOMEM";

     case MAMA_ENTITLE_STATUS_BAD_PARAM : return "ENTITLE_STATUS_BAD_PARAM";

Index: c_cpp/src/c/mama/status.h

===================================================================

RCS file: /cvsroot/products/mama/c_cpp/src/c/mama/status.h,v

retrieving revision 1.29.4.3.2.1.4.6

diff -u -r1.29.4.3.2.1.4.6 status.h

--- c_cpp/src/c/mama/status.h 29 Aug 2011 11:52:44 -0000          1.29.4.3.2.1.4.6

+++ c_cpp/src/c/mama/status.h              28 Dec 2011 17:47:07 -0000

@@ -1,4 +1,4 @@

-/* $Id$

+/* $Id: status.h,v 1.29.4.3.2.1.4.6 2011/08/29 11:52:44 ianbell Exp $

  *

  * OpenMAMA: The open middleware agnostic messaging API

  * Copyright (C) 2011 NYSE Inc.

@@ -99,7 +99,9 @@

     /* Queue has open objects. */

     MAMA_STATUS_QUEUE_OPEN_OBJECTS          = 5002,

     /* The function isn't supported for this type of subscription. */

-    MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE   = 5003

+    MAMA_STATUS_SUBSCRIPTION_INVALID_TYPE   = 5003,

+    /* The underlying transport saw a gap. */

+    MAMA_STATUS_SUBSCRIPTION_GAP            = 5004

 #ifdef WITH_ENTITLEMENTS

     /* Out of memory */

 

Signed-off-by: John Gray <jgray@...>