[PATCH 01/50] Ownership of message buffers


Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@...>

The msg payload bridges now have a msgPayloadSetParent() call which is
used to set the parent mamaMsg object for a payload. This allows the
"ownership" of the mamaMsg to be interrogated from a payload bridge
i.e. if a message can be modified or not. The use of the message owner
term has also been normalised.

Signed-off-by: Glenn McClements <gmcclements@...>
Signed-off-by: Mike Schonberg <mschonberg@...>
---
common/c_cpp/src/c/wombat/wConfig.h | 2 +-
mama/c_cpp/src/c/msg.c | 106 +++++++++++++++---------
mama/c_cpp/src/c/msgfield.c | 6 +-
mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h | 1 +
mama/c_cpp/src/c/payload/avismsg/avispayload.c | 12 +++
mama/c_cpp/src/c/payload/avismsg/avispayload.h | 3 +
mama/c_cpp/src/c/payloadbridge.h | 12 ++-
7 files changed, 94 insertions(+), 48 deletions(-)

diff --git a/common/c_cpp/src/c/wombat/wConfig.h b/common/c_cpp/src/c/wombat/wConfig.h
index 3414bff..e7aeede 120000
--- a/common/c_cpp/src/c/wombat/wConfig.h
+++ b/common/c_cpp/src/c/wombat/wConfig.h
@@ -1 +1 @@
-../../.././src/c/linux/wConfig.h
\ No newline at end of file
+../../../src/c/linux/wConfig.h
\ No newline at end of file
diff --git a/mama/c_cpp/src/c/msg.c b/mama/c_cpp/src/c/msg.c
index ca17bf0..af51d25 100644
--- a/mama/c_cpp/src/c/msg.c
+++ b/mama/c_cpp/src/c/msg.c
@@ -92,7 +92,7 @@ typedef struct mamaMsgImpl_
msgBridge mBridgeMessage;
/*If we have detached the middleware message we will own it
and are responsible for destroying it*/
- int mMiddlewareMessageOwner;
+ int mMessageOwner;

/*The context if this is a msg from the dqStrategy cache*/
mamaDqContext* mDqStrategyContext;
@@ -121,7 +121,7 @@ mamaMsg_destroy (mamaMsg msg)
mamaMsgImpl_destroyLastVectorMsg (impl);
}

- if (impl->mPayloadBridge && impl->mMiddlewareMessageOwner)
+ if (impl->mPayloadBridge && impl->mMessageOwner)
{
if (impl->mFieldPayload)
{
@@ -138,9 +138,9 @@ mamaMsg_destroy (mamaMsg msg)

impl->mPayloadBridge = NULL;

- /*set mMiddlewareMessageOwner to zero now the payload has been destroyed to prevent
+ /*set mMessageOwner to zero now the payload has been destroyed to prevent
us destroying the underlying message again in the bridge specific function*/
- impl->mMiddlewareMessageOwner = 0;
+ impl->mMessageOwner = 0;
}

if (impl->mNestedMessages != NULL)
@@ -159,7 +159,7 @@ mamaMsg_destroy (mamaMsg msg)
{
/*Invoke the bridge specific destroy function*/
impl->mBridgeImpl->bridgeMamaMsgDestroy (
- impl->mBridgeMessage, impl->mMiddlewareMessageOwner);
+ impl->mBridgeMessage, impl->mMessageOwner);
}
else
{
@@ -228,8 +228,10 @@ mamaMsg_detach (mamaMsg msg)
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
mama_status status = MAMA_STATUS_OK;
- if (!impl) return MAMA_STATUS_NULL_ARG;
- if (!impl->mQueue) return MAMA_STATUS_INVALID_QUEUE;
+ msgPayload payload = NULL;
+
+ if (!impl) return MAMA_STATUS_NULL_ARG;
+ if (!impl->mQueue) return MAMA_STATUS_INVALID_QUEUE;
if (!impl->mBridgeImpl) return MAMA_STATUS_NO_BRIDGE_IMPL;

if (MAMA_STATUS_OK!=(status=mamaQueueImpl_detachMsg (impl->mQueue, msg)))
@@ -237,8 +239,8 @@ mamaMsg_detach (mamaMsg msg)
/*Message already logged in mamaQueueImpl_detachMsg*/
return status;
}
-
- /*We also need to detach the bridge specific message*/
+
+ /*We also need to detach the middleware bridge specific message*/
if (MAMA_STATUS_OK!=(status=impl->mBridgeImpl->bridgeMamaMsgDetach
(impl->mBridgeMessage)))
{
@@ -246,10 +248,24 @@ mamaMsg_detach (mamaMsg msg)
"Could not detach bridge message.");
return status;
}
+ /* Copy the payload */
+ if (MAMA_STATUS_OK != (status =
+ (msg->mPayloadBridge->msgPayloadCopy (impl->mPayload,
+ &payload))))
+ {
+ mama_log(MAMA_LOG_LEVEL_ERROR,
+ "mamaMsg_detach() Failed. "
+ "Could not copy native payload [%d]", status);
+ return status;
+ }

+ msg->mPayload = payload;
+ msg->mPayloadBridge->msgPayloadSetParent (impl->mPayload, msg);
+
/*If this is a dqStrategy cache message*/
if (impl->mDqStrategyContext)
- {
+ {
+
if (MAMA_STATUS_OK!=(status=dqStrategyImpl_detachMsg (impl->mDqStrategyContext, msg)))
{
mama_log (MAMA_LOG_LEVEL_ERROR, "mamaMsg_detach(): "
@@ -259,7 +275,7 @@ mamaMsg_detach (mamaMsg msg)
}

/*We are now responsible for destroying the middleware message*/
- impl->mMiddlewareMessageOwner = 1;
+ impl->mMessageOwner = 1;

return MAMA_STATUS_OK;
}
@@ -383,7 +399,7 @@ mamaMsgImpl_setPayloadBridge (mamaMsg msg, mamaPayloadBridgeImpl* payloadBridge)
}

mama_status
-mamaMsgImpl_setPayload (mamaMsg msg, msgPayload payload, short ownsMsg)
+mamaMsgImpl_setPayload (mamaMsg msg, msgPayload payload, short owner)
{
mamaMsgImpl* impl = (mamaMsgImpl*)msg;
if (!impl) return MAMA_STATUS_NULL_ARG;
@@ -400,8 +416,10 @@ mamaMsgImpl_setPayload (mamaMsg msg, msgPayload payload, short ownsMsg)
/* Do not destroy the list. We can reuse the memory! */
}

- impl->mPayload = payload;
- impl->mMiddlewareMessageOwner = ownsMsg;
+ impl->mPayload = payload;
+ impl->mMessageOwner = owner;
+ impl->mPayloadBridge->msgPayloadSetParent (impl->mPayload, msg);
+
return MAMA_STATUS_OK;
}

@@ -434,7 +452,7 @@ mamaMsg_createForPayloadBridge (mamaMsg* msg, mamaPayloadBridge payloadBridge)
return mamaMsgImpl_createForPayload (msg,
payload,
payloadBridge,
- 0);
+ 1);
}


@@ -469,7 +487,7 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
/*If there is an existing message buffer which we own destroy it*/
if (impl->mBridgeImpl &&
impl->mBridgeMessage &&
- impl->mMiddlewareMessageOwner)
+ impl->mMessageOwner)
{
if (MAMA_STATUS_OK!=
impl->mBridgeImpl->bridgeMamaMsgDestroyMiddlewareMsg (
@@ -481,8 +499,7 @@ mamaMsgImpl_setMsgBuffer(mamaMsg msg,
}
impl->mBridgeMessage = NULL;
}
-
- impl->mMiddlewareMessageOwner = 0;
+ impl->mMessageOwner = 0;

if (id == '\0')
id = (char) ((const char*)data) [0];
@@ -550,6 +567,7 @@ mamaMsgImpl_createNestedForPayload (mamaMsg* result,
impl->mPayloadBridge = parent->mPayloadBridge;
impl->mPayload = payload;
impl->mParent = parent;
+ impl->mPayloadBridge->msgPayloadSetParent (impl->mPayload, impl);

return MAMA_STATUS_OK;
}
@@ -562,20 +580,20 @@ mamaMsgImpl_setMessageOwner (mamaMsg msg,
mama_status status = MAMA_STATUS_OK;

mamaMsgImpl* impl = (mamaMsgImpl*)msg;
- impl->mMiddlewareMessageOwner = owner;
+ impl->mMessageOwner = owner;

return status;
}

mama_status
mamaMsgImpl_getMessageOwner (mamaMsg msg,
- short* owner)
+ short* owner)
{
mama_status status = MAMA_STATUS_OK;

mamaMsgImpl* impl = (mamaMsgImpl*)msg;
- *owner = impl->mMiddlewareMessageOwner;
-
+ *owner = impl->mMessageOwner;
+
return status;
}

@@ -583,7 +601,7 @@ mama_status
mamaMsgImpl_createForPayload (mamaMsg* msg,
msgPayload payload,
mamaPayloadBridgeImpl* payloadBridge,
- short noDeletePayload)
+ short owner)
{
mama_status status = MAMA_STATUS_OK;
mamaMsgImpl* impl = NULL;
@@ -609,14 +627,20 @@ mamaMsgImpl_createForPayload (mamaMsg* msg,
}
*msg = (mamaMsg)impl;

- impl->mPayload = payload;
- impl->mPayloadBridge = payloadBridge;
- impl->mFieldPayload = NULL;
+ impl->mPayload = payload;
+ impl->mPayloadBridge = payloadBridge;
+ impl->mFieldPayload = NULL;
/*These will be set later if necessary*/
- impl->mBridgeImpl = NULL;
- impl->mBridgeMessage = NULL;
- impl->mMiddlewareMessageOwner = noDeletePayload == 1 ? 0 : 1;
- impl->mDqStrategyContext = NULL;
+ impl->mBridgeImpl = NULL;
+ impl->mBridgeMessage = NULL;
+ impl->mMessageOwner = owner;
+ impl->mDqStrategyContext = NULL;
+
+ /* The payloadBridge and payload are optional for this function */
+ if (payloadBridge)
+ {
+ impl->mPayloadBridge->msgPayloadSetParent (impl->mPayload, impl);
+ }

return status;
}
@@ -650,7 +674,7 @@ mamaMsg_copy (mamaMsg src, mamaMsg* copy)
if (*copy == NULL)
{
if (MAMA_STATUS_OK != (status =
- mamaMsgImpl_createForPayload (copy, payload, source->mPayloadBridge, 0)))
+ mamaMsgImpl_createForPayload (copy, payload, source->mPayloadBridge, 1)))
{
mama_log(MAMA_LOG_LEVEL_ERROR,
"mamaMsg_copy Failed. "
@@ -695,7 +719,7 @@ mamaMsg_create (mamaMsg* msg)
return mamaMsgImpl_createForPayload (msg,
payload,
bridge,
- 0);
+ 1);
}

mama_status
@@ -2958,7 +2982,7 @@ mamaMsg_getVectorMsg (
impl->mPayloadBridge);
/*
* We do not detach the middle ware message so we do
- * not own it. NOTE Inverse sense to the create call below
+ * not own it.
*/
mamaMsgImpl_setPayload (impl->mLastVectorPayloadMsg[i],
payloadVector[i],
@@ -2969,12 +2993,12 @@ mamaMsg_getVectorMsg (
{
/*
* We create from payload so payload owns
- * We set noDelete to true to not delete the payload
+ * We set owner to false to not delete the payload
*/
mamaMsgImpl_createForPayload (&(impl->mLastVectorPayloadMsg[i]),
payloadVector[i],
impl->mPayloadBridge,
- 1);
+ 0);
}

/*Store the maxumim vector we have encountered*/
@@ -3052,9 +3076,9 @@ mamaMsgImpl_getPayloadBuffer (
if (impl->mPayloadBridge)
{
return
- impl->mPayloadBridge->msgPayloadGetByteBuffer (impl->mPayload,
- buffer,
- bufferLength);
+ impl->mPayloadBridge->msgPayloadGetByteBuffer (impl->mPayload,
+ buffer,
+ bufferLength);
}
return MAMA_STATUS_NULL_ARG;
}
@@ -3072,9 +3096,9 @@ mamaMsg_setNewBuffer (mamaMsg msg, void* buffer,
if (impl->mPayloadBridge)
{
return
- impl->mPayloadBridge->msgPayloadUnSerialize (impl->mPayload,
- buffer,
- size);
+ impl->mPayloadBridge->msgPayloadUnSerialize (impl->mPayload,
+ buffer,
+ size);
}

return MAMA_STATUS_NULL_ARG;
diff --git a/mama/c_cpp/src/c/msgfield.c b/mama/c_cpp/src/c/msgfield.c
index 747f983..ac5fee3 100644
--- a/mama/c_cpp/src/c/msgfield.c
+++ b/mama/c_cpp/src/c/msgfield.c
@@ -755,7 +755,7 @@ mamaMsgField_getVectorMsg (
impl->myPayloadBridge);
/*
* We do not detach the middleware message so we do
- * not own it. NOTE Inverse sense to the create call below
+ * not own it.
*/
mamaMsgImpl_setPayload (impl->myLastVectorPayloadMsg[i],
payloadVector[i],
@@ -766,12 +766,12 @@ mamaMsgField_getVectorMsg (
{
/*
* We create from payload so payload owns
- * We set noDelete to true to not delete the payload
+ * We set owner to false to not delete the payload
*/
mamaMsgImpl_createForPayload (&(impl->myLastVectorPayloadMsg[i]),
payloadVector[i],
impl->myPayloadBridge,
- 1);
+ 0);
}

/*Store the maxumim vector we have encountered*/
diff --git a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
index c68c28e..ade9b0b 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
+++ b/mama/c_cpp/src/c/payload/avismsg/avismsgimpl.h
@@ -33,6 +33,7 @@ typedef struct avisPayload
{
Attributes* mAvisMsg;
struct avisFieldPayload* mAvisField;
+ mamaMsg mParent;

void * mBuffer;
uint16_t mBufferLen;
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.c b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
index e0f9df6..28cca7a 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.c
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.c
@@ -205,6 +205,18 @@ avismsgPayload_destroy (msgPayload msg)
}

mama_status
+avisPayload_setParent (msgPayload msg,
+ const mamaMsg parent)
+{
+ avisPayloadImpl* impl = (avisPayloadImpl*) msg;
+ if (!impl) return MAMA_STATUS_NULL_ARG;
+
+ impl->mParent = parent;
+
+ return MAMA_STATUS_OK;
+}
+
+mama_status
avismsgPayload_getByteSize (const msgPayload msg,
mama_size_t* size)
{
diff --git a/mama/c_cpp/src/c/payload/avismsg/avispayload.h b/mama/c_cpp/src/c/payload/avismsg/avispayload.h
index 4bb7f37..a2c1388 100755
--- a/mama/c_cpp/src/c/payload/avismsg/avispayload.h
+++ b/mama/c_cpp/src/c/payload/avismsg/avispayload.h
@@ -54,6 +54,9 @@ avismsgPayload_clear (msgPayload msg);
extern mama_status
avismsgPayload_destroy (msgPayload msg);
extern mama_status
+avismsgPayload_setParent (msgPayload msg,
+ const mamaMsg parent);
+extern mama_status
avismsgPayload_getByteSize (const msgPayload msg,
mama_size_t* size);

diff --git a/mama/c_cpp/src/c/payloadbridge.h b/mama/c_cpp/src/c/payloadbridge.h
index 97396f9..6d851ae 100644
--- a/mama/c_cpp/src/c/payloadbridge.h
+++ b/mama/c_cpp/src/c/payloadbridge.h
@@ -55,6 +55,8 @@ do \
= identifier ## Payload_clear; \
msgPayloadImpl->msgPayloadDestroy \
= identifier ## Payload_destroy; \
+ msgPayloadImpl->msgPayloadSetParent \
+ = identifier ## Payload_setParent; \
msgPayloadImpl->msgPayloadGetByteSize \
= identifier ## Payload_getByteSize; \
msgPayloadImpl->msgPayloadGetNumFields \
@@ -437,6 +439,9 @@ typedef mama_status
typedef mama_status
(*msgPayload_destroy) (msgPayload msg);
typedef mama_status
+(*msgPayload_setParent) (msgPayload msg,
+ const mamaMsg parent);
+typedef mama_status
(*msgPayload_getByteSize) (msgPayload msg,
mama_size_t* size);
typedef mama_status
@@ -456,14 +461,14 @@ typedef mama_status
void* closure);

typedef mama_status
-(*msgPayload_serialize) (const msgPayload msg,
+(*msgPayload_serialize) (const msgPayload msg,
const void** buffer,
mama_size_t* bufferLength);

typedef mama_status
-(*msgPayload_unSerialize) (const msgPayload msg,
+(*msgPayload_unSerialize) (const msgPayload msg,
const void** buffer,
- mama_size_t bufferLength);
+ mama_size_t bufferLength);

typedef mama_status
(*msgPayload_getByteBuffer) (const msgPayload msg,
@@ -1303,6 +1308,7 @@ typedef struct mamaPayloadBridgeImpl_
msgPayload_copy msgPayloadCopy;
msgPayload_clear msgPayloadClear;
msgPayload_destroy msgPayloadDestroy;
+ msgPayload_setParent msgPayloadSetParent;
msgPayload_getByteSize msgPayloadGetByteSize;
msgPayload_getNumFields msgPayloadGetNumFields;
msgPayload_getSendSubject msgPayloadGetSendSubject;
--
1.7.7.6