[PATCH 3/6] [testools] Added file playback to mamaproducerc_v2


Ian Bell <IBell@...>
 

From 112ffb484cc1663c320e601a5c7516d895bb2ff8 Mon Sep 17 00:00:00 2001

Message-Id: <112ffb484cc1663c320e601a5c7516d895bb2ff8.1348824474.git.ibell@...>

In-Reply-To: <766019e29e6ff6072c662caa2de6adbd3e9df875.1348824474.git.ibell@...>

References: <766019e29e6ff6072c662caa2de6adbd3e9df875.1348824474.git.ibell@...>

From: Ian Bell <ibell@...>

Date: Fri, 28 Sep 2012 10:10:24 +0100

Subject: [PATCH 3/6] [testools] Added file playback to mamaproducerc_v2

 

Added the ability to playback from a capture file to

mamaproducerc_v2.

 

Signed-off-by: Ian Bell <ibell@...>

---

.../src/testtools/performance/c/mamaproducerc_v2.c |  245 +++++++++++++++++++-

1 file changed, 236 insertions(+), 9 deletions(-)

 

diff --git a/mama/c_cpp/src/testtools/performance/c/mamaproducerc_v2.c b/mama/c_cpp/src/testtools/performance/c/mamaproducerc_v2.c

index 830b750..28b3bd4 100644

--- a/mama/c_cpp/src/testtools/performance/c/mamaproducerc_v2.c

+++ b/mama/c_cpp/src/testtools/performance/c/mamaproducerc_v2.c

@@ -31,6 +31,8 @@

#include <sys/time.h>   /* needed for getrusage */

#include <sys/resource.h>    /* needed for getrusage */

+#include "wombat/wtable.h"

+#include "playback/playbackFileParser.h"

#define TV2NANO(x) (1000 *((x)->tv_sec * 1000000 + (x)->tv_usec))

#define TV2USEC(x) ((x)->tv_sec * 1000000 + (x)->tv_usec)

#define TS2NANO(x) ((x)->tv_sec * 1000000000 + (x)->tv_nsec)

@@ -100,9 +102,12 @@ typedef struct {

static publisher*           gPublisherList      = NULL;

static mamaPublisher        gStepPublisher      = NULL;

static mamaSubscription     gStepSubscriber    = NULL;

+static mamaPublisher        gInitialPublisher      = NULL;

+static mamaSubscription     gInitialSubscriber    = NULL;

static mamaTransport*       gTransportArray     = NULL;

static uint64_t             gThrottle           = 0;

static mamaMsg              gStepMsg            = NULL;

+static mamaMsg              gInitialMsg            = NULL;

static mamaQueue            gDefaultQueue       = NULL;

static mamaTimer            gRandomTimer        = NULL;

static mamaTimer            gBurstTimer         = NULL;

@@ -156,6 +161,13 @@ static mamaTimeZone         gTimeZone           = NULL;

 static uint32_t             gRun                = 1;

+static int                  gInitials       = 0;

+static int                  gInitialDelay       = 0;

+static int                  gUsePlayback                  = 0;

+static const char*          gPlaybackFilename   = NULL;

+static mamaPlaybackFileParser gFileParser                        = NULL;

+static wtable_t                                               gPublisherTable                                = NULL;

+static mamaMsg                                                            gCachedMsg                                      = NULL;

/* DJD condition variable for shutdown */

pthread_cond_t              pendingShutdown     = PTHREAD_COND_INITIALIZER;

pthread_mutex_t             pendingShutdownLock = PTHREAD_MUTEX_INITIALIZER;

@@ -390,6 +402,8 @@ static inline uint64_t calculatePubTicks

     uint64_t        diffNsec;

     uint64_t        diffNumMsgs;

     uint64_t        pubNsec;

+    if (numMsgs == lastNumMsgs)

+        return 0;

     if (gRdtsc)

         diffNsec = (uint64_t)((double)(1000 * (end-start))/(double)gCpuFreq);

@@ -602,6 +616,7 @@ int main (int argc, const char **argv)

                           startCompleteCallback);

     signal (SIGINT, signalCatcher);

+    sleep (gInitialDelay);

     MAMA_CHECK (mamaDateTime_create (&gNowTime));

     mamaDateTime_setToNow(gNowTime);

@@ -715,9 +730,16 @@ int main (int argc, const char **argv)

             targetRate = gAsyncRate = gBaseRate = gTargetRate;

         /*Must initializze nsec to something sensible and the model*/

-        if(targetRate) gRefreshRate=targetRate/8+1;

-        else           gRefreshRate=DEFAULT_REFRESH_RATE;

+        if(targetRate)

+        {

+            gRefreshRate=targetRate/8+1;

         nsec = calculateNsecSleep (0, targetRate);

+        }

+        else

+        {

+            pause ();

+            gRun = 0;

+        }

         while (gRun)

         {

@@ -1098,6 +1120,33 @@ static void onErrorStep

)

{}

+static void onCreateInitial

+(

+     mamaSubscription   subscription,

+     void*              closure

+)

+{}

+

+static void onErrorInitial

+(

+    mamaSubscription    subscription,

+    mama_status         status,

+    void*               platformError,

+    const char*         subject,

+    void*               closure

+)

+{}

+

+static void onMsgInitial

+(

+    mamaSubscription        subscription,

+    mamaMsg                 msg,

+    void*                   closure,

+    void*                   itemClosure

+)

+{

+             mamaPublisher_sendReplyToInbox (gInitialPublisher, msg, gInitialMsg);

+}

static void onMsgStep

(

     mamaSubscription        subscription,

@@ -1165,9 +1214,17 @@ static void initializePublishers

     if (gDupTopics)

         gNumPubs *= gNumTrans;

+    if (gUsePlayback)

+    {

+             gPublisherTable = wtable_create ("PublisherTable", gNumPubs/10);

+                             mamaMsg_create(&gCachedMsg);

+    }

+    else

+    {

     gPublisherList = (publisher*)calloc (gNumPubs, sizeof (publisher));

     pub = gPublisherList;

+    }

     while (*symbol)

     {

@@ -1177,6 +1234,8 @@ static void initializePublishers

             /* For each transport if duplicating */

             for (;;)

             {

+                if (gUsePlayback)

+                             pub=(publisher*)calloc (1, sizeof (publisher));

                 if (gNumTopics > 1)

                 {

                     snprintf (pub->mTopic,

@@ -1201,8 +1260,13 @@ static void initializePublishers

                                                   NULL,

                                                   NULL));

+                if (gUsePlayback)

+                             wtable_insert  (gPublisherTable,pub->mTopic, (void*) pub);

+                else

+                {

                 initializeMessages (pub, msgSize, msgVar);

                 pub++;

+                }

                 j++;

                 count++;

@@ -1255,9 +1319,39 @@ static void initializePublishers

         }

     }

+    if (gInitials!= 0)

+    {

+        MAMA_CHECK(mamaPublisher_create (&gInitialPublisher,

+                                         gTransportArray[0],

+                                             "REPLAY",

+                                             NULL,

+                                             NULL));

+            mamaMsgCallbacks callbacks;

+            memset (&callbacks, 0, sizeof callbacks);

+            callbacks.onMsg = onMsgInitial;

+            callbacks.onCreate = onCreateInitial;

+            callbacks.onError = onErrorInitial;

+

+            MAMA_CHECK (mamaSubscription_allocate (&gInitialSubscriber));

+            MAMA_CHECK (mamaSubscription_createBasic (gInitialSubscriber,

+                                                      gTransportArray[0],

+                                                      gDefaultQueue,

+                                                      &callbacks,

+                                                      "_MD.REPLAY",

+                                                      NULL));

+       

+        MAMA_CHECK(mamaMsg_create(&gInitialMsg));

+        MAMA_CHECK(mamaMsg_addU8(gInitialMsg, NULL,  MamaFieldMsgType.mFid, MAMA_MSG_TYPE_RECAP));

+                             MAMA_CHECK(mamaMsg_addI32(gInitialMsg, NULL, MamaFieldMsgStatus.mFid, MAMA_MSG_STATUS_OK));

+    }

     MAMA_CHECK(mamaMsg_create(&gStepMsg));

     MAMA_CHECK(mamaMsg_addU64(gStepMsg, NULL, STEP_TARGET, 0));

     MAMA_CHECK(mamaMsg_addU32(gStepMsg, NULL, STEP_PERCENT, 0));

+    if (gUsePlayback)

+    {

+             mamaPlaybackFileParser_allocate (&gFileParser);

+             mamaPlaybackFileParser_openFile(gFileParser, gPlaybackFilename);

+    }

}

 static void initializeMessages

@@ -1332,24 +1426,97 @@ static void publishMessage

     mamaDateTime*   mTime

)

{

-    publisher* pub = &gPublisherList[pubIndex];

+             mamaMsg msg =NULL;

+             publisher* pub = NULL;

+             if (gUsePlayback)

+             {

+                             char symbolname[64];

+                             char topicname[MAMA_MAX_SYMBOL_LEN];

+                             char* headerString, *tempPointer;

+                             int i = 0, found = 0;

+                             if (!mamaPlaybackFileParser_getNextHeader(gFileParser, &headerString))

+                             {

+

+                                             exit(0);

+                             }

+                             tempPointer = headerString;

+                             while (found < 3)

+                             {

+                                             if (*tempPointer == ':')

+                                                             found++;

+                                             else

+                                             {

+                                                             if (found==2)

+                                                                             symbolname[i++]=*tempPointer;

+                                             }

+                                             tempPointer++;

+                             }

+                             symbolname[i]='\0';

+

+        if (gNumTopics > 1)

+        {

+            snprintf (topicname,

+                      MAMA_MAX_SYMBOL_LEN,

+                      "%s%s%.2d",

+                      gSymbolNamespace ? gSymbolNamespace : "",

+                       symbolname,

+                       pubIndex);

+        }

+        else

+        {

+            snprintf (topicname,

+                      MAMA_MAX_SYMBOL_LEN,

+                      "%s%s",

+                      gSymbolNamespace ? gSymbolNamespace : "",

+                      symbolname);

+        }

+

+                             pub = wtable_lookup  (gPublisherTable,topicname);

+

+                             if (!mamaPlaybackFileParser_getNextMsg (gFileParser,

+                                                                             &msg))

+                             {

+

+                                                                             exit(0);

+                             }

+

+                             mamaMsg_applyMsg(gCachedMsg, msg);

+             }

+             else

+             {

+                             pub = &gPublisherList[pubIndex];

+                             msg = pub->mMamaMsgs[msgSample];

+             }

+

+

+             if (!pub)

+                             return;

               /*Update Seq Number*/

-    MAMA_CHECK(mamaMsg_updateU32 (pub->mMamaMsgs[msgSample],

-                                  "NULL",

+   if (MAMA_STATUS_OK != mamaMsg_updateU32 (msg,

+                                  NULL,

                                   SEQ_NUM_FID,

-                                  pub->mMsgNum++));

+                                  pub->mMsgNum++))

+             {

+                             printf ("INFO - update seqnum failed - %s \n", mamaMsg_toString(msg));

+                             return;                 

+             }

     pthread_mutex_lock(&gMutex);

                /*Get and update the timestamp*/

     mamaDateTime_setToNow(*mTime);

-    MAMA_CHECK (mamaMsg_updateDateTime (pub->mMamaMsgs[msgSample],

+    if (MAMA_STATUS_OK != mamaMsg_updateDateTime (msg,

                                         NULL,

                                         SEND_TIME_FID,

-                                        *mTime));

+                                        *mTime))

+             {

+                             printf ("INFO - update sendtime failed - %s\n", mamaMsg_toString(msg));

+    pthread_mutex_unlock(&gMutex);

+                             return;                 

+             }

                /*Publish the message*/

-    MAMA_CHECK (mamaPublisher_send (pub->mMamaPublisher, pub->mMamaMsgs[msgSample]));

+    MAMA_CHECK (mamaPublisher_send (pub->mMamaPublisher, msg));

     pthread_mutex_unlock(&gMutex);

}

@@ -1541,6 +1708,50 @@ static void parseCommandLine

                 gNumSymbols++;

                 i += 2;

             }

+            else if (strcmp ("-f", argv[i]) == 0)

+            {

+                                                                             FILE*fp = NULL;

+                                                                             char charbuf[1024];

+

+                                                             char * filename = argv[i+1];

+                                                             if ((fp = fopen(filename, "r")) == (FILE*)NULL)

+                                                                             exit(1);

+                                                            

+                                                             while (fgets (charbuf, 1023, fp))

+                                                             {

+                                                                             char *c= charbuf;

+

+                                                                             if (gNumSymbols == gMaxNumSymbols)

+                             {

+                             void* vp = NULL;

+                             vp = realloc (gSymbolList,

+                                  2 * gMaxNumSymbols * sizeof (*gSymbolList));

+                             if (vp)

+                             {

+                             gSymbolList = vp;

+                             memset (&gSymbolList[gMaxNumSymbols],

+                                             0,

+                                             sizeof (*gSymbolList) * gMaxNumSymbols);

+                             gMaxNumSymbols *= 2;

+                             }

+                             else

+                             {

+                             PRINT_ERROR ("Failed to realloc symbol list");

+                             exit (1);

+                             }

+                             }

+

+                                                                             while ((*c != '\0') && (*c != '\n'))

+                                                                                             c++;

+

+                                                                             *c = '\0';

+

+                                                                            

+                             gSymbolList[gNumSymbols] = strdup (charbuf);

+                             gNumSymbols++;

+                                                             }

+                i += 2;

+            }

             else if (strcmp ("-S", argv[i]) == 0)

             {

                 gSymbolNamespace = calloc (MAMA_MAX_SYMBOL_LEN,

@@ -1641,6 +1852,22 @@ static void parseCommandLine

                 *middleware = argv[i+1];

                 i += 2;

             }

+            else if (strcmp ("-delay", argv[i]) == 0)

+                                             {

+                                                             gInitialDelay = (uint32_t) atol(argv[i+1]);

+                                                             i += 2;

+                                             }

+                                             else if (strcmp ("-initials", argv[i]) == 0)

+                                             {

+                                                             gInitials=1;

+                                                             i++;

+                                             }

+            else if (strcmp ("-playback", argv[i]) == 0)

+                                             {

+                                                             gPlaybackFilename = argv[i+1];

+                                                             gUsePlayback = 1;

+                i += 2;

+            }

             else

             {

                 fprintf(stderr,"ERROR: Commandline option %s not recognised. Skipping\n",argv[i]);

--

1.7.9.5

 




Please consider the environment before printing this e-mail.

This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy.

Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden.