From 112ffb484cc1663c320e601a5c7516d895bb2ff8 Mon Sep 17 00:00:00 2001
Message-Id: <112ffb484cc1663c320e601a5c7516d895bb2ff8.1348824474.git.ibell@...>
In-Reply-To: <766019e29e6ff6072c662caa2de6adbd3e9df875.1348824474.git.ibell@...>
References: <766019e29e6ff6072c662caa2de6adbd3e9df875.1348824474.git.ibell@...>
From: Ian Bell <ibell@...>
Date: Fri, 28 Sep 2012 10:10:24 +0100
Subject: [PATCH 3/6] [testools] Added file playback to mamaproducerc_v2
Added the ability to playback from a capture file to
mamaproducerc_v2.
Signed-off-by: Ian Bell <ibell@...>
---
.../src/testtools/performance/c/mamaproducerc_v2.c | 245 +++++++++++++++++++-
1 file changed, 236 insertions(+), 9 deletions(-)
diff --git a/mama/c_cpp/src/testtools/performance/c/mamaproducerc_v2.c b/mama/c_cpp/src/testtools/performance/c/mamaproducerc_v2.c
index 830b750..28b3bd4 100644
--- a/mama/c_cpp/src/testtools/performance/c/mamaproducerc_v2.c
+++ b/mama/c_cpp/src/testtools/performance/c/mamaproducerc_v2.c
@@ -31,6 +31,8 @@
#include <sys/time.h> /* needed for getrusage */
#include <sys/resource.h> /* needed for getrusage */
+#include "wombat/wtable.h"
+#include "playback/playbackFileParser.h"
#define TV2NANO(x) (1000 *((x)->tv_sec * 1000000 + (x)->tv_usec))
#define TV2USEC(x) ((x)->tv_sec * 1000000 + (x)->tv_usec)
#define TS2NANO(x) ((x)->tv_sec * 1000000000 + (x)->tv_nsec)
@@ -100,9 +102,12 @@ typedef struct {
static publisher* gPublisherList = NULL;
static mamaPublisher gStepPublisher = NULL;
static mamaSubscription gStepSubscriber = NULL;
+static mamaPublisher gInitialPublisher = NULL;
+static mamaSubscription gInitialSubscriber = NULL;
static mamaTransport* gTransportArray = NULL;
static uint64_t gThrottle = 0;
static mamaMsg gStepMsg = NULL;
+static mamaMsg gInitialMsg = NULL;
static mamaQueue gDefaultQueue = NULL;
static mamaTimer gRandomTimer = NULL;
static mamaTimer gBurstTimer = NULL;
@@ -156,6 +161,13 @@ static mamaTimeZone gTimeZone = NULL;
static uint32_t gRun = 1;
+static int gInitials = 0;
+static int gInitialDelay = 0;
+static int gUsePlayback = 0;
+static const char* gPlaybackFilename = NULL;
+static mamaPlaybackFileParser gFileParser = NULL;
+static wtable_t gPublisherTable = NULL;
+static mamaMsg gCachedMsg = NULL;
/* DJD condition variable for shutdown */
pthread_cond_t pendingShutdown = PTHREAD_COND_INITIALIZER;
pthread_mutex_t pendingShutdownLock = PTHREAD_MUTEX_INITIALIZER;
@@ -390,6 +402,8 @@ static inline uint64_t calculatePubTicks
uint64_t diffNsec;
uint64_t diffNumMsgs;
uint64_t pubNsec;
+ if (numMsgs == lastNumMsgs)
+ return 0;
if (gRdtsc)
diffNsec = (uint64_t)((double)(1000 * (end-start))/(double)gCpuFreq);
@@ -602,6 +616,7 @@ int main (int argc, const char **argv)
startCompleteCallback);
signal (SIGINT, signalCatcher);
+ sleep (gInitialDelay);
MAMA_CHECK (mamaDateTime_create (&gNowTime));
mamaDateTime_setToNow(gNowTime);
@@ -715,9 +730,16 @@ int main (int argc, const char **argv)
targetRate = gAsyncRate = gBaseRate = gTargetRate;
/*Must initializze nsec to something sensible and the model*/
- if(targetRate) gRefreshRate=targetRate/8+1;
- else gRefreshRate=DEFAULT_REFRESH_RATE;
+ if(targetRate)
+ {
+ gRefreshRate=targetRate/8+1;
nsec = calculateNsecSleep (0, targetRate);
+ }
+ else
+ {
+ pause ();
+ gRun = 0;
+ }
while (gRun)
{
@@ -1098,6 +1120,33 @@ static void onErrorStep
)
{}
+static void onCreateInitial
+(
+ mamaSubscription subscription,
+ void* closure
+)
+{}
+
+static void onErrorInitial
+(
+ mamaSubscription subscription,
+ mama_status status,
+ void* platformError,
+ const char* subject,
+ void* closure
+)
+{}
+
+static void onMsgInitial
+(
+ mamaSubscription subscription,
+ mamaMsg msg,
+ void* closure,
+ void* itemClosure
+)
+{
+ mamaPublisher_sendReplyToInbox (gInitialPublisher, msg, gInitialMsg);
+}
static void onMsgStep
(
mamaSubscription subscription,
@@ -1165,9 +1214,17 @@ static void initializePublishers
if (gDupTopics)
gNumPubs *= gNumTrans;
+ if (gUsePlayback)
+ {
+ gPublisherTable = wtable_create ("PublisherTable", gNumPubs/10);
+ mamaMsg_create(&gCachedMsg);
+ }
+ else
+ {
gPublisherList = (publisher*)calloc (gNumPubs, sizeof (publisher));
pub = gPublisherList;
+ }
while (*symbol)
{
@@ -1177,6 +1234,8 @@ static void initializePublishers
/* For each transport if duplicating */
for (;;)
{
+ if (gUsePlayback)
+ pub=(publisher*)calloc (1, sizeof (publisher));
if (gNumTopics > 1)
{
snprintf (pub->mTopic,
@@ -1201,8 +1260,13 @@ static void initializePublishers
NULL,
NULL));
+ if (gUsePlayback)
+ wtable_insert (gPublisherTable,pub->mTopic, (void*) pub);
+ else
+ {
initializeMessages (pub, msgSize, msgVar);
pub++;
+ }
j++;
count++;
@@ -1255,9 +1319,39 @@ static void initializePublishers
}
}
+ if (gInitials!= 0)
+ {
+ MAMA_CHECK(mamaPublisher_create (&gInitialPublisher,
+ gTransportArray[0],
+ "REPLAY",
+ NULL,
+ NULL));
+ mamaMsgCallbacks callbacks;
+ memset (&callbacks, 0, sizeof callbacks);
+ callbacks.onMsg = onMsgInitial;
+ callbacks.onCreate = onCreateInitial;
+ callbacks.onError = onErrorInitial;
+
+ MAMA_CHECK (mamaSubscription_allocate (&gInitialSubscriber));
+ MAMA_CHECK (mamaSubscription_createBasic (gInitialSubscriber,
+ gTransportArray[0],
+ gDefaultQueue,
+ &callbacks,
+ "_MD.REPLAY",
+ NULL));
+
+ MAMA_CHECK(mamaMsg_create(&gInitialMsg));
+ MAMA_CHECK(mamaMsg_addU8(gInitialMsg, NULL, MamaFieldMsgType.mFid, MAMA_MSG_TYPE_RECAP));
+ MAMA_CHECK(mamaMsg_addI32(gInitialMsg, NULL, MamaFieldMsgStatus.mFid, MAMA_MSG_STATUS_OK));
+ }
MAMA_CHECK(mamaMsg_create(&gStepMsg));
MAMA_CHECK(mamaMsg_addU64(gStepMsg, NULL, STEP_TARGET, 0));
MAMA_CHECK(mamaMsg_addU32(gStepMsg, NULL, STEP_PERCENT, 0));
+ if (gUsePlayback)
+ {
+ mamaPlaybackFileParser_allocate (&gFileParser);
+ mamaPlaybackFileParser_openFile(gFileParser, gPlaybackFilename);
+ }
}
static void initializeMessages
@@ -1332,24 +1426,97 @@ static void publishMessage
mamaDateTime* mTime
)
{
- publisher* pub = &gPublisherList[pubIndex];
+ mamaMsg msg =NULL;
+ publisher* pub = NULL;
+ if (gUsePlayback)
+ {
+ char symbolname[64];
+ char topicname[MAMA_MAX_SYMBOL_LEN];
+ char* headerString, *tempPointer;
+ int i = 0, found = 0;
+ if (!mamaPlaybackFileParser_getNextHeader(gFileParser, &headerString))
+ {
+
+ exit(0);
+ }
+ tempPointer = headerString;
+ while (found < 3)
+ {
+ if (*tempPointer == ':')
+ found++;
+ else
+ {
+ if (found==2)
+ symbolname[i++]=*tempPointer;
+ }
+ tempPointer++;
+ }
+ symbolname[i]='\0';
+
+ if (gNumTopics > 1)
+ {
+ snprintf (topicname,
+ MAMA_MAX_SYMBOL_LEN,
+ "%s%s%.2d",
+ gSymbolNamespace ? gSymbolNamespace : "",
+ symbolname,
+ pubIndex);
+ }
+ else
+ {
+ snprintf (topicname,
+ MAMA_MAX_SYMBOL_LEN,
+ "%s%s",
+ gSymbolNamespace ? gSymbolNamespace : "",
+ symbolname);
+ }
+
+ pub = wtable_lookup (gPublisherTable,topicname);
+
+ if (!mamaPlaybackFileParser_getNextMsg (gFileParser,
+ &msg))
+ {
+
+ exit(0);
+ }
+
+ mamaMsg_applyMsg(gCachedMsg, msg);
+ }
+ else
+ {
+ pub = &gPublisherList[pubIndex];
+ msg = pub->mMamaMsgs[msgSample];
+ }
+
+
+ if (!pub)
+ return;
/*Update Seq Number*/
- MAMA_CHECK(mamaMsg_updateU32 (pub->mMamaMsgs[msgSample],
- "NULL",
+ if (MAMA_STATUS_OK != mamaMsg_updateU32 (msg,
+ NULL,
SEQ_NUM_FID,
- pub->mMsgNum++));
+ pub->mMsgNum++))
+ {
+ printf ("INFO - update seqnum failed - %s \n", mamaMsg_toString(msg));
+ return;
+ }
pthread_mutex_lock(&gMutex);
/*Get and update the timestamp*/
mamaDateTime_setToNow(*mTime);
- MAMA_CHECK (mamaMsg_updateDateTime (pub->mMamaMsgs[msgSample],
+ if (MAMA_STATUS_OK != mamaMsg_updateDateTime (msg,
NULL,
SEND_TIME_FID,
- *mTime));
+ *mTime))
+ {
+ printf ("INFO - update sendtime failed - %s\n", mamaMsg_toString(msg));
+ pthread_mutex_unlock(&gMutex);
+ return;
+ }
/*Publish the message*/
- MAMA_CHECK (mamaPublisher_send (pub->mMamaPublisher, pub->mMamaMsgs[msgSample]));
+ MAMA_CHECK (mamaPublisher_send (pub->mMamaPublisher, msg));
pthread_mutex_unlock(&gMutex);
}
@@ -1541,6 +1708,50 @@ static void parseCommandLine
gNumSymbols++;
i += 2;
}
+ else if (strcmp ("-f", argv[i]) == 0)
+ {
+ FILE*fp = NULL;
+ char charbuf[1024];
+
+ char * filename = argv[i+1];
+ if ((fp = fopen(filename, "r")) == (FILE*)NULL)
+ exit(1);
+
+ while (fgets (charbuf, 1023, fp))
+ {
+ char *c= charbuf;
+
+ if (gNumSymbols == gMaxNumSymbols)
+ {
+ void* vp = NULL;
+ vp = realloc (gSymbolList,
+ 2 * gMaxNumSymbols * sizeof (*gSymbolList));
+ if (vp)
+ {
+ gSymbolList = vp;
+ memset (&gSymbolList[gMaxNumSymbols],
+ 0,
+ sizeof (*gSymbolList) * gMaxNumSymbols);
+ gMaxNumSymbols *= 2;
+ }
+ else
+ {
+ PRINT_ERROR ("Failed to realloc symbol list");
+ exit (1);
+ }
+ }
+
+ while ((*c != '\0') && (*c != '\n'))
+ c++;
+
+ *c = '\0';
+
+
+ gSymbolList[gNumSymbols] = strdup (charbuf);
+ gNumSymbols++;
+ }
+ i += 2;
+ }
else if (strcmp ("-S", argv[i]) == 0)
{
gSymbolNamespace = calloc (MAMA_MAX_SYMBOL_LEN,
@@ -1641,6 +1852,22 @@ static void parseCommandLine
*middleware = argv[i+1];
i += 2;
}
+ else if (strcmp ("-delay", argv[i]) == 0)
+ {
+ gInitialDelay = (uint32_t) atol(argv[i+1]);
+ i += 2;
+ }
+ else if (strcmp ("-initials", argv[i]) == 0)
+ {
+ gInitials=1;
+ i++;
+ }
+ else if (strcmp ("-playback", argv[i]) == 0)
+ {
+ gPlaybackFilename = argv[i+1];
+ gUsePlayback = 1;
+ i += 2;
+ }
else
{
fprintf(stderr,"ERROR: Commandline option %s not recognised. Skipping\n",argv[i]);
--
1.7.9.5