[PATCH 20/50] [mama] transport write queue high and low watermarks


Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

Allow applications to set high and low watermarks for internal write queues if
the underlying middleware queues writes. Socket based middlewares that use
scatter/gather IO and a dedicated IO thread often coalesce writes in a separate
queue for each connected client. A larger queue where that trigger a high
watermark often indicates a slow consumer.

Signed-off-by: Mike Schonberg <mschonberg@...>
---
mama/c_cpp/src/c/mama/transport.h | 16 +++++++++++++-
mama/c_cpp/src/c/transport.c | 40 +++++++++++++++++++++++++++++++-----
mama/c_cpp/src/c/transportimpl.h | 6 +++++
3 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/transport.h b/mama/c_cpp/src/c/mama/transport.h
index d66905c..e1a29ac 100644
--- a/mama/c_cpp/src/c/mama/transport.h
+++ b/mama/c_cpp/src/c/mama/transport.h
@@ -76,7 +76,9 @@ typedef enum
MAMA_TRANSPORT_PUBLISHER_DISCONNECT,
MAMA_TRANSPORT_QUALITY,
MAMA_TRANSPORT_NAMING_SERVICE_CONNECT,
- MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT
+ MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT,
+ MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK,
+ MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK
} mamaTransportEvent;

/**
@@ -186,6 +188,18 @@ extern mama_status
mamaTransport_setTransportCallback (mamaTransport transport,
mamaTransportCB callback,
void* closure);
+/**
+ * Set the transport write queue high and low water mark values. The
+ * MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK and
+ * MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK events will be delivered though
+ * the transport callback when the respective number of items are outstanding on
+ * a clients write queue.
+ */
+MAMAExpDLL
+extern mama_status
+mamaTransport_setWriteQueueWatermarks (mamaTransport transport,
+ uint32_t highWater,
+ uint32_t lowWater);

/**
* Set the transport topic callback. It receives advisories when a client
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 7e583d5..f059280 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -89,6 +89,8 @@ typedef struct transportImpl_
mamaSymbolMapFunc mMapFunc;
void* mMapFuncClosure;

+ uint32_t mWriteQueueHighWatermark;
+ uint32_t mWriteQueueLowWatermark;
/* These members are only needed for the market data transport */
wList mListeners;

@@ -160,12 +162,12 @@ typedef struct transportImpl_

int mGroupSizeHint;

- uint8_t mDisableDisconnectCb;
- uint8_t mDisableRefresh;
+ uint8_t mDisableRefresh;
dqStartegyScheme mDQStratScheme;
dqftStrategyScheme mFTStratScheme;

uint8_t mInternal;
+ uint8_t mDisableDisconnectCb;
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -445,15 +447,15 @@ static void setFtStrategy (mamaTransport transport)
}
}

+void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
+{
+ self->mDisableRefresh=disable;
+}
/**
* Check property to disable refresh messages. Undocumented.
*
* Return non-zero to disable refresh messages.
*/
-void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)
-{
- self->mDisableRefresh=disable;
-}

static int mamaTransportInternal_disableRefreshes(const char* transportName)
{
@@ -1130,6 +1132,7 @@ mamaTransport_destroy (mamaTransport transport)
if (self->mRefreshTransport)
{
refreshTransport_destroy (self->mRefreshTransport);
+ self->mRefreshTransport = NULL;
}

if (self->mThrottle)
@@ -1610,6 +1613,8 @@ mamaTransportEvent_toString (mamaTransportEvent event)
case MAMA_TRANSPORT_QUALITY: return "QUALITY";
case MAMA_TRANSPORT_NAMING_SERVICE_CONNECT: return "NAMING_SERVICE_CONNECT";
case MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT: return "NAMING_SERVICE_DISCONNECT";
+ case MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK: return "MAMA_TRANSPORT_WRITE_QUEUE_HIGH_WATER_MARK";
+ case MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK: return "MAMA_TRANSPORT_WRITE_QUEUE_LOW_WATER_MARK";
default: return "UNKNOWN";
}
}
@@ -1625,6 +1630,29 @@ mamaTransport_setTransportCallback (mamaTransport transport,
return MAMA_STATUS_OK;
}

+mama_status
+mamaTransport_setWriteQueueWatermarks (mamaTransport transport,
+ uint32_t high,
+ uint32_t low)
+{
+ if (!self)
+ return MAMA_STATUS_NULL_ARG;
+ if (high < low || low == 0)
+ return MAMA_STATUS_INVALID_ARG;
+
+ self->mWriteQueueHighWatermark = high;
+ self->mWriteQueueLowWatermark = low;
+ return MAMA_STATUS_OK;
+}
+
+void
+mamaTransportImpl_getWriteQueueWatermarks (mamaTransport transport,
+ uint32_t* high,
+ uint32_t* low)
+{
+ *high = self->mWriteQueueHighWatermark;
+ *low = self->mWriteQueueLowWatermark;
+}

mama_status
mamaTransport_setTransportTopicCallback (mamaTransport transport,
diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h
index a253e3f..18c3a28 100644
--- a/mama/c_cpp/src/c/transportimpl.h
+++ b/mama/c_cpp/src/c/transportimpl.h
@@ -139,6 +139,12 @@ mamaTransportImpl_getTransportTopicCallback (mamaTransport transport,

MAMAExpDLL
extern void
+mamaTransportImpl_getWriteQueueWatermarks (mamaTransport transport,
+ uint32_t* high,
+ uint32_t* low);
+
+MAMAExpDLL
+extern void
mamaTransportImpl_resetRefreshForListener (mamaTransport tport, void *handle);

MAMAExpDLL
--
1.7.7.6