[PATCH 2/2] MAMA: Moved file parsing to external module for playbacks


Frank Quinn <fquinn@...>
 

This change moves file parsing capabilities to an external module. This

makes the code more readable and modular as well as allowing us to remove

the file streams macro which was temporarily added to allow us to run the

playback tools on windows.

 

The rightCopy function was also removed as a copy was not actually

required, and mamaPlaybackFileParser_setSize was also removed in favour of

the new fileParser functions.

 

There is also a new configuration parameter to decide whether to use MMAP

or file streams. Mmap is now the default for all systems, but some users

may prefer to use file streams instead due to its reduced memory

footprint. To enable file streams, the following configuration parameter

may be used:

 

mama.playbackfileparser.use_mmap=false (default = true)

 

This was tested on both Linux and Windows and included testing on

windows and linux for both mmap and file stream modes using

captureconvertc and capturereplayc with a mamalistenc client.

 

Signed-off-by: Frank Quinn <fquinn@...>

---

mama/c_cpp/src/c/playback/playbackFileParser.c | 233 ++++++++-----------------

mama/c_cpp/src/c/playback/playbackFileParser.h |  11 +-

2 files changed, 81 insertions(+), 163 deletions(-)

 

diff --git a/mama/c_cpp/src/c/playback/playbackFileParser.c b/mama/c_cpp/src/c/playback/playbackFileParser.c

index 64fb465..c8f1717 100644

--- a/mama/c_cpp/src/c/playback/playbackFileParser.c

+++ b/mama/c_cpp/src/c/playback/playbackFileParser.c

@@ -20,13 +20,14 @@

  */

 #include "playbackFileParser.h"

+#include "property.h"

-#ifdef WIN32

-#define MAMA_PLAYBACK_USE_FILE_STREAMS 1

-#endif

+#include <wombat/strutils.h>

+#include <mama/mama.h>

+#include <mamainternal.h>

+

+#define     FILE_PARSER_CONFIG_USE_MMAP     "mama.playbackfileparser.use_mmap"

-static void

-rightCopy (char* input, char* output, int start);

static

mama_bool_t isNumeric( const char* str );

static mama_status

@@ -40,32 +41,10 @@ mamaPlaybackFileParser_saveMsgLength (mamaPlaybackFileParser fileParser,

static

mama_status mamaPlaybackFileParser_init (mamaPlaybackFileParser  fileParser);

-/**

- * Internally sets the  size of playback file to be parsed.

- * @param playbackFileParser object

- * @param filename - this could be either a relative or absolute,

- * filename with path.

- * @return mama_status

- */

-static void

-mamaPlaybackFileParser_setSize (mamaPlaybackFileParser fileParser,

-                                char* fileName);

 /****************************************************************************

Utility functions

****************************************************************************/

-static void rightCopy (char* input, char* output, int start)

-{

-    int index  = 0;

-    int i      = 0;

-    int length = strlen (input);

-    for (i = start; i<length; i++)

-    {

-        output[index]=input[i];

-        index++;

-    }

-    output[index] =0;

-}

 mama_bool_t isNumeric( const char* str )

{

@@ -97,11 +76,8 @@ mama_status mamaPlaybackFileParser_init (mamaPlaybackFileParser

     {

         return MAMA_STATUS_NULL_ARG;

     }

-    impl->myLastPos     = 0;

-    impl->myFiledata    = NULL;

-    impl->myFilePointer = NULL;

     impl->myMamaMsgLen  = 0;

-    impl->myLastMsgLen  = 0;

+    impl->myMaxMsgLen   = 0;

     impl->myMsgBuffer   = NULL;

     return MAMA_STATUS_OK;

}

@@ -110,11 +86,18 @@ mama_status

mamaPlaybackFileParser_allocate (mamaPlaybackFileParser* fileParser)

{

     mama_status status = MAMA_STATUS_OK;

+    fileParserStatus fileStatus = FILE_PARSER_STATUS_OK;

     mamaPlaybackFileParserImpl* impl =

         (mamaPlaybackFileParserImpl*)

         calloc (1, sizeof(mamaPlaybackFileParserImpl));

     if (impl == NULL) return MAMA_STATUS_NOMEM;

+

+    fileStatus = fileParser_allocate (&impl->myFileReader);

+    if (FILE_PARSER_STATUS_OK != fileStatus)

+    {

+        return MAMA_STATUS_NOMEM;

+    }

               mamaMsg_create(&impl->myMamaMsg);

     mamaPlaybackFileParser_init (impl);

@@ -146,6 +129,10 @@ mamaPlaybackFileParser_deallocate (mamaPlaybackFileParser fileParser)

             exit (status);

         }

     }

+    if (impl->myFileReader != NULL)

+    {

+        fileParser_destroy (impl->myFileReader);

+    }

     free (impl);

     impl = NULL;

     return MAMA_STATUS_OK;

@@ -156,45 +143,47 @@ mamaPlaybackFileParser_openFile (mamaPlaybackFileParser fileParser,

                                  char* fileName)

{

     mama_status status = MAMA_STATUS_OK;

+    fileParserStatus fileStatus = FILE_PARSER_STATUS_OK;

    

     mamaPlaybackFileParserImpl* impl =

         (mamaPlaybackFileParserImpl*)fileParser;

     if (impl == NULL)  return MAMA_STATUS_NULL_ARG;

+    if (impl->myFileReader == NULL) return MAMA_STATUS_INVALID_ARG;

     if (fileName != NULL)

     {

+        const char* useMmap = properties_Get (mamaInternal_getProperties(),

+            FILE_PARSER_CONFIG_USE_MMAP);

+        fileParserType parserType = FILE_PARSER_TYPE_UNKNOWN;

+

         mama_log (MAMA_LOG_LEVEL_FINE,

                   "openFile: checking for file: %s", fileName);

-        mamaPlaybackFileParser_setSize (impl,fileName);

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-        if ((impl->myFileHandle = fopen (fileName, "rb")) == NULL)

+        /* If use_mmap is specified and set to false */

+        if (useMmap != NULL && 0 == strtobool (useMmap))

         {

-            status = MAMA_STATUS_PLATFORM;

+            mama_log (MAMA_LOG_LEVEL_FINE,

+                  "openFile: opening %s in file streaming mode", fileName);

+            parserType = FILE_PARSER_TYPE_FILE_STREAM;

         }

-#else

-        if ((impl->myFileDescriptor = (open (fileName,

-                                             /*O_RDWR*/O_RDONLY | O_NONBLOCK,

-                                             0))) >= 0)

+        else

         {

-            impl->myFiledata = mmap (NULL, impl->myFileSize,

-                                     PROT_READ /*| PROT_WRITE*/,

-                                     MAP_SHARED,

-                                     impl->myFileDescriptor,

-                                     0);

-            if (!impl->myFiledata)

-            {

-                mama_log(MAMA_LOG_LEVEL_NORMAL,

-                         "memory mapping failed?\n");

-                exit(1);

-            }

-            impl->myFilePointer= (char*)impl->myFiledata;

+            mama_log (MAMA_LOG_LEVEL_FINE,

+                  "openFile: opening %s in mmap mode", fileName);

+            parserType = FILE_PARSER_TYPE_MMAP;

         }

-        else

+

+        fileStatus = fileParser_create (impl->myFileReader,

+            parserType, fileName);

+        if (fileStatus != FILE_PARSER_STATUS_OK)

         {

-            status = MAMA_STATUS_PLATFORM;

+            mama_log (MAMA_LOG_LEVEL_ERROR,

+                  "openFile: Failed to create file reader for file: %s (%s)",

+                  fileName,

+                  fileParser_stringForStatus(fileStatus));

+            return MAMA_STATUS_PLATFORM;

         }

-#endif

+        impl->myFileSize = fileParser_getFileSize (impl->myFileReader);

     }

     else

     {

@@ -211,40 +200,10 @@ mamaPlaybackFileParser_closeFile (mamaPlaybackFileParser fileParser)

     {

         return MAMA_STATUS_NULL_ARG;

     }

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-    fclose (impl->myFileHandle);

-#else

-    munmap(impl->myFiledata, impl->myFileSize);

-#endif

-    return MAMA_STATUS_OK;

-}

-void

-mamaPlaybackFileParser_setSize (mamaPlaybackFileParser fileParser,

-                                char* fileName)

-{

-    /*

-      use stat system call to get the size of the given file.

-    */

-    struct stat fileInfo;

-    int result = 0;

-    mamaPlaybackFileParserImpl* impl =

-        (mamaPlaybackFileParserImpl*)fileParser;

-    if (impl == NULL)

-    {

-        mama_log (MAMA_LOG_LEVEL_FINE,

-                  "setSize : Null pointer exception !!!!!!\n!");

-        return;

-    }

-    result = stat(fileName, &fileInfo);

-    if (result)

-    {

-        mama_log (MAMA_LOG_LEVEL_NORMAL,

-                  "File - %s does not exist or access is denied.\n",

-                  fileName);

-        exit(1);

-    }

-    impl->myFileSize = fileInfo.st_size;

+    fileParser_closeFile (impl->myFileReader);

+

+    return MAMA_STATUS_OK;

}

 mama_bool_t

@@ -253,9 +212,10 @@ mamaPlaybackFileParser_getNextHeader (mamaPlaybackFileParser fileParser,

{

      mamaPlaybackFileParserImpl* impl =

          (mamaPlaybackFileParserImpl*)fileParser;

-     int   result = TRUE;

-     char  delim  = (char)DELIMETER;

-     int   i      = 0;

+     int   result           = TRUE;

+     char  delim            = (char)DELIMETER;

+     int   i                = 0;

+     uint64_t bytesCopied   = 0;

      if (impl == NULL)

      {

          mama_log (MAMA_LOG_LEVEL_FINE,

@@ -267,32 +227,20 @@ mamaPlaybackFileParser_getNextHeader (mamaPlaybackFileParser fileParser,

      /*clear contents*/

      memset (impl->myBlockHeader,0,HEADER_SIZE);

-     if (impl->myLastPos >= impl->myFileSize )

+     if (mamaPlaybackFileParser_isEndOfFile (impl))

      {

          mama_log (MAMA_LOG_LEVEL_FINEST,

                    "End-of-file");

          return FALSE;

      }

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-     /* With fseek etc, we're already at the last position so just read */

-     while ((impl->myBlockHeader[i] = fgetc (impl->myFileHandle)) != delim)

-     {

-         i++;

-     }

-#else

-     impl->myFilePointer = (char*)impl->myFiledata+

-          impl->myLastPos;

-     while (*impl->myFilePointer != delim)

-     {

-        impl->myBlockHeader[i]= *impl->myFilePointer;

-        ++impl->myFilePointer;

-        i++;

-     }

-#endif

+     fileParser_readFileToBufferUntilCharacter (impl->myFileReader,

+                            (void*) impl->myBlockHeader,

+                            HEADER_SIZE,

+                            delim,

+                            &bytesCopied);

-     impl->myLastPos += i;

-     impl->myBlockHeader[i++] = '\0';

+     impl->myBlockHeader[bytesCopied] = '\0';

      mamaPlaybackFileParser_saveMsgLength (impl,

                                            impl->myBlockHeader);

@@ -304,57 +252,34 @@ mama_bool_t

mamaPlaybackFileParser_getNextMsg (mamaPlaybackFileParser fileParser,

                                    mamaMsg* msg)

{

-              long i, counter, msgEnd ;

-

+    fileParserStatus fileStatus = FILE_PARSER_STATUS_OK;

+    uint64_t bytesCopied = 0;

     mamaPlaybackFileParserImpl* impl =

         (mamaPlaybackFileParserImpl*)fileParser;

-    i = counter = msgEnd = 0;

     if (impl->myMsgBuffer == NULL)

     {

-        impl->myLastMsgLen = impl->myMamaMsgLen;

+        impl->myMaxMsgLen = impl->myMamaMsgLen;

         impl->myMsgBuffer = (char*) calloc (impl->myMamaMsgLen

                                             , sizeof(char));

     }

     else

     {

-        if (impl->myLastMsgLen < impl->myMamaMsgLen)

+        if (impl->myMaxMsgLen < impl->myMamaMsgLen)

         {

             impl->myMsgBuffer = (char*)

                 realloc ((char*)impl->myMsgBuffer, impl->myMamaMsgLen);

+            impl->myMaxMsgLen = impl->myMamaMsgLen;

         }

-        impl->myLastMsgLen = impl->myMamaMsgLen;

     }

-    if (impl->myLastPos >= impl->myFileSize )

+    if (mamaPlaybackFileParser_isEndOfFile (impl))

         return -1; /*end of file*/

-    msgEnd = impl->myLastPos + impl->myMamaMsgLen;

-

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-    counter = (long) fread (impl->myMsgBuffer, 1, impl->myMamaMsgLen, impl->myFileHandle);

-    if (counter != impl->myMamaMsgLen)

-    {

-        mama_log (MAMA_LOG_LEVEL_ERROR,

-                  "mamaPlaybackFileParser_getNextMsg(): Bytes read (%lu) "

-                  "does not match bytes requested (%lu)",

-                  counter,

-                  impl->myMamaMsgLen);

-    }

-#else

-    impl->myFilePointer = (char*)impl->myFiledata+

-                           impl->myLastPos + 1 ;

-    /*moved filepointer past the GS delim*/

-

-    for (i = impl->myLastPos ; i < msgEnd ; i++)

-    {

-        impl->myMsgBuffer[counter] = *impl->myFilePointer;

-        ++impl->myFilePointer;

-        counter++;

-    }

-#endif

-    impl->myLastPos += impl->myMamaMsgLen + 1;

-     /* moved forward to  begining of next header*/

+    fileStatus = fileParser_readFileToBuffer (impl->myFileReader,

+                            (void*) impl->myMsgBuffer,

+                            impl->myMamaMsgLen,

+                            &bytesCopied);

     if (MAMA_STATUS_NOMEM ==

         mamaPlaybackFileParser_createOrSetMsg (impl))

@@ -377,7 +302,7 @@ mamaPlaybackFileParser_isEndOfFile (mamaPlaybackFileParser fileParser)

         return FALSE;

     impl = (mamaPlaybackFileParserImpl*)fileParser;

-    return impl->myLastPos >= impl->myFileSize;

+    return (mama_bool_t) fileParser_isEndOfFile (impl->myFileReader);

}

 mama_status

@@ -389,13 +314,7 @@ mamaPlaybackFileParser_rewindFile (mamaPlaybackFileParser fileParser)

         return MAMA_STATUS_NULL_ARG;

     impl = (mamaPlaybackFileParserImpl*)fileParser;

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-    /* Set the file position back to the first byte in the file */

-    fseek (impl->myFileHandle, 0, SEEK_SET);

-#else

-    impl->myFilePointer = (char*)impl->myFiledata;

-#endif

-    impl->myLastPos = 0;

+    fileParser_rewindFile (impl->myFileReader);

     return MAMA_STATUS_OK;

}

@@ -409,7 +328,6 @@ void mamaPlaybackFileParser_saveMsgLength (mamaPlaybackFileParser

     char  delim       = (char)':';

     char* temp        = NULL;

     int   nPos;

-    char  buffer      [HEADER_SIZE];

     int   msgLength   = 0;

     temp = strrchr (msgHeader, delim);

@@ -419,12 +337,15 @@ void mamaPlaybackFileParser_saveMsgLength (mamaPlaybackFileParser

                   "delimeter [:] not found in header\n");

         return;

     }

-    nPos = temp - msgHeader;

-    memset(buffer, 0,HEADER_SIZE);

-    rightCopy (msgHeader, buffer, nPos+1);

-    if (isNumeric (buffer))

+    else

+    {

+        /* Move the pointer past the delimiter */

+        temp++;

+    }

+

+    if (isNumeric (temp))

     {

-        msgLength  = atol (buffer);

+        msgLength  = atol (temp);

     }

     impl->myMamaMsgLen = msgLength;

}

diff --git a/mama/c_cpp/src/c/playback/playbackFileParser.h b/mama/c_cpp/src/c/playback/playbackFileParser.h

index 46fe2f1..d03e8e0 100644

--- a/mama/c_cpp/src/c/playback/playbackFileParser.h

+++ b/mama/c_cpp/src/c/playback/playbackFileParser.h

@@ -28,6 +28,7 @@ extern "C" {

#endif

 #include "wombat/port.h"

+#include "wombat/fileparser.h"

 #include <stdlib.h>

#include <fcntl.h>

@@ -47,15 +48,11 @@ typedef struct mamaPlaybackFileParser_

{

     char            myBlockHeader[HEADER_SIZE];

     mamaMsg         myMamaMsg;

-    int             myFileDescriptor;

-    FILE*           myFileHandle;

-    void*           myFiledata;

-    off_t           myFileSize;

-    char*           myFilePointer;

-    mama_u64_t      myLastPos;

+    mama_u64_t      myFileSize;

     mama_size_t     myMamaMsgLen;

-    mama_size_t     myLastMsgLen;

+    mama_size_t     myMaxMsgLen;

     char*           myMsgBuffer;

+    fileParser      myFileReader;

}mamaPlaybackFileParserImpl;

 /*************************************************************************

--

1.8.4.msysgit.0

 


This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of IntercontinentalExchange Group, Inc. (ICE), NYSE Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.


Damian Maguire <DMaguire@...>
 

Bugzilla ticket raised for tracking this one: http://bugs.openmama.org/show_bug.cgi?id=80

Cheers, 

D

From: Frank Quinn <fquinn@...>
Date: Wednesday, March 12, 2014 1:48 PM
To: "openmama-dev@..." <openmama-dev@...>
Subject: [Openmama-dev] [PATCH 2/2] MAMA: Moved file parsing to external module for playbacks

This change moves file parsing capabilities to an external module. This

makes the code more readable and modular as well as allowing us to remove

the file streams macro which was temporarily added to allow us to run the

playback tools on windows.

 

The rightCopy function was also removed as a copy was not actually

required, and mamaPlaybackFileParser_setSize was also removed in favour of

the new fileParser functions.

 

There is also a new configuration parameter to decide whether to use MMAP

or file streams. Mmap is now the default for all systems, but some users

may prefer to use file streams instead due to its reduced memory

footprint. To enable file streams, the following configuration parameter

may be used:

 

mama.playbackfileparser.use_mmap=false (default = true)

 

This was tested on both Linux and Windows and included testing on

windows and linux for both mmap and file stream modes using

captureconvertc and capturereplayc with a mamalistenc client.

 

Signed-off-by: Frank Quinn <fquinn@...>

---

mama/c_cpp/src/c/playback/playbackFileParser.c | 233 ++++++++-----------------

mama/c_cpp/src/c/playback/playbackFileParser.h |  11 +-

2 files changed, 81 insertions(+), 163 deletions(-)

 

diff --git a/mama/c_cpp/src/c/playback/playbackFileParser.c b/mama/c_cpp/src/c/playback/playbackFileParser.c

index 64fb465..c8f1717 100644

--- a/mama/c_cpp/src/c/playback/playbackFileParser.c

+++ b/mama/c_cpp/src/c/playback/playbackFileParser.c

@@ -20,13 +20,14 @@

  */

 #include "playbackFileParser.h"

+#include "property.h"

-#ifdef WIN32

-#define MAMA_PLAYBACK_USE_FILE_STREAMS 1

-#endif

+#include <wombat/strutils.h>

+#include <mama/mama.h>

+#include <mamainternal.h>

+

+#define     FILE_PARSER_CONFIG_USE_MMAP     "mama.playbackfileparser.use_mmap"

-static void

-rightCopy (char* input, char* output, int start);

static

mama_bool_t isNumeric( const char* str );

static mama_status

@@ -40,32 +41,10 @@ mamaPlaybackFileParser_saveMsgLength (mamaPlaybackFileParser fileParser,

static

mama_status mamaPlaybackFileParser_init (mamaPlaybackFileParser  fileParser);

-/**

- * Internally sets the  size of playback file to be parsed.

- * @param playbackFileParser object

- * @param filename - this could be either a relative or absolute,

- * filename with path.

- * @return mama_status

- */

-static void

-mamaPlaybackFileParser_setSize (mamaPlaybackFileParser fileParser,

-                                char* fileName);

 /****************************************************************************

Utility functions

****************************************************************************/

-static void rightCopy (char* input, char* output, int start)

-{

-    int index  = 0;

-    int i      = 0;

-    int length = strlen (input);

-    for (i = start; i<length; i++)

-    {

-        output[index]=input[i];

-        index++;

-    }

-    output[index] =0;

-}

 mama_bool_t isNumeric( const char* str )

{

@@ -97,11 +76,8 @@ mama_status mamaPlaybackFileParser_init (mamaPlaybackFileParser

     {

         return MAMA_STATUS_NULL_ARG;

     }

-    impl->myLastPos     = 0;

-    impl->myFiledata    = NULL;

-    impl->myFilePointer = NULL;

     impl->myMamaMsgLen  = 0;

-    impl->myLastMsgLen  = 0;

+    impl->myMaxMsgLen   = 0;

     impl->myMsgBuffer   = NULL;

     return MAMA_STATUS_OK;

}

@@ -110,11 +86,18 @@ mama_status

mamaPlaybackFileParser_allocate (mamaPlaybackFileParser* fileParser)

{

     mama_status status = MAMA_STATUS_OK;

+    fileParserStatus fileStatus = FILE_PARSER_STATUS_OK;

     mamaPlaybackFileParserImpl* impl =

         (mamaPlaybackFileParserImpl*)

         calloc (1, sizeof(mamaPlaybackFileParserImpl));

     if (impl == NULL) return MAMA_STATUS_NOMEM;

+

+    fileStatus = fileParser_allocate (&impl->myFileReader);

+    if (FILE_PARSER_STATUS_OK != fileStatus)

+    {

+        return MAMA_STATUS_NOMEM;

+    }

               mamaMsg_create(&impl->myMamaMsg);

     mamaPlaybackFileParser_init (impl);

@@ -146,6 +129,10 @@ mamaPlaybackFileParser_deallocate (mamaPlaybackFileParser fileParser)

             exit (status);

         }

     }

+    if (impl->myFileReader != NULL)

+    {

+        fileParser_destroy (impl->myFileReader);

+    }

     free (impl);

     impl = NULL;

     return MAMA_STATUS_OK;

@@ -156,45 +143,47 @@ mamaPlaybackFileParser_openFile (mamaPlaybackFileParser fileParser,

                                  char* fileName)

{

     mama_status status = MAMA_STATUS_OK;

+    fileParserStatus fileStatus = FILE_PARSER_STATUS_OK;

    

     mamaPlaybackFileParserImpl* impl =

         (mamaPlaybackFileParserImpl*)fileParser;

     if (impl == NULL)  return MAMA_STATUS_NULL_ARG;

+    if (impl->myFileReader == NULL) return MAMA_STATUS_INVALID_ARG;

     if (fileName != NULL)

     {

+        const char* useMmap = properties_Get (mamaInternal_getProperties(),

+            FILE_PARSER_CONFIG_USE_MMAP);

+        fileParserType parserType = FILE_PARSER_TYPE_UNKNOWN;

+

         mama_log (MAMA_LOG_LEVEL_FINE,

                   "openFile: checking for file: %s", fileName);

-        mamaPlaybackFileParser_setSize (impl,fileName);

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-        if ((impl->myFileHandle = fopen (fileName, "rb")) == NULL)

+        /* If use_mmap is specified and set to false */

+        if (useMmap != NULL && 0 == strtobool (useMmap))

         {

-            status = MAMA_STATUS_PLATFORM;

+            mama_log (MAMA_LOG_LEVEL_FINE,

+                  "openFile: opening %s in file streaming mode", fileName);

+            parserType = FILE_PARSER_TYPE_FILE_STREAM;

         }

-#else

-        if ((impl->myFileDescriptor = (open (fileName,

-                                             /*O_RDWR*/O_RDONLY | O_NONBLOCK,

-                                             0))) >= 0)

+        else

         {

-            impl->myFiledata = mmap (NULL, impl->myFileSize,

-                                     PROT_READ /*| PROT_WRITE*/,

-                                     MAP_SHARED,

-                                     impl->myFileDescriptor,

-                                     0);

-            if (!impl->myFiledata)

-            {

-                mama_log(MAMA_LOG_LEVEL_NORMAL,

-                         "memory mapping failed?\n");

-                exit(1);

-            }

-            impl->myFilePointer= (char*)impl->myFiledata;

+            mama_log (MAMA_LOG_LEVEL_FINE,

+                  "openFile: opening %s in mmap mode", fileName);

+            parserType = FILE_PARSER_TYPE_MMAP;

         }

-        else

+

+        fileStatus = fileParser_create (impl->myFileReader,

+            parserType, fileName);

+        if (fileStatus != FILE_PARSER_STATUS_OK)

         {

-            status = MAMA_STATUS_PLATFORM;

+            mama_log (MAMA_LOG_LEVEL_ERROR,

+                  "openFile: Failed to create file reader for file: %s (%s)",

+                  fileName,

+                  fileParser_stringForStatus(fileStatus));

+            return MAMA_STATUS_PLATFORM;

         }

-#endif

+        impl->myFileSize = fileParser_getFileSize (impl->myFileReader);

     }

     else

     {

@@ -211,40 +200,10 @@ mamaPlaybackFileParser_closeFile (mamaPlaybackFileParser fileParser)

     {

         return MAMA_STATUS_NULL_ARG;

     }

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-    fclose (impl->myFileHandle);

-#else

-    munmap(impl->myFiledata, impl->myFileSize);

-#endif

-    return MAMA_STATUS_OK;

-}

-void

-mamaPlaybackFileParser_setSize (mamaPlaybackFileParser fileParser,

-                                char* fileName)

-{

-    /*

-      use stat system call to get the size of the given file.

-    */

-    struct stat fileInfo;

-    int result = 0;

-    mamaPlaybackFileParserImpl* impl =

-        (mamaPlaybackFileParserImpl*)fileParser;

-    if (impl == NULL)

-    {

-        mama_log (MAMA_LOG_LEVEL_FINE,

-                  "setSize : Null pointer exception !!!!!!\n!");

-        return;

-    }

-    result = stat(fileName, &fileInfo);

-    if (result)

-    {

-        mama_log (MAMA_LOG_LEVEL_NORMAL,

-                  "File - %s does not exist or access is denied.\n",

-                  fileName);

-        exit(1);

-    }

-    impl->myFileSize = fileInfo.st_size;

+    fileParser_closeFile (impl->myFileReader);

+

+    return MAMA_STATUS_OK;

}

 mama_bool_t

@@ -253,9 +212,10 @@ mamaPlaybackFileParser_getNextHeader (mamaPlaybackFileParser fileParser,

{

      mamaPlaybackFileParserImpl* impl =

          (mamaPlaybackFileParserImpl*)fileParser;

-     int   result = TRUE;

-     char  delim  = (char)DELIMETER;

-     int   i      = 0;

+     int   result           = TRUE;

+     char  delim            = (char)DELIMETER;

+     int   i                = 0;

+     uint64_t bytesCopied   = 0;

      if (impl == NULL)

      {

          mama_log (MAMA_LOG_LEVEL_FINE,

@@ -267,32 +227,20 @@ mamaPlaybackFileParser_getNextHeader (mamaPlaybackFileParser fileParser,

      /*clear contents*/

      memset (impl->myBlockHeader,0,HEADER_SIZE);

-     if (impl->myLastPos >= impl->myFileSize )

+     if (mamaPlaybackFileParser_isEndOfFile (impl))

      {

          mama_log (MAMA_LOG_LEVEL_FINEST,

                    "End-of-file");

          return FALSE;

      }

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-     /* With fseek etc, we're already at the last position so just read */

-     while ((impl->myBlockHeader[i] = fgetc (impl->myFileHandle)) != delim)

-     {

-         i++;

-     }

-#else

-     impl->myFilePointer = (char*)impl->myFiledata+

-          impl->myLastPos;

-     while (*impl->myFilePointer != delim)

-     {

-        impl->myBlockHeader[i]= *impl->myFilePointer;

-        ++impl->myFilePointer;

-        i++;

-     }

-#endif

+     fileParser_readFileToBufferUntilCharacter (impl->myFileReader,

+                            (void*) impl->myBlockHeader,

+                            HEADER_SIZE,

+                            delim,

+                            &bytesCopied);

-     impl->myLastPos += i;

-     impl->myBlockHeader[i++] = '\0';

+     impl->myBlockHeader[bytesCopied] = '\0';

      mamaPlaybackFileParser_saveMsgLength (impl,

                                            impl->myBlockHeader);

@@ -304,57 +252,34 @@ mama_bool_t

mamaPlaybackFileParser_getNextMsg (mamaPlaybackFileParser fileParser,

                                    mamaMsg* msg)

{

-              long i, counter, msgEnd ;

-

+    fileParserStatus fileStatus = FILE_PARSER_STATUS_OK;

+    uint64_t bytesCopied = 0;

     mamaPlaybackFileParserImpl* impl =

         (mamaPlaybackFileParserImpl*)fileParser;

-    i = counter = msgEnd = 0;

     if (impl->myMsgBuffer == NULL)

     {

-        impl->myLastMsgLen = impl->myMamaMsgLen;

+        impl->myMaxMsgLen = impl->myMamaMsgLen;

         impl->myMsgBuffer = (char*) calloc (impl->myMamaMsgLen

                                             , sizeof(char));

     }

     else

     {

-        if (impl->myLastMsgLen < impl->myMamaMsgLen)

+        if (impl->myMaxMsgLen < impl->myMamaMsgLen)

         {

             impl->myMsgBuffer = (char*)

                 realloc ((char*)impl->myMsgBuffer, impl->myMamaMsgLen);

+            impl->myMaxMsgLen = impl->myMamaMsgLen;

         }

-        impl->myLastMsgLen = impl->myMamaMsgLen;

     }

-    if (impl->myLastPos >= impl->myFileSize )

+    if (mamaPlaybackFileParser_isEndOfFile (impl))

         return -1; /*end of file*/

-    msgEnd = impl->myLastPos + impl->myMamaMsgLen;

-

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-    counter = (long) fread (impl->myMsgBuffer, 1, impl->myMamaMsgLen, impl->myFileHandle);

-    if (counter != impl->myMamaMsgLen)

-    {

-        mama_log (MAMA_LOG_LEVEL_ERROR,

-                  "mamaPlaybackFileParser_getNextMsg(): Bytes read (%lu) "

-                  "does not match bytes requested (%lu)",

-                  counter,

-                  impl->myMamaMsgLen);

-    }

-#else

-    impl->myFilePointer = (char*)impl->myFiledata+

-                           impl->myLastPos + 1 ;

-    /*moved filepointer past the GS delim*/

-

-    for (i = impl->myLastPos ; i < msgEnd ; i++)

-    {

-        impl->myMsgBuffer[counter] = *impl->myFilePointer;

-        ++impl->myFilePointer;

-        counter++;

-    }

-#endif

-    impl->myLastPos += impl->myMamaMsgLen + 1;

-     /* moved forward to  begining of next header*/

+    fileStatus = fileParser_readFileToBuffer (impl->myFileReader,

+                            (void*) impl->myMsgBuffer,

+                            impl->myMamaMsgLen,

+                            &bytesCopied);

     if (MAMA_STATUS_NOMEM ==

         mamaPlaybackFileParser_createOrSetMsg (impl))

@@ -377,7 +302,7 @@ mamaPlaybackFileParser_isEndOfFile (mamaPlaybackFileParser fileParser)

         return FALSE;

     impl = (mamaPlaybackFileParserImpl*)fileParser;

-    return impl->myLastPos >= impl->myFileSize;

+    return (mama_bool_t) fileParser_isEndOfFile (impl->myFileReader);

}

 mama_status

@@ -389,13 +314,7 @@ mamaPlaybackFileParser_rewindFile (mamaPlaybackFileParser fileParser)

         return MAMA_STATUS_NULL_ARG;

     impl = (mamaPlaybackFileParserImpl*)fileParser;

-#ifdef MAMA_PLAYBACK_USE_FILE_STREAMS

-    /* Set the file position back to the first byte in the file */

-    fseek (impl->myFileHandle, 0, SEEK_SET);

-#else

-    impl->myFilePointer = (char*)impl->myFiledata;

-#endif

-    impl->myLastPos = 0;

+    fileParser_rewindFile (impl->myFileReader);

     return MAMA_STATUS_OK;

}

@@ -409,7 +328,6 @@ void mamaPlaybackFileParser_saveMsgLength (mamaPlaybackFileParser

     char  delim       = (char)':';

     char* temp        = NULL;

     int   nPos;

-    char  buffer      [HEADER_SIZE];

     int   msgLength   = 0;

     temp = strrchr (msgHeader, delim);

@@ -419,12 +337,15 @@ void mamaPlaybackFileParser_saveMsgLength (mamaPlaybackFileParser

                   "delimeter [:] not found in header\n");

         return;

     }

-    nPos = temp - msgHeader;

-    memset(buffer, 0,HEADER_SIZE);

-    rightCopy (msgHeader, buffer, nPos+1);

-    if (isNumeric (buffer))

+    else

+    {

+        /* Move the pointer past the delimiter */

+        temp++;

+    }

+

+    if (isNumeric (temp))

     {

-        msgLength  = atol (buffer);

+        msgLength  = atol (temp);

     }

     impl->myMamaMsgLen = msgLength;

}

diff --git a/mama/c_cpp/src/c/playback/playbackFileParser.h b/mama/c_cpp/src/c/playback/playbackFileParser.h

index 46fe2f1..d03e8e0 100644

--- a/mama/c_cpp/src/c/playback/playbackFileParser.h

+++ b/mama/c_cpp/src/c/playback/playbackFileParser.h

@@ -28,6 +28,7 @@ extern "C" {

#endif

 #include "wombat/port.h"

+#include "wombat/fileparser.h"

 #include <stdlib.h>

#include <fcntl.h>

@@ -47,15 +48,11 @@ typedef struct mamaPlaybackFileParser_

{

     char            myBlockHeader[HEADER_SIZE];

     mamaMsg         myMamaMsg;

-    int             myFileDescriptor;

-    FILE*           myFileHandle;

-    void*           myFiledata;

-    off_t           myFileSize;

-    char*           myFilePointer;

-    mama_u64_t      myLastPos;

+    mama_u64_t      myFileSize;

     mama_size_t     myMamaMsgLen;

-    mama_size_t     myLastMsgLen;

+    mama_size_t     myMaxMsgLen;

     char*           myMsgBuffer;

+    fileParser      myFileReader;

}mamaPlaybackFileParserImpl;

 /*************************************************************************

--

1.8.4.msysgit.0

 


This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of IntercontinentalExchange Group, Inc. (ICE), NYSE Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of IntercontinentalExchange Group, Inc. (ICE), NYSE Euronext or any of their subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.