[PATCH] [mama] Add pre-recap cache


Ian Bell <IBell@...>
 

From 257b9d3c406f7e419263ff229611f9009063d66a Mon Sep 17 00:00:00 2001

Message-Id: <257b9d3c406f7e419263ff229611f9009063d66a.1348658266.git.ibell@...>

From: Ian Bell <ibell@...>

Date: Wed, 26 Sep 2012 12:17:40 +0100

Subject: [PATCH] [mama] Add pre-recap cache

 

Modified the pre-initial cache to also work for recaps.  This is

configurable via properties.

mama.transport.%s.prerecapcache.enable=true

 

Signed-off-by: Ian Bell <ibell@...>

---

mama/c_cpp/src/c/listenermsgcallback.c |   14 +++++++++-----

mama/c_cpp/src/c/transport.c           |   27 +++++++++++++++++++++++++++

mama/c_cpp/src/c/transportimpl.h       |    4 ++++

3 files changed, 40 insertions(+), 5 deletions(-)

 

diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c

index ca9db2c..fedcc49 100644

--- a/mama/c_cpp/src/c/listenermsgcallback.c

+++ b/mama/c_cpp/src/c/listenermsgcallback.c

@@ -189,7 +189,8 @@ static void processPointToPointMessage (msgCallback*    callback,

     if (PRE_INITIAL_SCHEME_ON_INITIAL==

             mamaTransportImpl_getPreInitialScheme (tport))

     {

-        if (msgType==MAMA_MSG_TYPE_INITIAL || msgType == MAMA_MSG_TYPE_BOOK_INITIAL)

+        if (msgType==MAMA_MSG_TYPE_INITIAL || msgType == MAMA_MSG_TYPE_BOOK_INITIAL ||

+           (mamaTransportImpl_preRecapCacheEnabled (tport) &&  (msgType == MAMA_MSG_TYPE_RECAP || msgType == MAMA_MSG_TYPE_BOOK_RECAP )))

         {

             dqContext_applyPreInitialCache (&ctx->mDqContext, self->mSubscription);

@@ -255,6 +256,8 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,

     mamaStatsCollector* queueStatsCollector = NULL;

     mamaStatsCollector* tportStatsCollector = NULL;

     const char* userSymbol = NULL;

+             dqState state = DQ_STATE_NOT_ESTABLISHED;

+    mamaSubscription_getTransport (subscription, &transport);

     if (!ctx)

     {

@@ -275,7 +278,6 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,

     if (gGenerateTransportStats)

     {

-        mamaSubscription_getTransport (subscription, &transport);

         tportStatsCollector = mamaTransport_getStatsCollector (transport);

     }

@@ -446,15 +448,17 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,

      */

     mamaSubscription_getExpectingInitial (subscription, &expectingInitial);

+    dqStrategy_getDqState (ctx->mDqContext, &state);

     /*While we are waiting for initial values we also check whether we have an

      * initial for an individual context.

       If we are no longer waiting for initials we assume that it is ok to pass

      on the update - (probably a new symbol for a group)*/

-    if (expectingInitial && !ctx->mInitialArrived

+    if ((expectingInitial && !ctx->mInitialArrived) ||

+        (state == DQ_STATE_WAITING_FOR_RECAP && mamaTransportImpl_preRecapCacheEnabled (transport)

         && msgType != MAMA_MSG_TYPE_DELETE

         && msgType != MAMA_MSG_TYPE_EXPIRE

-        && msgType != MAMA_MSG_TYPE_UNKNOWN)

+        && msgType != MAMA_MSG_TYPE_UNKNOWN))

     {

         /*Add this message to the cache. If the message after the initial

          * results in a gap we will attempt to fill the gap from this cache

@@ -471,7 +475,7 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,

             mama_log (MAMA_LOG_LEVEL_FINE,

                            "%s%s %s%s"

                            " Subscription ignoring message received prior"

-                           " to initial update. Type: %d %s %p",

+                           " to initial or recap. Type: %d %d %p",

                            userSymbolFormatted, ctxSymbolFormatted,

                            msgType, mamaMsg_toString(msg), ctx);

         }

diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c

index c54a94c..b9c1a9b 100644

--- a/mama/c_cpp/src/c/transport.c

+++ b/mama/c_cpp/src/c/transport.c

@@ -42,6 +42,7 @@

#include "mama/statfields.h"

#include "statsgeneratorinternal.h"

#include "mama/statscollector.h"

+#include "wombat/strutils.h"

 extern int gGenerateTransportStats;

extern int gGenerateLbmStats;

@@ -169,6 +170,7 @@ typedef struct transportImpl_

     uint8_t                 mInternal;

     uint8_t                 mDisableDisconnectCb;

     preInitialScheme         mPreInitialScheme;

+    mama_bool_t             mPreRecapCacheEnabled;

     void*                   mClosure;

} transportImpl;

@@ -449,6 +451,19 @@ static void setFtStrategy (mamaTransport transport)

     }

}

+static void enablePreRecapCache (mamaTransport transport)

+{

+    char propNameBuf[256];

+

+    if (!self) return;

+

+    snprintf (propNameBuf, 256, "mama.transport.%s.prerecapcache.enable", self->mName);

+

+    self->mPreRecapCacheEnabled = strtobool (mama_getProperty (propNameBuf));

+

+    mama_log (MAMA_LOG_LEVEL_NORMAL,

+              "%s: Pre-Recap cache %s", self->mName, self->mPreRecapCacheEnabled ? "enabled" : "disabled");

+}   

 void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)

{

     self->mDisableRefresh=disable;

@@ -846,6 +861,7 @@ mamaTransport_create (mamaTransport transport,

     setPreInitialStrategy ((mamaTransport)self);

     setDQStrategy ((mamaTransport)self);

     setFtStrategy ((mamaTransport)self);

+    enablePreRecapCache ((mamaTransport)self);

     if (mamaTransportImpl_disableDisconnectCb (name))

     {

@@ -1735,6 +1751,17 @@ mamaTransportImpl_getFtStrategyScheme (mamaTransport transport)

     }

     return DQ_FT_DO_NOT_WAIT_FOR_RECAP;

}

+

+mama_bool_t

+mamaTransportImpl_preRecapCacheEnabled (mamaTransport transport)

+{

+    if (self)

+    {

+        return self->mPreRecapCacheEnabled;

+    }

+    return 0;

+}

+

/* Process an advisory message and invokes callbacks

  *                    on all listeners.

  * @param transport The transport.

diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h

index ac4c3c2..764d0a0 100644

--- a/mama/c_cpp/src/c/transportimpl.h

+++ b/mama/c_cpp/src/c/transportimpl.h

@@ -298,6 +298,10 @@ extern mama_status

mamaTransportImpl_forceClientDisconnect (mamaTransport   transport,

                                          const char*     ipAddress,

                                          uint16_t        port);

+

+extern mama_bool_t

+mamaTransportImpl_preRecapCacheEnabled (mamaTransport transport);

+

#if defined(__cplusplus)

}

#endif

--

1.7.9.5

 




Please consider the environment before printing this e-mail.

This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy.

Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden.


Ian Bell <IBell@...>
 

Small addition to patch.  Attached new version

 

 

diff --git a/mama/c_cpp/src/c/dqstrategy.c b/mama/c_cpp/src/c/dqstrategy.c

index 7978c63..011436d 100644

--- a/mama/c_cpp/src/c/dqstrategy.c

+++ b/mama/c_cpp/src/c/dqstrategy.c

@@ -384,6 +384,10 @@ dqStrategy_checkSeqNum (dqStrategy      strategy,

     case MAMA_MSG_TYPE_RECAP        :

     case MAMA_MSG_TYPE_BOOK_RECAP   :

+        if (mamaTransportImpl_preRecapCacheEnabled (tport))

+        {

+            self->mTryToFillGap = 1;

+        }

         mamaSubscription_unsetAllPossiblyStale (subscription);

         resetDqState (strategy, ctx);

         dqStrategyImpl_resetDqContext (ctx, seqNum, senderId);

 

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Ian Bell
Sent: 26 September 2012 12:19
To: openmama-dev@...
Subject: [Openmama-dev] [PATCH] [mama] Add pre-recap cache

 

From 257b9d3c406f7e419263ff229611f9009063d66a Mon Sep 17 00:00:00 2001

Message-Id: <257b9d3c406f7e419263ff229611f9009063d66a.1348658266.git.ibell@...>

From: Ian Bell <ibell@...>

Date: Wed, 26 Sep 2012 12:17:40 +0100

Subject: [PATCH] [mama] Add pre-recap cache

 

Modified the pre-initial cache to also work for recaps.  This is

configurable via properties.

mama.transport.%s.prerecapcache.enable=true

 

Signed-off-by: Ian Bell <ibell@...>

---

mama/c_cpp/src/c/listenermsgcallback.c |   14 +++++++++-----

mama/c_cpp/src/c/transport.c           |   27 +++++++++++++++++++++++++++

mama/c_cpp/src/c/transportimpl.h       |    4 ++++

3 files changed, 40 insertions(+), 5 deletions(-)

 

diff --git a/mama/c_cpp/src/c/listenermsgcallback.c b/mama/c_cpp/src/c/listenermsgcallback.c

index ca9db2c..fedcc49 100644

--- a/mama/c_cpp/src/c/listenermsgcallback.c

+++ b/mama/c_cpp/src/c/listenermsgcallback.c

@@ -189,7 +189,8 @@ static void processPointToPointMessage (msgCallback*    callback,

     if (PRE_INITIAL_SCHEME_ON_INITIAL==

             mamaTransportImpl_getPreInitialScheme (tport))

     {

-        if (msgType==MAMA_MSG_TYPE_INITIAL || msgType == MAMA_MSG_TYPE_BOOK_INITIAL)

+        if (msgType==MAMA_MSG_TYPE_INITIAL || msgType == MAMA_MSG_TYPE_BOOK_INITIAL ||

+           (mamaTransportImpl_preRecapCacheEnabled (tport) &&  (msgType == MAMA_MSG_TYPE_RECAP || msgType == MAMA_MSG_TYPE_BOOK_RECAP )))

         {

             dqContext_applyPreInitialCache (&ctx->mDqContext, self->mSubscription);

@@ -255,6 +256,8 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,

     mamaStatsCollector* queueStatsCollector = NULL;

     mamaStatsCollector* tportStatsCollector = NULL;

     const char* userSymbol = NULL;

+             dqState state = DQ_STATE_NOT_ESTABLISHED;

+    mamaSubscription_getTransport (subscription, &transport);

     if (!ctx)

     {

@@ -275,7 +278,6 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,

     if (gGenerateTransportStats)

     {

-        mamaSubscription_getTransport (subscription, &transport);

         tportStatsCollector = mamaTransport_getStatsCollector (transport);

     }

@@ -446,15 +448,17 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,

      */

     mamaSubscription_getExpectingInitial (subscription, &expectingInitial);

+    dqStrategy_getDqState (ctx->mDqContext, &state);

     /*While we are waiting for initial values we also check whether we have an

      * initial for an individual context.

       If we are no longer waiting for initials we assume that it is ok to pass

      on the update - (probably a new symbol for a group)*/

-    if (expectingInitial && !ctx->mInitialArrived

+    if ((expectingInitial && !ctx->mInitialArrived) ||

+        (state == DQ_STATE_WAITING_FOR_RECAP && mamaTransportImpl_preRecapCacheEnabled (transport)

         && msgType != MAMA_MSG_TYPE_DELETE

         && msgType != MAMA_MSG_TYPE_EXPIRE

-        && msgType != MAMA_MSG_TYPE_UNKNOWN)

+        && msgType != MAMA_MSG_TYPE_UNKNOWN))

     {

         /*Add this message to the cache. If the message after the initial

          * results in a gap we will attempt to fill the gap from this cache

@@ -471,7 +475,7 @@ listenerMsgCallback_processMsg( listenerMsgCallback callback, mamaMsg msg,

             mama_log (MAMA_LOG_LEVEL_FINE,

                            "%s%s %s%s"

                            " Subscription ignoring message received prior"

-                           " to initial update. Type: %d %s %p",

+                           " to initial or recap. Type: %d %d %p",

                            userSymbolFormatted, ctxSymbolFormatted,

                            msgType, mamaMsg_toString(msg), ctx);

         }

diff --git a/mama/c_cpp/src/c/transport.c b/mama/c_cpp/src/c/transport.c

index c54a94c..b9c1a9b 100644

--- a/mama/c_cpp/src/c/transport.c

+++ b/mama/c_cpp/src/c/transport.c

@@ -42,6 +42,7 @@

#include "mama/statfields.h"

#include "statsgeneratorinternal.h"

#include "mama/statscollector.h"

+#include "wombat/strutils.h"

 extern int gGenerateTransportStats;

extern int gGenerateLbmStats;

@@ -169,6 +170,7 @@ typedef struct transportImpl_

     uint8_t                 mInternal;

     uint8_t                 mDisableDisconnectCb;

     preInitialScheme         mPreInitialScheme;

+    mama_bool_t             mPreRecapCacheEnabled;

     void*                   mClosure;

} transportImpl;

@@ -449,6 +451,19 @@ static void setFtStrategy (mamaTransport transport)

     }

}

+static void enablePreRecapCache (mamaTransport transport)

+{

+    char propNameBuf[256];

+

+    if (!self) return;

+

+    snprintf (propNameBuf, 256, "mama.transport.%s.prerecapcache.enable", self->mName);

+

+    self->mPreRecapCacheEnabled = strtobool (mama_getProperty (propNameBuf));

+

+    mama_log (MAMA_LOG_LEVEL_NORMAL,

+              "%s: Pre-Recap cache %s", self->mName, self->mPreRecapCacheEnabled ? "enabled" : "disabled");

+}   

 void mamaTransport_disableRefresh(mamaTransport transport, uint8_t disable)

{

     self->mDisableRefresh=disable;

@@ -846,6 +861,7 @@ mamaTransport_create (mamaTransport transport,

     setPreInitialStrategy ((mamaTransport)self);

     setDQStrategy ((mamaTransport)self);

     setFtStrategy ((mamaTransport)self);

+    enablePreRecapCache ((mamaTransport)self);

     if (mamaTransportImpl_disableDisconnectCb (name))

     {

@@ -1735,6 +1751,17 @@ mamaTransportImpl_getFtStrategyScheme (mamaTransport transport)

     }

     return DQ_FT_DO_NOT_WAIT_FOR_RECAP;

}

+

+mama_bool_t

+mamaTransportImpl_preRecapCacheEnabled (mamaTransport transport)

+{

+    if (self)

+    {

+        return self->mPreRecapCacheEnabled;

+    }

+    return 0;

+}

+

/* Process an advisory message and invokes callbacks

  *                    on all listeners.

  * @param transport The transport.

diff --git a/mama/c_cpp/src/c/transportimpl.h b/mama/c_cpp/src/c/transportimpl.h

index ac4c3c2..764d0a0 100644

--- a/mama/c_cpp/src/c/transportimpl.h

+++ b/mama/c_cpp/src/c/transportimpl.h

@@ -298,6 +298,10 @@ extern mama_status

mamaTransportImpl_forceClientDisconnect (mamaTransport   transport,

                                          const char*     ipAddress,

                                          uint16_t        port);

+

+extern mama_bool_t

+mamaTransportImpl_preRecapCacheEnabled (mamaTransport transport);

+

#if defined(__cplusplus)

}

#endif

--

1.7.9.5

 

 



Please consider the environment before printing this e-mail.

This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy.

Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden.




Please consider the environment before printing this e-mail.

This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy.

Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden.