Date
1 - 1 of 1
[PATCH 02/14] AVIS: Pulled IO implementation from qpid over to avis
Frank Quinn <fquinn.ni@...>
In future, we may want to make this code central. In the meantime,
at least making the code identical with qpid should make it easier to maintain. Signed-off-by: Frank Quinn <fquinn.ni@...> --- mama/c_cpp/src/c/bridge/avis/SConscript | 4 +- mama/c_cpp/src/c/bridge/avis/bridge.c | 16 +- mama/c_cpp/src/c/bridge/avis/io.c | 274 +++++++++++++++++++++++++++++--- mama/c_cpp/src/c/bridge/avis/io.h | 51 ++++++ 4 files changed, 320 insertions(+), 25 deletions(-) create mode 100644 mama/c_cpp/src/c/bridge/avis/io.h diff --git a/mama/c_cpp/src/c/bridge/avis/SConscript b/mama/c_cpp/src/c/bridge/avis/SConscript index 526612f..475c924 100644 --- a/mama/c_cpp/src/c/bridge/avis/SConscript +++ b/mama/c_cpp/src/c/bridge/avis/SConscript @@ -17,10 +17,10 @@ incPath.append('#mama/c_cpp/src/c') env['CCFLAGS'] = [x for x in env['CCFLAGS'] if x != '-pedantic-errors'] if env['host']['os'] == 'Darwin': - env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon'], + env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'event', 'wombatcommon'], LIBPATH=libPath, CPPPATH=incPath) else: - env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon', 'uuid'], + env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'event', 'wombatcommon', 'uuid'], LIBPATH=libPath, CPPPATH=incPath) conf = Configure(env, config_h='./config.h', log_file='./config.log') diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c index 33dfc17..e975474 100644 --- a/mama/c_cpp/src/c/bridge/avis/bridge.c +++ b/mama/c_cpp/src/c/bridge/avis/bridge.c @@ -124,6 +124,9 @@ avisBridge_open (mamaBridge bridgeImpl) return MAMA_STATUS_PLATFORM; } + /* Start the io thread */ + avisBridgeMamaIoImpl_start (); + return MAMA_STATUS_OK; } @@ -150,16 +153,19 @@ avisBridge_close (mamaBridge bridgeImpl) mamaBridgeImpl_getClosure(impl, &avisBridge); - if (avisBridge) - { - free (avisBridge); - } - mamaQueue_destroyWait(impl->mDefaultEventQueue); free (impl); wsocketcleanup(); + + /* Stop and destroy the io thread */ + avisBridgeMamaIoImpl_stop (); + + if (avisBridge) + { + free (avisBridge); + } return status; } diff --git a/mama/c_cpp/src/c/bridge/avis/io.c b/mama/c_cpp/src/c/bridge/avis/io.c index 91e7d22..8630c98 100644 --- a/mama/c_cpp/src/c/bridge/avis/io.c +++ b/mama/c_cpp/src/c/bridge/avis/io.c @@ -1,7 +1,7 @@ /* $Id$ * * OpenMAMA: The open middleware agnostic messaging API - * Copyright (C) 2011 NYSE Technologies, Inc. + * Copyright (C) 2011 NYSE Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -19,36 +19,274 @@ * 02110-1301 USA */ + +/*========================================================================= + = Includes = + =========================================================================*/ + #include <mama/mama.h> #include <mama/io.h> -#include <bridge.h> +#include <wombat/port.h> #include "avisbridgefunctions.h" +#include "io.h" +#include <event.h> + +/*========================================================================= + = Typedefs, structs, enums and globals = + =========================================================================*/ + +typedef struct avisIoImpl +{ + struct event_base* mEventBase; + wthread_t mDispatchThread; + uint8_t mActive; + uint8_t mEventsRegistered; + wsem_t mResumeDispatching; +} avisIoImpl; + +typedef struct avisIoEventImpl +{ + uint32_t mDescriptor; + mamaIoCb mAction; + mamaIoType mIoType; + mamaIo mParent; + void* mClosure; + struct event mEvent; +} avisIoEventImpl; + +/* + * Global static container to hold instance-wide information not otherwise + * available in this interface. + */ +static avisIoImpl gAvisIoContainer; + + +/*========================================================================= + = Private implementation prototypes = + =========================================================================*/ + +void* +avisBridgeMamaIoImpl_dispatchThread (void* closure); +void +avisBridgeMamaIoImpl_libeventIoCallback (int fd, short type, void* closure); + + +/*========================================================================= + = Public implementation functions = + =========================================================================*/ + +/* Not implemented in the avis bridge */ mama_status -avisBridgeMamaIo_create(ioBridge* result, - void* nativeQueueHandle, - uint32_t descriptor, - mamaIoCb action, - mamaIoType ioType, - mamaIo parent, - void* closure) -{ - if (!result) return MAMA_STATUS_NULL_ARG; +avisBridgeMamaIo_create (ioBridge* result, + void* nativeQueueHandle, + uint32_t descriptor, + mamaIoCb action, + mamaIoType ioType, + mamaIo parent, + void* closure) +{ + avisIoEventImpl* impl = NULL; + short evtType = 0; + + if (NULL == result) + { + return MAMA_STATUS_NULL_ARG; + } + *result = 0; - return MAMA_STATUS_NOT_IMPLEMENTED; + + /* Check for supported types so we don't prematurely allocate */ + switch (ioType) + { + case MAMA_IO_READ: + evtType = EV_READ; + break; + case MAMA_IO_WRITE: + evtType = EV_WRITE; + break; + case MAMA_IO_ERROR: + evtType = EV_READ | EV_WRITE; + break; + case MAMA_IO_CONNECT: + case MAMA_IO_ACCEPT: + case MAMA_IO_CLOSE: + case MAMA_IO_EXCEPT: + default: + return MAMA_STATUS_UNSUPPORTED_IO_TYPE; + break; + } + + impl = (avisIoEventImpl*) calloc (1, sizeof (avisIoEventImpl)); + if (NULL == impl) + { + return MAMA_STATUS_NOMEM; + } + + impl->mDescriptor = descriptor; + impl->mAction = action; + impl->mIoType = ioType; + impl->mParent = parent; + impl->mClosure = closure; + + event_set (&impl->mEvent, + impl->mDescriptor, + evtType, + avisBridgeMamaIoImpl_libeventIoCallback, + impl); + + event_add (&impl->mEvent, NULL); + + event_base_set (gAvisIoContainer.mEventBase, &impl->mEvent); + + /* If this is the first event since base was emptied or created */ + if (0 == gAvisIoContainer.mEventsRegistered) + { + wsem_post (&gAvisIoContainer.mResumeDispatching); + } + gAvisIoContainer.mEventsRegistered++; + + *result = (ioBridge)impl; + + return MAMA_STATUS_OK; } +/* Not implemented in the avis bridge */ mama_status -avisBridgeMamaIo_destroy (ioBridge io) +avisBridgeMamaIo_destroy (ioBridge io) { - return MAMA_STATUS_NOT_IMPLEMENTED; + avisIoEventImpl* impl = (avisIoEventImpl*) io; + if (NULL == io) + { + return MAMA_STATUS_NULL_ARG; + } + event_del (&impl->mEvent); + + free (impl); + gAvisIoContainer.mEventsRegistered--; + + return MAMA_STATUS_OK; } +/* Not implemented in the avis bridge */ mama_status -avisBridgeMamaIo_getDescriptor (ioBridge io, uint32_t *result) +avisBridgeMamaIo_getDescriptor (ioBridge io, + uint32_t* result) { - if (!result) return MAMA_STATUS_NULL_ARG; - *result = 0; - return MAMA_STATUS_NOT_IMPLEMENTED; + avisIoEventImpl* impl = (avisIoEventImpl*) io; + if (NULL == io || NULL == result) + { + return MAMA_STATUS_NULL_ARG; + } + + *result = impl->mDescriptor; + + return MAMA_STATUS_OK; +} + +/*========================================================================= + = Public implementation prototypes = + =========================================================================*/ + +mama_status +avisBridgeMamaIoImpl_start () +{ + int threadResult = 0; + gAvisIoContainer.mEventsRegistered = 0; + gAvisIoContainer.mActive = 1; + gAvisIoContainer.mEventBase = event_init (); + + wsem_init (&gAvisIoContainer.mResumeDispatching, 0, 0); + threadResult = wthread_create (&gAvisIoContainer.mDispatchThread, + NULL, + avisBridgeMamaIoImpl_dispatchThread, + gAvisIoContainer.mEventBase); + if (0 != threadResult) + { + mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaIoImpl_initialize(): " + "wthread_create returned %d", threadResult); + return MAMA_STATUS_PLATFORM; + } + return MAMA_STATUS_OK; +} + +mama_status +avisBridgeMamaIoImpl_stop () +{ + gAvisIoContainer.mActive = 0; + + /* Alert the semaphore so the dispatch loop can exit */ + wsem_post (&gAvisIoContainer.mResumeDispatching); + + /* Tell the event loop to exit */ + event_base_loopexit (gAvisIoContainer.mEventBase, NULL); + + /* Join with the dispatch thread - it should exit shortly */ + wthread_join (gAvisIoContainer.mDispatchThread, NULL); + wsem_destroy (&gAvisIoContainer.mResumeDispatching); + + /* Free the main event base */ + event_base_free (gAvisIoContainer.mEventBase); + + return MAMA_STATUS_OK; } + + +/*========================================================================= + = Private implementation prototypes = + =========================================================================*/ + +void* +avisBridgeMamaIoImpl_dispatchThread (void* closure) +{ + int dispatchResult = 0; + + /* Wait on the first event to register before starting dispatching */ + wsem_wait (&gAvisIoContainer.mResumeDispatching); + + while (0 != gAvisIoContainer.mActive) + { + dispatchResult = event_base_loop (gAvisIoContainer.mEventBase, + EVLOOP_NONBLOCK | EVLOOP_ONCE); + + /* If no events are currently registered */ + if (1 == dispatchResult) + { + /* Wait until they are */ + gAvisIoContainer.mEventsRegistered = 0; + wsem_wait (&gAvisIoContainer.mResumeDispatching); + } + } + return NULL; +} + +void +avisBridgeMamaIoImpl_libeventIoCallback (int fd, short type, void* closure) +{ + avisIoEventImpl* impl = (avisIoEventImpl*) closure; + + /* Timeout is the only error detectable with libevent */ + if (EV_TIMEOUT == type) + { + /* If this is an error IO type, fire the callback */ + if (impl->mIoType == MAMA_IO_ERROR && NULL != impl->mAction) + { + (impl->mAction)(impl->mParent, impl->mIoType, impl->mClosure); + } + /* If this is not an error IO type, do nothing */ + else + { + return; + } + } + + /* Call the action callback if defined */ + if (NULL != impl->mAction) + { + (impl->mAction)(impl->mParent, impl->mIoType, impl->mClosure); + } + + /* Enqueue for the next time */ + event_add (&impl->mEvent, NULL); +} diff --git a/mama/c_cpp/src/c/bridge/avis/io.h b/mama/c_cpp/src/c/bridge/avis/io.h new file mode 100644 index 0000000..a14e794 --- /dev/null +++ b/mama/c_cpp/src/c/bridge/avis/io.h @@ -0,0 +1,51 @@ +/* $Id$ + * + * OpenMAMA: The open middleware agnostic messaging API + * Copyright (C) 2011 NYSE Technologies, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#ifndef MAMA_BRIDGE_AVIS_IO_H__ +#define MAMA_BRIDGE_AVIS_IO_H__ + + +/*========================================================================= + = Includes = + =========================================================================*/ + + +#if defined(__cplusplus) +extern "C" { +#endif + +#include <mama/mama.h> + +/*========================================================================= + = Public implementation functions = + =========================================================================*/ + +mama_status +avisBridgeMamaIoImpl_start (void); + +mama_status +avisBridgeMamaIoImpl_stop (void); + +#if defined(__cplusplus) +} +#endif + +#endif /* MAMA_BRIDGE_AVIS_IO_H__ */ -- 2.4.3 |
|