[PATCH 02/14] AVIS: Pulled IO implementation from qpid over to avis


Frank Quinn <fquinn.ni@...>
 

In future, we may want to make this code central. In the meantime,
at least making the code identical with qpid should make it easier
to maintain.

Signed-off-by: Frank Quinn <fquinn.ni@...>
---
 mama/c_cpp/src/c/bridge/avis/SConscript |   4 +-
 mama/c_cpp/src/c/bridge/avis/bridge.c   |  16 +-
 mama/c_cpp/src/c/bridge/avis/io.c       | 274 +++++++++++++++++++++++++++++---
 mama/c_cpp/src/c/bridge/avis/io.h       |  51 ++++++
 4 files changed, 320 insertions(+), 25 deletions(-)
 create mode 100644 mama/c_cpp/src/c/bridge/avis/io.h

diff --git a/mama/c_cpp/src/c/bridge/avis/SConscript b/mama/c_cpp/src/c/bridge/avis/SConscript
index 526612f..475c924 100644
--- a/mama/c_cpp/src/c/bridge/avis/SConscript
+++ b/mama/c_cpp/src/c/bridge/avis/SConscript
@@ -17,10 +17,10 @@ incPath.append('#mama/c_cpp/src/c')
 env['CCFLAGS'] = [x for x in env['CCFLAGS'] if x != '-pedantic-errors']
 
 if env['host']['os'] == 'Darwin':
-    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon'],
+    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'event', 'wombatcommon'],
                LIBPATH=libPath, CPPPATH=incPath)
 else:
-    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'wombatcommon', 'uuid'],
+    env.Append(LIBS=['avis', 'mamaavismsgimpl', 'mama', 'm', 'event', 'wombatcommon', 'uuid'],
                LIBPATH=libPath, CPPPATH=incPath)
 
 conf = Configure(env, config_h='./config.h', log_file='./config.log')
diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 33dfc17..e975474 100644
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -124,6 +124,9 @@ avisBridge_open (mamaBridge bridgeImpl)
         return MAMA_STATUS_PLATFORM;
     }
 
+    /* Start the io thread */
+    avisBridgeMamaIoImpl_start ();
+
     return MAMA_STATUS_OK;
 }
 
@@ -150,16 +153,19 @@ avisBridge_close (mamaBridge bridgeImpl)
 
     mamaBridgeImpl_getClosure(impl, &avisBridge);
 
-    if (avisBridge)
-    {
-        free (avisBridge);
-    }
-
     mamaQueue_destroyWait(impl->mDefaultEventQueue);
 
     free (impl);
    
     wsocketcleanup();
+
+    /* Stop and destroy the io thread */
+    avisBridgeMamaIoImpl_stop ();
+
+    if (avisBridge)
+    {
+        free (avisBridge);
+    }
     return status;
 }
 
diff --git a/mama/c_cpp/src/c/bridge/avis/io.c b/mama/c_cpp/src/c/bridge/avis/io.c
index 91e7d22..8630c98 100644
--- a/mama/c_cpp/src/c/bridge/avis/io.c
+++ b/mama/c_cpp/src/c/bridge/avis/io.c
@@ -1,7 +1,7 @@
 /* $Id$
  *
  * OpenMAMA: The open middleware agnostic messaging API
- * Copyright (C) 2011 NYSE Technologies, Inc.
+ * Copyright (C) 2011 NYSE Inc.
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -19,36 +19,274 @@
  * 02110-1301 USA
  */
 
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
 #include <mama/mama.h>
 #include <mama/io.h>
-#include <bridge.h>
+#include <wombat/port.h>
 #include "avisbridgefunctions.h"
+#include "io.h"
+#include <event.h>
+
+/*=========================================================================
+  =                Typedefs, structs, enums and globals                   =
+  =========================================================================*/
+
+typedef struct avisIoImpl
+{
+    struct event_base*  mEventBase;
+    wthread_t           mDispatchThread;
+    uint8_t             mActive;
+    uint8_t             mEventsRegistered;
+    wsem_t              mResumeDispatching;
+} avisIoImpl;
+
+typedef struct avisIoEventImpl
+{
+    uint32_t            mDescriptor;
+    mamaIoCb            mAction;
+    mamaIoType          mIoType;
+    mamaIo              mParent;
+    void*               mClosure;
+    struct event        mEvent;
+} avisIoEventImpl;
+
+/*
+ * Global static container to hold instance-wide information not otherwise
+ * available in this interface.
+ */
+static avisIoImpl       gAvisIoContainer;
+
+
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+void*
+avisBridgeMamaIoImpl_dispatchThread (void* closure);
 
+void
+avisBridgeMamaIoImpl_libeventIoCallback (int fd, short type, void* closure);
+
+
+/*=========================================================================
+  =                   Public implementation functions                     =
+  =========================================================================*/
+
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_create(ioBridge*  result,
-                         void*      nativeQueueHandle,
-                         uint32_t   descriptor,
-                         mamaIoCb   action,
-                         mamaIoType ioType,
-                         mamaIo     parent,
-                         void*      closure)
-{
-    if (!result) return MAMA_STATUS_NULL_ARG;
+avisBridgeMamaIo_create         (ioBridge*   result,
+                                 void*       nativeQueueHandle,
+                                 uint32_t    descriptor,
+                                 mamaIoCb    action,
+                                 mamaIoType  ioType,
+                                 mamaIo      parent,
+                                 void*       closure)
+{
+    avisIoEventImpl*    impl    = NULL;
+    short               evtType = 0;
+
+    if (NULL == result)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
     *result = 0;
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+
+    /* Check for supported types so we don't prematurely allocate */
+    switch (ioType)
+    {
+    case MAMA_IO_READ:
+        evtType = EV_READ;
+        break;
+    case MAMA_IO_WRITE:
+        evtType = EV_WRITE;
+        break;
+    case MAMA_IO_ERROR:
+        evtType = EV_READ | EV_WRITE;
+        break;
+    case MAMA_IO_CONNECT:
+    case MAMA_IO_ACCEPT:
+    case MAMA_IO_CLOSE:
+    case MAMA_IO_EXCEPT:
+    default:
+        return MAMA_STATUS_UNSUPPORTED_IO_TYPE;
+        break;
+    }
+
+    impl = (avisIoEventImpl*) calloc (1, sizeof (avisIoEventImpl));
+    if (NULL == impl)
+    {
+        return MAMA_STATUS_NOMEM;
+    }
+
+    impl->mDescriptor           = descriptor;
+    impl->mAction               = action;
+    impl->mIoType               = ioType;
+    impl->mParent               = parent;
+    impl->mClosure              = closure;
+
+    event_set (&impl->mEvent,
+               impl->mDescriptor,
+               evtType,
+               avisBridgeMamaIoImpl_libeventIoCallback,
+               impl);
+
+    event_add (&impl->mEvent, NULL);
+
+    event_base_set (gAvisIoContainer.mEventBase, &impl->mEvent);
+
+    /* If this is the first event since base was emptied or created */
+    if (0 == gAvisIoContainer.mEventsRegistered)
+    {
+        wsem_post (&gAvisIoContainer.mResumeDispatching);
+    }
+    gAvisIoContainer.mEventsRegistered++;
+
+    *result = (ioBridge)impl;
+
+    return MAMA_STATUS_OK;
 }
 
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_destroy (ioBridge io)
+avisBridgeMamaIo_destroy        (ioBridge io)
 {
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisIoEventImpl* impl = (avisIoEventImpl*) io;
+    if (NULL == io)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+    event_del (&impl->mEvent);
+
+    free (impl);
+    gAvisIoContainer.mEventsRegistered--;
+
+    return MAMA_STATUS_OK;
 }
 
+/* Not implemented in the avis bridge */
 mama_status
-avisBridgeMamaIo_getDescriptor (ioBridge io, uint32_t *result)
+avisBridgeMamaIo_getDescriptor  (ioBridge    io,
+                                 uint32_t*   result)
 {
-    if (!result) return MAMA_STATUS_NULL_ARG;
-    *result = 0;
-    return MAMA_STATUS_NOT_IMPLEMENTED;
+    avisIoEventImpl* impl = (avisIoEventImpl*) io;
+    if (NULL == io || NULL == result)
+    {
+        return MAMA_STATUS_NULL_ARG;
+    }
+
+    *result = impl->mDescriptor;
+
+    return MAMA_STATUS_OK;
+}
+
+/*=========================================================================
+  =                  Public implementation prototypes                     =
+  =========================================================================*/
+
+mama_status
+avisBridgeMamaIoImpl_start ()
+{
+    int threadResult                        = 0;
+    gAvisIoContainer.mEventsRegistered      = 0;
+    gAvisIoContainer.mActive                = 1;
+    gAvisIoContainer.mEventBase             = event_init ();
+
+    wsem_init (&gAvisIoContainer.mResumeDispatching, 0, 0);
+    threadResult = wthread_create (&gAvisIoContainer.mDispatchThread,
+                                   NULL,
+                                   avisBridgeMamaIoImpl_dispatchThread,
+                                   gAvisIoContainer.mEventBase);
+    if (0 != threadResult)
+    {
+        mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaIoImpl_initialize(): "
+                  "wthread_create returned %d", threadResult);
+        return MAMA_STATUS_PLATFORM;
+    }
+    return MAMA_STATUS_OK;
+}
+
+mama_status
+avisBridgeMamaIoImpl_stop ()
+{
+    gAvisIoContainer.mActive = 0;
+
+    /* Alert the semaphore so the dispatch loop can exit */
+    wsem_post (&gAvisIoContainer.mResumeDispatching);
+
+    /* Tell the event loop to exit */
+    event_base_loopexit (gAvisIoContainer.mEventBase, NULL);
+
+    /* Join with the dispatch thread - it should exit shortly */
+    wthread_join (gAvisIoContainer.mDispatchThread, NULL);
+    wsem_destroy (&gAvisIoContainer.mResumeDispatching);
+
+    /* Free the main event base */
+    event_base_free (gAvisIoContainer.mEventBase);
+
+    return MAMA_STATUS_OK;
 }
 
+
+
+/*=========================================================================
+  =                  Private implementation prototypes                    =
+  =========================================================================*/
+
+void*
+avisBridgeMamaIoImpl_dispatchThread (void* closure)
+{
+    int             dispatchResult = 0;
+
+    /* Wait on the first event to register before starting dispatching */
+    wsem_wait (&gAvisIoContainer.mResumeDispatching);
+
+    while (0 != gAvisIoContainer.mActive)
+    {
+        dispatchResult = event_base_loop (gAvisIoContainer.mEventBase,
+                                          EVLOOP_NONBLOCK | EVLOOP_ONCE);
+
+        /* If no events are currently registered */
+        if (1 == dispatchResult)
+        {
+            /* Wait until they are */
+            gAvisIoContainer.mEventsRegistered = 0;
+            wsem_wait (&gAvisIoContainer.mResumeDispatching);
+        }
+    }
+    return NULL;
+}
+
+void
+avisBridgeMamaIoImpl_libeventIoCallback (int fd, short type, void* closure)
+{
+    avisIoEventImpl* impl = (avisIoEventImpl*) closure;
+
+    /* Timeout is the only error detectable with libevent */
+    if (EV_TIMEOUT == type)
+    {
+        /* If this is an error IO type, fire the callback */
+        if (impl->mIoType == MAMA_IO_ERROR && NULL != impl->mAction)
+        {
+            (impl->mAction)(impl->mParent, impl->mIoType, impl->mClosure);
+        }
+        /* If this is not an error IO type, do nothing */
+        else
+        {
+            return;
+        }
+    }
+
+    /* Call the action callback if defined */
+    if (NULL != impl->mAction)
+    {
+        (impl->mAction)(impl->mParent, impl->mIoType, impl->mClosure);
+    }
+
+    /* Enqueue for the next time */
+    event_add (&impl->mEvent, NULL);
+}
diff --git a/mama/c_cpp/src/c/bridge/avis/io.h b/mama/c_cpp/src/c/bridge/avis/io.h
new file mode 100644
index 0000000..a14e794
--- /dev/null
+++ b/mama/c_cpp/src/c/bridge/avis/io.h
@@ -0,0 +1,51 @@
+/* $Id$
+ *
+ * OpenMAMA: The open middleware agnostic messaging API
+ * Copyright (C) 2011 NYSE Technologies, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#ifndef MAMA_BRIDGE_AVIS_IO_H__
+#define MAMA_BRIDGE_AVIS_IO_H__
+
+
+/*=========================================================================
+  =                             Includes                                  =
+  =========================================================================*/
+
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+#include <mama/mama.h>
+
+/*=========================================================================
+  =                  Public implementation functions                      =
+  =========================================================================*/
+
+mama_status
+avisBridgeMamaIoImpl_start (void);
+
+mama_status
+avisBridgeMamaIoImpl_stop  (void);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* MAMA_BRIDGE_AVIS_IO_H__ */
--
2.4.3