Date
1 - 1 of 1
[PATCH 03/14] AVIS: Pulled Queue and Timer implementation from qpid over to avis
Frank Quinn <fquinn.ni@...>
In future, we may want to make this code central. In the meantime,
at least making the code identical with qpid should make it easier to maintain. Signed-off-by: Frank Quinn <fquinn.ni@...> --- mama/c_cpp/src/c/bridge/avis/bridge.c | 8 +- mama/c_cpp/src/c/bridge/avis/queue.c | 473 ++++++++++++++++++++++++---------- mama/c_cpp/src/c/bridge/avis/timer.c | 346 +++++++++++++++---------- 3 files changed, 557 insertions(+), 270 deletions(-) diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c index e975474..9728046 100644 --- a/mama/c_cpp/src/c/bridge/avis/bridge.c +++ b/mama/c_cpp/src/c/bridge/avis/bridge.c @@ -30,7 +30,7 @@ #include "avisdefs.h" #include "transportbridge.h" -timerHeap gTimerHeap; +timerHeap gAvisTimerHeap; /*Responsible for creating the bridge impl structure*/ void avisBridge_createImpl (mamaBridge* result) @@ -110,14 +110,14 @@ avisBridge_open (mamaBridge bridgeImpl) mama_log (MAMA_LOG_LEVEL_NORMAL, "avisBridge_open(): Successfully created Avis queue"); - if (0 != createTimerHeap (&gTimerHeap)) + if (0 != createTimerHeap (&gAvisTimerHeap)) { mama_log (MAMA_LOG_LEVEL_NORMAL, "avisBridge_open(): Failed to initialize timers."); return MAMA_STATUS_PLATFORM; } - if (0 != startDispatchTimerHeap (gTimerHeap)) + if (0 != startDispatchTimerHeap (gAvisTimerHeap)) { mama_log (MAMA_LOG_LEVEL_NORMAL, "avisBridge_open(): Failed to start timer thread."); @@ -142,7 +142,7 @@ avisBridge_close (mamaBridge bridgeImpl) impl = (mamaBridgeImpl*)bridgeImpl; - if (0 != destroyHeap (gTimerHeap)) + if (0 != destroyHeap (gAvisTimerHeap)) { mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_close():" "Failed to destroy Avis timer heap."); diff --git a/mama/c_cpp/src/c/bridge/avis/queue.c b/mama/c_cpp/src/c/bridge/avis/queue.c index e32845f..c0aab67 100644 --- a/mama/c_cpp/src/c/bridge/avis/queue.c +++ b/mama/c_cpp/src/c/bridge/avis/queue.c @@ -1,7 +1,7 @@ /* $Id$ * * OpenMAMA: The open middleware agnostic messaging API - * Copyright (C) 2011 NYSE Technologies, Inc. + * Copyright (C) 2011 NYSE Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -19,58 +19,131 @@ * 02110-1301 USA */ + +/*========================================================================= + = Includes = + =========================================================================*/ + #include <mama/mama.h> +#include <wombat/queue.h> #include <bridge.h> #include "queueimpl.h" #include "avisbridgefunctions.h" -#include <wombat/queue.h> -#include <avis/elvin.h> + +/*========================================================================= + = Typedefs, structs, enums and globals = + =========================================================================*/ typedef struct avisQueueBridge { mamaQueue mParent; wombatQueue mQueue; - mamaQueueEnqueueCB mEnqueueCb; + uint8_t mHighWaterFired; + size_t mHighWatermark; + size_t mLowWatermark; + uint8_t mIsDispatching; + mamaQueueEnqueueCB mEnqueueCallback; void* mEnqueueClosure; - uint8_t mIsNative; + wthread_mutex_t mDispatchLock; } avisQueueBridge; -typedef struct avisQueueClosure { - avisQueueBridge* mImpl; - mamaQueueEventCB mCb; - void* mUserClosure; -} avisQueueClosure; -#define avisQueue(queue) ((avisQueueBridge*) queue) -#define CHECK_QUEUE(queue) \ - do { \ - if (avisQueue(queue) == 0) return MAMA_STATUS_NULL_ARG; \ - if (avisQueue(queue)->mQueue == NULL) return MAMA_STATUS_NULL_ARG; \ - } while(0) +/*========================================================================= + = Macros = + =========================================================================*/ + +#define CHECK_QUEUE(IMPL) \ + do { \ + if (IMPL == NULL) return MAMA_STATUS_NULL_ARG; \ + if (IMPL->mQueue == NULL) return MAMA_STATUS_NULL_ARG; \ + } while(0) + +/* Timeout is in milliseconds */ +#define AVIS_QUEUE_DISPATCH_TIMEOUT 500 +#define AVIS_QUEUE_MAX_SIZE WOMBAT_QUEUE_MAX_SIZE +#define AVIS_QUEUE_CHUNK_SIZE WOMBAT_QUEUE_CHUNK_SIZE +#define AVIS_QUEUE_INITIAL_SIZE WOMBAT_QUEUE_CHUNK_SIZE + +/*========================================================================= + = Private implementation prototypes = + =========================================================================*/ + +/** + * This funcion is called to check the current queue size against configured + * watermarks to determine whether or not it should call the watermark callback + * functions. If it determines that it should, it invokes the relevant callback + * itself. + * + * @param impl The avis queue bridge implementation to check. + */ +static void +avisBridgeMamaQueueImpl_checkWatermarks (avisQueueBridge* impl); + + +/*========================================================================= + = Public interface implementation functions = + =========================================================================*/ mama_status avisBridgeMamaQueue_create (queueBridge* queue, mamaQueue parent) { - avisQueueBridge* avisQueue = NULL; + /* Null initialize the queue to be created */ + avisQueueBridge* impl = NULL; + wombatQueueStatus underlyingStatus = WOMBAT_QUEUE_OK; - if (queue == NULL) + if (queue == NULL || parent == NULL) + { return MAMA_STATUS_NULL_ARG; + } + + /* Null initialize the queueBridge */ *queue = NULL; - avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge)); - if (avisQueue == NULL) + /* Allocate memory for the avis queue implementation */ + impl = (avisQueueBridge*) calloc (1, sizeof (avisQueueBridge)); + if (NULL == impl) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "avisBridgeMamaQueue_create (): " + "Failed to allocate memory for queue."); return MAMA_STATUS_NOMEM; + } - avisQueue->mParent = parent; - avisQueue->mEnqueueCb = NULL; - avisQueue->mEnqueueClosure = NULL; + /* Initialize the dispatch lock */ + wthread_mutex_init (&impl->mDispatchLock, NULL); - wombatQueue_allocate (&avisQueue->mQueue); - wombatQueue_create (avisQueue->mQueue, 0, 0, 0); + /* Back-reference the parent for future use in the implementation struct */ + impl->mParent = parent; - *queue = (queueBridge) avisQueue; + /* Allocate and create the wombat queue */ + underlyingStatus = wombatQueue_allocate (&impl->mQueue); + if (WOMBAT_QUEUE_OK != underlyingStatus) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "avisBridgeMamaQueue_create (): " + "Failed to allocate memory for underlying queue."); + free (impl); + return MAMA_STATUS_NOMEM; + } + + underlyingStatus = wombatQueue_create (impl->mQueue, + AVIS_QUEUE_MAX_SIZE, + AVIS_QUEUE_INITIAL_SIZE, + AVIS_QUEUE_CHUNK_SIZE); + if (WOMBAT_QUEUE_OK != underlyingStatus) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "avisBridgeMamaQueue_create (): " + "Failed to create underlying queue."); + wombatQueue_deallocate (impl->mQueue); + free (impl); + return MAMA_STATUS_PLATFORM; + } + + /* Populate the queueBridge pointer with the implementation for return */ + *queue = (queueBridge) impl; return MAMA_STATUS_OK; } @@ -80,23 +153,33 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue, mamaQueue parent, void* nativeQueue) { - avisQueueBridge* avisQueue = NULL; - - if (queue == NULL) + avisQueueBridge* impl = NULL; + if (NULL == queue || NULL == parent || NULL == nativeQueue) + { return MAMA_STATUS_NULL_ARG; + } + + /* Null initialize the queueBridge to be returned */ *queue = NULL; - avisQueue = (avisQueueBridge*)calloc (1, sizeof (avisQueueBridge)); - if (avisQueue == NULL) + /* Allocate memory for the avis bridge implementation */ + impl = (avisQueueBridge*) calloc (1, sizeof (avisQueueBridge)); + if (NULL == impl) + { + mama_log (MAMA_LOG_LEVEL_ERROR, + "avisBridgeMamaQueue_create_usingNative (): " + "Failed to allocate memory for queue."); return MAMA_STATUS_NOMEM; + } + + /* Back-reference the parent for future use in the implementation struct */ + impl->mParent = parent; - avisQueue->mParent = parent; - avisQueue->mEnqueueCb = NULL; - avisQueue->mEnqueueClosure = NULL; - avisQueue->mQueue = (wombatQueue)nativeQueue; - avisQueue->mIsNative = 1; + /* Wombat queue has already been created, so simply reference it here */ + impl->mQueue = (wombatQueue) nativeQueue; - *queue = (queueBridge) avisQueue; + /* Populate the queueBridge pointer with the implementation for return */ + *queue = (queueBridge) impl; return MAMA_STATUS_OK; } @@ -104,35 +187,98 @@ avisBridgeMamaQueue_create_usingNative (queueBridge* queue, mama_status avisBridgeMamaQueue_destroy (queueBridge queue) { - CHECK_QUEUE(queue); - if (!avisQueue(queue)->mIsNative) - wombatQueue_destroy (avisQueue(queue)->mQueue); - free(avisQueue(queue)); + wombatQueueStatus status = WOMBAT_QUEUE_OK; + avisQueueBridge* impl = (avisQueueBridge*) queue; + + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); + + /* Destroy the underlying wombatQueue - can be called from any thread*/ + wthread_mutex_lock (&impl->mDispatchLock); + status = wombatQueue_destroy (impl->mQueue); + wthread_mutex_unlock (&impl->mDispatchLock); + + /* Free the avisQueueImpl container struct */ + free (impl); + + if (WOMBAT_QUEUE_OK != status) + { + mama_log (MAMA_LOG_LEVEL_WARN, + "avisBridgeMamaQueue_destroy (): " + "Failed to destroy wombat queue (%d).", + status); + return MAMA_STATUS_PLATFORM; + } + + return MAMA_STATUS_OK; +} + +mama_status +avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count) +{ + avisQueueBridge* impl = (avisQueueBridge*) queue; + int countInt = 0; + + if (NULL == count) + return MAMA_STATUS_NULL_ARG; + + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); + + /* Initialize count to zero */ + *count = 0; + + /* Get the wombatQueue size */ + wombatQueue_getSize (impl->mQueue, &countInt); + *count = (size_t)countInt; + return MAMA_STATUS_OK; } mama_status avisBridgeMamaQueue_dispatch (queueBridge queue) { - wombatQueueStatus status; + wombatQueueStatus status; + avisQueueBridge* impl = (avisQueueBridge*) queue; + + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); + + /* Lock for dispatching */ + wthread_mutex_lock (&impl->mDispatchLock); - CHECK_QUEUE(queue); + impl->mIsDispatching = 1; + /* + * Continually dispatch as long as the calling application wants dispatching + * to be done and no errors are encountered + */ do { - /* 500 is .5 seconds */ - status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue, - NULL, NULL, 500); + /* Check the watermarks to see if thresholds have been breached */ + avisBridgeMamaQueueImpl_checkWatermarks (impl); + + /* + * Perform a dispatch with a timeout to allow the dispatching process + * to be interrupted by the calling application between iterations + */ + status = wombatQueue_timedDispatch (impl->mQueue, + NULL, + NULL, + AVIS_QUEUE_DISPATCH_TIMEOUT); } - while ((status == WOMBAT_QUEUE_OK || - status == WOMBAT_QUEUE_TIMEOUT) && - mamaQueueImpl_isDispatching (avisQueue(queue)->mParent)); + while ( (WOMBAT_QUEUE_OK == status || WOMBAT_QUEUE_TIMEOUT == status) + && impl->mIsDispatching); - if (status != WOMBAT_QUEUE_OK && status != WOMBAT_QUEUE_TIMEOUT) + /* Unlock the dispatch lock */ + wthread_mutex_unlock (&impl->mDispatchLock); + + /* Timeout is encountered after each dispatch and so is expected here */ + if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status) { mama_log (MAMA_LOG_LEVEL_ERROR, - "Failed to dispatch Avis Middleware queue. %d", - "mamaQueue_dispatch ():", + "avisBridgeMamaQueue_dispatch (): " + "Failed to dispatch Avis Middleware queue (%d). ", status); return MAMA_STATUS_PLATFORM; } @@ -143,41 +289,56 @@ avisBridgeMamaQueue_dispatch (queueBridge queue) mama_status avisBridgeMamaQueue_timedDispatch (queueBridge queue, uint64_t timeout) { - wombatQueueStatus status; - - CHECK_QUEUE(queue); + wombatQueueStatus status; + avisQueueBridge* impl = (avisQueueBridge*) queue; + + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); - status = wombatQueue_timedDispatch (avisQueue(queue)->mQueue, - NULL, NULL, timeout); - if (status == WOMBAT_QUEUE_TIMEOUT) return MAMA_STATUS_TIMEOUT; + /* Check the watermarks to see if thresholds have been breached */ + avisBridgeMamaQueueImpl_checkWatermarks (impl); - if (status != WOMBAT_QUEUE_OK) + /* Attempt to dispatch the queue with a timeout once */ + status = wombatQueue_timedDispatch (impl->mQueue, + NULL, + NULL, + timeout); + + /* If dispatch failed, report here */ + if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status) { mama_log (MAMA_LOG_LEVEL_ERROR, - "Failed to dispatch Avis Middleware queue. %d", - "mamaQueue_dispatch ():", + "avisBridgeMamaQueue_timedDispatch (): " + "Failed to dispatch Avis Middleware queue (%d).", status); return MAMA_STATUS_PLATFORM; } return MAMA_STATUS_OK; + } mama_status avisBridgeMamaQueue_dispatchEvent (queueBridge queue) { - wombatQueueStatus status; + wombatQueueStatus status; + avisQueueBridge* impl = (avisQueueBridge*) queue; - CHECK_QUEUE(queue); + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); - status = wombatQueue_dispatch (avisQueue(queue)->mQueue, - NULL, NULL); + /* Check the watermarks to see if thresholds have been breached */ + avisBridgeMamaQueueImpl_checkWatermarks (impl); - if (status != WOMBAT_QUEUE_OK) + /* Attempt to dispatch the queue with a timeout once */ + status = wombatQueue_dispatch (impl->mQueue, NULL, NULL); + + /* If dispatch failed, report here */ + if (WOMBAT_QUEUE_OK != status && WOMBAT_QUEUE_TIMEOUT != status) { mama_log (MAMA_LOG_LEVEL_ERROR, - "Failed to dispatch Avis Middleware queue. %d", - "mamaQueue_dispatch ():", + "avisBridgeMamaQueue_dispatchEvent (): " + "Failed to dispatch Avis Middleware queue (%d).", status); return MAMA_STATUS_PLATFORM; } @@ -185,42 +346,40 @@ avisBridgeMamaQueue_dispatchEvent (queueBridge queue) return MAMA_STATUS_OK; } -static void MAMACALLTYPE queueCb (void *ignored, void* closure) -{ - avisQueueClosure* cl = (avisQueueClosure*)closure; - if (NULL ==cl) return; - cl->mCb (cl->mImpl->mParent, cl->mUserClosure); - free (cl); -} - mama_status avisBridgeMamaQueue_enqueueEvent (queueBridge queue, mamaQueueEventCB callback, void* closure) { - wombatQueueStatus status; - avisQueueClosure* cl = NULL; + wombatQueueStatus status; + avisQueueBridge* impl = (avisQueueBridge*) queue; - if (!callback) return MAMA_STATUS_NULL_ARG; - CHECK_QUEUE(queue); - - cl = (avisQueueClosure*)calloc(1, sizeof(avisQueueClosure)); - if (NULL == cl) return MAMA_STATUS_NOMEM; + if (NULL == callback) + return MAMA_STATUS_NULL_ARG; - cl->mImpl = avisQueue(queue); - cl->mCb = callback; - cl->mUserClosure = closure; + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); - status = wombatQueue_enqueue (avisQueue(queue)->mQueue, - queueCb, NULL, cl); + /* Call the underlying wombatQueue_enqueue method */ + status = wombatQueue_enqueue (impl->mQueue, + (wombatQueueCb) callback, + impl->mParent, + closure); - if (status != WOMBAT_QUEUE_OK) - return MAMA_STATUS_PLATFORM; + /* Call the enqueue callback if provided */ + if (NULL != impl->mEnqueueCallback) + { + impl->mEnqueueCallback (impl->mParent, impl->mEnqueueClosure); + } - if (avisQueue(queue)->mEnqueueCb) + /* If dispatch failed, report here */ + if (WOMBAT_QUEUE_OK != status) { - avisQueue(queue)->mEnqueueCb (avisQueue(queue)->mParent, - avisQueue(queue)->mEnqueueClosure); + mama_log (MAMA_LOG_LEVEL_ERROR, + "avisBridgeMamaQueue_enqueueEvent (): " + "Failed to enqueueEvent (%d). Callback: %p; Closure: %p", + status, callback, closure); + return MAMA_STATUS_PLATFORM; } return MAMA_STATUS_OK; @@ -229,20 +388,13 @@ avisBridgeMamaQueue_enqueueEvent (queueBridge queue, mama_status avisBridgeMamaQueue_stopDispatch (queueBridge queue) { - wombatQueueStatus status; - CHECK_QUEUE(queue); + avisQueueBridge* impl = (avisQueueBridge*) queue; - if (queue == NULL) - return MAMA_STATUS_NULL_ARG; + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); - status = wombatQueue_unblock (avisQueue(queue)->mQueue); - if (status != WOMBAT_QUEUE_OK) - { - mama_log (MAMA_LOG_LEVEL_ERROR, - " Failed to stop dispatching Avis Middleware queue.", - "wmwMamaQueue_stopDispatch ():"); - return MAMA_STATUS_PLATFORM; - } + /* Tell this implementation to stop dispatching */ + impl->mIsDispatching = 0; return MAMA_STATUS_OK; } @@ -252,11 +404,18 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge queue, mamaQueueEnqueueCB callback, void* closure) { - if (!callback) return MAMA_STATUS_NULL_ARG; - CHECK_QUEUE(queue); + avisQueueBridge* impl = (avisQueueBridge*) queue; - avisQueue(queue)->mEnqueueCb = callback; - avisQueue(queue)->mEnqueueClosure = closure; + if (NULL == callback) + { + return MAMA_STATUS_NULL_ARG; + } + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); + + /* Set the enqueue callback and closure */ + impl->mEnqueueCallback = callback; + impl->mEnqueueClosure = closure; return MAMA_STATUS_OK; } @@ -264,21 +423,35 @@ avisBridgeMamaQueue_setEnqueueCallback (queueBridge queue, mama_status avisBridgeMamaQueue_removeEnqueueCallback (queueBridge queue) { - CHECK_QUEUE(queue); + avisQueueBridge* impl = (avisQueueBridge*) queue; + + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); - avisQueue(queue)->mEnqueueCb = NULL; - avisQueue(queue)->mEnqueueClosure = NULL; + /* Set the enqueue callback to NULL */ + impl->mEnqueueCallback = NULL; + impl->mEnqueueClosure = NULL; return MAMA_STATUS_OK; } mama_status avisBridgeMamaQueue_getNativeHandle (queueBridge queue, - void** result) + void** nativeHandle) { - if (!result) return MAMA_STATUS_NULL_ARG; - CHECK_QUEUE(queue); - *result = avisQueue(queue)->mQueue; + avisQueueBridge* impl = (avisQueueBridge*) queue; + + if (NULL == nativeHandle) + { + return MAMA_STATUS_NULL_ARG; + } + + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); + + /* Return the handle to the native queue */ + *nativeHandle = queue; + return MAMA_STATUS_OK; } @@ -286,26 +459,64 @@ mama_status avisBridgeMamaQueue_setHighWatermark (queueBridge queue, size_t highWatermark) { - if (!highWatermark) return MAMA_STATUS_NULL_ARG; - CHECK_QUEUE(queue); - return MAMA_STATUS_NOT_IMPLEMENTED; + avisQueueBridge* impl = (avisQueueBridge*) queue; + + if (0 == highWatermark) + { + return MAMA_STATUS_NULL_ARG; + } + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); + + /* Set the high water mark */ + impl->mHighWatermark = highWatermark; + + return MAMA_STATUS_OK; } mama_status -avisBridgeMamaQueue_setLowWatermark (queueBridge queue, - size_t lowWatermark) +avisBridgeMamaQueue_setLowWatermark (queueBridge queue, + size_t lowWatermark) { - if (!lowWatermark) return MAMA_STATUS_NULL_ARG; - CHECK_QUEUE(queue); - return MAMA_STATUS_NOT_IMPLEMENTED; + avisQueueBridge* impl = (avisQueueBridge*) queue; + + if (0 == lowWatermark) + { + return MAMA_STATUS_NULL_ARG; + } + /* Perform null checks and return if null arguments provided */ + CHECK_QUEUE(impl); + + /* Set the low water mark */ + impl->mLowWatermark = lowWatermark; + + return MAMA_STATUS_OK; } -mama_status -avisBridgeMamaQueue_getEventCount (queueBridge queue, size_t* count) + +/*========================================================================= + = Private implementation functions = + =========================================================================*/ + +void +avisBridgeMamaQueueImpl_checkWatermarks (avisQueueBridge* impl) { - if (!count) return MAMA_STATUS_NULL_ARG; - CHECK_QUEUE(queue); - *count = 0; - wombatQueue_getSize (avisQueue(queue)->mQueue, (int*)count); - return MAMA_STATUS_OK; + size_t eventCount = 0; + + /* Get the current size of the wombat impl */ + avisBridgeMamaQueue_getEventCount ((queueBridge) impl, &eventCount); + + /* If the high watermark had been fired but the event count is now down */ + if (0 != impl->mHighWaterFired && eventCount == impl->mLowWatermark) + { + impl->mHighWaterFired = 0; + mamaQueueImpl_lowWatermarkExceeded (impl->mParent, eventCount); + } + /* If the high watermark is not currently fired and now above threshold */ + else if (0 == impl->mHighWaterFired && eventCount >= impl->mHighWatermark) + { + impl->mHighWaterFired = 1; + mamaQueueImpl_highWatermarkExceeded (impl->mParent, eventCount); + } } + diff --git a/mama/c_cpp/src/c/bridge/avis/timer.c b/mama/c_cpp/src/c/bridge/avis/timer.c index 61cb01f..e5ec0e1 100644 --- a/mama/c_cpp/src/c/bridge/avis/timer.c +++ b/mama/c_cpp/src/c/bridge/avis/timer.c @@ -1,7 +1,7 @@ /* $Id$ * * OpenMAMA: The open middleware agnostic messaging API - * Copyright (C) 2011 NYSE Technologies, Inc. + * Copyright (C) 2011 NYSE Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -19,79 +19,88 @@ * 02110-1301 USA */ + +/*========================================================================= + = Includes = + =========================================================================*/ + #include <mama/mama.h> #include <mama/timer.h> #include <timers.h> #include "avisbridgefunctions.h" #include <wombat/queue.h> -extern timerHeap gTimerHeap; + +/*========================================================================= + = Typedefs, structs, enums and globals = + =========================================================================*/ + +extern timerHeap gAvisTimerHeap; typedef struct avisTimerImpl_ { - timerElement mTimerElement; - double mInterval; - mamaTimerCb mAction; - void* mClosure; - mamaTimer mParent; - mamaQueue mQueue; - - /* This callback will be invoked whenever the timer has been completely destroyed. */ + timerElement mTimerElement; + double mInterval; + void* mClosure; + mamaTimer mParent; + void* mQueue; + uint8_t mDestroying; + /* This callback will be invoked whenever the timer has been destroyed. */ mamaTimerCb mOnTimerDestroyed; - + /* This callback will be invoked on each timer firing */ + mamaTimerCb mAction; } avisTimerImpl; -static void MAMACALLTYPE -destroy_callback (mamaQueue queue, void* closure) -{ - avisTimerImpl* impl = (avisTimerImpl*) closure; - - (*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure); - free (impl); -} +/*========================================================================= + = Private implementation prototypes = + =========================================================================*/ +/** + * Due to the fact that timed events may still be on the event queue, the + * timer's destroy function does not destroy the implementation immediately. + * Instead, it sets an implementation specific flag to stop further callbacks + * from being enqueued from this timer, and then enqueues this function as a + * callback on the queue to perform the actual destruction. This function also + * calls the application developer's destroy callback function. + * + * @param queue MAMA queue from which this callback was fired. + * @param closure In this instance, the closure will contain the avis timer + * implementation. + */ static void MAMACALLTYPE -timerQueueCb (mamaQueue queue, void* closure) -{ - avisTimerImpl* impl = (avisTimerImpl*)closure; - - if (impl->mAction) - impl->mAction (impl->mParent, impl->mClosure); +avisBridgeMamaTimerImpl_destroyCallback (mamaQueue queue, void* closure); -} +/** + * When a timer fires, it enqueues this callback for execution. This is where + * the action callback provided in the timer's create function gets fired. + * + * @param queue MAMA queue from which this callback was fired. + * @param closure In this instance, the closure will contain the avis timer + * implementation. + */ +static void MAMACALLTYPE +avisBridgeMamaTimerImpl_queueCallback (mamaQueue queue, void* closure); +/** + * Every time the timer fires, it calls this timer callback which adds + * avisBridgeMamaTimerImpl_queueCallback to the queue as long as the timer's + * mDestroying flag is not currently set. + * + * @param timer The underlying timer element which has just fired (not used). + * @param closure In this instance, the closure will contain the avis timer + * implementation. + */ static void -timerCb (timerElement timer, - void* closure) -{ - avisTimerImpl* impl = (avisTimerImpl*)closure; - struct timeval timeout; - - if (impl == NULL) return; - - /* Mama timers are repeating */ - timeout.tv_sec = (time_t)impl->mInterval; - timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0); +avisBridgeMamaTimerImpl_timerCallback (timerElement timer, void* closure); - if (0 != createTimer (&impl->mTimerElement, - gTimerHeap, - timerCb, - &timeout, - impl)) - { - mama_log (MAMA_LOG_LEVEL_ERROR, - "%s Failed to create Avis timer.", - "mamaTimer_create ():"); - } - mamaQueue_enqueueEvent (impl->mQueue, - timerQueueCb, - (void*)impl); -} +/*========================================================================= + = Public interface implementation functions = + =========================================================================*/ mama_status -avisBridgeMamaTimer_create (timerBridge* result, +avisBridgeMamaTimer_create (timerBridge* result, void* nativeQueueHandle, mamaTimerCb action, mamaTimerCb onTimerDestroyed, @@ -99,77 +108,92 @@ avisBridgeMamaTimer_create (timerBridge* result, mamaTimer parent, void* closure) { - avisTimerImpl* impl = NULL; - struct timeval timeout; - if (result == NULL) return MAMA_STATUS_NULL_ARG; + avisTimerImpl* impl = NULL; + int timerResult = 0; + struct timeval timeout; - mama_log (MAMA_LOG_LEVEL_FINEST, - "%s Entering with interval [%f].", - "avisMamaTimer_create ():", - interval); + if (NULL == result || NULL == nativeQueueHandle + || NULL == action + || NULL == parent ) + { + return MAMA_STATUS_NULL_ARG; + } + /* Null initialize the timer bridge supplied */ *result = NULL; - impl = (avisTimerImpl*)calloc (1, sizeof (avisTimerImpl)); - if (impl == NULL) return MAMA_STATUS_NOMEM; - - impl->mQueue = NULL; - impl->mParent = parent; - impl->mAction = action; - impl->mClosure = closure; - impl->mInterval = interval; - - mamaTimer_getQueue (parent, &impl->mQueue); - - impl->mOnTimerDestroyed = onTimerDestroyed; - - *result = (timerBridge)impl; + /* Allocate the timer implementation and set up */ + impl = (avisTimerImpl*) calloc (1, sizeof (avisTimerImpl)); + if (NULL == impl) + { + return MAMA_STATUS_NOMEM; + } - timeout.tv_sec = (time_t)interval; + *result = (timerBridge) impl; + impl->mQueue = nativeQueueHandle; + impl->mParent = parent; + impl->mAction = action; + impl->mClosure = closure; + impl->mInterval = interval; + impl->mOnTimerDestroyed = onTimerDestroyed; + impl->mDestroying = 0; + + /* Determine when the next timer should fire */ + timeout.tv_sec = (time_t) interval; timeout.tv_usec = ((interval-timeout.tv_sec) * 1000000.0); - if (0 != createTimer (&impl->mTimerElement, - gTimerHeap, - timerCb, - &timeout, - impl)) + /* Create the first single fire timer */ + timerResult = createTimer (&impl->mTimerElement, + gAvisTimerHeap, + avisBridgeMamaTimerImpl_timerCallback, + &timeout, + impl); + if (0 != timerResult) { mama_log (MAMA_LOG_LEVEL_ERROR, - "%s Failed to create Avis timer.", - "mamaTimer_create ():"); - return MAMA_STATUS_TIMER_FAILURE; + "Failed to create Avis underlying timer [%d].", timerResult); + return MAMA_STATUS_PLATFORM; } return MAMA_STATUS_OK; } +/* This call should always come from MAMA queue thread */ mama_status avisBridgeMamaTimer_destroy (timerBridge timer) { - mama_status returnStatus = MAMA_STATUS_OK; - avisTimerImpl* impl = NULL; + avisTimerImpl* impl = NULL; + mama_status returnStatus = MAMA_STATUS_OK; + int timerResult = 0; - if (timer == NULL) + if (NULL == timer) + { return MAMA_STATUS_NULL_ARG; - impl = (avisTimerImpl*)timer; + } - impl->mAction = NULL; - mama_log (MAMA_LOG_LEVEL_FINEST, - "%s Entering for 0x%x", - "avisMamaTimer_destroy ():", impl); + /* Nullify the callback and set destroy flag */ + impl = (avisTimerImpl*) timer; + impl->mDestroying = 1; + impl->mAction = NULL; - if (0 != destroyTimer (gTimerHeap, impl->mTimerElement)) + /* Destroy the timer element */ + timerResult = destroyTimer (gAvisTimerHeap, impl->mTimerElement); + if (0 != timerResult) { mama_log (MAMA_LOG_LEVEL_ERROR, - "%s Failed to destroy Avis timer.", - "avisMamaTimer_destroy ():"); + "Failed to destroy Avis underlying timer [%d].", + timerResult); returnStatus = MAMA_STATUS_PLATFORM; } - mamaQueue_enqueueEvent (impl->mQueue, - destroy_callback, - (void*)impl); + /* + * Put the impl free at the back of the queue to be executed when all + * pending timer events have been completed + */ + avisBridgeMamaQueue_enqueueEvent ((queueBridge) impl->mQueue, + avisBridgeMamaTimerImpl_destroyCallback, + (void*) impl); return returnStatus; } @@ -177,60 +201,52 @@ avisBridgeMamaTimer_destroy (timerBridge timer) mama_status avisBridgeMamaTimer_reset (timerBridge timer) { - mama_status status = MAMA_STATUS_OK; - avisTimerImpl* impl = NULL; - struct timeval timeout; + avisTimerImpl* impl = (avisTimerImpl*) timer; + int timerResult = 0; + struct timeval timeout; - if (timer == NULL) + if (NULL == impl) + { return MAMA_STATUS_NULL_ARG; - impl = (avisTimerImpl*)timer; + } - timeout.tv_sec = (time_t)impl->mInterval; - timeout.tv_usec = ((impl->mInterval-timeout.tv_sec) * 1000000.0); + /* Destroy the existing timer element */ + destroyTimer (gAvisTimerHeap, impl->mTimerElement); - if (timer == NULL) - return MAMA_STATUS_NULL_ARG; - impl = (avisTimerImpl*)timer; + /* Calculate next time interval */ + timeout.tv_sec = (time_t) impl->mInterval; + timeout.tv_usec = ((impl->mInterval- timeout.tv_sec) * 1000000.0); - if (0 != destroyTimer (gTimerHeap, impl->mTimerElement)) + /* Create the timer for the next firing */ + timerResult = createTimer (&impl->mTimerElement, + gAvisTimerHeap, + avisBridgeMamaTimerImpl_timerCallback, + &timeout, + impl); + if (0 != timerResult) { mama_log (MAMA_LOG_LEVEL_ERROR, - "%s Failed to destroy Avis timer.", - "avisMamaTimer_destroy ():"); - avisBridgeMamaTimer_destroy (timer); - status = MAMA_STATUS_PLATFORM; - } - else - { - if (0 != createTimer (&impl->mTimerElement, - gTimerHeap, - timerCb, - &timeout, - impl)) - { - mama_log (MAMA_LOG_LEVEL_ERROR, - "%s Failed to create Avis timer.", - "mamaTimer_create ():"); - status = MAMA_STATUS_PLATFORM; - } + "Failed to reset Avis underlying timer [%d].", timerResult); + return MAMA_STATUS_PLATFORM; } - return status; + return MAMA_STATUS_OK; + } mama_status avisBridgeMamaTimer_setInterval (timerBridge timer, - mama_f64_t interval) + mama_f64_t interval) { - avisTimerImpl* impl = NULL; - - if (timer == NULL) + avisTimerImpl* impl = (avisTimerImpl*) timer; + if (NULL == timer) + { return MAMA_STATUS_NULL_ARG; - impl = (avisTimerImpl*)timer; + } impl->mInterval = interval; - return avisBridgeMamaTimer_reset (timer); + return avisBridgeMamaTimer_reset (timer); } mama_status @@ -238,11 +254,71 @@ avisBridgeMamaTimer_getInterval (timerBridge timer, mama_f64_t* interval) { avisTimerImpl* impl = NULL; - - if (timer == NULL) + if (NULL == timer || NULL == interval) + { return MAMA_STATUS_NULL_ARG; - impl = (avisTimerImpl*)timer; + } + impl = (avisTimerImpl*) timer; *interval = impl->mInterval; + return MAMA_STATUS_OK; } + + +/*========================================================================= + = Private implementation functions = + =========================================================================*/ + +/* This callback is invoked by the avis bridge's destroy event */ +static void MAMACALLTYPE +avisBridgeMamaTimerImpl_destroyCallback (mamaQueue queue, void* closure) +{ + avisTimerImpl* impl = (avisTimerImpl*) closure; + (*impl->mOnTimerDestroyed)(impl->mParent, impl->mClosure); + + /* Free the implementation memory here */ + free (impl); +} + +/* This callback is invoked by the avis bridge's timer event */ +static void MAMACALLTYPE +avisBridgeMamaTimerImpl_queueCallback (mamaQueue queue, void* closure) +{ + avisTimerImpl* impl = (avisTimerImpl*) closure; + if (impl->mAction) + { + impl->mAction (impl->mParent, impl->mClosure); + } +} + +/* This callback is invoked by the common timer's dispatch thread */ +static void +avisBridgeMamaTimerImpl_timerCallback (timerElement timer, + void* closure) +{ + + avisTimerImpl* impl = (avisTimerImpl*) closure; + + if (NULL == impl) + { + return; + } + + /* + * Only enqueue further timer callbacks the timer is not currently getting + * destroyed + */ + if (0 == impl->mDestroying) + { + /* Set the timer for the next firing */ + avisBridgeMamaTimer_reset ((timerBridge) closure); + + /* Enqueue the callback for handling */ + avisBridgeMamaQueue_enqueueEvent ((queueBridge) impl->mQueue, + avisBridgeMamaTimerImpl_queueCallback, + closure); + } +} + + -- 2.4.3 |
|