[PATCH 03/14] AVIS: Pulled Queue and Timer implementation from qpid over to avis


Frank Quinn <fquinn.ni@...>
 

In future, we may want to make this code central. In the meantime,
at least making the code identical with qpid should make it easier
to maintain.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/bridge.c |   8 +-
 mama/c_cpp/src/c/bridge/avis/queue.c  | 473 ++++++++++++++++++++++++----------
 mama/c_cpp/src/c/bridge/avis/timer.c  | 346 +++++++++++++++----------
 3 files changed, 557 insertions(+), 270 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index e975474..9728046 100644
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -30,7 +30,7 @@
 #include "avisdefs.h"
 #include "transportbridge.h"
 
-timerHeap gTimerHeap;
+timerHeap gAvisTimerHeap;
 
 /*Responsible for creating the bridge impl structure*/
 void avisBridge_createImpl (mamaBridge* result)
@@ -110,14 +110,14 @@ avisBridge_open (mamaBridge bridgeImpl)
     mama_log (MAMA_LOG_LEVEL_NORMAL,
               "avisBridge_open(): Successfully created Avis queue");
 
-    if (0 != createTimerHeap (&gTimerHeap))
+    if (0 != createTimerHeap (&gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_NORMAL,
                 "avisBridge_open(): Failed to initialize timers.");
         return MAMA_STATUS_PLATFORM;
     }
 
-    if (0 != startDispatchTimerHeap (gTimerHeap))
+    if (0 != startDispatchTimerHeap (gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_NORMAL,
                 "avisBridge_open(): Failed to start timer thread.");
@@ -142,7 +142,7 @@ avisBridge_close (mamaBridge bridgeImpl)
     impl =  (mamaBridgeImpl*)bridgeImpl;
 
 
-    if (0 != destroyHeap (gTimerHeap))
+    if (0 != destroyHeap (gAvisTimerHeap))
     {
         mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_close():"
                 "Failed to destroy Avis timer heap.");
diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c
index e32845f..c0aab67 100644
--- a/mama/c_cpp/src/c/bridge/avis/queue.c
+++ b/mama/c_cpp/src/c/bridge/avis/queue.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,58 +19,131 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
+#include <wombat/queue.h>
 #include <bridge.h>
 #include "queueimpl.h"
 #include "avisbridgefunctions.h"
-#include <wombat/queue.h>
 
-#include <avis/elvin.h>
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
 
 typedef struct avisQueueBridge {
     mamaQueue          mParent;
     wombatQueue        mQueue;
-    mamaQueueEnqueueCB mEnqueueCb;
+    uint8_t            mHighWaterFired;
+    size_t             mHighWatermark;
+    size_t             mLowWatermark;
+    uint8_t            mIsDispatching;
+    mamaQueueEnqueueCB mEnqueueCallback;
     void*              mEnqueueClosure;
-    uint8_t            mIsNative;
+    wthread_mutex_t    mDispatchLock;
 } avisQueueBridge;
 
-typedef struct avisQueueClosure {
-    avisQueueBridge* mImpl;
-    mamaQueueEventCB mCb;
-    void*            mUserClosure;
-} avisQueueClosure;
 
-#define avisQueue(queue) ((avisQueueBridge*) queue)
-#define CHECK_QUEUE(queue) \
-        do {  \
-           if (avisQueue(queue) == 0) return MAMA_STATUS_NULL_ARG; \
-           if (avisQueue(queue)->mQueue == NULL) return MAMA_STATUS_NULL_ARG; \
-         } while(0)
+/*=========================================================================
+  =                              Macros                                   =
+  =========================================================================*/
+
+#define     CHECK_QUEUE(IMPL)                                                  \
+    do {                                                                       \
+        if (IMPL == NULL)              return MAMA_STATUS_NULL_ARG;            \
+        if (IMPL->mQueue == NULL)      return MAMA_STATUS_NULL_ARG;            \
+    } while(0)
+
+/* Timeout is in milliseconds */
+#define     AVIS_QUEUE_DISPATCH_TIMEOUT     500
+#define     AVIS_QUEUE_MAX_SIZE             WOMBAT_QUEUE_MAX_SIZE
+#define     AVIS_QUEUE_CHUNK_SIZE           WOMBAT_QUEUE_CHUNK_SIZE
+#define     AVIS_QUEUE_INITIAL_SIZE         WOMBAT_QUEUE_CHUNK_SIZE
+
 
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+/**
+ * This funcion is called to check the current queue size against configured
+ * watermarks to determine whether or not it should call the watermark callback
+ * functions. If it determines that it should, it invokes the relevant callback
+ * itself.
+ *
+ * @param impl The avis queue bridge implementation to check.
+ */
+static void
+avisBridgeMamaQueueImpl_checkWatermarks (avisQueueBridge* impl);
+
+
+/*=========================================================================
+  =               Public interface implementation functions               =
+  =========================================================================*/
 
 mama_status
 avisBridgeMamaQueue_create (queueBridge* queue,
                             mamaQueue    parent)
 {
-    avisQueueBridge* avisQueue = NULL;
+    /* Null initialize the queue to be created */
+    avisQueueBridge*    impl                = NULL;
+    wombatQueueStatus   underlyingStatus    = WOMBAT_QUEUE_OK;
 
-    if (queue == NULL)
+    if (queue == NULL || parent == NULL)
+    {
         return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Null initialize the queueBridge */
     *queue = NULL;
 
-    avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
-    if (avisQueue == NULL)
+    /* Allocate memory for the avis queue implementation */
+    impl = (avisQueueBridge*) calloc (1, sizeof (avisQueueBridge));
+    if (NULL == impl)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to allocate memory for queue.");
         return MAMA_STATUS_NOMEM;
+    }
 
-    avisQueue->mParent         = parent;
-    avisQueue->mEnqueueCb      = NULL;
-    avisQueue->mEnqueueClosure = NULL;
+    /* Initialize the dispatch lock */
+    wthread_mutex_init (&impl->mDispatchLock, NULL);
 
-    wombatQueue_allocate (&avisQueue->mQueue);
-    wombatQueue_create (avisQueue->mQueue, 0, 0, 0);
+    /* Back-reference the parent for future use in the implementation struct */
+    impl->mParent = parent;
 
-    *queue = (queueBridge) avisQueue;
+    /* Allocate and create the wombat queue */
+    underlyingStatus = wombatQueue_allocate (&impl->mQueue);
+    if (WOMBAT_QUEUE_OK != underlyingStatus)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to allocate memory for underlying queue.");
+        free (impl);
+        return MAMA_STATUS_NOMEM;
+    }
+
+    underlyingStatus = wombatQueue_create (impl->mQueue,
+                                           AVIS_QUEUE_MAX_SIZE,
+                                           AVIS_QUEUE_INITIAL_SIZE,
+                                           AVIS_QUEUE_CHUNK_SIZE);
+    if (WOMBAT_QUEUE_OK != underlyingStatus)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create (): "
+                  "Failed to create underlying queue.");
+        wombatQueue_deallocate (impl->mQueue);
+        free (impl);
+        return MAMA_STATUS_PLATFORM;
+    }
+
+    /* Populate the queueBridge pointer with the implementation for return */
+    *queue = (queueBridge) impl;
 
     return MAMA_STATUS_OK;
 }
@@ -80,23 +153,33 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue,
                                         mamaQueue    parent,
                                         void*        nativeQueue)
 {
-    avisQueueBridge* avisQueue = NULL;
-   
-    if (queue == NULL)
+    avisQueueBridge* impl = NULL;
+    if (NULL == queue || NULL == parent || NULL == nativeQueue)
+    {
         return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Null initialize the queueBridge to be returned */
     *queue = NULL;
 
-    avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge));
-    if (avisQueue == NULL)
+    /* Allocate memory for the avis bridge implementation */
+    impl = (avisQueueBridge*) calloc (1, sizeof (avisQueueBridge));
+    if (NULL == impl)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_create_usingNative (): "
+                  "Failed to allocate memory for queue.");
         return MAMA_STATUS_NOMEM;
+    }
+
+    /* Back-reference the parent for future use in the implementation struct */
+    impl->mParent = parent;
 
-    avisQueue->mParent         = parent;
-    avisQueue->mEnqueueCb      = NULL;
-    avisQueue->mEnqueueClosure = NULL;
-    avisQueue->mQueue          = (wombatQueue)nativeQueue;
-    avisQueue->mIsNative       = 1;
+    /* Wombat queue has already been created, so simply reference it here */
+    impl->mQueue = (wombatQueue) nativeQueue;
 
-    *queue = (queueBridge) avisQueue;
+    /* Populate the queueBridge pointer with the implementation for return */
+    *queue = (queueBridge) impl;
 
     return MAMA_STATUS_OK;
 }
@@ -104,35 +187,98 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue,
 mama_status
 avisBridgeMamaQueue_destroy (queueBridge queue)
 {
-    CHECK_QUEUE(queue);
-    if (!avisQueue(queue)->mIsNative)
-        wombatQueue_destroy (avisQueue(queue)->mQueue);
-    free(avisQueue(queue));
+    wombatQueueStatus   status  = WOMBAT_QUEUE_OK;
+    avisQueueBridge*    impl    = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Destroy the underlying wombatQueue - can be called from any thread*/
+    wthread_mutex_lock              (&impl->mDispatchLock);
+    status = wombatQueue_destroy    (impl->mQueue);
+    wthread_mutex_unlock            (&impl->mDispatchLock);
+
+    /* Free the avisQueueImpl container struct */
+    free (impl);
+
+    if (WOMBAT_QUEUE_OK != status)
+    {
+        mama_log (MAMA_LOG_LEVEL_WARN,
+                  "avisBridgeMamaQueue_destroy (): "
+                  "Failed to destroy wombat queue (%d).",
+                  status);
+        return MAMA_STATUS_PLATFORM;
+    }
+
+    return MAMA_STATUS_OK;
+}
+
+mama_status
+avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count)
+{
+    avisQueueBridge* impl       = (avisQueueBridge*) queue;
+    int              countInt   = 0;
+
+    if (NULL == count)
+        return MAMA_STATUS_NULL_ARG;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Initialize count to zero */
+    *count = 0;
+
+    /* Get the wombatQueue size */
+    wombatQueue_getSize (impl->mQueue, &countInt);
+    *count = (size_t)countInt;
+
     return MAMA_STATUS_OK;
 }
 
 mama_status
 avisBridgeMamaQueue_dispatch (queueBridge queue)
 {
-    wombatQueueStatus status;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Lock for dispatching */
+    wthread_mutex_lock (&impl->mDispatchLock);
 
-    CHECK_QUEUE(queue);
+    impl->mIsDispatching = 1;
 
+    /*
+     * Continually dispatch as long as the calling application wants dispatching
+     * to be done and no errors are encountered
+     */
     do
     {
-        /* 500 is .5 seconds */
-        status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL, 500);
+        /* Check the watermarks to see if thresholds have been breached */
+        avisBridgeMamaQueueImpl_checkWatermarks (impl);
+
+        /*
+         * Perform a dispatch with a timeout to allow the dispatching process
+         * to be interrupted by the calling application between iterations
+         */
+        status = wombatQueue_timedDispatch (impl->mQueue,
+                                            NULL,
+                                            NULL,
+                                            AVIS_QUEUE_DISPATCH_TIMEOUT);
     }
-    while ((status == WOMBAT_QUEUE_OK ||
-            status == WOMBAT_QUEUE_TIMEOUT) &&
-            mamaQueueImpl_isDispatching (avisQueue(queue)->mParent));
+    while ( (WOMBAT_QUEUE_OK == status || WOMBAT_QUEUE_TIMEOUT == status)
+            && impl->mIsDispatching);
 
-    if (status != WOMBAT_QUEUE_OK && status != WOMBAT_QUEUE_TIMEOUT)
+    /* Unlock the dispatch lock */
+    wthread_mutex_unlock (&impl->mDispatchLock);
+
+    /* Timeout is encountered after each dispatch and so is expected here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_dispatch (): "
+                  "Failed to dispatch Avis Middleware queue (%d). ",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
@@ -143,41 +289,56 @@ avisBridgeMamaQueue_dispatch (queueBridge queue)
 mama_status
 avisBridgeMamaQueue_timedDispatch (queueBridge queue, uint64_t timeout)
 {
-    wombatQueueStatus status;
-   
-    CHECK_QUEUE(queue);
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl        = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL, timeout);
-    if (status == WOMBAT_QUEUE_TIMEOUT) return MAMA_STATUS_TIMEOUT;
+    /* Check the watermarks to see if thresholds have been breached */
+    avisBridgeMamaQueueImpl_checkWatermarks (impl);
 
-    if (status != WOMBAT_QUEUE_OK)
+    /* Attempt to dispatch the queue with a timeout once */
+    status = wombatQueue_timedDispatch (impl->mQueue,
+                                        NULL,
+                                        NULL,
+                                        timeout);
+
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_timedDispatch (): "
+                  "Failed to dispatch Avis Middleware queue (%d).",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
+
 }
 
 mama_status
 avisBridgeMamaQueue_dispatchEvent (queueBridge queue)
 {
-    wombatQueueStatus status;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
 
-    CHECK_QUEUE(queue);
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_dispatch (avisQueue(queue)->mQueue,
-                     NULL, NULL);
+    /* Check the watermarks to see if thresholds have been breached */
+    avisBridgeMamaQueueImpl_checkWatermarks (impl);
 
-    if (status != WOMBAT_QUEUE_OK)
+    /* Attempt to dispatch the queue with a timeout once */
+    status = wombatQueue_dispatch (impl->mQueue, NULL, NULL);
+
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "Failed to dispatch Avis Middleware queue. %d",
-                  "mamaQueue_dispatch ():",
+                  "avisBridgeMamaQueue_dispatchEvent (): "
+                  "Failed to dispatch Avis Middleware queue (%d).",
                   status);
         return MAMA_STATUS_PLATFORM;
     }
@@ -185,42 +346,40 @@ avisBridgeMamaQueue_dispatchEvent (queueBridge queue)
     return MAMA_STATUS_OK;
 }
 
-static void MAMACALLTYPE queueCb (void *ignored, void* closure)
-{
-    avisQueueClosure* cl = (avisQueueClosure*)closure;
-    if (NULL ==cl) return;
-    cl->mCb (cl->mImpl->mParent, cl->mUserClosure);
-    free (cl);
-}
-
 mama_status
 avisBridgeMamaQueue_enqueueEvent (queueBridge        queue,
                                   mamaQueueEventCB   callback,
                                   void*              closure)
 {
-    wombatQueueStatus status;
-    avisQueueClosure* cl = NULL;
+    wombatQueueStatus   status;
+    avisQueueBridge*    impl = (avisQueueBridge*) queue;
 
-    if (!callback) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-
-    cl = (avisQueueClosure*)calloc(1, sizeof(avisQueueClosure));
-    if (NULL == cl) return MAMA_STATUS_NOMEM;
+    if (NULL == callback)
+        return MAMA_STATUS_NULL_ARG;
 
-    cl->mImpl = avisQueue(queue);
-    cl->mCb    = callback;
-    cl->mUserClosure = closure;
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_enqueue (avisQueue(queue)->mQueue,
-                queueCb, NULL, cl);
+    /* Call the underlying wombatQueue_enqueue method */
+    status = wombatQueue_enqueue (impl->mQueue,
+                                  (wombatQueueCb) callback,
+                                  impl->mParent,
+                                  closure);
 
-    if (status != WOMBAT_QUEUE_OK)
-        return MAMA_STATUS_PLATFORM;
+    /* Call the enqueue callback if provided */
+    if (NULL != impl->mEnqueueCallback)
+    {
+        impl->mEnqueueCallback (impl->mParent, impl->mEnqueueClosure);
+    }
 
-    if (avisQueue(queue)->mEnqueueCb)
+    /* If dispatch failed, report here */
+    if (WOMBAT_QUEUE_OK != status)
     {
-        avisQueue(queue)->mEnqueueCb (avisQueue(queue)->mParent,
-                                      avisQueue(queue)->mEnqueueClosure);
+        mama_log (MAMA_LOG_LEVEL_ERROR,
+                  "avisBridgeMamaQueue_enqueueEvent (): "
+                  "Failed to enqueueEvent (%d). Callback: %p; Closure: %p",
+                  status, callback, closure);
+        return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
@@ -229,20 +388,13 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge        queue,
 mama_status
 avisBridgeMamaQueue_stopDispatch (queueBridge queue)
 {
-    wombatQueueStatus status;
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
 
-    if (queue == NULL)
-        return MAMA_STATUS_NULL_ARG;
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    status = wombatQueue_unblock (avisQueue(queue)->mQueue);
-    if (status != WOMBAT_QUEUE_OK)
-    {
-        mama_log (MAMA_LOG_LEVEL_ERROR,
-                  " Failed to stop dispatching Avis Middleware queue.",
-                  "wmwMamaQueue_stopDispatch ():");
-        return MAMA_STATUS_PLATFORM;
-    }
+    /* Tell this implementation to stop dispatching */
+    impl->mIsDispatching = 0;
 
     return MAMA_STATUS_OK;
 }
@@ -252,11 +404,18 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge        queue,
                                         mamaQueueEnqueueCB callback,
                                         void*              closure)
 {
-    if (!callback) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl   = (avisQueueBridge*) queue;
 
-    avisQueue(queue)->mEnqueueCb      = callback;
-    avisQueue(queue)->mEnqueueClosure = closure;
+    if (NULL == callback)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the enqueue callback and closure */
+    impl->mEnqueueCallback  = callback;
+    impl->mEnqueueClosure   = closure;
 
     return MAMA_STATUS_OK;
 }
@@ -264,21 +423,35 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge        queue,
 mama_status
 avisBridgeMamaQueue_removeEnqueueCallback (queueBridge queue)
 {
-    CHECK_QUEUE(queue);
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
 
-    avisQueue(queue)->mEnqueueCb      = NULL;
-    avisQueue(queue)->mEnqueueClosure = NULL;
+    /* Set the enqueue callback to NULL */
+    impl->mEnqueueCallback  = NULL;
+    impl->mEnqueueClosure   = NULL;
 
     return MAMA_STATUS_OK;
 }
 
 mama_status
 avisBridgeMamaQueue_getNativeHandle (queueBridge queue,
-                                     void**      result)
+                                     void**      nativeHandle)
 {
-    if (!result) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    *result = avisQueue(queue)->mQueue;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (NULL == nativeHandle)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Return the handle to the native queue */
+    *nativeHandle = queue;
+
     return MAMA_STATUS_OK;
 }
 
@@ -286,26 +459,64 @@ mama_status
 avisBridgeMamaQueue_setHighWatermark (queueBridge queue,
                                       size_t      highWatermark)
 {
-    if (!highWatermark) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (0 == highWatermark)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the high water mark */
+    impl->mHighWatermark = highWatermark;
+
+    return MAMA_STATUS_OK;
 }
 
 mama_status
-avisBridgeMamaQueue_setLowWatermark (queueBridge queue,
-                                     size_t lowWatermark)
+avisBridgeMamaQueue_setLowWatermark (queueBridge    queue,
+                                     size_t         lowWatermark)
 {
-    if (!lowWatermark) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisQueueBridge* impl = (avisQueueBridge*) queue;
+
+    if (0 == lowWatermark)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    /* Perform null checks and return if null arguments provided */
+    CHECK_QUEUE(impl);
+
+    /* Set the low water mark */
+    impl->mLowWatermark = lowWatermark;
+
+    return MAMA_STATUS_OK;
 }
 
-mama_status
-avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count)
+
+/*=========================================================================
+  =                  Private implementation functions                     =
+  =========================================================================*/
+
+void
+avisBridgeMamaQueueImpl_checkWatermarks (avisQueueBridge* impl)
 {
-    if (!count) return MAMA_STATUS_NULL_ARG;
-    CHECK_QUEUE(queue);
-    *count = 0;
-    wombatQueue_getSize (avisQueue(queue)->mQueue, (int*)count);
-    return MAMA_STATUS_OK;
+    size_t              eventCount      =  0;
+
+    /* Get the current size of the wombat impl */
+    avisBridgeMamaQueue_getEventCount      ((queueBridge) impl, &eventCount);
+
+    /* If the high watermark had been fired but the event count is now down */
+    if (0 != impl->mHighWaterFired && eventCount == impl->mLowWatermark)
+    {
+        impl->mHighWaterFired = 0;
+        mamaQueueImpl_lowWatermarkExceeded (impl->mParent, eventCount);
+    }
+    /* If the high watermark is not currently fired and now above threshold */
+    else if (0 == impl->mHighWaterFired && eventCount >= impl->mHighWatermark)
+    {
+        impl->mHighWaterFired = 1;
+        mamaQueueImpl_highWatermarkExceeded (impl->mParent, eventCount);
+    }
 }
+
diff --git a/mama/c_cpp/src/c/bridge/avis/timer.c b/mama/c_cpp/src/c/bridge/avis/timer.c
index 61cb01f..e5ec0e1 100644
--- a/mama/c_cpp/src/c/bridge/avis/timer.c
+++ b/mama/c_cpp/src/c/bridge/avis/timer.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,79 +19,88 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
 #include <mama/timer.h>
 #include <timers.h>
 #include "avisbridgefunctions.h"
 #include <wombat/queue.h>
 
-extern timerHeap gTimerHeap;
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
+
+extern timerHeap gAvisTimerHeap;
 
 typedef struct avisTimerImpl_
 {
-    timerElement  mTimerElement;
-    double        mInterval;
-    mamaTimerCb   mAction;
-    void*         mClosure;
-    mamaTimer     mParent;
-    mamaQueue     mQueue;
-
-    /* This callback will be invoked whenever the timer has been completely destroyed. */
+    timerElement    mTimerElement;
+    double          mInterval;
+    void*           mClosure;
+    mamaTimer       mParent;
+    void*           mQueue;
+    uint8_t         mDestroying;
+    /* This callback will be invoked whenever the timer has been destroyed. */
     mamaTimerCb     mOnTimerDestroyed;
-
+    /* This callback will be invoked on each timer firing */
+    mamaTimerCb     mAction;
 } avisTimerImpl;
 
-static void MAMACALLTYPE
-destroy_callback (mamaQueue queue, void* closure)
-{
-    avisTimerImpl* impl  = (avisTimerImpl*) closure;
-
-    (*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure);
 
-    free (impl);
-}
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
 
+/**
+ * Due to the fact that timed events may still be on the event queue, the
+ * timer's destroy function does not destroy the implementation immediately.
+ * Instead, it sets an implementation specific flag to stop further callbacks
+ * from being enqueued from this timer, and then enqueues this function as a
+ * callback on the queue to perform the actual destruction. This function also
+ * calls the application developer's destroy callback function.
+ *
+ * @param queue   MAMA queue from which this callback was fired.
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
 static void MAMACALLTYPE
-timerQueueCb (mamaQueue queue, void* closure)
-{
-    avisTimerImpl* impl = (avisTimerImpl*)closure;
-
-    if (impl->mAction)
-        impl->mAction (impl->mParent, impl->mClosure);
+avisBridgeMamaTimerImpl_destroyCallback (mamaQueue queue, void* closure);
 
-}
+/**
+ * When a timer fires, it enqueues this callback for execution. This is where
+ * the action callback provided in the timer's create function gets fired.
+ *
+ * @param queue   MAMA queue from which this callback was fired.
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_queueCallback (mamaQueue queue, void* closure);
 
+/**
+ * Every time the timer fires, it calls this timer callback which adds
+ * avisBridgeMamaTimerImpl_queueCallback to the queue as long as the timer's
+ * mDestroying flag is not currently set.
+ *
+ * @param timer   The underlying timer element which has just fired (not used).
+ * @param closure In this instance, the closure will contain the avis timer
+ *                implementation.
+ */
 static void
-timerCb (timerElement  timer,
-         void*         closure)
-{
-    avisTimerImpl* impl = (avisTimerImpl*)closure;
-    struct timeval timeout;
-
-    if (impl == NULL) return;
-
-    /* Mama timers are repeating */
-    timeout.tv_sec = (time_t)impl->mInterval;
-    timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
+avisBridgeMamaTimerImpl_timerCallback (timerElement timer, void* closure);
 
-    if (0 != createTimer (&impl->mTimerElement,
-                          gTimerHeap,
-                          timerCb,
-                          &timeout,
-                          impl))
-    {
-        mama_log (MAMA_LOG_LEVEL_ERROR,
-              "%s Failed to create Avis timer.",
-              "mamaTimer_create ():");
-    }
 
-    mamaQueue_enqueueEvent (impl->mQueue,
-                            timerQueueCb,
-                            (void*)impl);
-}
+/*=========================================================================
+  =               Public interface implementation functions               =
+  =========================================================================*/
 
 mama_status
-avisBridgeMamaTimer_create (timerBridge* result,
+avisBridgeMamaTimer_create (timerBridge*  result,
                            void*         nativeQueueHandle,
                            mamaTimerCb   action,
                            mamaTimerCb   onTimerDestroyed,
@@ -99,77 +108,92 @@ avisBridgeMamaTimer_create (timerBridge* result,
                            mamaTimer     parent,
                            void*         closure)
 {
-    avisTimerImpl* impl      =   NULL;
-    struct timeval timeout;
 
-    if (result == NULL) return MAMA_STATUS_NULL_ARG;
+    avisTimerImpl*              impl            = NULL;
+    int                         timerResult     = 0;
+    struct timeval              timeout;
 
-    mama_log (MAMA_LOG_LEVEL_FINEST,
-              "%s Entering with interval [%f].",
-              "avisMamaTimer_create ():",
-              interval);
+    if (NULL == result || NULL == nativeQueueHandle
+            || NULL == action
+            || NULL == parent )
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
 
+    /* Null initialize the timer bridge supplied */
     *result = NULL;
 
-    impl = (avisTimerImpl*)calloc (1, sizeof (avisTimerImpl));
-    if (impl == NULL) return MAMA_STATUS_NOMEM;
-
-    impl->mQueue    = NULL;
-    impl->mParent   = parent;
-    impl->mAction   = action;
-    impl->mClosure  = closure;
-    impl->mInterval = interval;
-
-    mamaTimer_getQueue (parent, &impl->mQueue);
-
-    impl->mOnTimerDestroyed = onTimerDestroyed;
-
-    *result = (timerBridge)impl;
+    /* Allocate the timer implementation and set up */
+    impl = (avisTimerImpl*) calloc (1, sizeof (avisTimerImpl));
+    if (NULL == impl)
+    {
+        return MAMA_STATUS_NOMEM;
+    }
 
-    timeout.tv_sec = (time_t)interval;
+    *result                     = (timerBridge) impl;
+    impl->mQueue                = nativeQueueHandle;
+    impl->mParent               = parent;
+    impl->mAction               = action;
+    impl->mClosure              = closure;
+    impl->mInterval             = interval;
+    impl->mOnTimerDestroyed     = onTimerDestroyed;
+    impl->mDestroying           = 0;
+
+    /* Determine when the next timer should fire */
+    timeout.tv_sec  = (time_t) interval;
     timeout.tv_usec = ((interval-timeout.tv_sec) * 1000000.0);
 
-    if (0 != createTimer (&impl->mTimerElement,
-                          gTimerHeap,
-                          timerCb,
-                          &timeout,
-                          impl))
+    /* Create the first single fire timer */
+    timerResult = createTimer (&impl->mTimerElement,
+                               gAvisTimerHeap,
+                               avisBridgeMamaTimerImpl_timerCallback,
+                               &timeout,
+                               impl);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to create Avis timer.",
-                  "mamaTimer_create ():");
-        return MAMA_STATUS_TIMER_FAILURE;
+                  "Failed to create Avis underlying timer [%d].", timerResult);
+        return MAMA_STATUS_PLATFORM;
     }
 
     return MAMA_STATUS_OK;
 }
 
+/* This call should always come from MAMA queue thread */
 mama_status
 avisBridgeMamaTimer_destroy (timerBridge timer)
 {
-    mama_status    returnStatus = MAMA_STATUS_OK;
-    avisTimerImpl* impl = NULL;
+    avisTimerImpl*  impl            = NULL;
+    mama_status     returnStatus    = MAMA_STATUS_OK;
+    int             timerResult     = 0;
 
-    if (timer == NULL)
+    if (NULL == timer)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
-    impl->mAction = NULL;
-    mama_log (MAMA_LOG_LEVEL_FINEST,
-              "%s Entering for 0x%x",
-              "avisMamaTimer_destroy ():", impl);
+    /* Nullify the callback and set destroy flag */
+    impl                            = (avisTimerImpl*) timer;
+    impl->mDestroying               = 1;
+    impl->mAction                   = NULL;
 
-    if (0 != destroyTimer (gTimerHeap, impl->mTimerElement))
+    /* Destroy the timer element */
+    timerResult = destroyTimer (gAvisTimerHeap, impl->mTimerElement);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to destroy Avis timer.",
-                  "avisMamaTimer_destroy ():");
+                  "Failed to destroy Avis underlying timer [%d].",
+                  timerResult);
         returnStatus = MAMA_STATUS_PLATFORM;
     }
 
-    mamaQueue_enqueueEvent (impl->mQueue,
-                            destroy_callback,
-                            (void*)impl);
+    /*
+     * Put the impl free at the back of the queue to be executed when all
+     * pending timer events have been completed
+     */
+    avisBridgeMamaQueue_enqueueEvent ((queueBridge) impl->mQueue,
+                                      avisBridgeMamaTimerImpl_destroyCallback,
+                                      (void*) impl);
 
     return returnStatus;
 }
@@ -177,60 +201,52 @@ avisBridgeMamaTimer_destroy (timerBridge timer)
 mama_status
 avisBridgeMamaTimer_reset (timerBridge timer)
 {
-    mama_status status      = MAMA_STATUS_OK;
-    avisTimerImpl* impl  = NULL;
-    struct timeval timeout;
+    avisTimerImpl*      impl            = (avisTimerImpl*) timer;
+    int                 timerResult     = 0;
+    struct timeval      timeout;
 
-    if (timer == NULL)
+    if (NULL == impl)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
-    timeout.tv_sec = (time_t)impl->mInterval;
-    timeout.tv_usec = ((impl->mInterval-timeout.tv_sec) * 1000000.0);
+    /* Destroy the existing timer element */
+    destroyTimer (gAvisTimerHeap, impl->mTimerElement);
 
-    if (timer == NULL)
-        return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    /* Calculate next time interval */
+    timeout.tv_sec  = (time_t) impl->mInterval;
+    timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0);
 
-    if (0 != destroyTimer (gTimerHeap, impl->mTimerElement))
+    /* Create the timer for the next firing */
+    timerResult = createTimer (&impl->mTimerElement,
+                               gAvisTimerHeap,
+                               avisBridgeMamaTimerImpl_timerCallback,
+                               &timeout,
+                               impl);
+    if (0 != timerResult)
     {
         mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to destroy Avis timer.",
-                  "avisMamaTimer_destroy ():");
-        avisBridgeMamaTimer_destroy (timer);
-        status = MAMA_STATUS_PLATFORM;
-    }
-    else
-    {
-        if (0 != createTimer (&impl->mTimerElement,
-                              gTimerHeap,
-                              timerCb,
-                              &timeout,
-                              impl))
-        {
-            mama_log (MAMA_LOG_LEVEL_ERROR,
-                  "%s Failed to create Avis timer.",
-                  "mamaTimer_create ():");
-            status = MAMA_STATUS_PLATFORM;
-        }
+                  "Failed to reset Avis underlying timer [%d].", timerResult);
+        return MAMA_STATUS_PLATFORM;
     }
 
-    return status;
+    return MAMA_STATUS_OK;
+
 }
 
 mama_status
 avisBridgeMamaTimer_setInterval (timerBridge  timer,
-                                mama_f64_t   interval)
+                                 mama_f64_t   interval)
 {
-    avisTimerImpl* impl  = NULL;
-
-    if (timer == NULL)
+    avisTimerImpl* impl  = (avisTimerImpl*) timer;
+    if (NULL == timer)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
     impl->mInterval = interval;
 
-   return  avisBridgeMamaTimer_reset (timer);
+    return  avisBridgeMamaTimer_reset (timer);
 }
 
 mama_status
@@ -238,11 +254,71 @@ avisBridgeMamaTimer_getInterval (timerBridge    timer,
                                 mama_f64_t*    interval)
 {
     avisTimerImpl* impl  = NULL;
-
-    if (timer == NULL)
+    if (NULL == timer || NULL == interval)
+    {
         return MAMA_STATUS_NULL_ARG;
-    impl = (avisTimerImpl*)timer;
+    }
 
+    impl = (avisTimerImpl*) timer;
     *interval = impl->mInterval;
+
     return MAMA_STATUS_OK;
 }
+
+
+/*=========================================================================
+  =                  Private implementation functions                     =
+  =========================================================================*/
+
+/* This callback is invoked by the avis bridge's destroy event */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_destroyCallback (mamaQueue queue, void* closure)
+{
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+    (*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure);
+
+    /* Free the implementation memory here */
+    free (impl);
+}
+
+/* This callback is invoked by the avis bridge's timer event */
+static void MAMACALLTYPE
+avisBridgeMamaTimerImpl_queueCallback (mamaQueue queue, void* closure)
+{
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+    if (impl->mAction)
+    {
+        impl->mAction (impl->mParent, impl->mClosure);
+    }
+}
+
+/* This callback is invoked by the common timer's dispatch thread */
+static void
+avisBridgeMamaTimerImpl_timerCallback (timerElement  timer,
+                                       void*         closure)
+{
+
+    avisTimerImpl* impl = (avisTimerImpl*) closure;
+
+    if (NULL == impl)
+    {
+        return;
+    }
+
+    /*
+     * Only enqueue further timer callbacks the timer is not currently getting
+     * destroyed
+     */
+    if (0 == impl->mDestroying)
+    {
+        /* Set the timer for the next firing */
+        avisBridgeMamaTimer_reset ((timerBridge) closure);
+
+        /* Enqueue the callback for handling */
+        avisBridgeMamaQueue_enqueueEvent ((queueBridge) impl->mQueue,
+                                          avisBridgeMamaTimerImpl_queueCallback,
+                                          closure);
+    }
+}
+
+
--
2.4.3