Re: [PATCH] [mama] Add pre-recap cache
Ian Bell <IBell@...>
Small addition to patch. Attached new version
diff --git a/mama/c_cpp/src/c/dqstrategy.c b/mama/c_cpp/src/c/dqstrategy.c index 7978c63..011436d 100644 --- a/mama/c_cpp/src/c/dqstrategy.c +++ b/mama/c_cpp/src/c/dqstrategy.c @@ -384,6 +384,10 @@ dqStrategy_checkSeqNum (dqStrategy strategy, case MAMA_MSG_TYPE_RECAP : case MAMA_MSG_TYPE_BOOK_RECAP :
+ if (mamaTransportImpl_preRecapCacheEnabled (tport)) + { + self->mTryToFillGap = 1; + } mamaSubscription_unsetAllPossiblyStale (subscription); resetDqState (strategy, ctx); dqStrategyImpl_resetDqContext (ctx, seqNum, senderId);
From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...]
On Behalf Of Ian Bell
From 257b9d3c406f7e419263ff229611f9009063d66a Mon Sep 17 00:00:00 2001 Message-Id: <257b9d3c406f7e419263ff229611f9009063d66a.1348658266.git.ibell@...> From: Ian Bell <ibell@...> Date: Wed, 26 Sep 2012 12:17:40 +0100 Subject: [PATCH] [mama] Add pre-recap cache
Modified the pre-initial cache to also work for recaps. This is configurable via properties. mama.transport.%s.prerecapcache.enable=true
Signed-off-by: Ian Bell <ibell@...> --- mama/c_cpp/src/c/listenermsgcallback.c | 14 +++++++++----- mama/c_cpp/src/c/transport.c | 27 +++++++++++++++++++++++++++ mama/c_cpp/src/c/transportimpl.h | 4 ++++ 3 files changed, 40 insertions(+), 5 deletions(-)
diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c index ca9db2c..fedcc49 100644 --- a/mama/c_cpp/src/c/listenermsgcallback.c +++ b/mama/c_cpp/src/c/listenermsgcallback.c @@ -189,7 +189,8 @@ static void processPointToPointMessage (msgCallback* callback, if (PRE_INITIAL_SCHEME_ON_INITIAL== mamaTransportImpl_getPreInitialScheme (tport)) { - if (msgType==MAMA_MSG_TYPE_INITIAL || msgType == MAMA_MSG_TYPE_BOOK_INITIAL) + if (msgType==MAMA_MSG_TYPE_INITIAL || msgType == MAMA_MSG_TYPE_BOOK_INITIAL || + (mamaTransportImpl_preRecapCacheEnabled (tport) && (msgType == MAMA_MSG_TYPE_RECAP || msgType == MAMA_MSG_TYPE_BOOK_RECAP ))) { dqContext_applyPreInitialCache (&ctx->mDqContext, self->mSubscription); @@ -255,6 +256,8 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg, mamaStatsCollector* queueStatsCollector = NULL; mamaStatsCollector* tportStatsCollector = NULL; const char* userSymbol = NULL; + dqState state = DQ_STATE_NOT_ESTABLISHED; + mamaSubscription_getTransport (subscription, &transport); if (!ctx) { @@ -275,7 +278,6 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg, if (gGenerateTransportStats) { - mamaSubscription_getTransport (subscription, &transport); tportStatsCollector = mamaTransport_getStatsCollector (transport); } @@ -446,15 +448,17 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg, */ mamaSubscription_getExpectingInitial (subscription, &expectingInitial); + dqStrategy_getDqState (ctx->mDqContext, &state); /*While we are waiting for initial values we also check whether we have an * initial for an individual context. If we are no longer waiting for initials we assume that it is ok to pass on the update - (probably a new symbol for a group)*/ - if (expectingInitial && !ctx->mInitialArrived + if ((expectingInitial && !ctx->mInitialArrived) || + (state == DQ_STATE_WAITING_FOR_RECAP && mamaTransportImpl_preRecapCacheEnabled (transport) && msgType != MAMA_MSG_TYPE_DELETE && msgType != MAMA_MSG_TYPE_EXPIRE - && msgType != MAMA_MSG_TYPE_UNKNOWN) + && msgType != MAMA_MSG_TYPE_UNKNOWN)) { /*Add this message to the cache. If the message after the initial * results in a gap we will attempt to fill the gap from this cache @@ -471,7 +475,7 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg, mama_log (MAMA_LOG_LEVEL_FINE, "%s%s %s%s" " Subscription ignoring message received prior" - " to initial update. Type: %d %s %p", + " to initial or recap. Type: %d %d %p", userSymbolFormatted, ctxSymbolFormatted, msgType, mamaMsg_toString(msg), ctx); } diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c index c54a94c..b9c1a9b 100644 --- a/mama/c_cpp/src/c/transport.c +++ b/mama/c_cpp/src/c/transport.c @@ -42,6 +42,7 @@ #include "mama/statfields.h" #include "statsgeneratorinternal.h" #include "mama/statscollector.h" +#include "wombat/strutils.h" extern int gGenerateTransportStats; extern int gGenerateLbmStats; @@ -169,6 +170,7 @@ typedef struct transportImpl_ uint8_t mInternal; uint8_t mDisableDisconnectCb; preInitialScheme mPreInitialScheme; + mama_bool_t mPreRecapCacheEnabled; void* mClosure; } transportImpl; @@ -449,6 +451,19 @@ static void setFtStrategy (mamaTransport transport) } } +static void enablePreRecapCache (mamaTransport transport) +{ + char propNameBuf[256]; + + if (!self) return; + + snprintf (propNameBuf, 256, "mama.transport.%s.prerecapcache.enable", self->mName); + + self->mPreRecapCacheEnabled = strtobool (mama_getProperty (propNameBuf)); + + mama_log (MAMA_LOG_LEVEL_NORMAL, + "%s: Pre-Recap cache %s", self->mName, self->mPreRecapCacheEnabled ? "enabled" : "disabled"); +} void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable) { self->mDisableRefresh=disable; @@ -846,6 +861,7 @@ mamaTransport_create (mamaTransport transport, setPreInitialStrategy ((mamaTransport)self); setDQStrategy ((mamaTransport)self); setFtStrategy ((mamaTransport)self); + enablePreRecapCache ((mamaTransport)self); if (mamaTransportImpl_disableDisconnectCb (name)) { @@ -1735,6 +1751,17 @@ mamaTransportImpl_getFtStrategyScheme (mamaTransport transport) } return DQ_FT_DO_NOT_WAIT_FOR_RECAP; } + +mama_bool_t +mamaTransportImpl_preRecapCacheEnabled (mamaTransport transport) +{ + if (self) + { + return self->mPreRecapCacheEnabled; + } + return 0; +} + /* Process an advisory message and invokes callbacks * on all listeners. * @param transport The transport. diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h index ac4c3c2..764d0a0 100644 --- a/mama/c_cpp/src/c/transportimpl.h +++ b/mama/c_cpp/src/c/transportimpl.h @@ -298,6 +298,10 @@ extern mama_status mamaTransportImpl_forceClientDisconnect (mamaTransport transport, const char* ipAddress, uint16_t port); + +extern mama_bool_t +mamaTransportImpl_preRecapCacheEnabled (mamaTransport transport); + #if defined(__cplusplus) } #endif -- 1.7.9.5
Please consider the environment before printing this e-mail. This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy. Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden. |
|