[PATCH 17/50] [mama] Configurable Schemes for data quality


Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

The dqStratgyScheme deterines whether or not OpenMAMA ignores duplicate
messages. Valid values are:
DQ_SCHEME_DELIVER_ALL deliver all messages including duplicats
DQ_SCHEME_IGNORE_DUPS do not deliver duplicate messages

The dqftStrategyScheme determines how a subscription behaves when OpenMAMA
detects a fault tolerant take over (the sender id changes for a subscription):
DQ_FT_DO_NOT_WAIT_FOR_RECAP continue to process STALE messages
DQ_FT_WAIT_FOR_RECAP discard stale messages

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/dqstrategy.c | 46 ++++++++++++++--
mama/c_cpp/src/c/dqstrategy.h | 5 ++-
mama/c_cpp/src/c/listenermsgcallback.c | 22 +++++++-
mama/c_cpp/src/c/mama/subscription.h | 12 ++++
mama/c_cpp/src/c/transport.c | 92 ++++++++++++++++++++++++++++++++
5 files changed, 170 insertions(+), 7 deletions(-)

diff --git a/mama/c_cpp/src/c/dqstrategy.c b/mama/c_cpp/src/c/dqstrategy.c
index 8f7ccda..1e495b6 100644
--- a/mama/c_cpp/src/c/dqstrategy.c
+++ b/mama/c_cpp/src/c/dqstrategy.c
@@ -141,7 +141,8 @@ handleFTTakeover (dqStrategy strategy,
int msgType,
mamaDqContext* ctx,
mama_seqnum_t seqNum,
- mama_u64_t senderId)
+ mama_u64_t senderId,
+ int recoverOnRecap)
{
const char* symbol = NULL;
mamaSubscription_getSymbol (self->mSubscription, &symbol);
@@ -151,11 +152,19 @@ handleFTTakeover (dqStrategy strategy,
"Previous SeqNum: %u. New SeqNum: %u. [%s]",
ctx->mSenderId, senderId, ctx->mSeqNum, seqNum, symbol);

+ if (recoverOnRecap)
+ {
+ ctx->mSeqNum = senderId;
+ ctx->mDQState = DQ_STATE_WAITING_FOR_RECAP_AFTER_FT;
+ }
+ else
+ {
resetDqState (strategy, ctx);

/*In all cases we reset the data quality context*/
dqStrategyImpl_resetDqContext (ctx, seqNum, senderId);

+ }
return MAMA_STATUS_OK;
}

@@ -183,6 +192,7 @@ dqStrategy_checkSeqNum (dqStrategy strategy,

mamaMsg_getSeqNum (msg, &seqNum);

+ ctx->mDoNotForward = 0;
if (mamaMsg_getU64 (msg, MamaFieldSenderId.mName, MamaFieldSenderId.mFid,
&senderId) != MAMA_STATUS_OK)
{
@@ -207,7 +217,14 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
mamaStatsCollector_incrementStat (*(mamaInternal_getGlobalStatsCollector()),
MamaStatFtTakeovers.mFid);
}
- return handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId);
+ if (DQ_FT_WAIT_FOR_RECAP==mamaTransportImpl_getFtStrategyScheme(tport))
+ {
+ handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId, 1);
+ }
+ else
+ {
+ return handleFTTakeover (strategy, msg, msgType, ctx, seqNum, senderId, 0);
+ }
}

if (gMamaLogLevel >= MAMA_LOG_LEVEL_FINER)
@@ -243,7 +260,8 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
if (((ctxDqState == DQ_STATE_NOT_ESTABLISHED) ||
(seqNum == 0) ||
(seqNum == (ctxSeqNum + conflateCnt))) &&
- (ctxDqState != DQ_STATE_WAITING_FOR_RECAP))
+ ((ctxDqState != DQ_STATE_WAITING_FOR_RECAP) ||
+ (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)))
{
/* No gap */
if (self->mTryToFillGap)
@@ -268,7 +286,19 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
return MAMA_STATUS_OK;
}

- if (seqNum == ctxSeqNum)
+ /* For late joins or middlewares that support a publish cache, it is possible that you will get old updates
+ in this case take no action */
+ if (DQ_SCHEME_INGORE_DUPS==mamaTransportImpl_getDqStrategyScheme(tport))
+ {
+ if ((seqNum <= ctxSeqNum) && ((ctxDqState != DQ_STATE_WAITING_FOR_RECAP) ||
+ (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)))
+ {
+ ctx->mDoNotForward = 1;
+ return MAMA_STATUS_OK;
+ }
+ }
+
+ if ((seqNum == ctxSeqNum) && (ctxDqState != DQ_STATE_WAITING_FOR_RECAP_AFTER_FT))
{
/* Duplicate data - set DQQuality to DUPLICATE, invoke quality callback */
ctx->mDQState = DQ_STATE_DUPLICATE;
@@ -290,6 +320,13 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
return MAMA_STATUS_OK;
}

+ if (ctxDqState == DQ_STATE_WAITING_FOR_RECAP_AFTER_FT)
+ {
+ ctx->mDoNotForward = 1;
+ return MAMA_STATUS_OK;
+ }
+ else
+ {
/* If we get here, we missed a sequence number. */
if ((PRE_INITIAL_SCHEME_ON_GAP==mamaTransportImpl_getPreInitialScheme(tport))
&&(self->mTryToFillGap))
@@ -323,6 +360,7 @@ dqStrategy_checkSeqNum (dqStrategy strategy,
}

handleStaleData (self, msg, ctx);
+ }
break;
case MAMA_MSG_TYPE_INITIAL :
case MAMA_MSG_TYPE_BOOK_INITIAL :
diff --git a/mama/c_cpp/src/c/dqstrategy.h b/mama/c_cpp/src/c/dqstrategy.h
index a7ff6a7..d550e95 100644
--- a/mama/c_cpp/src/c/dqstrategy.h
+++ b/mama/c_cpp/src/c/dqstrategy.h
@@ -50,7 +50,9 @@ typedef enum dqState_
* In the case of a stale initial, we do not want
* a recap because it may also be stale data.
*/
- DQ_STATE_STALE_NO_RECAP = 5
+ DQ_STATE_STALE_NO_RECAP = 5,
+
+ DQ_STATE_WAITING_FOR_RECAP_AFTER_FT = 6
} dqState;

typedef struct
@@ -64,6 +66,7 @@ typedef struct
imageRequest mRecapRequest;
mama_u64_t mSenderId;

+ uint8_t mDoNotForward;
} mamaDqContext;

typedef struct dqStrategy_* dqStrategy;
diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c
index dca5474..cbe1c90 100644
--- a/mama/c_cpp/src/c/listenermsgcallback.c
+++ b/mama/c_cpp/src/c/listenermsgcallback.c
@@ -449,7 +449,16 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
case MAMA_MSG_TYPE_QUOTE:
case MAMA_MSG_TYPE_TRADE:
mamaSubscription_checkSeqNum(subscription, msg, msgType, ctx);
- mamaSubscription_forwardMsg(subscription, msg);
+ if (!ctx->mDqContext.mDoNotForward)
+ {
+ mamaSubscription_forwardMsg(subscription, msg);
+ }
+ else
+ {
+ mamaSubscription_getSymbol (subscription, &userSymbol);
+ mama_log (MAMA_LOG_LEVEL_FINER, "Subscription for %s not forwarded"
+ " as message seqnum is before seqnum expecting", userSymbol);
+ }
break;
case MAMA_MSG_TYPE_REFRESH:
mamaSubscription_respondToRefreshMessage(subscription);
@@ -468,7 +477,16 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,
break;
default:
mamaSubscription_checkSeqNum(subscription, msg, msgType, ctx);
- mamaSubscription_forwardMsg(subscription, msg);
+ if (!ctx->mDqContext.mDoNotForward)
+ {
+ mamaSubscription_forwardMsg(subscription, msg);
+ }
+ else
+ {
+ mamaSubscription_getSymbol (subscription, &userSymbol);
+ mama_log (MAMA_LOG_LEVEL_FINER, "Subscription for %s not forwarded"
+ " as message seqnum is before seqnum expecting", userSymbol);
+ }
}
}

diff --git a/mama/c_cpp/src/c/mama/subscription.h b/mama/c_cpp/src/c/mama/subscription.h
index 058a41c..9eb1f0c 100644
--- a/mama/c_cpp/src/c/mama/subscription.h
+++ b/mama/c_cpp/src/c/mama/subscription.h
@@ -115,6 +115,18 @@ typedef enum

} mamaSubscriptionState;

+typedef enum
+{
+ DQ_SCHEME_DELIVER_ALL,
+ DQ_SCHEME_INGORE_DUPS
+} dqStartegyScheme;
+
+
+typedef enum
+{
+ DQ_FT_DO_NOT_WAIT_FOR_RECAP,
+ DQ_FT_WAIT_FOR_RECAP
+}dqftStrategyScheme;
/* *************************************************** */
/* Type Defines. */
/* *************************************************** */
diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c
index 20ee0cc..0a4adcd 100644
--- a/mama/c_cpp/src/c/transport.c
+++ b/mama/c_cpp/src/c/transport.c
@@ -162,6 +162,9 @@ typedef struct transportImpl_

uint8_t mDisableDisconnectCb;
uint8_t mDisableRefresh;
+ dqStartegyScheme mDQStratScheme;
+ dqftStrategyScheme mFTStratScheme;
+
preInitialScheme mPreInitialScheme;
} transportImpl;

@@ -177,6 +180,8 @@ init (transportImpl* transport, int createResponder)
self->mCause = 0;
self->mPlatformInfo = NULL;
self->mPreInitialScheme = PRE_INITIAL_SCHEME_ON_GAP;
+ self->mDQStratScheme = DQ_SCHEME_DELIVER_ALL;
+ self->mFTStratScheme = DQ_FT_DO_NOT_WAIT_FOR_RECAP;


mama_log (MAMA_LOG_LEVEL_FINEST,
@@ -365,6 +370,72 @@ static void setPreInitialStrategy (mamaTransport transport)
"%s: Using default preinitial strategy: ON_GAP", self->mName);
}
}
+static void setDQStrategy (mamaTransport transport)
+{
+ const char* propValue = NULL;
+ char propNameBuf[256];
+
+ if (!self) return;
+
+ snprintf (propNameBuf, 256, "mama.transport.%s.dqstrategy", self->mName);
+
+ propValue = properties_Get(mamaInternal_getProperties(),
+ propNameBuf);
+
+ if (NULL!=propValue)
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL, "Setting %s=%s",
+ propNameBuf, propValue);
+
+ if (0==strcmp (propValue, "ignoredups"))
+ {
+ self->mDQStratScheme = DQ_SCHEME_INGORE_DUPS;
+ }
+ else
+ {
+ self->mDQStratScheme = DQ_SCHEME_DELIVER_ALL;
+ }
+ }
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL,
+ "%s: Using default dq strategy: DQ_SCHEME_DELIVER_ALL", self->mName);
+ }
+}
+
+static void setFtStrategy (mamaTransport transport)
+{
+ const char* propValue = NULL;
+ char propNameBuf[256];
+
+ if (!self) return;
+
+ snprintf (propNameBuf, 256, "mama.transport.%s.ftstrategy", self->mName);
+
+ propValue = properties_Get(mamaInternal_getProperties(),
+ propNameBuf);
+
+ if (NULL!=propValue)
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL, "Setting %s=%s",
+ propNameBuf, propValue);
+
+ if (0==strcmp (propValue, "waitforrecap"))
+ {
+ self->mFTStratScheme = DQ_FT_WAIT_FOR_RECAP;
+ }
+ else
+ {
+ self->mFTStratScheme = DQ_FT_DO_NOT_WAIT_FOR_RECAP;
+ }
+ }
+ else
+ {
+ mama_log (MAMA_LOG_LEVEL_NORMAL,
+ "%s: Using default ft strategy: DQ_FT_DO_NOT_WAIT_FOR_RECAP", self->mName);
+ }
+}
+
/**
* Check property to disable refresh messages. Undocumented.
*
@@ -756,6 +827,8 @@ mamaTransport_create (mamaTransport transport,


setPreInitialStrategy ((mamaTransport)self);
+ setDQStrategy (self);
+ setFtStrategy (self);

if (mamaTransportImpl_disableDisconnectCb (name))
{
@@ -1584,6 +1657,25 @@ mamaTransportImpl_getPreInitialScheme (mamaTransport transport)
return PRE_INITIAL_SCHEME_ON_GAP;
}

+dqStartegyScheme
+mamaTransportImpl_getDqStrategyScheme (mamaTransport transport)
+{
+ if (self)
+ {
+ return self->mDQStratScheme;
+ }
+ return DQ_SCHEME_DELIVER_ALL;
+}
+
+dqftStrategyScheme
+mamaTransportImpl_getFtStrategyScheme (mamaTransport transport)
+{
+ if (self)
+ {
+ return self->mFTStratScheme;
+ }
+ return DQ_FT_DO_NOT_WAIT_FOR_RECAP;
+}
/* Process an advisory message and invokes callbacks
* on all listeners.
* @param transport The transport.
--
1.7.7.6

Join Openmama-dev@lists.openmama.org to automatically receive all group messages.