[PATCH 09/50] [bridge] Added mamaTransport_forceClientDisconnect()


Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

For TCP socket based middleware, allow transports to disconnect individual
clients. This is usefull for eliminating slow consumers or other applications
that may impact the entier system.

Also fixed compiler warnings in Avis bridge files.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/bridge.h | 11 +++-
mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h | 7 ++-
mama/c_cpp/src/c/bridge/avis/bridge.c | 36 +++++++-----
mama/c_cpp/src/c/bridge/avis/transportbridge.c | 59 ++++++++++++-------
4 files changed, 73 insertions(+), 40 deletions(-)

diff --git a/mama/c_cpp/src/c/bridge.h b/mama/c_cpp/src/c/bridge.h
index 5ab7707..9ead2bd 100644
--- a/mama/c_cpp/src/c/bridge.h
+++ b/mama/c_cpp/src/c/bridge.h
@@ -116,6 +116,8 @@ do \
implIdentifier ## BridgeMamaTransport_destroy; \
bridgeImpl->bridgeMamaTransportCreate = \
implIdentifier ## BridgeMamaTransport_create; \
+ bridgeImpl->bridgeMamaTransportForceClientDisconnect = \
+ implIdentifier ## BridgeMamaTransport_forceClientDisconnect;\
bridgeImpl->bridgeMamaTransportFindConnection = \
implIdentifier ## BridgeMamaTransport_findConnection; \
bridgeImpl->bridgeMamaTransportGetAllConnections = \
@@ -269,7 +271,7 @@ typedef mama_status (*bridge_stop)(mamaQueue defaultEventQueue);
/*Called by mama_getVersion()*/
typedef const char* (*bridge_getVersion)(void);
typedef const char* (*bridge_getName)(void);
-typedef mama_status (*bridge_getDefaultPayloadId)(char**name, char* id);
+typedef mama_status (*bridge_getDefaultPayloadId)(char***name, char** id);

/*===================================================================
= mamaQueue bridge function pointers =
@@ -330,6 +332,11 @@ typedef mama_status (*bridgeMamaTransport_destroy)(transportBridge transport);
typedef mama_status (*bridgeMamaTransport_create)(transportBridge *result,
const char* name,
mamaTransport parent);
+typedef mama_status (*bridgeMamaTransport_forceClientDisconnect)
+ (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port);
/* Find a connection with specified IP Address and Port. If the port is 0, the
* call returns the first connection with the specified IP Address. If a
* connection is not found the method returns MAMA_STATUS_NOT_FOUND and
@@ -735,6 +742,8 @@ typedef struct mamaBridgeImpl
bridgeMamaTransport_isValid bridgeMamaTransportIsValid;
bridgeMamaTransport_destroy bridgeMamaTransportDestroy;
bridgeMamaTransport_create bridgeMamaTransportCreate;
+ bridgeMamaTransport_forceClientDisconnect
+ bridgeMamaTransportForceClientDisconnect;
bridgeMamaTransport_findConnection bridgeMamaTransportFindConnection;
bridgeMamaTransport_getAllConnections
bridgeMamaTransportGetAllConnections;
diff --git a/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h b/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
index 94c4a42..0dd6f8e 100644
--- a/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
+++ b/mama/c_cpp/src/c/bridge/avis/avisbridgefunctions.h
@@ -43,7 +43,7 @@ extern const char*
avisBridge_getVersion (void);

mama_status
-avisBridge_getDefaultPayloadId (char**name, char* id);
+avisBridge_getDefaultPayloadId (char***name, char** id);

extern mama_status
avisBridge_open (mamaBridge bridgeImpl);
@@ -125,6 +125,11 @@ avisBridgeMamaTransport_create (transportBridge* result,
const char* name,
mamaTransport parent);
extern mama_status
+avisBridgeMamaTransport_forceClientDisconnect (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port);
+extern mama_status
avisBridgeMamaTransport_findConnection (transportBridge* transports,
int numTransports,
mamaConnection* result,
diff --git a/mama/c_cpp/src/c/bridge/avis/bridge.c b/mama/c_cpp/src/c/bridge/avis/bridge.c
index 24c369a..4b5343a 100755
--- a/mama/c_cpp/src/c/bridge/avis/bridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/bridge.c
@@ -35,10 +35,12 @@ timerHeap gTimerHeap;
/*Responsible for creating the bridge impl structure*/
void avisBridge_createImpl (mamaBridge* result)
{
+ avisBridgeImpl* avisBridge = NULL;
+ mamaBridgeImpl* impl = NULL;
if (!result) return;
*result = NULL;

- mamaBridgeImpl* impl = (mamaBridgeImpl*)calloc (1, sizeof (mamaBridgeImpl));
+ impl = (mamaBridgeImpl*)calloc (1, sizeof (mamaBridgeImpl));
if (!impl)
{
mama_log (MAMA_LOG_LEVEL_SEVERE, "avisBridge_createImpl(): "
@@ -46,7 +48,7 @@ void avisBridge_createImpl (mamaBridge* result)
return;
}

- avisBridgeImpl* avisBridge = (avisBridgeImpl*) calloc(1, sizeof(avisBridgeImpl));
+ avisBridge = (avisBridgeImpl*) calloc(1, sizeof(avisBridgeImpl));

/*Populate the bridge impl structure with the function pointers*/
INITIALIZE_BRIDGE (impl, avis);
@@ -59,7 +61,7 @@ void avisBridge_createImpl (mamaBridge* result)
const char*
avisBridge_getVersion (void)
{
- return (const char*) VERSION;
+ return (const char*) "Unable to get version number";
}

const char*
@@ -68,14 +70,14 @@ avisBridge_getName (void)
return "avis";
}

-#define DEFAULT_PAYLOAD_NAME "avismsg"
-#define DEFAULT_PAYLOAD_ID MAMA_PAYLOAD_AVIS
+static const char* PAYLOAD_NAMES[] = {"avismsg",NULL};
+static const char PAYLOAD_IDS[] = {MAMA_PAYLOAD_AVIS,NULL};

mama_status
-avisBridge_getDefaultPayloadId (char**name, char* id)
+avisBridge_getDefaultPayloadId (char***name, char** id)
{
- *name=DEFAULT_PAYLOAD_NAME;
- *id=DEFAULT_PAYLOAD_ID;
+ *name = PAYLOAD_NAMES;
+ *id = PAYLOAD_IDS;

return MAMA_STATUS_OK;
}
@@ -87,6 +89,7 @@ avisBridge_open (mamaBridge bridgeImpl)
mama_status status = MAMA_STATUS_OK;
mamaBridgeImpl* impl = (mamaBridgeImpl*)bridgeImpl;

+ wsocketstartup();
mama_log (MAMA_LOG_LEVEL_FINEST, "avisBridge_open(): Entering.");

if (MAMA_STATUS_OK !=
@@ -123,10 +126,11 @@ avisBridge_open (mamaBridge bridgeImpl)
mama_status
avisBridge_close (mamaBridge bridgeImpl)
{
+ mama_status status = MAMA_STATUS_OK;
+ mamaBridgeImpl* impl = NULL;
mama_log (MAMA_LOG_LEVEL_FINEST, "avisBridge_close(): Entering.");

- mama_status status = MAMA_STATUS_OK;
- mamaBridgeImpl* impl = (mamaBridgeImpl*)bridgeImpl;
+ impl = (mamaBridgeImpl*)bridgeImpl;


if (0 != destroyHeap (gTimerHeap))
@@ -139,6 +143,7 @@ avisBridge_close (mamaBridge bridgeImpl)
mamaQueue_destroyWait(impl->mDefaultEventQueue);

free (impl);
+ wsocketcleanup();
return status;
}

@@ -146,12 +151,12 @@ avisBridge_close (mamaBridge bridgeImpl)
mama_status
avisBridge_start(mamaQueue defaultEventQueue)
{
- mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_start(): Start dispatching on default event queue.");
-
mama_status status = MAMA_STATUS_OK;
+ avisBridgeImpl* avisBridge = NULL;
+
+ mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_start(): Start dispatching on default event queue.");

// start Avis event loop(s)
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) mamaQueueImpl_getBridgeImpl(defaultEventQueue), (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_start(): Could not get Elvin object");
return status;
@@ -168,11 +173,10 @@ avisBridge_start(mamaQueue defaultEventQueue)
mama_status
avisBridge_stop(mamaQueue defaultEventQueue)
{
- mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_stop(): Stopping bridge.");
-
mama_status status = MAMA_STATUS_OK;
+ avisBridgeImpl* avisBridge = NULL;

- avisBridgeImpl* avisBridge;
+ mama_log (MAMA_LOG_LEVEL_FINER, "avisBridge_stop(): Stopping bridge.");
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) mamaQueueImpl_getBridgeImpl(defaultEventQueue), (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridge_stop(): Could not get Elvin object");
return status;
diff --git a/mama/c_cpp/src/c/bridge/avis/transportbridge.c b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
index c994879..de10b61 100755
--- a/mama/c_cpp/src/c/bridge/avis/transportbridge.c
+++ b/mama/c_cpp/src/c/bridge/avis/transportbridge.c
@@ -29,6 +29,7 @@
#include <mama/types.h>
#include <transportimpl.h>
#include <timers.h>
+#include <errno.h>
#include "transportbridge.h"
#include "avisbridgefunctions.h"
#include "avisdefs.h"
@@ -59,14 +60,12 @@ void log_avis_error(MamaLogLevel logLevel, Elvin* avis)

void closeListener(Elvin* avis, CloseReason reason, const char* message, void* closure)
{
+ const char* errMsg;
if (avisBridge(closure) == NULL) {
mama_log (MAMA_LOG_LEVEL_FINE, "Avis closeListener: could not get Avis bridge");
return;
}

- // TODO -- serialize access across multiple threads
-
- const char* errMsg;
switch( reason )
{
case REASON_CLIENT_SHUTDOWN: errMsg = "Avis client shutdown"; break;
@@ -82,16 +81,19 @@ void closeListener(Elvin* avis, CloseReason reason, const char* message, void* c
static const char*
getURL( const char *name )
{
+ int len = 0;
+ char* buff = NULL;
+ const char* property = NULL;
if (name == NULL)
return NULL;

mama_log (MAMA_LOG_LEVEL_FINE, "initializing Avis transport: %s", name);
- int len = strlen(name) + strlen( TPORT_PREFIX ) + strlen(TPORT_PARAM) + 4;
- char* buff = (char *)alloca( len );
+ len = strlen(name) + strlen( TPORT_PREFIX ) + strlen(TPORT_PARAM) + 4;
+ buff = (char *)alloca( len );
memset(buff, '\0', len);
snprintf( buff, len, "%s.%s.%s", TPORT_PREFIX, name, TPORT_PARAM );

- const char* property = properties_Get( mamaInternal_getProperties(), buff );
+ property = properties_Get( mamaInternal_getProperties(), buff );
if ( property == NULL )
return DEFAULT_URL;

@@ -123,13 +125,13 @@ void* avisDispatchThread(void* closure)

mama_status avisTransportBridge_start(avisTransportBridge* transportBridge)
{
- CHECK_TRANSPORT(transportBridge);
-
// stop Avis event loop
- pthread_t tid;
+ wthread_t tid;
int rc;
- if (0 != (rc = pthread_create(&tid, NULL, avisDispatchThread, transportBridge))) {
- mama_log (MAMA_LOG_LEVEL_ERROR, "pthread_create returned %d", rc);
+ CHECK_TRANSPORT(transportBridge);
+
+ if (0 != (rc = wthread_create(&tid, NULL, avisDispatchThread, transportBridge))) {
+ mama_log (MAMA_LOG_LEVEL_ERROR, "wthread_create returned %d", rc);
return MAMA_STATUS_SYSTEM_ERROR;
}

@@ -193,22 +195,24 @@ avisBridgeMamaTransport_create (transportBridge* result,
const char* name,
mamaTransport mamaTport )
{
- avisTransportBridge* transport =
- (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
+ mama_status status;
+ avisBridgeImpl* avisBridge = NULL;
+ avisTransportBridge* transport = NULL;
+ mamaBridgeImpl* bridgeImpl = NULL;
+ const char* url = NULL;
+
+ transport = (avisTransportBridge*)calloc( 1, sizeof( avisTransportBridge ) );
if (transport == NULL)
return MAMA_STATUS_NOMEM;

transport->mTransport = (mamaTransport) mamaTport;

- // TODO -- serialize access across multiple threads
- mamaBridgeImpl* bridgeImpl = mamaTransportImpl_getBridgeImpl(mamaTport);
+ bridgeImpl = mamaTransportImpl_getBridgeImpl(mamaTport);
if (!bridgeImpl) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get bridge");
free(transport);
return MAMA_STATUS_PLATFORM;
}
- mama_status status;
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) bridgeImpl, (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get Avis bridge object");
free(transport);
@@ -229,7 +233,7 @@ avisBridgeMamaTransport_create (transportBridge* result,
}

// open the server connection
- const char* url = getURL(name);
+ url = getURL(name);
if (url == NULL) {
mama_log (MAMA_LOG_LEVEL_NORMAL, "No %s property defined for transport : %s", TPORT_PARAM, name);
return MAMA_STATUS_INVALID_ARG;
@@ -253,15 +257,17 @@ avisBridgeMamaTransport_create (transportBridge* result,
mama_status
avisBridgeMamaTransport_destroy (transportBridge transport)
{
- // TODO -- serialize access across multiple threads
- mamaBridgeImpl* bridgeImpl = mamaTransportImpl_getBridgeImpl(avisTransport(transport)->mTransport);
+ mama_status status;
+ avisBridgeImpl* avisBridge = NULL;
+ mamaBridgeImpl* bridgeImpl = NULL;
+
+
+ bridgeImpl = mamaTransportImpl_getBridgeImpl(avisTransport(transport)->mTransport);
if (!bridgeImpl) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get bridge");
free(transport);
return MAMA_STATUS_PLATFORM;
}
- mama_status status;
- avisBridgeImpl* avisBridge;
if (MAMA_STATUS_OK != (status = mamaBridgeImpl_getClosure((mamaBridge) bridgeImpl, (void**) &avisBridge))) {
mama_log (MAMA_LOG_LEVEL_ERROR, "avisBridgeMamaTransport_create(): Could not get Avis bridge object");
free(transport);
@@ -286,6 +292,15 @@ avisBridgeMamaTransport_isValid (transportBridge transport)
}

mama_status
+avisBridgeMamaTransport_forceClientDisconnect (transportBridge* transports,
+ int numTransports,
+ const char* ipAddress,
+ uint16_t port)
+{
+ return MAMA_STATUS_NOT_IMPLEMENTED;
+}
+
+mama_status
avisBridgeMamaTransport_findConnection (transportBridge* transports,
int numTransports,
mamaConnection* result,
--
1.7.7.6

Join Openmama-dev@lists.openmama.org to automatically receive all group messages.