[PATCH] MAMA: Fix race conditions in mama queue/dispatcher + unit test fixes.


Lee Skillen <lskillen@...>
 

Changed the mIsDispatching signalling variable into an atomic int since
its visible, inspected and modified across thread boundaries. This is
the cause of several errors in helgrind and has been observed to cause
an issue with shutting down the queue within avis (may occur on other
middleware, but cannot reproduce with qpid).

Additional changes :-

- Fixed warning for uninitialised variables in msgutils.

- Rewrite of the MamaQueueTestC/MamaTimerTestC tests. Fixed memory
leaks, race conditions and off-by-one errors (causing some to execute
across the boundary of indivisible tests). Also added in allocation
of transport to keep avis middleware happy (it doesn't construct a
default transport so it didn't execute the tests properly).

All MamaQueueTestC/MamaTimerTestC tests now execute for both qpid and
avis (although the latter fails on some due to unimplemented features).

Signed-off-by: Lee Skillen <lskillen@...>
---
mama/c_cpp/src/c/msgutils.c | 4 +-
mama/c_cpp/src/c/queue.c | 79 +++++++-------
mama/c_cpp/src/gunittest/c/openclosetest.cpp | 61 ++++++-----
mama/c_cpp/src/gunittest/c/queuetest.cpp | 150 ++++++++++++++-------------
mama/c_cpp/src/gunittest/c/timertest.cpp | 136 ++++++++++--------------
5 files changed, 213 insertions(+), 217 deletions(-)

diff --git a/mama/c_cpp/src/c/msgutils.c b/mama/c_cpp/src/c/msgutils.c
index 6f9bd5d..0530c30 100644
--- a/mama/c_cpp/src/c/msgutils.c
+++ b/mama/c_cpp/src/c/msgutils.c
@@ -58,7 +58,7 @@ msgUtils_setStatus (mamaMsg msg, short status)
mama_status
msgUtils_msgTotal (mamaMsg msg, short *result)
{
- int32_t val;
+ int32_t val = 0;
mama_status status;

status = mamaMsg_getI32 (msg,
@@ -74,7 +74,7 @@ msgUtils_msgTotal (mamaMsg msg, short *result)
mama_status
msgUtils_msgNum (mamaMsg msg, short *result)
{
- int32_t val;
+ int32_t val = 0;
mama_status status;

status = mamaMsg_getI32 (msg,
diff --git a/mama/c_cpp/src/c/queue.c b/mama/c_cpp/src/c/queue.c
index 8fccaa6..6a104da 100644
--- a/mama/c_cpp/src/c/queue.c
+++ b/mama/c_cpp/src/c/queue.c
@@ -66,7 +66,7 @@ typedef struct mamaQueueImpl_
{
/*Reuseable message for all received on this queue*/
mamaMsg mMsg;
- int mIsDispatching;
+ wInterlockedInt mIsDispatching;
/*Hold onto the bridge impl for later use*/
mamaBridgeImpl* mBridgeImpl;
/*The bridge specific queue implementation*/
@@ -103,16 +103,14 @@ typedef struct mamaQueueImpl_
} mamaQueueImpl;

/*Main structure for the mamaDispatcher*/
-typedef struct mamaDisptacherImpl_
+typedef struct mamaDispatcherImpl_
{
/*The queue on which this dispatcher is dispatching*/
- mamaQueue mQueue;
+ mamaQueue mQueue;
/*The thread on which this dispatcher is dispathcing.*/
- wthread_t mThread;
+ wthread_t mThread;
/*Whether the dispatcher is dispatching*/
- int mIsDispatching;
- /*Destroy has been called*/
- int mDestroy;
+ wInterlockedInt mIsDispatching;
} mamaDispatcherImpl;


@@ -226,7 +224,8 @@ mamaQueue_create (mamaQueue* queue,
wInterlocked_initialize(&impl->mNumberOpenObjects);
wInterlocked_set(0, &impl->mNumberOpenObjects);

-
+ wInterlocked_initialize(&impl->mIsDispatching);
+ wInterlocked_set(0, &impl->mIsDispatching);

/* Call the bridge impl specific queue create function*/
if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaQueueCreate
@@ -549,6 +548,7 @@ mamaQueue_destroyWait(mamaQueue queue)
{
/* Dispatch messages for 10 ms. */
ret = mamaQueue_timedDispatch(queue, 10);
+
if(MAMA_STATUS_OK == ret)
{
/* Attempt to destroy the queue again. */
@@ -587,6 +587,8 @@ mamaQueue_destroyTimedWait(mamaQueue queue,
{
/* Dispatch messages for 10 ms. */
ret = mamaQueue_timedDispatch(queue, 10);
+
+ /* Should this not react to MAMA_STATUS_TIMEOUT too? */
if(MAMA_STATUS_OK == ret)
{
/* Add 10ms to the count. */
@@ -638,7 +640,6 @@ mamaQueue_destroy (mamaQueue queue)

mama_log (MAMA_LOG_LEVEL_FINEST, "Entering mamaQueue_destroy for queue 0x%X.", queue);

- impl->mIsDispatching = 0;
if (!queue)
{
mama_log (MAMA_LOG_LEVEL_ERROR, "mamaQueue_destroy(): NULL queue.");
@@ -649,8 +650,19 @@ mamaQueue_destroy (mamaQueue queue)
status = MAMA_STATUS_QUEUE_OPEN_OBJECTS;

/* Only continue if the object count is 0. */
- if(0 == wInterlocked_read(&impl->mNumberOpenObjects))
+ if (0 == wInterlocked_read(&impl->mNumberOpenObjects))
{
+ wInterlocked_set (0, &impl->mIsDispatching);
+
+ if (impl->mDispatcher)
+ {
+ /* Try to ensure that the dispatcher does not restart the queue
+ * dispatching after we've stopped and destroy it. */
+ mamaDispatcherImpl* dispatcherImpl = (mamaDispatcherImpl*)impl->mDispatcher;
+ wInterlocked_set (0, &dispatcherImpl->mIsDispatching);
+ dispatcherImpl->mQueue = NULL;
+ }
+
if (impl->mMamaQueueBridgeImpl)
{
if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaQueueDestroy (
@@ -662,13 +674,6 @@ mamaQueue_destroy (mamaQueue queue)
}
}

- if (impl->mDispatcher)
- {
- /* We don't want the dispatcher to access a destroyed queue */
- ((mamaDispatcherImpl*)(impl->mDispatcher))->mIsDispatching = 0;
- ((mamaDispatcherImpl*)(impl->mDispatcher))->mQueue = NULL;
- }
-
/*Destroy the cached mamaMsg - no longer needed*/
if (impl->mMsg) mamaMsg_destroy (impl->mMsg);

@@ -747,6 +752,7 @@ mamaQueue_destroy (mamaQueue queue)

/* Destroy the counter lock */
wInterlocked_destroy(&impl->mNumberOpenObjects);
+ wInterlocked_destroy(&impl->mIsDispatching);

free (impl);

@@ -785,15 +791,17 @@ mamaQueue_getEventCount (mamaQueue queue,
mama_status
mamaQueue_dispatch (mamaQueue queue)
{
- mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mama_status status = MAMA_STATUS_OK;
+
if (!impl)
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"mamaQueue_dispatch(): NULL queue.");
return MAMA_STATUS_NULL_ARG;
}
- impl->mIsDispatching = 1;

+ wInterlocked_set (1, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueDispatch
(impl->mMamaQueueBridgeImpl);
}
@@ -802,7 +810,7 @@ mama_status
mamaQueue_timedDispatch (mamaQueue queue,
uint64_t timeout)
{
- mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;

if (!impl)
{
@@ -811,6 +819,7 @@ mamaQueue_timedDispatch (mamaQueue queue,
return MAMA_STATUS_NULL_ARG;
}

+ wInterlocked_set (1, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueTimedDispatch
(impl->mMamaQueueBridgeImpl, timeout);
}
@@ -842,8 +851,8 @@ mamaQueue_stopDispatch (mamaQueue queue)
"mamaQueue_stopDispatch(): NULL queue.");
return MAMA_STATUS_NULL_ARG;
}
- impl->mIsDispatching = 0;

+ wInterlocked_set (0, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueStopDispatch
(impl->mMamaQueueBridgeImpl);
}
@@ -1261,16 +1270,10 @@ static void
{
mamaDispatcherImpl* impl = (mamaDispatcherImpl*)closure;

- impl->mIsDispatching = 1;
-
- while (impl->mIsDispatching && !impl->mDestroy &&
- MAMA_STATUS_OK == mamaQueue_dispatch (impl->mQueue))
- ;
-
- impl->mIsDispatching = 0;
-
- if (impl->mQueue)
- impl->mQueue->mDispatcher = NULL;
+ wInterlocked_set (1, &impl->mIsDispatching);
+ while (wInterlocked_read (&impl->mIsDispatching) &&
+ MAMA_STATUS_OK == mamaQueue_dispatch (impl->mQueue));
+ wInterlocked_set (0, &impl->mIsDispatching);

return NULL;
}
@@ -1309,8 +1312,10 @@ mamaDispatcher_create (mamaDispatcher *result,
return MAMA_STATUS_NOMEM;
}

+ wInterlocked_initialize(&impl->mIsDispatching);
+ wInterlocked_set(0, &impl->mIsDispatching);
+
impl->mQueue = queue;
- impl->mDestroy = 0;
if (wthread_create(&impl->mThread, NULL, dispatchThreadProc, impl))
{
free (impl);
@@ -1333,16 +1338,15 @@ mamaDispatcher_destroy (mamaDispatcher dispatcher)
if (!impl)
return MAMA_STATUS_NULL_ARG;

- if( impl->mQueue && impl->mIsDispatching )
+ if (impl->mQueue)
{
- impl->mIsDispatching = 0;
mamaQueue_stopDispatch (impl->mQueue);
}

- impl->mDestroy = 1;
-
/* Wait for the thread to return. */
+ wInterlocked_set(0, &impl->mIsDispatching);
wthread_join (impl->mThread, NULL);
+ wInterlocked_destroy (&impl->mIsDispatching);

impl->mQueue->mDispatcher = NULL;
free (impl);
@@ -1352,7 +1356,8 @@ mamaDispatcher_destroy (mamaDispatcher dispatcher)
int
mamaQueueImpl_isDispatching (mamaQueue queue)
{
- return ((mamaQueueImpl*)queue)->mIsDispatching;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ return wInterlocked_read (&impl->mIsDispatching);
}

int MAMACALLTYPE
diff --git a/mama/c_cpp/src/gunittest/c/openclosetest.cpp b/mama/c_cpp/src/gunittest/c/openclosetest.cpp
index ed26e05..f288ba5 100644
--- a/mama/c_cpp/src/gunittest/c/openclosetest.cpp
+++ b/mama/c_cpp/src/gunittest/c/openclosetest.cpp
@@ -23,11 +23,11 @@
* opening and closing middleware bridges.
*/

+#include "MainUnitTestC.h"
#include <gtest/gtest.h>
#include "mama/mama.h"
#include "mama/status.h"
-#include "MainUnitTestC.h"
-
+#include "wombat/wSemaphore.h"

class MamaOpenCloseTestC : public ::testing::Test
{
@@ -38,7 +38,6 @@ protected:

virtual void SetUp(void);
virtual void TearDown(void);
-

};

@@ -58,10 +57,19 @@ void MamaOpenCloseTestC::TearDown(void)
{
}

-static void MAMACALLTYPE startCallback (mama_status status)
+static void MAMACALLTYPE startCallback (mama_status status,
+ mamaBridge bridge,
+ void* closure)
{
+ wsem_t* sem = (wsem_t*)closure;
+ ASSERT_EQ (0, wsem_post (sem));
}

+void onEventStop (mamaQueue queue, void* closure)
+{
+ mamaBridge bridge = (mamaBridge)closure;
+ mama_stop (bridge);
+}

/* ************************************************************************* */
/* Tests */
@@ -73,8 +81,8 @@ static void MAMACALLTYPE startCallback (mama_status status)
*/
TEST_F (MamaOpenCloseTestC, OpenClose)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

@@ -89,8 +97,8 @@ TEST_F (MamaOpenCloseTestC, OpenClose)
*/
TEST_F (MamaOpenCloseTestC, NestedOpenClose)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

@@ -109,15 +117,15 @@ TEST_F (MamaOpenCloseTestC, NestedOpenClose)
*/
TEST_F (MamaOpenCloseTestC, OpenCloseReopenSameBridge)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

ASSERT_EQ (MAMA_STATUS_OK, mama_close());

/* bridge must be loaded again after close */
- mama_loadBridge (&mBridge, getMiddleware());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

ASSERT_EQ (MAMA_STATUS_OK, mama_open());
}
@@ -129,18 +137,18 @@ TEST_F (MamaOpenCloseTestC, OpenCloseReopenSameBridge)
*/
TEST_F (MamaOpenCloseTestC, DISABLED_OpenCloseReopenNewBridge)
{
- mamaBridge mBridge;
- ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&mBridge, getMiddleware()));
+ mamaBridge bridge;
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

- ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());

- ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&mBridge, "avis"));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, "avis"));

- ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}

/* Description: Load the middleware bridge, initialize MAMA, begin
@@ -151,18 +159,19 @@ TEST_F (MamaOpenCloseTestC, DISABLED_OpenCloseReopenNewBridge)
*/
TEST_F (MamaOpenCloseTestC, StartStopDifferentThreads)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

/* Start mama in the background so it uses a different thread */
- ASSERT_EQ (MAMA_STATUS_OK, mama_startBackground (mBridge, startCallback));
-
- /* Sleep to allow the other thread to complete startup */
- sleep(2);
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_stop (mBridge));
+ wsem_t sem;
+ mamaQueue defaultQueue;
+ ASSERT_EQ (0, wsem_init (&sem, 0, 0));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_getDefaultEventQueue (bridge, &defaultQueue));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaQueue_enqueueEvent (defaultQueue, onEventStop, bridge));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_startBackgroundEx (bridge, startCallback, &sem));
+ ASSERT_EQ (0, wsem_wait (&sem));

ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}
diff --git a/mama/c_cpp/src/gunittest/c/queuetest.cpp b/mama/c_cpp/src/gunittest/c/queuetest.cpp
index c9dbe66..5315c26 100644
--- a/mama/c_cpp/src/gunittest/c/queuetest.cpp
+++ b/mama/c_cpp/src/gunittest/c/queuetest.cpp
@@ -17,13 +17,14 @@
* 02110-1301 USA
*/

-
+#include "MainUnitTestC.h"
#include <gtest/gtest.h>
#include <pthread.h>
#include "mama/mama.h"
#include "mama/status.h"
-#include "MainUnitTestC.h"
#include "mama/queue.h"
+#include "mama/transport.h"
+#include "wombat/wSemaphore.h"

class MamaQueueTestC : public ::testing::Test
{
@@ -33,25 +34,22 @@ class MamaQueueTestC : public ::testing::Test
virtual void SetUp();
virtual void TearDown();
public:
- mamaBridge mBridge;
- MamaQueueTestC* m_this;
+ mamaBridge m_bridge;
+ mamaTransport m_transport;
int m_numQueues;
- int m_queueCounter;
int m_numEvents;
int m_eventCounter;
- int m_numDispatches;
int m_highWaterMarkOccurance;
int m_lowWaterMarkOccurance;
+ int m_numDispatches[10];
uint64_t m_timeout;
- mamaDispatcher dispatcher[10];
- mamaQueue qArray[10];
+ mamaDispatcher m_dispatcher[10];
+ mamaQueue m_queues[10];
+ wsem_t m_sem;
};

MamaQueueTestC::MamaQueueTestC()
{
- mBridge;
- m_this = this;
-
m_highWaterMarkOccurance = 0;
m_lowWaterMarkOccurance = 0;
m_timeout = 5000;
@@ -59,23 +57,31 @@ MamaQueueTestC::MamaQueueTestC()

MamaQueueTestC::~MamaQueueTestC()
{
- m_this = NULL;
}

void MamaQueueTestC::SetUp()
{
- mama_loadBridge (&mBridge, getMiddleware());
- mama_open();
+ ASSERT_EQ(0, wsem_init (&m_sem, 0, 0));
+ m_eventCounter = 0;
+
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&m_bridge, getMiddleware()));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_allocate (&m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_create (m_transport, NULL, m_bridge));
}

void MamaQueueTestC::TearDown()
{
- mama_close();
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_destroy (m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (0, wsem_destroy (&m_sem));
}

+#if 0
static void MAMACALLTYPE startCallback (mama_status status)
{
}
+#endif

void highWaterMarkCallback (mamaQueue queue, size_t size, void* closure)
{
@@ -98,21 +104,22 @@ void onEvent (mamaQueue queue, void* closure)
mamaQueue_stopDispatch (queue);
}
}
-void onTimedEvent (mamaQueue queue, void* closure)
+
+void onEventNop (mamaQueue queue, void* closure)
{
}

void onBgEvent (mamaQueue queue, void* closure)
{
MamaQueueTestC* fixture = (MamaQueueTestC *)closure;
- fixture->m_numDispatches++;
+
+ void* pIndex = 0;
+ mamaQueue_getClosure (queue, &pIndex);
+ size_t index = (size_t)pIndex;

- if (fixture->m_numDispatches == 1000)
+ if (fixture->m_numEvents == ++fixture->m_numDispatches[index])
{
- for (int x=0; x!=10; x++)
- {
- mamaQueue_stopDispatch (fixture->qArray[x]);
- }
+ wsem_post (&fixture->m_sem);
}
}

@@ -141,7 +148,7 @@ TEST_F (MamaQueueTestC, GetDefaultQueue)
mamaQueue defaultQueue;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));
}

/* Description: Create a mamaQueue then destroy it.
@@ -153,7 +160,7 @@ TEST_F (MamaQueueTestC, CreateNonDefaultQueue)
mamaQueue queue;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_destroy (queue));
@@ -168,11 +175,14 @@ TEST_F (MamaQueueTestC, Enqueue)
mamaQueue defaultQueue = NULL;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaQueue_dispatchEvent (defaultQueue));
+
}

/* Description: Set the enqueue callback associated with the default queue
@@ -185,17 +195,19 @@ TEST_F (MamaQueueTestC, setEnqueueCallback)
mamaQueue defaultQueue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, m_this));
+ mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaQueue_dispatchEvent (defaultQueue));
+
}

/* Description: Set the enqueue callback assiciated with the
@@ -207,24 +219,22 @@ TEST_F (MamaQueueTestC, RemoveEnqueueCallback)
{
mamaQueue defaultQueue = NULL;

- m_numEvents = 1;
- m_eventCounter = 0;
+ m_numEvents = 1;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, m_this));
+ mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_removeEnqueueCallback (defaultQueue));
+ mamaQueue_dispatchEvent (defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
-
+ mamaQueue_removeEnqueueCallback (defaultQueue));
}

/* Description: Enqueue an event on the default queue then dispatch it.
@@ -236,13 +246,12 @@ TEST_F (MamaQueueTestC, EnqueueDispatch)
mamaQueue defaultQueue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatchEvent (defaultQueue));
@@ -259,13 +268,12 @@ TEST_F (MamaQueueTestC, EnqueueDispatchNonDefault)
mamaQueue queue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatch (queue));
@@ -286,15 +294,14 @@ TEST_F (MamaQueueTestC, EnqueueDispatchMany)
mamaQueue queue = NULL;

m_numEvents = 10;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));
}
ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatch (queue));
@@ -314,15 +321,14 @@ TEST_F (MamaQueueTestC, TimedDispatch)
mamaQueue queue = NULL;

m_numEvents = 1000;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onTimedEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEventNop, this));
}

ASSERT_EQ (MAMA_STATUS_OK,
@@ -343,17 +349,16 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
{
mamaQueue queue = NULL;
m_numEvents = 20;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

mamaQueueMonitorCallbacks queueCallbacks;
queueCallbacks.onQueueHighWatermarkExceeded = onHighWatermark;
queueCallbacks.onQueueLowWatermark = onLowWatermark;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setQueueMonitorCallbacks (queue, &queueCallbacks, m_this));
+ mamaQueue_setQueueMonitorCallbacks (queue, &queueCallbacks, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_setHighWatermark (queue, 10));
@@ -361,10 +366,10 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_setLowWatermark (queue, 5));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));
}

ASSERT_EQ (MAMA_STATUS_OK,
@@ -383,37 +388,40 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
*/
TEST_F (MamaQueueTestC, DispatchManyQueuesWithDispatchers)
{
- m_numQueues = 10;
+ m_numQueues = 10; // FIXME: Storage is hardcoded to 10!
m_numEvents = 100;
- m_eventCounter = 0;
- m_numDispatches = 0;

- for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+ for (int x = 0; x!=m_numQueues; x++)
{
+ m_numDispatches[x] = 0;
+
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&qArray[m_queueCounter], mBridge));
+ mamaQueue_create (&m_queues[x], m_bridge));

- for (int x=0; x!=m_numEvents; x++)
+ mamaQueue_setClosure (m_queues[x], (void*)(size_t)x);
+
+ for (int y=0; y!=m_numEvents; y++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (qArray[m_queueCounter], onBgEvent, m_this));
+ mamaQueue_enqueueEvent (m_queues[x], onBgEvent, this));
}
}

- for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+ for (int x = 0; x!=m_numQueues; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaDispatcher_create (&dispatcher[m_queueCounter], qArray[m_queueCounter]));
+ mamaDispatcher_create (&m_dispatcher[x], m_queues[x]));
}

- for (int x=0; x!=10; x++)
+ for (int x = 0; x!=m_numQueues; x++)
{
- mamaDispatcher_destroy (dispatcher[x]);
+ ASSERT_EQ(0, wsem_wait (&m_sem));
}
- for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+
+ for (int x = 0; x!=m_numQueues; x++)
{
- mamaQueue_destroy (qArray[m_queueCounter]);
+ ASSERT_EQ(MAMA_STATUS_OK, mamaDispatcher_destroy (m_dispatcher[x]));
+ ASSERT_EQ(MAMA_STATUS_OK, mamaQueue_destroy (m_queues[x]));
}
-
}

diff --git a/mama/c_cpp/src/gunittest/c/timertest.cpp b/mama/c_cpp/src/gunittest/c/timertest.cpp
index f0102df..0980f13 100644
--- a/mama/c_cpp/src/gunittest/c/timertest.cpp
+++ b/mama/c_cpp/src/gunittest/c/timertest.cpp
@@ -26,6 +26,7 @@
#include "mama/types.h"
#include "mama/timer.h"
#include "mama/queue.h"
+#include "mama/transport.h"
#include <cstring>
#include <cstdio>
#include <cstdlib>
@@ -40,18 +41,17 @@ protected:
virtual void SetUp();
virtual void TearDown ();
public:
- MamaTimerTestC *m_this;
- mamaBridge mBridge;
-
- int tCounter;
- int numTimers;
- mamaTimer tarray[100];
- mamaTimer longTimer;
- mamaTimer shortTimer;
- mamaTimer stopTimer;
- mamaTimer timer;
- mamaQueue queue;
- mama_f64_t interval;
+ mamaBridge m_bridge;
+ mamaTransport m_transport;
+ int m_tCounter;
+ int m_numTimers;
+ mamaTimer m_timers[100];
+ mamaTimer m_longTimer;
+ mamaTimer m_shortTimer;
+ mamaTimer m_stopTimer;
+ mamaTimer m_timer;
+ mamaQueue m_queue;
+ mama_f64_t m_interval;
};

MamaTimerTestC::MamaTimerTestC()
@@ -64,53 +64,50 @@ MamaTimerTestC::~MamaTimerTestC()

void MamaTimerTestC::SetUp(void)
{
- interval = 0.01;
- m_this = this;
+ m_tCounter = 0;
+ m_interval = 0.01;

- mama_loadBridge (&mBridge, getMiddleware());
- mama_open ();
- ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &queue));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&m_bridge, getMiddleware()));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_getDefaultEventQueue (m_bridge, &m_queue));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_allocate (&m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_create (m_transport, NULL, m_bridge));
}

void MamaTimerTestC::TearDown(void)
{
- mama_close ();
- m_this = NULL;
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_destroy (m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}

static void MAMACALLTYPE onTimerTick (mamaTimer timer, void* closure)
{
- ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_destroy(timer));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy(timer));
}

static void MAMACALLTYPE onTimerDestroy (mamaTimer timer, void* closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;
- fixture->tCounter++;
-
- if (fixture->tCounter == fixture->numTimers)
+ if (++fixture->m_tCounter == fixture->m_numTimers)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mama_stop (fixture->mBridge));
+ mama_stop (fixture->m_bridge));
}
}

static void MAMACALLTYPE onRecursiveTimerDestroy (mamaTimer timer, void* closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;
- fixture->tCounter++;
-
- if (fixture->tCounter == fixture->numTimers)
+ if (++fixture->m_tCounter == fixture->m_numTimers)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mama_stop(fixture->mBridge));
+ mama_stop(fixture->m_bridge));
}
else
{
- mamaTimer_create2 (&timer, fixture->queue, onTimerTick,
- onRecursiveTimerDestroy, fixture->interval, fixture);
+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaTimer_create2 (&fixture->m_timer, fixture->m_queue, onTimerTick,
+ onRecursiveTimerDestroy, fixture->m_interval, fixture));
}

}
@@ -123,24 +120,21 @@ static void MAMACALLTYPE onLongTimerTick (mamaTimer timer, void* closure)
{
}

-static void MAMACALLTYPE onTwoTimerDestroy (mamaTimer timer, void* closure)
-{
-}
-
static void MAMACALLTYPE onStopTimerTick (mamaTimer timer, void* closure)
{
- mamaTimer_destroy(timer);
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy(timer));
}

static void MAMACALLTYPE onStopTimerDestroy (mamaTimer timer, void* closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;

- mamaTimer_destroy (fixture->shortTimer);
- mamaTimer_destroy (fixture->longTimer);
-
- mama_stop (fixture->mBridge);
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy (fixture->m_shortTimer));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy (fixture->m_longTimer));
+
+ ASSERT_EQ (MAMA_STATUS_OK, mama_stop (fixture->m_bridge));
}
+
/* ************************************************************************* */
/* Test Functions */
/* ************************************************************************* */
@@ -151,19 +145,13 @@ static void MAMACALLTYPE onStopTimerDestroy (mamaTimer timer, void* closure)
*/
TEST_F (MamaTimerTestC, CreateDestroy)
{
-
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 1;
+ m_numTimers = 1;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&timer, fixture->queue, onTimerTick,
- onTimerDestroy, fixture->interval, m_this));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
+ mamaTimer_create2 (&m_timer, m_queue, onTimerTick,
+ onTimerDestroy, m_interval, this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}

/* Description: Create many mamaTimers which destroy themselves when fired.
@@ -173,21 +161,16 @@ TEST_F (MamaTimerTestC, CreateDestroy)
*/
TEST_F (MamaTimerTestC, CreateDestroyMany)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 100;
+ m_numTimers = 100; // FIXME: Storage hardcoded to 100!

- for (int x=0; x!=fixture->numTimers; x++)
+ for (int x=0; x!=m_numTimers; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&tarray[x], fixture->queue, onTimerTick,
- onTimerDestroy,fixture->interval, m_this));
+ mamaTimer_create2 (&m_timers[x], m_queue, onTimerTick,
+ onTimerDestroy, m_interval, this));
}

- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}

/* Description: Create a timer whiich creates another timer when fired.
@@ -197,18 +180,13 @@ TEST_F (MamaTimerTestC, CreateDestroyMany)
*/
TEST_F (MamaTimerTestC, RecursiveCreateDestroy)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 11;
+ m_numTimers = 11;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2(&timer, fixture->queue, onTimerTick,
- onRecursiveTimerDestroy, fixture->interval, m_this));
+ mamaTimer_create2(&m_timer, m_queue, onTimerTick,
+ onRecursiveTimerDestroy, m_interval, this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_start(mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start(m_bridge));
}

/* Description: Two timers are created which tick indefinately at different rates,
@@ -218,22 +196,18 @@ TEST_F (MamaTimerTestC, RecursiveCreateDestroy)
*/
TEST_F (MamaTimerTestC, TwoTimer)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
-
ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create(&shortTimer, fixture->queue, onShortTimerTick,
- fixture->interval, m_this));
+ mamaTimer_create(&m_shortTimer, m_queue, onShortTimerTick,
+ m_interval, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create (&longTimer, fixture->queue, onLongTimerTick,
- ((fixture->interval)*2), m_this));
+ mamaTimer_create (&m_longTimer, m_queue, onLongTimerTick,
+ m_interval*2, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&stopTimer, fixture->queue, onStopTimerTick,
- onStopTimerDestroy, ((fixture->interval)*100), m_this));
+ mamaTimer_create2 (&m_stopTimer, m_queue, onStopTimerTick,
+ onStopTimerDestroy, m_interval*100, this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}


Lee Skillen <lskillen@...>
 

Changed the mIsDispatching signalling variable into an atomic int since
its visible, inspected and modified across thread boundaries. This is
the cause of several errors in helgrind and has been observed to cause
an issue with shutting down the queue within avis (may occur on other
middleware, but cannot reproduce with qpid).

Additional changes :-

- Fixed warning for uninitialised variables in msgutils.

- Rewrite of the MamaQueueTestC/MamaTimerTestC tests. Fixed memory
leaks, race conditions and off-by-one errors (causing some to execute
across the boundary of indivisible tests). Also added in allocation
of transport to keep avis middleware happy (it doesn't construct a
default transport so it didn't execute the tests properly).

- Add 'pthread' to libs for linking the unittest binary.

All MamaQueueTestC/MamaTimerTestC tests now execute for both qpid and
avis (although the latter fails on some due to unimplemented features).

Signed-off-by: Lee Skillen <lskillen@...>
---
mama/c_cpp/src/c/msgutils.c | 4 +-
mama/c_cpp/src/c/queue.c | 79 +++++++-------
mama/c_cpp/src/gunittest/c/SConscript | 2 +-
mama/c_cpp/src/gunittest/c/openclosetest.cpp | 61 ++++++-----
mama/c_cpp/src/gunittest/c/queuetest.cpp | 150 ++++++++++++++-------------
mama/c_cpp/src/gunittest/c/timertest.cpp | 136 ++++++++++--------------
6 files changed, 214 insertions(+), 218 deletions(-)

diff --git a/mama/c_cpp/src/c/msgutils.c b/mama/c_cpp/src/c/msgutils.c
index 6f9bd5d..0530c30 100644
--- a/mama/c_cpp/src/c/msgutils.c
+++ b/mama/c_cpp/src/c/msgutils.c
@@ -58,7 +58,7 @@ msgUtils_setStatus (mamaMsg msg, short status)
mama_status
msgUtils_msgTotal (mamaMsg msg, short *result)
{
- int32_t val;
+ int32_t val = 0;
mama_status status;

status = mamaMsg_getI32 (msg,
@@ -74,7 +74,7 @@ msgUtils_msgTotal (mamaMsg msg, short *result)
mama_status
msgUtils_msgNum (mamaMsg msg, short *result)
{
- int32_t val;
+ int32_t val = 0;
mama_status status;

status = mamaMsg_getI32 (msg,
diff --git a/mama/c_cpp/src/c/queue.c b/mama/c_cpp/src/c/queue.c
index 8fccaa6..6a104da 100644
--- a/mama/c_cpp/src/c/queue.c
+++ b/mama/c_cpp/src/c/queue.c
@@ -66,7 +66,7 @@ typedef struct mamaQueueImpl_
{
/*Reuseable message for all received on this queue*/
mamaMsg mMsg;
- int mIsDispatching;
+ wInterlockedInt mIsDispatching;
/*Hold onto the bridge impl for later use*/
mamaBridgeImpl* mBridgeImpl;
/*The bridge specific queue implementation*/
@@ -103,16 +103,14 @@ typedef struct mamaQueueImpl_
} mamaQueueImpl;

/*Main structure for the mamaDispatcher*/
-typedef struct mamaDisptacherImpl_
+typedef struct mamaDispatcherImpl_
{
/*The queue on which this dispatcher is dispatching*/
- mamaQueue mQueue;
+ mamaQueue mQueue;
/*The thread on which this dispatcher is dispathcing.*/
- wthread_t mThread;
+ wthread_t mThread;
/*Whether the dispatcher is dispatching*/
- int mIsDispatching;
- /*Destroy has been called*/
- int mDestroy;
+ wInterlockedInt mIsDispatching;
} mamaDispatcherImpl;


@@ -226,7 +224,8 @@ mamaQueue_create (mamaQueue* queue,
wInterlocked_initialize(&impl->mNumberOpenObjects);
wInterlocked_set(0, &impl->mNumberOpenObjects);

-
+ wInterlocked_initialize(&impl->mIsDispatching);
+ wInterlocked_set(0, &impl->mIsDispatching);

/* Call the bridge impl specific queue create function*/
if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaQueueCreate
@@ -549,6 +548,7 @@ mamaQueue_destroyWait(mamaQueue queue)
{
/* Dispatch messages for 10 ms. */
ret = mamaQueue_timedDispatch(queue, 10);
+
if(MAMA_STATUS_OK == ret)
{
/* Attempt to destroy the queue again. */
@@ -587,6 +587,8 @@ mamaQueue_destroyTimedWait(mamaQueue queue,
{
/* Dispatch messages for 10 ms. */
ret = mamaQueue_timedDispatch(queue, 10);
+
+ /* Should this not react to MAMA_STATUS_TIMEOUT too? */
if(MAMA_STATUS_OK == ret)
{
/* Add 10ms to the count. */
@@ -638,7 +640,6 @@ mamaQueue_destroy (mamaQueue queue)

mama_log (MAMA_LOG_LEVEL_FINEST, "Entering mamaQueue_destroy for queue 0x%X.", queue);

- impl->mIsDispatching = 0;
if (!queue)
{
mama_log (MAMA_LOG_LEVEL_ERROR, "mamaQueue_destroy(): NULL queue.");
@@ -649,8 +650,19 @@ mamaQueue_destroy (mamaQueue queue)
status = MAMA_STATUS_QUEUE_OPEN_OBJECTS;

/* Only continue if the object count is 0. */
- if(0 == wInterlocked_read(&impl->mNumberOpenObjects))
+ if (0 == wInterlocked_read(&impl->mNumberOpenObjects))
{
+ wInterlocked_set (0, &impl->mIsDispatching);
+
+ if (impl->mDispatcher)
+ {
+ /* Try to ensure that the dispatcher does not restart the queue
+ * dispatching after we've stopped and destroy it. */
+ mamaDispatcherImpl* dispatcherImpl = (mamaDispatcherImpl*)impl->mDispatcher;
+ wInterlocked_set (0, &dispatcherImpl->mIsDispatching);
+ dispatcherImpl->mQueue = NULL;
+ }
+
if (impl->mMamaQueueBridgeImpl)
{
if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaQueueDestroy (
@@ -662,13 +674,6 @@ mamaQueue_destroy (mamaQueue queue)
}
}

- if (impl->mDispatcher)
- {
- /* We don't want the dispatcher to access a destroyed queue */
- ((mamaDispatcherImpl*)(impl->mDispatcher))->mIsDispatching = 0;
- ((mamaDispatcherImpl*)(impl->mDispatcher))->mQueue = NULL;
- }
-
/*Destroy the cached mamaMsg - no longer needed*/
if (impl->mMsg) mamaMsg_destroy (impl->mMsg);

@@ -747,6 +752,7 @@ mamaQueue_destroy (mamaQueue queue)

/* Destroy the counter lock */
wInterlocked_destroy(&impl->mNumberOpenObjects);
+ wInterlocked_destroy(&impl->mIsDispatching);

free (impl);

@@ -785,15 +791,17 @@ mamaQueue_getEventCount (mamaQueue queue,
mama_status
mamaQueue_dispatch (mamaQueue queue)
{
- mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mama_status status = MAMA_STATUS_OK;
+
if (!impl)
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"mamaQueue_dispatch(): NULL queue.");
return MAMA_STATUS_NULL_ARG;
}
- impl->mIsDispatching = 1;

+ wInterlocked_set (1, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueDispatch
(impl->mMamaQueueBridgeImpl);
}
@@ -802,7 +810,7 @@ mama_status
mamaQueue_timedDispatch (mamaQueue queue,
uint64_t timeout)
{
- mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;

if (!impl)
{
@@ -811,6 +819,7 @@ mamaQueue_timedDispatch (mamaQueue queue,
return MAMA_STATUS_NULL_ARG;
}

+ wInterlocked_set (1, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueTimedDispatch
(impl->mMamaQueueBridgeImpl, timeout);
}
@@ -842,8 +851,8 @@ mamaQueue_stopDispatch (mamaQueue queue)
"mamaQueue_stopDispatch(): NULL queue.");
return MAMA_STATUS_NULL_ARG;
}
- impl->mIsDispatching = 0;

+ wInterlocked_set (0, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueStopDispatch
(impl->mMamaQueueBridgeImpl);
}
@@ -1261,16 +1270,10 @@ static void
{
mamaDispatcherImpl* impl = (mamaDispatcherImpl*)closure;

- impl->mIsDispatching = 1;
-
- while (impl->mIsDispatching && !impl->mDestroy &&
- MAMA_STATUS_OK == mamaQueue_dispatch (impl->mQueue))
- ;
-
- impl->mIsDispatching = 0;
-
- if (impl->mQueue)
- impl->mQueue->mDispatcher = NULL;
+ wInterlocked_set (1, &impl->mIsDispatching);
+ while (wInterlocked_read (&impl->mIsDispatching) &&
+ MAMA_STATUS_OK == mamaQueue_dispatch (impl->mQueue));
+ wInterlocked_set (0, &impl->mIsDispatching);

return NULL;
}
@@ -1309,8 +1312,10 @@ mamaDispatcher_create (mamaDispatcher *result,
return MAMA_STATUS_NOMEM;
}

+ wInterlocked_initialize(&impl->mIsDispatching);
+ wInterlocked_set(0, &impl->mIsDispatching);
+
impl->mQueue = queue;
- impl->mDestroy = 0;
if (wthread_create(&impl->mThread, NULL, dispatchThreadProc, impl))
{
free (impl);
@@ -1333,16 +1338,15 @@ mamaDispatcher_destroy (mamaDispatcher dispatcher)
if (!impl)
return MAMA_STATUS_NULL_ARG;

- if( impl->mQueue && impl->mIsDispatching )
+ if (impl->mQueue)
{
- impl->mIsDispatching = 0;
mamaQueue_stopDispatch (impl->mQueue);
}

- impl->mDestroy = 1;
-
/* Wait for the thread to return. */
+ wInterlocked_set(0, &impl->mIsDispatching);
wthread_join (impl->mThread, NULL);
+ wInterlocked_destroy (&impl->mIsDispatching);

impl->mQueue->mDispatcher = NULL;
free (impl);
@@ -1352,7 +1356,8 @@ mamaDispatcher_destroy (mamaDispatcher dispatcher)
int
mamaQueueImpl_isDispatching (mamaQueue queue)
{
- return ((mamaQueueImpl*)queue)->mIsDispatching;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ return wInterlocked_read (&impl->mIsDispatching);
}

int MAMACALLTYPE
diff --git a/mama/c_cpp/src/gunittest/c/SConscript b/mama/c_cpp/src/gunittest/c/SConscript
index ade6975..f1eb158 100644
--- a/mama/c_cpp/src/gunittest/c/SConscript
+++ b/mama/c_cpp/src/gunittest/c/SConscript
@@ -16,7 +16,7 @@ incpath.append('#mama/c_cpp/src/c')
libpath = []
libpath.append('$libdir')

-env.Append(LIBPATH=libpath, LIBS=['dl', 'wombatcommon', 'mama'],
+env.Append(LIBPATH=libpath, LIBS=['dl', 'wombatcommon', 'mama', 'pthread'],
CPPPATH=incpath)

env['CCFLAGS'] = [x for x in env['CCFLAGS'] if x != '-pedantic-errors']
diff --git a/mama/c_cpp/src/gunittest/c/openclosetest.cpp b/mama/c_cpp/src/gunittest/c/openclosetest.cpp
index ed26e05..f288ba5 100644
--- a/mama/c_cpp/src/gunittest/c/openclosetest.cpp
+++ b/mama/c_cpp/src/gunittest/c/openclosetest.cpp
@@ -23,11 +23,11 @@
* opening and closing middleware bridges.
*/

+#include "MainUnitTestC.h"
#include <gtest/gtest.h>
#include "mama/mama.h"
#include "mama/status.h"
-#include "MainUnitTestC.h"
-
+#include "wombat/wSemaphore.h"

class MamaOpenCloseTestC : public ::testing::Test
{
@@ -38,7 +38,6 @@ protected:

virtual void SetUp(void);
virtual void TearDown(void);
-

};

@@ -58,10 +57,19 @@ void MamaOpenCloseTestC::TearDown(void)
{
}

-static void MAMACALLTYPE startCallback (mama_status status)
+static void MAMACALLTYPE startCallback (mama_status status,
+ mamaBridge bridge,
+ void* closure)
{
+ wsem_t* sem = (wsem_t*)closure;
+ ASSERT_EQ (0, wsem_post (sem));
}

+void onEventStop (mamaQueue queue, void* closure)
+{
+ mamaBridge bridge = (mamaBridge)closure;
+ mama_stop (bridge);
+}

/* ************************************************************************* */
/* Tests */
@@ -73,8 +81,8 @@ static void MAMACALLTYPE startCallback (mama_status status)
*/
TEST_F (MamaOpenCloseTestC, OpenClose)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

@@ -89,8 +97,8 @@ TEST_F (MamaOpenCloseTestC, OpenClose)
*/
TEST_F (MamaOpenCloseTestC, NestedOpenClose)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

@@ -109,15 +117,15 @@ TEST_F (MamaOpenCloseTestC, NestedOpenClose)
*/
TEST_F (MamaOpenCloseTestC, OpenCloseReopenSameBridge)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

ASSERT_EQ (MAMA_STATUS_OK, mama_close());

/* bridge must be loaded again after close */
- mama_loadBridge (&mBridge, getMiddleware());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

ASSERT_EQ (MAMA_STATUS_OK, mama_open());
}
@@ -129,18 +137,18 @@ TEST_F (MamaOpenCloseTestC, OpenCloseReopenSameBridge)
*/
TEST_F (MamaOpenCloseTestC, DISABLED_OpenCloseReopenNewBridge)
{
- mamaBridge mBridge;
- ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&mBridge, getMiddleware()));
+ mamaBridge bridge;
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

- ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());

- ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&mBridge, "avis"));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, "avis"));

- ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}

/* Description: Load the middleware bridge, initialize MAMA, begin
@@ -151,18 +159,19 @@ TEST_F (MamaOpenCloseTestC, DISABLED_OpenCloseReopenNewBridge)
*/
TEST_F (MamaOpenCloseTestC, StartStopDifferentThreads)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

/* Start mama in the background so it uses a different thread */
- ASSERT_EQ (MAMA_STATUS_OK, mama_startBackground (mBridge, startCallback));
-
- /* Sleep to allow the other thread to complete startup */
- sleep(2);
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_stop (mBridge));
+ wsem_t sem;
+ mamaQueue defaultQueue;
+ ASSERT_EQ (0, wsem_init (&sem, 0, 0));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_getDefaultEventQueue (bridge, &defaultQueue));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaQueue_enqueueEvent (defaultQueue, onEventStop, bridge));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_startBackgroundEx (bridge, startCallback, &sem));
+ ASSERT_EQ (0, wsem_wait (&sem));

ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}
diff --git a/mama/c_cpp/src/gunittest/c/queuetest.cpp b/mama/c_cpp/src/gunittest/c/queuetest.cpp
index c9dbe66..5315c26 100644
--- a/mama/c_cpp/src/gunittest/c/queuetest.cpp
+++ b/mama/c_cpp/src/gunittest/c/queuetest.cpp
@@ -17,13 +17,14 @@
* 02110-1301 USA
*/

-
+#include "MainUnitTestC.h"
#include <gtest/gtest.h>
#include <pthread.h>
#include "mama/mama.h"
#include "mama/status.h"
-#include "MainUnitTestC.h"
#include "mama/queue.h"
+#include "mama/transport.h"
+#include "wombat/wSemaphore.h"

class MamaQueueTestC : public ::testing::Test
{
@@ -33,25 +34,22 @@ class MamaQueueTestC : public ::testing::Test
virtual void SetUp();
virtual void TearDown();
public:
- mamaBridge mBridge;
- MamaQueueTestC* m_this;
+ mamaBridge m_bridge;
+ mamaTransport m_transport;
int m_numQueues;
- int m_queueCounter;
int m_numEvents;
int m_eventCounter;
- int m_numDispatches;
int m_highWaterMarkOccurance;
int m_lowWaterMarkOccurance;
+ int m_numDispatches[10];
uint64_t m_timeout;
- mamaDispatcher dispatcher[10];
- mamaQueue qArray[10];
+ mamaDispatcher m_dispatcher[10];
+ mamaQueue m_queues[10];
+ wsem_t m_sem;
};

MamaQueueTestC::MamaQueueTestC()
{
- mBridge;
- m_this = this;
-
m_highWaterMarkOccurance = 0;
m_lowWaterMarkOccurance = 0;
m_timeout = 5000;
@@ -59,23 +57,31 @@ MamaQueueTestC::MamaQueueTestC()

MamaQueueTestC::~MamaQueueTestC()
{
- m_this = NULL;
}

void MamaQueueTestC::SetUp()
{
- mama_loadBridge (&mBridge, getMiddleware());
- mama_open();
+ ASSERT_EQ(0, wsem_init (&m_sem, 0, 0));
+ m_eventCounter = 0;
+
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&m_bridge, getMiddleware()));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_allocate (&m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_create (m_transport, NULL, m_bridge));
}

void MamaQueueTestC::TearDown()
{
- mama_close();
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_destroy (m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (0, wsem_destroy (&m_sem));
}

+#if 0
static void MAMACALLTYPE startCallback (mama_status status)
{
}
+#endif

void highWaterMarkCallback (mamaQueue queue, size_t size, void* closure)
{
@@ -98,21 +104,22 @@ void onEvent (mamaQueue queue, void* closure)
mamaQueue_stopDispatch (queue);
}
}
-void onTimedEvent (mamaQueue queue, void* closure)
+
+void onEventNop (mamaQueue queue, void* closure)
{
}

void onBgEvent (mamaQueue queue, void* closure)
{
MamaQueueTestC* fixture = (MamaQueueTestC *)closure;
- fixture->m_numDispatches++;
+
+ void* pIndex = 0;
+ mamaQueue_getClosure (queue, &pIndex);
+ size_t index = (size_t)pIndex;

- if (fixture->m_numDispatches == 1000)
+ if (fixture->m_numEvents == ++fixture->m_numDispatches[index])
{
- for (int x=0; x!=10; x++)
- {
- mamaQueue_stopDispatch (fixture->qArray[x]);
- }
+ wsem_post (&fixture->m_sem);
}
}

@@ -141,7 +148,7 @@ TEST_F (MamaQueueTestC, GetDefaultQueue)
mamaQueue defaultQueue;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));
}

/* Description: Create a mamaQueue then destroy it.
@@ -153,7 +160,7 @@ TEST_F (MamaQueueTestC, CreateNonDefaultQueue)
mamaQueue queue;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_destroy (queue));
@@ -168,11 +175,14 @@ TEST_F (MamaQueueTestC, Enqueue)
mamaQueue defaultQueue = NULL;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaQueue_dispatchEvent (defaultQueue));
+
}

/* Description: Set the enqueue callback associated with the default queue
@@ -185,17 +195,19 @@ TEST_F (MamaQueueTestC, setEnqueueCallback)
mamaQueue defaultQueue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, m_this));
+ mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaQueue_dispatchEvent (defaultQueue));
+
}

/* Description: Set the enqueue callback assiciated with the
@@ -207,24 +219,22 @@ TEST_F (MamaQueueTestC, RemoveEnqueueCallback)
{
mamaQueue defaultQueue = NULL;

- m_numEvents = 1;
- m_eventCounter = 0;
+ m_numEvents = 1;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, m_this));
+ mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_removeEnqueueCallback (defaultQueue));
+ mamaQueue_dispatchEvent (defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
-
+ mamaQueue_removeEnqueueCallback (defaultQueue));
}

/* Description: Enqueue an event on the default queue then dispatch it.
@@ -236,13 +246,12 @@ TEST_F (MamaQueueTestC, EnqueueDispatch)
mamaQueue defaultQueue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatchEvent (defaultQueue));
@@ -259,13 +268,12 @@ TEST_F (MamaQueueTestC, EnqueueDispatchNonDefault)
mamaQueue queue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatch (queue));
@@ -286,15 +294,14 @@ TEST_F (MamaQueueTestC, EnqueueDispatchMany)
mamaQueue queue = NULL;

m_numEvents = 10;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));
}
ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatch (queue));
@@ -314,15 +321,14 @@ TEST_F (MamaQueueTestC, TimedDispatch)
mamaQueue queue = NULL;

m_numEvents = 1000;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onTimedEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEventNop, this));
}

ASSERT_EQ (MAMA_STATUS_OK,
@@ -343,17 +349,16 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
{
mamaQueue queue = NULL;
m_numEvents = 20;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

mamaQueueMonitorCallbacks queueCallbacks;
queueCallbacks.onQueueHighWatermarkExceeded = onHighWatermark;
queueCallbacks.onQueueLowWatermark = onLowWatermark;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setQueueMonitorCallbacks (queue, &queueCallbacks, m_this));
+ mamaQueue_setQueueMonitorCallbacks (queue, &queueCallbacks, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_setHighWatermark (queue, 10));
@@ -361,10 +366,10 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_setLowWatermark (queue, 5));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));
}

ASSERT_EQ (MAMA_STATUS_OK,
@@ -383,37 +388,40 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
*/
TEST_F (MamaQueueTestC, DispatchManyQueuesWithDispatchers)
{
- m_numQueues = 10;
+ m_numQueues = 10; // FIXME: Storage is hardcoded to 10!
m_numEvents = 100;
- m_eventCounter = 0;
- m_numDispatches = 0;

- for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+ for (int x = 0; x!=m_numQueues; x++)
{
+ m_numDispatches[x] = 0;
+
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&qArray[m_queueCounter], mBridge));
+ mamaQueue_create (&m_queues[x], m_bridge));

- for (int x=0; x!=m_numEvents; x++)
+ mamaQueue_setClosure (m_queues[x], (void*)(size_t)x);
+
+ for (int y=0; y!=m_numEvents; y++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (qArray[m_queueCounter], onBgEvent, m_this));
+ mamaQueue_enqueueEvent (m_queues[x], onBgEvent, this));
}
}

- for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+ for (int x = 0; x!=m_numQueues; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaDispatcher_create (&dispatcher[m_queueCounter], qArray[m_queueCounter]));
+ mamaDispatcher_create (&m_dispatcher[x], m_queues[x]));
}

- for (int x=0; x!=10; x++)
+ for (int x = 0; x!=m_numQueues; x++)
{
- mamaDispatcher_destroy (dispatcher[x]);
+ ASSERT_EQ(0, wsem_wait (&m_sem));
}
- for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+
+ for (int x = 0; x!=m_numQueues; x++)
{
- mamaQueue_destroy (qArray[m_queueCounter]);
+ ASSERT_EQ(MAMA_STATUS_OK, mamaDispatcher_destroy (m_dispatcher[x]));
+ ASSERT_EQ(MAMA_STATUS_OK, mamaQueue_destroy (m_queues[x]));
}
-
}

diff --git a/mama/c_cpp/src/gunittest/c/timertest.cpp b/mama/c_cpp/src/gunittest/c/timertest.cpp
index f0102df..0980f13 100644
--- a/mama/c_cpp/src/gunittest/c/timertest.cpp
+++ b/mama/c_cpp/src/gunittest/c/timertest.cpp
@@ -26,6 +26,7 @@
#include "mama/types.h"
#include "mama/timer.h"
#include "mama/queue.h"
+#include "mama/transport.h"
#include <cstring>
#include <cstdio>
#include <cstdlib>
@@ -40,18 +41,17 @@ protected:
virtual void SetUp();
virtual void TearDown ();
public:
- MamaTimerTestC *m_this;
- mamaBridge mBridge;
-
- int tCounter;
- int numTimers;
- mamaTimer tarray[100];
- mamaTimer longTimer;
- mamaTimer shortTimer;
- mamaTimer stopTimer;
- mamaTimer timer;
- mamaQueue queue;
- mama_f64_t interval;
+ mamaBridge m_bridge;
+ mamaTransport m_transport;
+ int m_tCounter;
+ int m_numTimers;
+ mamaTimer m_timers[100];
+ mamaTimer m_longTimer;
+ mamaTimer m_shortTimer;
+ mamaTimer m_stopTimer;
+ mamaTimer m_timer;
+ mamaQueue m_queue;
+ mama_f64_t m_interval;
};

MamaTimerTestC::MamaTimerTestC()
@@ -64,53 +64,50 @@ MamaTimerTestC::~MamaTimerTestC()

void MamaTimerTestC::SetUp(void)
{
- interval = 0.01;
- m_this = this;
+ m_tCounter = 0;
+ m_interval = 0.01;

- mama_loadBridge (&mBridge, getMiddleware());
- mama_open ();
- ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &queue));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&m_bridge, getMiddleware()));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_getDefaultEventQueue (m_bridge, &m_queue));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_allocate (&m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_create (m_transport, NULL, m_bridge));
}

void MamaTimerTestC::TearDown(void)
{
- mama_close ();
- m_this = NULL;
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_destroy (m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}

static void MAMACALLTYPE onTimerTick (mamaTimer timer, void* closure)
{
- ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_destroy(timer));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy(timer));
}

static void MAMACALLTYPE onTimerDestroy (mamaTimer timer, void* closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;
- fixture->tCounter++;
-
- if (fixture->tCounter == fixture->numTimers)
+ if (++fixture->m_tCounter == fixture->m_numTimers)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mama_stop (fixture->mBridge));
+ mama_stop (fixture->m_bridge));
}
}

static void MAMACALLTYPE onRecursiveTimerDestroy (mamaTimer timer, void* closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;
- fixture->tCounter++;
-
- if (fixture->tCounter == fixture->numTimers)
+ if (++fixture->m_tCounter == fixture->m_numTimers)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mama_stop(fixture->mBridge));
+ mama_stop(fixture->m_bridge));
}
else
{
- mamaTimer_create2 (&timer, fixture->queue, onTimerTick,
- onRecursiveTimerDestroy, fixture->interval, fixture);
+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaTimer_create2 (&fixture->m_timer, fixture->m_queue, onTimerTick,
+ onRecursiveTimerDestroy, fixture->m_interval, fixture));
}

}
@@ -123,24 +120,21 @@ static void MAMACALLTYPE onLongTimerTick (mamaTimer timer, void* closure)
{
}

-static void MAMACALLTYPE onTwoTimerDestroy (mamaTimer timer, void* closure)
-{
-}
-
static void MAMACALLTYPE onStopTimerTick (mamaTimer timer, void* closure)
{
- mamaTimer_destroy(timer);
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy(timer));
}

static void MAMACALLTYPE onStopTimerDestroy (mamaTimer timer, void* closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;

- mamaTimer_destroy (fixture->shortTimer);
- mamaTimer_destroy (fixture->longTimer);
-
- mama_stop (fixture->mBridge);
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy (fixture->m_shortTimer));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy (fixture->m_longTimer));
+
+ ASSERT_EQ (MAMA_STATUS_OK, mama_stop (fixture->m_bridge));
}
+
/* ************************************************************************* */
/* Test Functions */
/* ************************************************************************* */
@@ -151,19 +145,13 @@ static void MAMACALLTYPE onStopTimerDestroy (mamaTimer timer, void* closure)
*/
TEST_F (MamaTimerTestC, CreateDestroy)
{
-
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 1;
+ m_numTimers = 1;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&timer, fixture->queue, onTimerTick,
- onTimerDestroy, fixture->interval, m_this));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
+ mamaTimer_create2 (&m_timer, m_queue, onTimerTick,
+ onTimerDestroy, m_interval, this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}

/* Description: Create many mamaTimers which destroy themselves when fired.
@@ -173,21 +161,16 @@ TEST_F (MamaTimerTestC, CreateDestroy)
*/
TEST_F (MamaTimerTestC, CreateDestroyMany)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 100;
+ m_numTimers = 100; // FIXME: Storage hardcoded to 100!

- for (int x=0; x!=fixture->numTimers; x++)
+ for (int x=0; x!=m_numTimers; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&tarray[x], fixture->queue, onTimerTick,
- onTimerDestroy,fixture->interval, m_this));
+ mamaTimer_create2 (&m_timers[x], m_queue, onTimerTick,
+ onTimerDestroy, m_interval, this));
}

- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}

/* Description: Create a timer whiich creates another timer when fired.
@@ -197,18 +180,13 @@ TEST_F (MamaTimerTestC, CreateDestroyMany)
*/
TEST_F (MamaTimerTestC, RecursiveCreateDestroy)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 11;
+ m_numTimers = 11;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2(&timer, fixture->queue, onTimerTick,
- onRecursiveTimerDestroy, fixture->interval, m_this));
+ mamaTimer_create2(&m_timer, m_queue, onTimerTick,
+ onRecursiveTimerDestroy, m_interval, this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_start(mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start(m_bridge));
}

/* Description: Two timers are created which tick indefinately at different rates,
@@ -218,22 +196,18 @@ TEST_F (MamaTimerTestC, RecursiveCreateDestroy)
*/
TEST_F (MamaTimerTestC, TwoTimer)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
-
ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create(&shortTimer, fixture->queue, onShortTimerTick,
- fixture->interval, m_this));
+ mamaTimer_create(&m_shortTimer, m_queue, onShortTimerTick,
+ m_interval, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create (&longTimer, fixture->queue, onLongTimerTick,
- ((fixture->interval)*2), m_this));
+ mamaTimer_create (&m_longTimer, m_queue, onLongTimerTick,
+ m_interval*2, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&stopTimer, fixture->queue, onStopTimerTick,
- onStopTimerDestroy, ((fixture->interval)*100), m_this));
+ mamaTimer_create2 (&m_stopTimer, m_queue, onStopTimerTick,
+ onStopTimerDestroy, m_interval*100, this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}

--
1.9.0


Lee Skillen <lskillen@...>
 

Rerolled patch includes linking in pthread - Missed the first time as ld didn't complain, but did after a rebuild, odd. :-)


On 3 March 2014 15:15, Lee Skillen <lskillen@...> wrote:
Changed the mIsDispatching signalling variable into an atomic int since
its visible, inspected and modified across thread boundaries.  This is
the cause of several errors in helgrind and has been observed to cause
an issue with shutting down the queue within avis (may occur on other
middleware, but cannot reproduce with qpid).

Additional changes :-

- Fixed warning for uninitialised variables in msgutils.

- Rewrite of the MamaQueueTestC/MamaTimerTestC tests.  Fixed memory
  leaks, race conditions and off-by-one errors (causing some to execute
  across the boundary of indivisible tests).  Also added in allocation
  of transport to keep avis middleware happy (it doesn't construct a
  default transport so it didn't execute the tests properly).

- Add 'pthread' to libs for linking the unittest binary.

All MamaQueueTestC/MamaTimerTestC tests now execute for both qpid and
avis (although the latter fails on some due to unimplemented features).

Signed-off-by: Lee Skillen <lskillen@...>
---
 mama/c_cpp/src/c/msgutils.c                  |   4 +-
 mama/c_cpp/src/c/queue.c                     |  79 +++++++-------
 mama/c_cpp/src/gunittest/c/SConscript        |   2 +-
 mama/c_cpp/src/gunittest/c/openclosetest.cpp |  61 ++++++-----
 mama/c_cpp/src/gunittest/c/queuetest.cpp     | 150 ++++++++++++++-------------
 mama/c_cpp/src/gunittest/c/timertest.cpp     | 136 ++++++++++--------------
 6 files changed, 214 insertions(+), 218 deletions(-)

diff --git a/mama/c_cpp/src/c/msgutils.c b/mama/c_cpp/src/c/msgutils.c
index 6f9bd5d..0530c30 100644
--- a/mama/c_cpp/src/c/msgutils.c
+++ b/mama/c_cpp/src/c/msgutils.c
@@ -58,7 +58,7 @@ msgUtils_setStatus (mamaMsg msg, short status)
 mama_status
 msgUtils_msgTotal (mamaMsg msg, short *result)
 {
-    int32_t val;
+    int32_t val = 0;
     mama_status status;

     status = mamaMsg_getI32 (msg,
@@ -74,7 +74,7 @@ msgUtils_msgTotal (mamaMsg msg, short *result)
 mama_status
 msgUtils_msgNum (mamaMsg msg, short *result)
 {
-    int32_t val;
+    int32_t val = 0;
     mama_status status;

     status = mamaMsg_getI32 (msg,
diff --git a/mama/c_cpp/src/c/queue.c b/mama/c_cpp/src/c/queue.c
index 8fccaa6..6a104da 100644
--- a/mama/c_cpp/src/c/queue.c
+++ b/mama/c_cpp/src/c/queue.c
@@ -66,7 +66,7 @@ typedef struct mamaQueueImpl_
 {
     /*Reuseable message for all received on this queue*/
     mamaMsg                     mMsg;
-    int                         mIsDispatching;
+    wInterlockedInt             mIsDispatching;
     /*Hold onto the bridge impl for later use*/
     mamaBridgeImpl*             mBridgeImpl;
     /*The bridge specific queue implementation*/
@@ -103,16 +103,14 @@ typedef struct mamaQueueImpl_
 } mamaQueueImpl;

 /*Main structure for the mamaDispatcher*/
-typedef struct mamaDisptacherImpl_
+typedef struct mamaDispatcherImpl_
 {
     /*The queue on which this dispatcher is dispatching*/
-    mamaQueue      mQueue;
+    mamaQueue       mQueue;
     /*The thread on which this dispatcher is dispathcing.*/
-    wthread_t      mThread;
+    wthread_t       mThread;
     /*Whether the dispatcher is dispatching*/
-    int            mIsDispatching;
-    /*Destroy has been called*/
-    int            mDestroy;
+    wInterlockedInt mIsDispatching;
 } mamaDispatcherImpl;


@@ -226,7 +224,8 @@ mamaQueue_create (mamaQueue* queue,
     wInterlocked_initialize(&impl->mNumberOpenObjects);
     wInterlocked_set(0, &impl->mNumberOpenObjects);

-
+    wInterlocked_initialize(&impl->mIsDispatching);
+    wInterlocked_set(0, &impl->mIsDispatching);

        /* Call the bridge impl specific queue create function*/
        if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaQueueCreate
@@ -549,6 +548,7 @@ mamaQueue_destroyWait(mamaQueue queue)
         {
             /* Dispatch messages for 10 ms. */
             ret = mamaQueue_timedDispatch(queue, 10);
+
             if(MAMA_STATUS_OK == ret)
             {
                 /* Attempt to destroy the queue again. */
@@ -587,6 +587,8 @@ mamaQueue_destroyTimedWait(mamaQueue queue,
             {
                 /* Dispatch messages for 10 ms. */
                 ret = mamaQueue_timedDispatch(queue, 10);
+
+                /* Should this not react to MAMA_STATUS_TIMEOUT too? */
                 if(MAMA_STATUS_OK == ret)
                 {
                     /* Add 10ms to the count. */
@@ -638,7 +640,6 @@ mamaQueue_destroy (mamaQueue queue)

     mama_log (MAMA_LOG_LEVEL_FINEST, "Entering mamaQueue_destroy for queue 0x%X.", queue);

-    impl->mIsDispatching = 0;
     if (!queue)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR, "mamaQueue_destroy(): NULL queue.");
@@ -649,8 +650,19 @@ mamaQueue_destroy (mamaQueue queue)
     status = MAMA_STATUS_QUEUE_OPEN_OBJECTS;

     /* Only continue if the object count is 0. */
-    if(0 == wInterlocked_read(&impl->mNumberOpenObjects))
+    if (0 == wInterlocked_read(&impl->mNumberOpenObjects))
     {
+        wInterlocked_set (0, &impl->mIsDispatching);
+
+        if (impl->mDispatcher)
+        {
+            /* Try to ensure that the dispatcher does not restart the queue
+             * dispatching after we've stopped and destroy it. */
+            mamaDispatcherImpl* dispatcherImpl = (mamaDispatcherImpl*)impl->mDispatcher;
+            wInterlocked_set (0, &dispatcherImpl->mIsDispatching);
+            dispatcherImpl->mQueue = NULL;
+        }
+
         if (impl->mMamaQueueBridgeImpl)
         {
             if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaQueueDestroy (
@@ -662,13 +674,6 @@ mamaQueue_destroy (mamaQueue queue)
             }
         }

-        if (impl->mDispatcher)
-        {
-            /* We don't want the dispatcher to access a destroyed queue */
-            ((mamaDispatcherImpl*)(impl->mDispatcher))->mIsDispatching = 0;
-            ((mamaDispatcherImpl*)(impl->mDispatcher))->mQueue = NULL;
-        }
-
         /*Destroy the cached mamaMsg - no longer needed*/
         if (impl->mMsg) mamaMsg_destroy (impl->mMsg);

@@ -747,6 +752,7 @@ mamaQueue_destroy (mamaQueue queue)

         /* Destroy the counter lock */
         wInterlocked_destroy(&impl->mNumberOpenObjects);
+        wInterlocked_destroy(&impl->mIsDispatching);

         free (impl);

@@ -785,15 +791,17 @@ mamaQueue_getEventCount (mamaQueue queue,
 mama_status
 mamaQueue_dispatch (mamaQueue queue)
 {
-    mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+    mamaQueueImpl* impl   = (mamaQueueImpl*)queue;
+    mama_status    status = MAMA_STATUS_OK;
+
     if (!impl)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
                   "mamaQueue_dispatch(): NULL queue.");
         return MAMA_STATUS_NULL_ARG;
     }
-    impl->mIsDispatching = 1;

+    wInterlocked_set (1, &impl->mIsDispatching);
     return impl->mBridgeImpl->bridgeMamaQueueDispatch
                                 (impl->mMamaQueueBridgeImpl);
 }
@@ -802,7 +810,7 @@ mama_status
 mamaQueue_timedDispatch (mamaQueue queue,
                          uint64_t  timeout)
 {
-    mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+    mamaQueueImpl* impl   = (mamaQueueImpl*)queue;

     if (!impl)
     {
@@ -811,6 +819,7 @@ mamaQueue_timedDispatch (mamaQueue queue,
         return MAMA_STATUS_NULL_ARG;
     }

+    wInterlocked_set (1, &impl->mIsDispatching);
     return impl->mBridgeImpl->bridgeMamaQueueTimedDispatch
                             (impl->mMamaQueueBridgeImpl, timeout);
 }
@@ -842,8 +851,8 @@ mamaQueue_stopDispatch (mamaQueue queue)
                   "mamaQueue_stopDispatch(): NULL queue.");
         return MAMA_STATUS_NULL_ARG;
     }
-    impl->mIsDispatching = 0;

+    wInterlocked_set (0, &impl->mIsDispatching);
     return impl->mBridgeImpl->bridgeMamaQueueStopDispatch
                             (impl->mMamaQueueBridgeImpl);
 }
@@ -1261,16 +1270,10 @@ static void
 {
     mamaDispatcherImpl* impl = (mamaDispatcherImpl*)closure;

-    impl->mIsDispatching = 1;
-
-    while (impl->mIsDispatching && !impl->mDestroy &&
-           MAMA_STATUS_OK == mamaQueue_dispatch (impl->mQueue))
-        ;
-
-    impl->mIsDispatching = 0;
-
-    if (impl->mQueue)
-        impl->mQueue->mDispatcher = NULL;
+    wInterlocked_set (1, &impl->mIsDispatching);
+    while (wInterlocked_read (&impl->mIsDispatching) &&
+           MAMA_STATUS_OK == mamaQueue_dispatch (impl->mQueue));
+    wInterlocked_set (0, &impl->mIsDispatching);

     return NULL;
 }
@@ -1309,8 +1312,10 @@ mamaDispatcher_create (mamaDispatcher *result,
         return MAMA_STATUS_NOMEM;
     }

+    wInterlocked_initialize(&impl->mIsDispatching);
+    wInterlocked_set(0, &impl->mIsDispatching);
+
     impl->mQueue = queue;
-    impl->mDestroy = 0;
     if (wthread_create(&impl->mThread, NULL, dispatchThreadProc, impl))
     {
         free (impl);
@@ -1333,16 +1338,15 @@ mamaDispatcher_destroy (mamaDispatcher dispatcher)
     if (!impl)
         return MAMA_STATUS_NULL_ARG;

-    if( impl->mQueue && impl->mIsDispatching )
+    if (impl->mQueue)
     {
-        impl->mIsDispatching = 0;
         mamaQueue_stopDispatch (impl->mQueue);
     }

-    impl->mDestroy = 1;
-
     /* Wait for the thread to return. */
+    wInterlocked_set(0, &impl->mIsDispatching);
     wthread_join (impl->mThread, NULL);
+    wInterlocked_destroy (&impl->mIsDispatching);

     impl->mQueue->mDispatcher = NULL;
     free (impl);
@@ -1352,7 +1356,8 @@ mamaDispatcher_destroy (mamaDispatcher dispatcher)
 int
 mamaQueueImpl_isDispatching (mamaQueue queue)
 {
-    return ((mamaQueueImpl*)queue)->mIsDispatching;
+    mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+    return wInterlocked_read (&impl->mIsDispatching);
 }

 int MAMACALLTYPE
diff --git a/mama/c_cpp/src/gunittest/c/SConscript b/mama/c_cpp/src/gunittest/c/SConscript
index ade6975..f1eb158 100644
--- a/mama/c_cpp/src/gunittest/c/SConscript
+++ b/mama/c_cpp/src/gunittest/c/SConscript
@@ -16,7 +16,7 @@ incpath.append('#mama/c_cpp/src/c')
 libpath = []
 libpath.append('$libdir')

-env.Append(LIBPATH=libpath, LIBS=['dl', 'wombatcommon', 'mama'],
+env.Append(LIBPATH=libpath, LIBS=['dl', 'wombatcommon', 'mama', 'pthread'],
            CPPPATH=incpath)

 env['CCFLAGS'] = [x for x in env['CCFLAGS'] if x != '-pedantic-errors']
diff --git a/mama/c_cpp/src/gunittest/c/openclosetest.cpp b/mama/c_cpp/src/gunittest/c/openclosetest.cpp
index ed26e05..f288ba5 100644
--- a/mama/c_cpp/src/gunittest/c/openclosetest.cpp
+++ b/mama/c_cpp/src/gunittest/c/openclosetest.cpp
@@ -23,11 +23,11 @@
  *               opening and closing middleware bridges.
  */

+#include "MainUnitTestC.h"
 #include <gtest/gtest.h>
 #include "mama/mama.h"
 #include "mama/status.h"
-#include "MainUnitTestC.h"
-
+#include "wombat/wSemaphore.h"

 class MamaOpenCloseTestC : public ::testing::Test
 {
@@ -38,7 +38,6 @@ protected:

     virtual void SetUp(void);
     virtual void TearDown(void);
-

 };

@@ -58,10 +57,19 @@ void MamaOpenCloseTestC::TearDown(void)
 {
 }

-static void MAMACALLTYPE startCallback (mama_status status)
+static void MAMACALLTYPE startCallback (mama_status status,
+                                        mamaBridge  bridge,
+                                        void*       closure)
 {
+    wsem_t* sem = (wsem_t*)closure;
+    ASSERT_EQ (0, wsem_post (sem));
 }

+void onEventStop (mamaQueue queue, void* closure)
+{
+    mamaBridge bridge = (mamaBridge)closure;
+    mama_stop (bridge);
+}

 /* ************************************************************************* */
 /* Tests */
@@ -73,8 +81,8 @@ static void MAMACALLTYPE startCallback (mama_status status)
  */
 TEST_F (MamaOpenCloseTestC, OpenClose)
 {
-    mamaBridge mBridge;
-    mama_loadBridge (&mBridge, getMiddleware());
+    mamaBridge bridge;
+    mama_loadBridge (&bridge, getMiddleware());

     ASSERT_EQ (MAMA_STATUS_OK, mama_open());

@@ -89,8 +97,8 @@ TEST_F (MamaOpenCloseTestC, OpenClose)
  */
 TEST_F (MamaOpenCloseTestC, NestedOpenClose)
 {
-    mamaBridge mBridge;
-    mama_loadBridge (&mBridge, getMiddleware());
+    mamaBridge bridge;
+    mama_loadBridge (&bridge, getMiddleware());

     ASSERT_EQ (MAMA_STATUS_OK, mama_open());

@@ -109,15 +117,15 @@ TEST_F (MamaOpenCloseTestC, NestedOpenClose)
  */
 TEST_F (MamaOpenCloseTestC, OpenCloseReopenSameBridge)
 {
-    mamaBridge mBridge;
-    mama_loadBridge (&mBridge, getMiddleware());
+    mamaBridge bridge;
+    mama_loadBridge (&bridge, getMiddleware());

     ASSERT_EQ (MAMA_STATUS_OK,  mama_open());

     ASSERT_EQ (MAMA_STATUS_OK, mama_close());

     /* bridge must be loaded again after close */
-    mama_loadBridge (&mBridge, getMiddleware());
+    ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

     ASSERT_EQ (MAMA_STATUS_OK, mama_open());
 }
@@ -129,18 +137,18 @@ TEST_F (MamaOpenCloseTestC, OpenCloseReopenSameBridge)
  */
 TEST_F (MamaOpenCloseTestC, DISABLED_OpenCloseReopenNewBridge)
 {
-    mamaBridge mBridge;
-    ASSERT_EQ (MAMA_STATUS_OK,  mama_loadBridge (&mBridge, getMiddleware()));
+    mamaBridge bridge;
+    ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

-    ASSERT_EQ (MAMA_STATUS_OK,  mama_open());
+    ASSERT_EQ (MAMA_STATUS_OK, mama_open());

-    ASSERT_EQ (MAMA_STATUS_OK,  mama_close());
+    ASSERT_EQ (MAMA_STATUS_OK, mama_close());

-    ASSERT_EQ (MAMA_STATUS_OK,  mama_loadBridge (&mBridge, "avis"));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, "avis"));

-    ASSERT_EQ (MAMA_STATUS_OK,  mama_open());
+    ASSERT_EQ (MAMA_STATUS_OK, mama_open());

-    ASSERT_EQ (MAMA_STATUS_OK,  mama_close());
+    ASSERT_EQ (MAMA_STATUS_OK, mama_close());
 }

 /*  Description:     Load the middleware bridge, initialize MAMA, begin
@@ -151,18 +159,19 @@ TEST_F (MamaOpenCloseTestC, DISABLED_OpenCloseReopenNewBridge)
  */
 TEST_F (MamaOpenCloseTestC, StartStopDifferentThreads)
 {
-    mamaBridge mBridge;
-    mama_loadBridge (&mBridge, getMiddleware());
+    mamaBridge bridge;
+    ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, getMiddleware()));

     ASSERT_EQ (MAMA_STATUS_OK, mama_open());

     /* Start mama in the background so it uses a different thread */
-    ASSERT_EQ (MAMA_STATUS_OK, mama_startBackground (mBridge, startCallback));
-
-    /* Sleep to allow the other thread to complete startup */
-    sleep(2);
-
-    ASSERT_EQ (MAMA_STATUS_OK, mama_stop (mBridge));
+    wsem_t sem;
+    mamaQueue defaultQueue;
+    ASSERT_EQ (0, wsem_init (&sem, 0, 0));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_getDefaultEventQueue (bridge, &defaultQueue));
+    ASSERT_EQ (MAMA_STATUS_OK, mamaQueue_enqueueEvent (defaultQueue, onEventStop, bridge));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_startBackgroundEx (bridge, startCallback, &sem));
+    ASSERT_EQ (0, wsem_wait (&sem));

     ASSERT_EQ (MAMA_STATUS_OK, mama_close());
 }
diff --git a/mama/c_cpp/src/gunittest/c/queuetest.cpp b/mama/c_cpp/src/gunittest/c/queuetest.cpp
index c9dbe66..5315c26 100644
--- a/mama/c_cpp/src/gunittest/c/queuetest.cpp
+++ b/mama/c_cpp/src/gunittest/c/queuetest.cpp
@@ -17,13 +17,14 @@
  * 02110-1301 USA
  */

-
+#include "MainUnitTestC.h"
 #include <gtest/gtest.h>
 #include <pthread.h>
 #include "mama/mama.h"
 #include "mama/status.h"
-#include "MainUnitTestC.h"
 #include "mama/queue.h"
+#include "mama/transport.h"
+#include "wombat/wSemaphore.h"

 class MamaQueueTestC : public ::testing::Test
 {
@@ -33,25 +34,22 @@ class MamaQueueTestC : public ::testing::Test
         virtual void SetUp();
         virtual void TearDown();
     public:
-        mamaBridge      mBridge;
-        MamaQueueTestC* m_this;
+        mamaBridge      m_bridge;
+        mamaTransport   m_transport;
         int             m_numQueues;
-        int             m_queueCounter;
         int             m_numEvents;
         int             m_eventCounter;
-        int             m_numDispatches;
         int             m_highWaterMarkOccurance;
         int             m_lowWaterMarkOccurance;
+        int             m_numDispatches[10];
         uint64_t        m_timeout;
-        mamaDispatcher  dispatcher[10];
-        mamaQueue       qArray[10];
+        mamaDispatcher  m_dispatcher[10];
+        mamaQueue       m_queues[10];
+        wsem_t          m_sem;
 };

 MamaQueueTestC::MamaQueueTestC()
 {
-    mBridge;
-    m_this = this;
-
     m_highWaterMarkOccurance = 0;
     m_lowWaterMarkOccurance  = 0;
     m_timeout                = 5000;
@@ -59,23 +57,31 @@ MamaQueueTestC::MamaQueueTestC()

 MamaQueueTestC::~MamaQueueTestC()
 {
-    m_this = NULL;
 }

 void MamaQueueTestC::SetUp()
 {
-    mama_loadBridge (&mBridge, getMiddleware());
-    mama_open();
+    ASSERT_EQ(0, wsem_init (&m_sem, 0, 0));
+    m_eventCounter = 0;
+
+    ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&m_bridge, getMiddleware()));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_allocate (&m_transport));
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_create (m_transport, NULL, m_bridge));
 }

 void MamaQueueTestC::TearDown()
 {
-    mama_close();
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_destroy (m_transport));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+    ASSERT_EQ (0, wsem_destroy (&m_sem));
 }

+#if 0
 static void MAMACALLTYPE startCallback (mama_status status)
 {
 }
+#endif

 void highWaterMarkCallback (mamaQueue queue, size_t size, void* closure)
 {
@@ -98,21 +104,22 @@ void onEvent (mamaQueue queue, void* closure)
         mamaQueue_stopDispatch (queue);
     }
 }
-void onTimedEvent (mamaQueue queue, void* closure)
+
+void onEventNop (mamaQueue queue, void* closure)
 {
 }

 void onBgEvent (mamaQueue queue, void* closure)
 {
     MamaQueueTestC* fixture = (MamaQueueTestC *)closure;
-    fixture->m_numDispatches++;
+
+    void* pIndex = 0;
+    mamaQueue_getClosure (queue, &pIndex);
+    size_t index = (size_t)pIndex;

-    if (fixture->m_numDispatches == 1000)
+    if (fixture->m_numEvents == ++fixture->m_numDispatches[index])
     {
-        for (int x=0; x!=10; x++)
-        {
-            mamaQueue_stopDispatch (fixture->qArray[x]);
-        }
+        wsem_post (&fixture->m_sem);
     }
 }

@@ -141,7 +148,7 @@ TEST_F (MamaQueueTestC, GetDefaultQueue)
     mamaQueue defaultQueue;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mama_getDefaultEventQueue (mBridge, &defaultQueue));
+               mama_getDefaultEventQueue (m_bridge, &defaultQueue));
 }

 /*  Description:   Create a mamaQueue then destroy it.
@@ -153,7 +160,7 @@ TEST_F (MamaQueueTestC, CreateNonDefaultQueue)
     mamaQueue queue;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_create (&queue, mBridge));
+               mamaQueue_create (&queue, m_bridge));

     ASSERT_EQ (MAMA_STATUS_OK,
                mamaQueue_destroy (queue));
@@ -168,11 +175,14 @@ TEST_F (MamaQueueTestC, Enqueue)
     mamaQueue defaultQueue = NULL;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mama_getDefaultEventQueue (mBridge, &defaultQueue));
+               mama_getDefaultEventQueue (m_bridge, &defaultQueue));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+               mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

+    ASSERT_EQ (MAMA_STATUS_OK,
+               mamaQueue_dispatchEvent (defaultQueue));
+
 }

 /*  Description:   Set the enqueue callback associated with the default queue
@@ -185,17 +195,19 @@ TEST_F (MamaQueueTestC, setEnqueueCallback)
     mamaQueue defaultQueue = NULL;

     m_numEvents    = 1;
-    m_eventCounter = 0;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mama_getDefaultEventQueue (mBridge, &defaultQueue));
+               mama_getDefaultEventQueue (m_bridge, &defaultQueue));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, m_this));
+               mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, this));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+               mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

+    ASSERT_EQ (MAMA_STATUS_OK,
+               mamaQueue_dispatchEvent (defaultQueue));
+
 }

 /*  Description:   Set the enqueue callback assiciated with the
@@ -207,24 +219,22 @@ TEST_F (MamaQueueTestC, RemoveEnqueueCallback)
 {
     mamaQueue defaultQueue = NULL;

-    m_numEvents    = 1;
-    m_eventCounter = 0;
+    m_numEvents = 1;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mama_getDefaultEventQueue (mBridge, &defaultQueue));
+               mama_getDefaultEventQueue (m_bridge, &defaultQueue));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, m_this));
+               mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue, this));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+               mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_removeEnqueueCallback (defaultQueue));
+               mamaQueue_dispatchEvent (defaultQueue));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
-
+               mamaQueue_removeEnqueueCallback (defaultQueue));
 }

 /*  Description:   Enqueue an event on the default queue then dispatch it.
@@ -236,13 +246,12 @@ TEST_F (MamaQueueTestC, EnqueueDispatch)
     mamaQueue defaultQueue = NULL;

     m_numEvents    = 1;
-    m_eventCounter = 0;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mama_getDefaultEventQueue (mBridge, &defaultQueue));
+               mama_getDefaultEventQueue (m_bridge, &defaultQueue));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+               mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

     ASSERT_EQ (MAMA_STATUS_OK,
                mamaQueue_dispatchEvent (defaultQueue));
@@ -259,13 +268,12 @@ TEST_F (MamaQueueTestC, EnqueueDispatchNonDefault)
     mamaQueue queue = NULL;

     m_numEvents    = 1;
-    m_eventCounter = 0;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_create (&queue, mBridge));
+               mamaQueue_create (&queue, m_bridge));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_enqueueEvent (queue, onEvent, m_this));
+               mamaQueue_enqueueEvent (queue, onEvent, this));

     ASSERT_EQ (MAMA_STATUS_OK,
                mamaQueue_dispatch (queue));
@@ -286,15 +294,14 @@ TEST_F (MamaQueueTestC, EnqueueDispatchMany)
     mamaQueue queue = NULL;

     m_numEvents    = 10;
-    m_eventCounter = 0;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_create (&queue, mBridge));
+               mamaQueue_create (&queue, m_bridge));

-    for (int x=0; x<=m_numEvents; x++)
+    for (int x=0; x<m_numEvents; x++)
     {
         ASSERT_EQ (MAMA_STATUS_OK,
-                   mamaQueue_enqueueEvent (queue, onEvent, m_this));
+                   mamaQueue_enqueueEvent (queue, onEvent, this));
     }
     ASSERT_EQ (MAMA_STATUS_OK,
                mamaQueue_dispatch (queue));
@@ -314,15 +321,14 @@ TEST_F (MamaQueueTestC, TimedDispatch)
     mamaQueue queue = NULL;

     m_numEvents    = 1000;
-    m_eventCounter = 0;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_create (&queue, mBridge));
+               mamaQueue_create (&queue, m_bridge));

-    for (int x=0; x<=m_numEvents; x++)
+    for (int x=0; x<m_numEvents; x++)
     {
         ASSERT_EQ (MAMA_STATUS_OK,
-                   mamaQueue_enqueueEvent (queue, onTimedEvent, m_this));
+                   mamaQueue_enqueueEvent (queue, onEventNop, this));
     }

     ASSERT_EQ (MAMA_STATUS_OK,
@@ -343,17 +349,16 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
 {
     mamaQueue queue   = NULL;
     m_numEvents       = 20;
-    m_eventCounter    = 0;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_create (&queue, mBridge));
+               mamaQueue_create (&queue, m_bridge));

     mamaQueueMonitorCallbacks queueCallbacks;
     queueCallbacks.onQueueHighWatermarkExceeded = onHighWatermark;
     queueCallbacks.onQueueLowWatermark          = onLowWatermark;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaQueue_setQueueMonitorCallbacks (queue, &queueCallbacks, m_this));
+               mamaQueue_setQueueMonitorCallbacks (queue, &queueCallbacks, this));

     ASSERT_EQ (MAMA_STATUS_OK,
                mamaQueue_setHighWatermark (queue, 10));
@@ -361,10 +366,10 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
     ASSERT_EQ (MAMA_STATUS_OK,
                mamaQueue_setLowWatermark (queue, 5));

-    for (int x=0; x<=m_numEvents; x++)
+    for (int x=0; x<m_numEvents; x++)
     {
         ASSERT_EQ (MAMA_STATUS_OK,
-                   mamaQueue_enqueueEvent (queue, onEvent, m_this));
+                   mamaQueue_enqueueEvent (queue, onEvent, this));
     }

     ASSERT_EQ (MAMA_STATUS_OK,
@@ -383,37 +388,40 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
  */
 TEST_F (MamaQueueTestC, DispatchManyQueuesWithDispatchers)
 {
-    m_numQueues     = 10;
+    m_numQueues     = 10;  // FIXME: Storage is hardcoded to 10!
     m_numEvents     = 100;
-    m_eventCounter  = 0;
-    m_numDispatches = 0;

-    for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+    for (int x = 0; x!=m_numQueues; x++)
     {
+        m_numDispatches[x] = 0;
+
         ASSERT_EQ (MAMA_STATUS_OK,
-                   mamaQueue_create (&qArray[m_queueCounter], mBridge));
+                   mamaQueue_create (&m_queues[x], m_bridge));

-        for (int x=0; x!=m_numEvents; x++)
+        mamaQueue_setClosure (m_queues[x], (void*)(size_t)x);
+
+        for (int y=0; y!=m_numEvents; y++)
         {
             ASSERT_EQ (MAMA_STATUS_OK,
-                       mamaQueue_enqueueEvent (qArray[m_queueCounter], onBgEvent, m_this));
+                       mamaQueue_enqueueEvent (m_queues[x], onBgEvent, this));
         }
     }

-    for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+    for (int x = 0; x!=m_numQueues; x++)
     {
         ASSERT_EQ (MAMA_STATUS_OK,
-                   mamaDispatcher_create (&dispatcher[m_queueCounter], qArray[m_queueCounter]));
+                   mamaDispatcher_create (&m_dispatcher[x], m_queues[x]));
     }

-    for (int x=0; x!=10; x++)
+    for (int x = 0; x!=m_numQueues; x++)
     {
-        mamaDispatcher_destroy (dispatcher[x]);
+        ASSERT_EQ(0, wsem_wait (&m_sem));
     }
-    for (m_queueCounter = 0; m_queueCounter!=m_numQueues; m_queueCounter++)
+
+    for (int x = 0; x!=m_numQueues; x++)
     {
-        mamaQueue_destroy (qArray[m_queueCounter]);
+        ASSERT_EQ(MAMA_STATUS_OK, mamaDispatcher_destroy (m_dispatcher[x]));
+        ASSERT_EQ(MAMA_STATUS_OK, mamaQueue_destroy (m_queues[x]));
     }
-
 }

diff --git a/mama/c_cpp/src/gunittest/c/timertest.cpp b/mama/c_cpp/src/gunittest/c/timertest.cpp
index f0102df..0980f13 100644
--- a/mama/c_cpp/src/gunittest/c/timertest.cpp
+++ b/mama/c_cpp/src/gunittest/c/timertest.cpp
@@ -26,6 +26,7 @@
 #include "mama/types.h"
 #include "mama/timer.h"
 #include "mama/queue.h"
+#include "mama/transport.h"
 #include <cstring>
 #include <cstdio>
 #include <cstdlib>
@@ -40,18 +41,17 @@ protected:
     virtual void SetUp();
     virtual void TearDown ();
 public:
-    MamaTimerTestC *m_this;
-    mamaBridge mBridge;
-
-    int         tCounter;
-    int         numTimers;
-    mamaTimer   tarray[100];
-    mamaTimer   longTimer;
-    mamaTimer   shortTimer;
-    mamaTimer   stopTimer;
-    mamaTimer   timer;
-    mamaQueue   queue;
-    mama_f64_t  interval;
+    mamaBridge    m_bridge;
+    mamaTransport m_transport;
+    int           m_tCounter;
+    int           m_numTimers;
+    mamaTimer     m_timers[100];
+    mamaTimer     m_longTimer;
+    mamaTimer     m_shortTimer;
+    mamaTimer     m_stopTimer;
+    mamaTimer     m_timer;
+    mamaQueue     m_queue;
+    mama_f64_t    m_interval;
 };

 MamaTimerTestC::MamaTimerTestC()
@@ -64,53 +64,50 @@ MamaTimerTestC::~MamaTimerTestC()

 void MamaTimerTestC::SetUp(void)
 {
-    interval = 0.01;
-    m_this   = this;
+    m_tCounter = 0;
+    m_interval = 0.01;

-    mama_loadBridge (&mBridge, getMiddleware());
-    mama_open ();
-    ASSERT_EQ (MAMA_STATUS_OK,
-               mama_getDefaultEventQueue (mBridge, &queue));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&m_bridge, getMiddleware()));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_getDefaultEventQueue (m_bridge, &m_queue));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_allocate (&m_transport));
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_create (m_transport, NULL, m_bridge));
 }

 void MamaTimerTestC::TearDown(void)
 {
-    mama_close ();
-    m_this = NULL;
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_destroy (m_transport));
+    ASSERT_EQ (MAMA_STATUS_OK, mama_close());
 }

 static void MAMACALLTYPE onTimerTick (mamaTimer timer, void* closure)
 {
-    ASSERT_EQ (MAMA_STATUS_OK,
-               mamaTimer_destroy(timer));
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy(timer));
 }

 static void MAMACALLTYPE onTimerDestroy (mamaTimer timer, void* closure)
 {
     MamaTimerTestC* fixture = (MamaTimerTestC *)closure;
-    fixture->tCounter++;
-
-    if (fixture->tCounter == fixture->numTimers)
+    if (++fixture->m_tCounter == fixture->m_numTimers)
     {
         ASSERT_EQ (MAMA_STATUS_OK,
-                   mama_stop (fixture->mBridge));
+                   mama_stop (fixture->m_bridge));
     }
 }

 static void MAMACALLTYPE onRecursiveTimerDestroy (mamaTimer timer, void* closure)
 {
     MamaTimerTestC* fixture = (MamaTimerTestC *)closure;
-    fixture->tCounter++;
-
-    if (fixture->tCounter == fixture->numTimers)
+    if (++fixture->m_tCounter == fixture->m_numTimers)
     {
         ASSERT_EQ (MAMA_STATUS_OK,
-                   mama_stop(fixture->mBridge));
+                   mama_stop(fixture->m_bridge));
     }
     else
     {
-        mamaTimer_create2 (&timer, fixture->queue, onTimerTick,
-                           onRecursiveTimerDestroy, fixture->interval, fixture);
+        ASSERT_EQ (MAMA_STATUS_OK,
+                   mamaTimer_create2 (&fixture->m_timer, fixture->m_queue, onTimerTick,
+                                      onRecursiveTimerDestroy, fixture->m_interval, fixture));
     }

 }
@@ -123,24 +120,21 @@ static void MAMACALLTYPE onLongTimerTick (mamaTimer timer, void* closure)
 {
 }

-static void MAMACALLTYPE onTwoTimerDestroy (mamaTimer timer, void* closure)
-{
-}
-
 static void MAMACALLTYPE onStopTimerTick (mamaTimer timer, void* closure)
 {
-    mamaTimer_destroy(timer);
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy(timer));
 }

 static void MAMACALLTYPE onStopTimerDestroy (mamaTimer timer, void* closure)
 {
     MamaTimerTestC* fixture = (MamaTimerTestC *)closure;

-    mamaTimer_destroy (fixture->shortTimer);
-    mamaTimer_destroy (fixture->longTimer);
-
-    mama_stop (fixture->mBridge);
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy (fixture->m_shortTimer));
+    ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy (fixture->m_longTimer));
+
+    ASSERT_EQ (MAMA_STATUS_OK, mama_stop (fixture->m_bridge));
 }
+
 /* ************************************************************************* */
 /* Test Functions */
 /* ************************************************************************* */
@@ -151,19 +145,13 @@ static void MAMACALLTYPE onStopTimerDestroy (mamaTimer timer, void* closure)
  */
 TEST_F (MamaTimerTestC, CreateDestroy)
 {
-
-    MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
-    fixture->tCounter  = 0;
-    fixture->numTimers = 1;
+    m_numTimers = 1;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaTimer_create2 (&timer, fixture->queue, onTimerTick,
-                                  onTimerDestroy, fixture->interval, m_this));
-
-    ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
+               mamaTimer_create2 (&m_timer, m_queue, onTimerTick,
+                                  onTimerDestroy, m_interval, this));

-    ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+    ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
 }

 /*  Description: Create many mamaTimers which destroy themselves when fired.
@@ -173,21 +161,16 @@ TEST_F (MamaTimerTestC, CreateDestroy)
  */
 TEST_F (MamaTimerTestC, CreateDestroyMany)
 {
-    MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
-    fixture->tCounter  = 0;
-    fixture->numTimers = 100;
+    m_numTimers = 100;  // FIXME: Storage hardcoded to 100!

-    for (int x=0; x!=fixture->numTimers; x++)
+    for (int x=0; x!=m_numTimers; x++)
     {
         ASSERT_EQ (MAMA_STATUS_OK,
-                   mamaTimer_create2 (&tarray[x], fixture->queue, onTimerTick,
-                                      onTimerDestroy,fixture->interval, m_this));
+                   mamaTimer_create2 (&m_timers[x], m_queue, onTimerTick,
+                                      onTimerDestroy, m_interval, this));
     }

-    ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
-
-    ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+    ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
 }

 /*  Description: Create a timer whiich creates another timer when fired.
@@ -197,18 +180,13 @@ TEST_F (MamaTimerTestC, CreateDestroyMany)
  */
 TEST_F (MamaTimerTestC, RecursiveCreateDestroy)
 {
-    MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
-    fixture->tCounter  = 0;
-    fixture->numTimers = 11;
+    m_numTimers = 11;

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaTimer_create2(&timer, fixture->queue, onTimerTick,
-                                 onRecursiveTimerDestroy, fixture->interval, m_this));
+               mamaTimer_create2(&m_timer, m_queue, onTimerTick,
+                                 onRecursiveTimerDestroy, m_interval, this));

-    ASSERT_EQ (MAMA_STATUS_OK, mama_start(mBridge));
-
-    ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+    ASSERT_EQ (MAMA_STATUS_OK, mama_start(m_bridge));
 }

 /*  Description: Two timers are created which tick indefinately at different rates,
@@ -218,22 +196,18 @@ TEST_F (MamaTimerTestC, RecursiveCreateDestroy)
  */
 TEST_F (MamaTimerTestC, TwoTimer)
 {
-    MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
-
     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaTimer_create(&shortTimer, fixture->queue, onShortTimerTick,
-                                fixture->interval, m_this));
+               mamaTimer_create(&m_shortTimer, m_queue, onShortTimerTick,
+                                m_interval, this));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaTimer_create (&longTimer, fixture->queue, onLongTimerTick,
-                                 ((fixture->interval)*2), m_this));
+               mamaTimer_create (&m_longTimer, m_queue, onLongTimerTick,
+                                 m_interval*2, this));

     ASSERT_EQ (MAMA_STATUS_OK,
-               mamaTimer_create2 (&stopTimer, fixture->queue, onStopTimerTick,
-                                  onStopTimerDestroy, ((fixture->interval)*100), m_this));
+               mamaTimer_create2 (&m_stopTimer, m_queue, onStopTimerTick,
+                                  onStopTimerDestroy, m_interval*100, this));

-    ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
-
-    ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+    ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
 }

--
1.9.0




--
Lee Skillen

Vulcan Financial Technologies
51 Malone Road, Belfast, BT9 6RY

Office:  +44 (0)28 95 817888
Mobile:  +44 (0)78 41 425152
Web:     www.vulcanft.com 


Damian Maguire <DMaguire@...>
 

Hey Lee,

I've added this to Bugzilla for tracking as well, if you'd like to add
yourself as a CC on it you can see when we update the ticket with any
comments.

Ticket Number: http://bugs.openmama.org/show_bug.cgi?id=68

Cheers,

D




On 2/21/14 11:48 AM, "Lee Skillen" <lskillen@...> wrote:


Changed the mIsDispatching signalling variable into an atomic int since
its visible, inspected and modified across thread boundaries. This is
the cause of several errors in helgrind and has been observed to cause
an issue with shutting down the queue within avis (may occur on other
middleware, but cannot reproduce with qpid).

Additional changes :-

- Fixed warning for uninitialised variables in msgutils.

- Rewrite of the MamaQueueTestC/MamaTimerTestC tests. Fixed memory
leaks, race conditions and off-by-one errors (causing some to execute
across the boundary of indivisible tests). Also added in allocation
of transport to keep avis middleware happy (it doesn't construct a
default transport so it didn't execute the tests properly).

All MamaQueueTestC/MamaTimerTestC tests now execute for both qpid and
avis (although the latter fails on some due to unimplemented features).

Signed-off-by: Lee Skillen <lskillen@...>
---
mama/c_cpp/src/c/msgutils.c | 4 +-
mama/c_cpp/src/c/queue.c | 79 +++++++-------
mama/c_cpp/src/gunittest/c/openclosetest.cpp | 61 ++++++-----
mama/c_cpp/src/gunittest/c/queuetest.cpp | 150
++++++++++++++-------------
mama/c_cpp/src/gunittest/c/timertest.cpp | 136
++++++++++--------------
5 files changed, 213 insertions(+), 217 deletions(-)

diff --git a/mama/c_cpp/src/c/msgutils.c b/mama/c_cpp/src/c/msgutils.c
index 6f9bd5d..0530c30 100644
--- a/mama/c_cpp/src/c/msgutils.c
+++ b/mama/c_cpp/src/c/msgutils.c
@@ -58,7 +58,7 @@ msgUtils_setStatus (mamaMsg msg, short status)
mama_status
msgUtils_msgTotal (mamaMsg msg, short *result)
{
- int32_t val;
+ int32_t val = 0;
mama_status status;

status = mamaMsg_getI32 (msg,
@@ -74,7 +74,7 @@ msgUtils_msgTotal (mamaMsg msg, short *result)
mama_status
msgUtils_msgNum (mamaMsg msg, short *result)
{
- int32_t val;
+ int32_t val = 0;
mama_status status;

status = mamaMsg_getI32 (msg,
diff --git a/mama/c_cpp/src/c/queue.c b/mama/c_cpp/src/c/queue.c
index 8fccaa6..6a104da 100644
--- a/mama/c_cpp/src/c/queue.c
+++ b/mama/c_cpp/src/c/queue.c
@@ -66,7 +66,7 @@ typedef struct mamaQueueImpl_
{
/*Reuseable message for all received on this queue*/
mamaMsg mMsg;
- int mIsDispatching;
+ wInterlockedInt mIsDispatching;
/*Hold onto the bridge impl for later use*/
mamaBridgeImpl* mBridgeImpl;
/*The bridge specific queue implementation*/
@@ -103,16 +103,14 @@ typedef struct mamaQueueImpl_
} mamaQueueImpl;

/*Main structure for the mamaDispatcher*/
-typedef struct mamaDisptacherImpl_
+typedef struct mamaDispatcherImpl_
{
/*The queue on which this dispatcher is dispatching*/
- mamaQueue mQueue;
+ mamaQueue mQueue;
/*The thread on which this dispatcher is dispathcing.*/
- wthread_t mThread;
+ wthread_t mThread;
/*Whether the dispatcher is dispatching*/
- int mIsDispatching;
- /*Destroy has been called*/
- int mDestroy;
+ wInterlockedInt mIsDispatching;
} mamaDispatcherImpl;


@@ -226,7 +224,8 @@ mamaQueue_create (mamaQueue* queue,
wInterlocked_initialize(&impl->mNumberOpenObjects);
wInterlocked_set(0, &impl->mNumberOpenObjects);

-
+ wInterlocked_initialize(&impl->mIsDispatching);
+ wInterlocked_set(0, &impl->mIsDispatching);

/* Call the bridge impl specific queue create function*/
if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaQueueCreate
@@ -549,6 +548,7 @@ mamaQueue_destroyWait(mamaQueue queue)
{
/* Dispatch messages for 10 ms. */
ret = mamaQueue_timedDispatch(queue, 10);
+
if(MAMA_STATUS_OK == ret)
{
/* Attempt to destroy the queue again. */
@@ -587,6 +587,8 @@ mamaQueue_destroyTimedWait(mamaQueue queue,
{
/* Dispatch messages for 10 ms. */
ret = mamaQueue_timedDispatch(queue, 10);
+
+ /* Should this not react to MAMA_STATUS_TIMEOUT too? */
if(MAMA_STATUS_OK == ret)
{
/* Add 10ms to the count. */
@@ -638,7 +640,6 @@ mamaQueue_destroy (mamaQueue queue)

mama_log (MAMA_LOG_LEVEL_FINEST, "Entering mamaQueue_destroy for
queue 0x%X.", queue);

- impl->mIsDispatching = 0;
if (!queue)
{
mama_log (MAMA_LOG_LEVEL_ERROR, "mamaQueue_destroy(): NULL
queue.");
@@ -649,8 +650,19 @@ mamaQueue_destroy (mamaQueue queue)
status = MAMA_STATUS_QUEUE_OPEN_OBJECTS;

/* Only continue if the object count is 0. */
- if(0 == wInterlocked_read(&impl->mNumberOpenObjects))
+ if (0 == wInterlocked_read(&impl->mNumberOpenObjects))
{
+ wInterlocked_set (0, &impl->mIsDispatching);
+
+ if (impl->mDispatcher)
+ {
+ /* Try to ensure that the dispatcher does not restart the
queue
+ * dispatching after we've stopped and destroy it. */
+ mamaDispatcherImpl* dispatcherImpl =
(mamaDispatcherImpl*)impl->mDispatcher;
+ wInterlocked_set (0, &dispatcherImpl->mIsDispatching);
+ dispatcherImpl->mQueue = NULL;
+ }
+
if (impl->mMamaQueueBridgeImpl)
{
if
(MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaQueueDestroy (
@@ -662,13 +674,6 @@ mamaQueue_destroy (mamaQueue queue)
}
}

- if (impl->mDispatcher)
- {
- /* We don't want the dispatcher to access a destroyed queue
*/
- ((mamaDispatcherImpl*)(impl->mDispatcher))->mIsDispatching =
0;
- ((mamaDispatcherImpl*)(impl->mDispatcher))->mQueue = NULL;
- }
-
/*Destroy the cached mamaMsg - no longer needed*/
if (impl->mMsg) mamaMsg_destroy (impl->mMsg);

@@ -747,6 +752,7 @@ mamaQueue_destroy (mamaQueue queue)

/* Destroy the counter lock */
wInterlocked_destroy(&impl->mNumberOpenObjects);
+ wInterlocked_destroy(&impl->mIsDispatching);

free (impl);

@@ -785,15 +791,17 @@ mamaQueue_getEventCount (mamaQueue queue,
mama_status
mamaQueue_dispatch (mamaQueue queue)
{
- mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mama_status status = MAMA_STATUS_OK;
+
if (!impl)
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"mamaQueue_dispatch(): NULL queue.");
return MAMA_STATUS_NULL_ARG;
}
- impl->mIsDispatching = 1;

+ wInterlocked_set (1, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueDispatch
(impl->mMamaQueueBridgeImpl);
}
@@ -802,7 +810,7 @@ mama_status
mamaQueue_timedDispatch (mamaQueue queue,
uint64_t timeout)
{
- mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;

if (!impl)
{
@@ -811,6 +819,7 @@ mamaQueue_timedDispatch (mamaQueue queue,
return MAMA_STATUS_NULL_ARG;
}

+ wInterlocked_set (1, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueTimedDispatch
(impl->mMamaQueueBridgeImpl, timeout);
}
@@ -842,8 +851,8 @@ mamaQueue_stopDispatch (mamaQueue queue)
"mamaQueue_stopDispatch(): NULL queue.");
return MAMA_STATUS_NULL_ARG;
}
- impl->mIsDispatching = 0;

+ wInterlocked_set (0, &impl->mIsDispatching);
return impl->mBridgeImpl->bridgeMamaQueueStopDispatch
(impl->mMamaQueueBridgeImpl);
}
@@ -1261,16 +1270,10 @@ static void
{
mamaDispatcherImpl* impl = (mamaDispatcherImpl*)closure;

- impl->mIsDispatching = 1;
-
- while (impl->mIsDispatching && !impl->mDestroy &&
- MAMA_STATUS_OK == mamaQueue_dispatch (impl->mQueue))
- ;
-
- impl->mIsDispatching = 0;
-
- if (impl->mQueue)
- impl->mQueue->mDispatcher = NULL;
+ wInterlocked_set (1, &impl->mIsDispatching);
+ while (wInterlocked_read (&impl->mIsDispatching) &&
+ MAMA_STATUS_OK == mamaQueue_dispatch (impl->mQueue));
+ wInterlocked_set (0, &impl->mIsDispatching);

return NULL;
}
@@ -1309,8 +1312,10 @@ mamaDispatcher_create (mamaDispatcher *result,
return MAMA_STATUS_NOMEM;
}

+ wInterlocked_initialize(&impl->mIsDispatching);
+ wInterlocked_set(0, &impl->mIsDispatching);
+
impl->mQueue = queue;
- impl->mDestroy = 0;
if (wthread_create(&impl->mThread, NULL, dispatchThreadProc, impl))
{
free (impl);
@@ -1333,16 +1338,15 @@ mamaDispatcher_destroy (mamaDispatcher dispatcher)
if (!impl)
return MAMA_STATUS_NULL_ARG;

- if( impl->mQueue && impl->mIsDispatching )
+ if (impl->mQueue)
{
- impl->mIsDispatching = 0;
mamaQueue_stopDispatch (impl->mQueue);
}

- impl->mDestroy = 1;
-
/* Wait for the thread to return. */
+ wInterlocked_set(0, &impl->mIsDispatching);
wthread_join (impl->mThread, NULL);
+ wInterlocked_destroy (&impl->mIsDispatching);

impl->mQueue->mDispatcher = NULL;
free (impl);
@@ -1352,7 +1356,8 @@ mamaDispatcher_destroy (mamaDispatcher dispatcher)
int
mamaQueueImpl_isDispatching (mamaQueue queue)
{
- return ((mamaQueueImpl*)queue)->mIsDispatching;
+ mamaQueueImpl* impl = (mamaQueueImpl*)queue;
+ return wInterlocked_read (&impl->mIsDispatching);
}

int MAMACALLTYPE
diff --git a/mama/c_cpp/src/gunittest/c/openclosetest.cpp
b/mama/c_cpp/src/gunittest/c/openclosetest.cpp
index ed26e05..f288ba5 100644
--- a/mama/c_cpp/src/gunittest/c/openclosetest.cpp
+++ b/mama/c_cpp/src/gunittest/c/openclosetest.cpp
@@ -23,11 +23,11 @@
* opening and closing middleware bridges.
*/

+#include "MainUnitTestC.h"
#include <gtest/gtest.h>
#include "mama/mama.h"
#include "mama/status.h"
-#include "MainUnitTestC.h"
-
+#include "wombat/wSemaphore.h"

class MamaOpenCloseTestC : public ::testing::Test
{
@@ -38,7 +38,6 @@ protected:

virtual void SetUp(void);
virtual void TearDown(void);
-

};

@@ -58,10 +57,19 @@ void MamaOpenCloseTestC::TearDown(void)
{
}

-static void MAMACALLTYPE startCallback (mama_status status)
+static void MAMACALLTYPE startCallback (mama_status status,
+ mamaBridge bridge,
+ void* closure)
{
+ wsem_t* sem = (wsem_t*)closure;
+ ASSERT_EQ (0, wsem_post (sem));
}

+void onEventStop (mamaQueue queue, void* closure)
+{
+ mamaBridge bridge = (mamaBridge)closure;
+ mama_stop (bridge);
+}

/*
*************************************************************************
*/
/* Tests */
@@ -73,8 +81,8 @@ static void MAMACALLTYPE startCallback (mama_status
status)
*/
TEST_F (MamaOpenCloseTestC, OpenClose)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

@@ -89,8 +97,8 @@ TEST_F (MamaOpenCloseTestC, OpenClose)
*/
TEST_F (MamaOpenCloseTestC, NestedOpenClose)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

@@ -109,15 +117,15 @@ TEST_F (MamaOpenCloseTestC, NestedOpenClose)
*/
TEST_F (MamaOpenCloseTestC, OpenCloseReopenSameBridge)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ mama_loadBridge (&bridge, getMiddleware());

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

ASSERT_EQ (MAMA_STATUS_OK, mama_close());

/* bridge must be loaded again after close */
- mama_loadBridge (&mBridge, getMiddleware());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge,
getMiddleware()));

ASSERT_EQ (MAMA_STATUS_OK, mama_open());
}
@@ -129,18 +137,18 @@ TEST_F (MamaOpenCloseTestC,
OpenCloseReopenSameBridge)
*/
TEST_F (MamaOpenCloseTestC, DISABLED_OpenCloseReopenNewBridge)
{
- mamaBridge mBridge;
- ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&mBridge,
getMiddleware()));
+ mamaBridge bridge;
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge,
getMiddleware()));

- ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());

- ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&mBridge, "avis"));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge, "avis"));

- ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}

/* Description: Load the middleware bridge, initialize MAMA, begin
@@ -151,18 +159,19 @@ TEST_F (MamaOpenCloseTestC,
DISABLED_OpenCloseReopenNewBridge)
*/
TEST_F (MamaOpenCloseTestC, StartStopDifferentThreads)
{
- mamaBridge mBridge;
- mama_loadBridge (&mBridge, getMiddleware());
+ mamaBridge bridge;
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&bridge,
getMiddleware()));

ASSERT_EQ (MAMA_STATUS_OK, mama_open());

/* Start mama in the background so it uses a different thread */
- ASSERT_EQ (MAMA_STATUS_OK, mama_startBackground (mBridge,
startCallback));
-
- /* Sleep to allow the other thread to complete startup */
- sleep(2);
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_stop (mBridge));
+ wsem_t sem;
+ mamaQueue defaultQueue;
+ ASSERT_EQ (0, wsem_init (&sem, 0, 0));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_getDefaultEventQueue (bridge,
&defaultQueue));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaQueue_enqueueEvent (defaultQueue,
onEventStop, bridge));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_startBackgroundEx (bridge,
startCallback, &sem));
+ ASSERT_EQ (0, wsem_wait (&sem));

ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}
diff --git a/mama/c_cpp/src/gunittest/c/queuetest.cpp
b/mama/c_cpp/src/gunittest/c/queuetest.cpp
index c9dbe66..5315c26 100644
--- a/mama/c_cpp/src/gunittest/c/queuetest.cpp
+++ b/mama/c_cpp/src/gunittest/c/queuetest.cpp
@@ -17,13 +17,14 @@
* 02110-1301 USA
*/

-
+#include "MainUnitTestC.h"
#include <gtest/gtest.h>
#include <pthread.h>
#include "mama/mama.h"
#include "mama/status.h"
-#include "MainUnitTestC.h"
#include "mama/queue.h"
+#include "mama/transport.h"
+#include "wombat/wSemaphore.h"

class MamaQueueTestC : public ::testing::Test
{
@@ -33,25 +34,22 @@ class MamaQueueTestC : public ::testing::Test
virtual void SetUp();
virtual void TearDown();
public:
- mamaBridge mBridge;
- MamaQueueTestC* m_this;
+ mamaBridge m_bridge;
+ mamaTransport m_transport;
int m_numQueues;
- int m_queueCounter;
int m_numEvents;
int m_eventCounter;
- int m_numDispatches;
int m_highWaterMarkOccurance;
int m_lowWaterMarkOccurance;
+ int m_numDispatches[10];
uint64_t m_timeout;
- mamaDispatcher dispatcher[10];
- mamaQueue qArray[10];
+ mamaDispatcher m_dispatcher[10];
+ mamaQueue m_queues[10];
+ wsem_t m_sem;
};

MamaQueueTestC::MamaQueueTestC()
{
- mBridge;
- m_this = this;
-
m_highWaterMarkOccurance = 0;
m_lowWaterMarkOccurance = 0;
m_timeout = 5000;
@@ -59,23 +57,31 @@ MamaQueueTestC::MamaQueueTestC()

MamaQueueTestC::~MamaQueueTestC()
{
- m_this = NULL;
}

void MamaQueueTestC::SetUp()
{
- mama_loadBridge (&mBridge, getMiddleware());
- mama_open();
+ ASSERT_EQ(0, wsem_init (&m_sem, 0, 0));
+ m_eventCounter = 0;
+
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&m_bridge,
getMiddleware()));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_allocate (&m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_create (m_transport, NULL,
m_bridge));
}

void MamaQueueTestC::TearDown()
{
- mama_close();
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_destroy (m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (0, wsem_destroy (&m_sem));
}

+#if 0
static void MAMACALLTYPE startCallback (mama_status status)
{
}
+#endif

void highWaterMarkCallback (mamaQueue queue, size_t size, void* closure)
{
@@ -98,21 +104,22 @@ void onEvent (mamaQueue queue, void* closure)
mamaQueue_stopDispatch (queue);
}
}
-void onTimedEvent (mamaQueue queue, void* closure)
+
+void onEventNop (mamaQueue queue, void* closure)
{
}

void onBgEvent (mamaQueue queue, void* closure)
{
MamaQueueTestC* fixture = (MamaQueueTestC *)closure;
- fixture->m_numDispatches++;
+
+ void* pIndex = 0;
+ mamaQueue_getClosure (queue, &pIndex);
+ size_t index = (size_t)pIndex;

- if (fixture->m_numDispatches == 1000)
+ if (fixture->m_numEvents == ++fixture->m_numDispatches[index])
{
- for (int x=0; x!=10; x++)
- {
- mamaQueue_stopDispatch (fixture->qArray[x]);
- }
+ wsem_post (&fixture->m_sem);
}
}

@@ -141,7 +148,7 @@ TEST_F (MamaQueueTestC, GetDefaultQueue)
mamaQueue defaultQueue;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));
}

/* Description: Create a mamaQueue then destroy it.
@@ -153,7 +160,7 @@ TEST_F (MamaQueueTestC, CreateNonDefaultQueue)
mamaQueue queue;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_destroy (queue));
@@ -168,11 +175,14 @@ TEST_F (MamaQueueTestC, Enqueue)
mamaQueue defaultQueue = NULL;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaQueue_dispatchEvent (defaultQueue));
+
}

/* Description: Set the enqueue callback associated with the default
queue
@@ -185,17 +195,19 @@ TEST_F (MamaQueueTestC, setEnqueueCallback)
mamaQueue defaultQueue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue,
m_this));
+ mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue,
this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaQueue_dispatchEvent (defaultQueue));
+
}

/* Description: Set the enqueue callback assiciated with the
@@ -207,24 +219,22 @@ TEST_F (MamaQueueTestC, RemoveEnqueueCallback)
{
mamaQueue defaultQueue = NULL;

- m_numEvents = 1;
- m_eventCounter = 0;
+ m_numEvents = 1;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue,
m_this));
+ mamaQueue_setEnqueueCallback (defaultQueue, onEnqueue,
this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_removeEnqueueCallback (defaultQueue));
+ mamaQueue_dispatchEvent (defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
-
+ mamaQueue_removeEnqueueCallback (defaultQueue));
}

/* Description: Enqueue an event on the default queue then dispatch
it.
@@ -236,13 +246,12 @@ TEST_F (MamaQueueTestC, EnqueueDispatch)
mamaQueue defaultQueue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &defaultQueue));
+ mama_getDefaultEventQueue (m_bridge, &defaultQueue));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (defaultQueue, onEvent, m_this));
+ mamaQueue_enqueueEvent (defaultQueue, onEventNop, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatchEvent (defaultQueue));
@@ -259,13 +268,12 @@ TEST_F (MamaQueueTestC, EnqueueDispatchNonDefault)
mamaQueue queue = NULL;

m_numEvents = 1;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatch (queue));
@@ -286,15 +294,14 @@ TEST_F (MamaQueueTestC, EnqueueDispatchMany)
mamaQueue queue = NULL;

m_numEvents = 10;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));
}
ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_dispatch (queue));
@@ -314,15 +321,14 @@ TEST_F (MamaQueueTestC, TimedDispatch)
mamaQueue queue = NULL;

m_numEvents = 1000;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onTimedEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEventNop, this));
}

ASSERT_EQ (MAMA_STATUS_OK,
@@ -343,17 +349,16 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
{
mamaQueue queue = NULL;
m_numEvents = 20;
- m_eventCounter = 0;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&queue, mBridge));
+ mamaQueue_create (&queue, m_bridge));

mamaQueueMonitorCallbacks queueCallbacks;
queueCallbacks.onQueueHighWatermarkExceeded = onHighWatermark;
queueCallbacks.onQueueLowWatermark = onLowWatermark;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_setQueueMonitorCallbacks (queue,
&queueCallbacks, m_this));
+ mamaQueue_setQueueMonitorCallbacks (queue,
&queueCallbacks, this));

ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_setHighWatermark (queue, 10));
@@ -361,10 +366,10 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
ASSERT_EQ (MAMA_STATUS_OK,
mamaQueue_setLowWatermark (queue, 5));

- for (int x=0; x<=m_numEvents; x++)
+ for (int x=0; x<m_numEvents; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (queue, onEvent, m_this));
+ mamaQueue_enqueueEvent (queue, onEvent, this));
}

ASSERT_EQ (MAMA_STATUS_OK,
@@ -383,37 +388,40 @@ TEST_F (MamaQueueTestC, MonitorWatermarks)
*/
TEST_F (MamaQueueTestC, DispatchManyQueuesWithDispatchers)
{
- m_numQueues = 10;
+ m_numQueues = 10; // FIXME: Storage is hardcoded to 10!
m_numEvents = 100;
- m_eventCounter = 0;
- m_numDispatches = 0;

- for (m_queueCounter = 0; m_queueCounter!=m_numQueues;
m_queueCounter++)
+ for (int x = 0; x!=m_numQueues; x++)
{
+ m_numDispatches[x] = 0;
+
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_create (&qArray[m_queueCounter], mBridge));
+ mamaQueue_create (&m_queues[x], m_bridge));

- for (int x=0; x!=m_numEvents; x++)
+ mamaQueue_setClosure (m_queues[x], (void*)(size_t)x);
+
+ for (int y=0; y!=m_numEvents; y++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaQueue_enqueueEvent (qArray[m_queueCounter],
onBgEvent, m_this));
+ mamaQueue_enqueueEvent (m_queues[x], onBgEvent,
this));
}
}

- for (m_queueCounter = 0; m_queueCounter!=m_numQueues;
m_queueCounter++)
+ for (int x = 0; x!=m_numQueues; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaDispatcher_create (&dispatcher[m_queueCounter],
qArray[m_queueCounter]));
+ mamaDispatcher_create (&m_dispatcher[x],
m_queues[x]));
}

- for (int x=0; x!=10; x++)
+ for (int x = 0; x!=m_numQueues; x++)
{
- mamaDispatcher_destroy (dispatcher[x]);
+ ASSERT_EQ(0, wsem_wait (&m_sem));
}
- for (m_queueCounter = 0; m_queueCounter!=m_numQueues;
m_queueCounter++)
+
+ for (int x = 0; x!=m_numQueues; x++)
{
- mamaQueue_destroy (qArray[m_queueCounter]);
+ ASSERT_EQ(MAMA_STATUS_OK, mamaDispatcher_destroy
(m_dispatcher[x]));
+ ASSERT_EQ(MAMA_STATUS_OK, mamaQueue_destroy (m_queues[x]));
}
-
}

diff --git a/mama/c_cpp/src/gunittest/c/timertest.cpp
b/mama/c_cpp/src/gunittest/c/timertest.cpp
index f0102df..0980f13 100644
--- a/mama/c_cpp/src/gunittest/c/timertest.cpp
+++ b/mama/c_cpp/src/gunittest/c/timertest.cpp
@@ -26,6 +26,7 @@
#include "mama/types.h"
#include "mama/timer.h"
#include "mama/queue.h"
+#include "mama/transport.h"
#include <cstring>
#include <cstdio>
#include <cstdlib>
@@ -40,18 +41,17 @@ protected:
virtual void SetUp();
virtual void TearDown ();
public:
- MamaTimerTestC *m_this;
- mamaBridge mBridge;
-
- int tCounter;
- int numTimers;
- mamaTimer tarray[100];
- mamaTimer longTimer;
- mamaTimer shortTimer;
- mamaTimer stopTimer;
- mamaTimer timer;
- mamaQueue queue;
- mama_f64_t interval;
+ mamaBridge m_bridge;
+ mamaTransport m_transport;
+ int m_tCounter;
+ int m_numTimers;
+ mamaTimer m_timers[100];
+ mamaTimer m_longTimer;
+ mamaTimer m_shortTimer;
+ mamaTimer m_stopTimer;
+ mamaTimer m_timer;
+ mamaQueue m_queue;
+ mama_f64_t m_interval;
};

MamaTimerTestC::MamaTimerTestC()
@@ -64,53 +64,50 @@ MamaTimerTestC::~MamaTimerTestC()

void MamaTimerTestC::SetUp(void)
{
- interval = 0.01;
- m_this = this;
+ m_tCounter = 0;
+ m_interval = 0.01;

- mama_loadBridge (&mBridge, getMiddleware());
- mama_open ();
- ASSERT_EQ (MAMA_STATUS_OK,
- mama_getDefaultEventQueue (mBridge, &queue));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_loadBridge (&m_bridge,
getMiddleware()));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_getDefaultEventQueue (m_bridge,
&m_queue));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_open());
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_allocate (&m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_create (m_transport, NULL,
m_bridge));
}

void MamaTimerTestC::TearDown(void)
{
- mama_close ();
- m_this = NULL;
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTransport_destroy (m_transport));
+ ASSERT_EQ (MAMA_STATUS_OK, mama_close());
}

static void MAMACALLTYPE onTimerTick (mamaTimer timer, void* closure)
{
- ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_destroy(timer));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy(timer));
}

static void MAMACALLTYPE onTimerDestroy (mamaTimer timer, void* closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;
- fixture->tCounter++;
-
- if (fixture->tCounter == fixture->numTimers)
+ if (++fixture->m_tCounter == fixture->m_numTimers)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mama_stop (fixture->mBridge));
+ mama_stop (fixture->m_bridge));
}
}

static void MAMACALLTYPE onRecursiveTimerDestroy (mamaTimer timer, void*
closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;
- fixture->tCounter++;
-
- if (fixture->tCounter == fixture->numTimers)
+ if (++fixture->m_tCounter == fixture->m_numTimers)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mama_stop(fixture->mBridge));
+ mama_stop(fixture->m_bridge));
}
else
{
- mamaTimer_create2 (&timer, fixture->queue, onTimerTick,
- onRecursiveTimerDestroy, fixture->interval,
fixture);
+ ASSERT_EQ (MAMA_STATUS_OK,
+ mamaTimer_create2 (&fixture->m_timer,
fixture->m_queue, onTimerTick,
+ onRecursiveTimerDestroy,
fixture->m_interval, fixture));
}

}
@@ -123,24 +120,21 @@ static void MAMACALLTYPE onLongTimerTick (mamaTimer
timer, void* closure)
{
}

-static void MAMACALLTYPE onTwoTimerDestroy (mamaTimer timer, void*
closure)
-{
-}
-
static void MAMACALLTYPE onStopTimerTick (mamaTimer timer, void* closure)
{
- mamaTimer_destroy(timer);
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy(timer));
}

static void MAMACALLTYPE onStopTimerDestroy (mamaTimer timer, void*
closure)
{
MamaTimerTestC* fixture = (MamaTimerTestC *)closure;

- mamaTimer_destroy (fixture->shortTimer);
- mamaTimer_destroy (fixture->longTimer);
-
- mama_stop (fixture->mBridge);
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy
(fixture->m_shortTimer));
+ ASSERT_EQ (MAMA_STATUS_OK, mamaTimer_destroy (fixture->m_longTimer));
+
+ ASSERT_EQ (MAMA_STATUS_OK, mama_stop (fixture->m_bridge));
}
+
/*
*************************************************************************
*/
/* Test Functions */
/*
*************************************************************************
*/
@@ -151,19 +145,13 @@ static void MAMACALLTYPE onStopTimerDestroy
(mamaTimer timer, void* closure)
*/
TEST_F (MamaTimerTestC, CreateDestroy)
{
-
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 1;
+ m_numTimers = 1;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&timer, fixture->queue, onTimerTick,
- onTimerDestroy, fixture->interval,
m_this));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
+ mamaTimer_create2 (&m_timer, m_queue, onTimerTick,
+ onTimerDestroy, m_interval, this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}

/* Description: Create many mamaTimers which destroy themselves when
fired.
@@ -173,21 +161,16 @@ TEST_F (MamaTimerTestC, CreateDestroy)
*/
TEST_F (MamaTimerTestC, CreateDestroyMany)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 100;
+ m_numTimers = 100; // FIXME: Storage hardcoded to 100!

- for (int x=0; x!=fixture->numTimers; x++)
+ for (int x=0; x!=m_numTimers; x++)
{
ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&tarray[x], fixture->queue,
onTimerTick,
- onTimerDestroy,fixture->interval,
m_this));
+ mamaTimer_create2 (&m_timers[x], m_queue, onTimerTick,
+ onTimerDestroy, m_interval, this));
}

- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}

/* Description: Create a timer whiich creates another timer when fired.
@@ -197,18 +180,13 @@ TEST_F (MamaTimerTestC, CreateDestroyMany)
*/
TEST_F (MamaTimerTestC, RecursiveCreateDestroy)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
- fixture->tCounter = 0;
- fixture->numTimers = 11;
+ m_numTimers = 11;

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2(&timer, fixture->queue, onTimerTick,
- onRecursiveTimerDestroy,
fixture->interval, m_this));
+ mamaTimer_create2(&m_timer, m_queue, onTimerTick,
+ onRecursiveTimerDestroy, m_interval,
this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_start(mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
-
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start(m_bridge));
}

/* Description: Two timers are created which tick indefinately at
different rates,
@@ -218,22 +196,18 @@ TEST_F (MamaTimerTestC, RecursiveCreateDestroy)
*/
TEST_F (MamaTimerTestC, TwoTimer)
{
- MamaTimerTestC* fixture = (MamaTimerTestC *)m_this;
-
ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create(&shortTimer, fixture->queue,
onShortTimerTick,
- fixture->interval, m_this));
+ mamaTimer_create(&m_shortTimer, m_queue, onShortTimerTick,
+ m_interval, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create (&longTimer, fixture->queue,
onLongTimerTick,
- ((fixture->interval)*2), m_this));
+ mamaTimer_create (&m_longTimer, m_queue, onLongTimerTick,
+ m_interval*2, this));

ASSERT_EQ (MAMA_STATUS_OK,
- mamaTimer_create2 (&stopTimer, fixture->queue,
onStopTimerTick,
- onStopTimerDestroy,
((fixture->interval)*100), m_this));
+ mamaTimer_create2 (&m_stopTimer, m_queue, onStopTimerTick,
+ onStopTimerDestroy, m_interval*100,
this));

- ASSERT_EQ (MAMA_STATUS_OK, mama_start (mBridge));
-
- ASSERT_EQ (MAMA_STATUS_OK, mama_close());
+ ASSERT_EQ (MAMA_STATUS_OK, mama_start (m_bridge));
}

________________________________________________________

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of IntercontinentalExchange Group, Inc. (ICE), NYSE Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
________________________________________________________