[PATCH 18/50] [mama] Subscription: don't put invalid actions on the throttle queue


Michael Schonberg <mschonberg@...>
 

From: Mike Schonberg <mschonberg@nyx.com>

Added code to avoid race conditions when enqueing shutdown logic on the
throttle.

Signed-off-by: Mike Schonberg <mschonberg@nyx.com>
---
mama/c_cpp/src/c/mama/subscription.h | 7 +------
mama/c_cpp/src/c/subscription.c | 19 +++++++++++--------
2 files changed, 12 insertions(+), 14 deletions(-)

diff --git a/mama/c_cpp/src/c/mama/subscription.h b/mama/c_cpp/src/c/mama/subscription.h
index 9eb1f0c..27978eb 100644
--- a/mama/c_cpp/src/c/mama/subscription.h
+++ b/mama/c_cpp/src/c/mama/subscription.h
@@ -106,12 +106,7 @@ typedef enum
/* The subscription has been de-allocated, this state is only supported so that the log entry will whenever the subscription
* has finally been freed.
*/
- MAMA_SUBSCRIPTION_DEALLOCATED = 10,
-
- /* The subscription is being re-activated, this state can only occur if the mamaSubscription_activate has been called while
- * the subscription is being deactivated, (i.e. its state is MAMA_SUBSCRIPTION_DEACTIVATING.
- */
- MAMA_SUBSCRIPTION_REACTIVATING = 11
+ MAMA_SUBSCRIPTION_DEALLOCATED = 10

} mamaSubscriptionState;

diff --git a/mama/c_cpp/src/c/subscription.c b/mama/c_cpp/src/c/subscription.c
index 6c73978..8c2497b 100644
--- a/mama/c_cpp/src/c/subscription.c
+++ b/mama/c_cpp/src/c/subscription.c
@@ -1772,7 +1772,7 @@ mama_status mamaSubscriptionImpl_deactivate(mamaSubscriptionImpl *impl)
mamaTransport_removeListener(impl->mTransport, impl->mSubscHandle);

/* If there is a create action on the throttle it must be removed. */
- if(NULL != impl->mAction)
+ if((NULL != throttle) && (NULL != impl->mAction))
{
wombatThrottle_removeAction(throttle, impl->mAction);
}
@@ -2195,7 +2195,7 @@ void MAMACALLTYPE mamaSubscriptionImpl_onSubscriptionDestroyed(mamaSubscription
break;

/* The subscription must be de-activated then re-activated. */
- case MAMA_SUBSCRIPTION_REACTIVATING:
+ case MAMA_SUBSCRIPTION_ACTIVATING:

/* Change the state. */
mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_DEACTIVATED);
@@ -2233,7 +2233,7 @@ void MAMACALLTYPE mamaSubscriptionImpl_onSubscriptionDestroyed(mamaSubscription
mama_status mamaSubscriptionImpl_removeFromThrottle(mamaSubscriptionImpl *impl)
{
/* Returns. */
- mama_status ret = MAMA_STATUS_OK;
+ mama_status ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE;

/* Acquire the lock before anything else is done. */
wlock_lock(impl->mCreateDestroyLock);
@@ -2262,7 +2262,8 @@ mama_status mamaSubscriptionImpl_removeFromThrottle(mamaSubscriptionImpl *impl)
* if the subscription is still in the activating state.
*/
ret = MAMA_STATUS_SUBSCRIPTION_INVALID_STATE;
- if(MAMA_SUBSCRIPTION_ACTIVATING == wInterlocked_read(&impl->mState))
+ if(MAMA_SUBSCRIPTION_ACTIVATING == wInterlocked_read(&impl->mState) &&
+ NULL != impl->mAction)
{
/* Remove the subscription from the throttle. */
wombatThrottle_removeAction(throttle, impl->mAction);
@@ -2514,6 +2515,7 @@ mama_status mamaSubscription_allocate(mamaSubscription *subscription)
impl->mPreInitialCacheSize = MAMA_SUBSCRIPTION_DEFAULT_PREINITIALCACHESIZE;

/* Set the initial state of the subscription now that the memory has been allocated. */
+ wInterlocked_initialize(&impl->mState);
mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_ALLOCATED);

/* The function has succeeded. */
@@ -2570,7 +2572,7 @@ mama_status mamaSubscription_activate(mamaSubscription subscription)
case MAMA_SUBSCRIPTION_DEACTIVATING:

/* Set the state to indicate that the subscription will be reactivated. */
- mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_REACTIVATING);
+ mamaSubscriptionImpl_setState(impl, MAMA_SUBSCRIPTION_ACTIVATING);

ret = MAMA_STATUS_OK;
break;
@@ -2914,7 +2916,8 @@ mama_status mamaSubscription_deactivate(mamaSubscription subscription)
/* If invalid state is returned by this function then the subscription has become active
* while waiting for it to be removed from the throttle.
*/
- if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret)
+ if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret &&
+ MAMA_SUBSCRIPTION_ACTIVATED == wInterlocked_read(&impl->mState))
{
/* Invoke this function recursively to process the correct state. */
ret = mamaSubscription_deactivate(subscription);
@@ -3043,7 +3046,8 @@ mama_status mamaSubscription_destroy(mamaSubscription subscription)
/* If invalid state is returned by this function then the subscription has become active
* while waiting for it to be removed from the throttle.
*/
- if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret)
+ if(MAMA_STATUS_SUBSCRIPTION_INVALID_STATE == ret &&
+ MAMA_SUBSCRIPTION_ACTIVATED == wInterlocked_read(&impl->mState))
{
/* Invoke this function recursively to process the correct state. */
ret = mamaSubscription_destroy(subscription);
@@ -4251,7 +4255,6 @@ const char* mamaSubscription_stringForState(mamaSubscriptionState state)
case MAMA_SUBSCRIPTION_DESTROYED: return "MAMA_SUBSCRIPTION_DESTROYED";
case MAMA_SUBSCRIPTION_DEALLOCATING: return "MAMA_SUBSCRIPTION_DEALLOCATING";
case MAMA_SUBSCRIPTION_DEALLOCATED: return "MAMA_SUBSCRIPTION_DEALLOCATED";
- case MAMA_SUBSCRIPTION_REACTIVATING: return "MAMA_SUBSCRIPTION_REACTIVATING";
}

return "State not recognised";
--
1.7.7.6