Re: [PATCH 06/12] Added wildcard subscriptions in java
Ian Bell <IBell@...>
Attached modified pacth...
Forgot to include new file in patch
diff --git a/mama/jni/src/com/wombat/mama/MamaBasicWildCardSubscriptionCallback.java b/mama/jni/src/com/wombat/mama/MamaBasicWildCardSubscriptionCallback.java new file mode 100644 index 0000000..c57a5e3 --- /dev/null +++ b/mama/jni/src/com/wombat/mama/MamaBasicWildCardSubscriptionCallback.java @@ -0,0 +1,53 @@ +package com.wombat.mama; + +/* $Id$ */ + +/** + * @see MamaBasicSubscription + * @author ldelaney + */ +public interface MamaBasicWildCardSubscriptionCallback +{ + /** + * Method invoked when subscription creation is complete. + * Since subscriptions are created asynchronous by throttle, this callback + * provides the subscription instance after the throttle processes the + * creation request. + * In the case where a subscription is created on a queue other than the default + * it is possible for <code>onMsg</code> calls to be processed to be called before + * the <code>onCreate</code> callback is processed. + * + * @param subscription The subscription. + */ + void onCreate (MamaBasicSubscription subscription); + + /** + * Invoked if an error occurs during prior to subscription creation or if the + * subscription receives a message for an unentitled subject. + * <p> + * If the status + * is <code>MamaMsgStatus.NOT_ENTITTLED</code> the subject parameter is the + * specific unentitled subject. If the subscription subject contains + * wildcards, the subscription may still receive messages for other + * entitled subjects. + * + * @param subscription The subscription. + * @param wombatStatus The wombat error code. + * @param platformError Third party, platform specific messaging error. + * @param subject The subject for NOT_ENTITLED + */ + void onError(MamaBasicSubscription subscription, + short wombatStatus, + int platformError, + String subject); + + /** + * Invoked when a message arrives. + * + * @param subscription the <code>MamaBasicSubscription</code>. + * @param msg The MamaMsg. + */ + void onMsg (MamaBasicSubscription subscription, MamaMsg msg, String topic); + + void onDestroy(MamaBasicSubscription subscription); +} -- 1.7.9.5
From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...]
On Behalf Of Ian Bell
Sent: 23 October 2012 19:54 To: openmama-dev@... Subject: [Openmama-dev] [PATCH 06/12] Added wildcard subscriptions in java
From 668aa21bb4b9f1ae29679da83a464eb67e21b3e4 Mon Sep 17 00:00:00 2001 Message-Id: <668aa21bb4b9f1ae29679da83a464eb67e21b3e4.1351009358.git.ibell@...> In-Reply-To: <180145dfd9a175bb1b824b63df34a41b7e97d425.1351009358.git.ibell@...> References: <180145dfd9a175bb1b824b63df34a41b7e97d425.1351009358.git.ibell@...> From: Ian Bell <ibell@...> Date: Tue, 23 Oct 2012 16:16:36 +0100 Subject: [PATCH 06/12] Added wildcard subscriptions in java
Added support for wildcard subscirptions to the jni layer.
Signed-off-by: Ian Bell <ibell@...> --- mama/jni/src/c/mamabasicsubscriptionjni.c | 246 ++++++++++++++++++++ .../src/com/wombat/mama/MamaBasicSubscription.java | 50 ++++ 2 files changed, 296 insertions(+)
diff --git a/mama/jni/src/c/mamabasicsubscriptionjni.c b/mama/jni/src/c/mamabasicsubscriptionjni.c index 7138fcb..87f0060 100644 --- a/mama/jni/src/c/mamabasicsubscriptionjni.c +++ b/mama/jni/src/c/mamabasicsubscriptionjni.c @@ -49,6 +49,35 @@ static jmethodID subCallbackonMsgId_g = NULL; static jmethodID subCallbackonErrorId_g = NULL; static jmethodID subCallbackonDestroyId_g = NULL; +static jmethodID subWildCardCallbackonCreateId_g = NULL; +static jmethodID subWildCardCallbackonMsgId_g = NULL; +static jmethodID subWildCardCallbackonErrorId_g = NULL; +static jmethodID subWildCardCallbackonDestroyId_g = NULL; + +/*Invoked for onMsg for wildcards */ +static void MAMACALLTYPE subscriptionWildCardMsgCB ( + mamaSubscription subscription, + mamaMsg msg, + const char* topic, + void* closure, + void* itemClosure); + +/*Invoked in response to errors for wildcards*/ +static void MAMACALLTYPE subscriptionWildCardErrorCB ( + mamaSubscription subscription, + mama_status status, + void* platformError, + const char* subject, + void* closure ); + +/*Invoked when a wildcard subscription is first created*/ +static void MAMACALLTYPE subscriptionWildCardCreateCB ( + mamaSubscription subscription, + void* closure ); + +static void MAMACALLTYPE subscriptionWildCardDestroyCB ( + mamaSubscription subscription, + void *closure ); /*Invoked for normal updates*/ static void MAMACALLTYPE subscriptionMsgCB ( mamaSubscription subscription, @@ -73,6 +102,122 @@ static void MAMACALLTYPE subscriptionDestroyCB ( mamaSubscription subscription, void *closure ); +JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_createNativeWildCardSubscription + (JNIEnv* env, jobject this, jobject callback, jobject transport, jobject queue, + jstring source, jstring topic, jobject closure) +{ + mama_status status = MAMA_STATUS_OK; + const char* c_topic = NULL; + const char* c_source = NULL; + jlong transportPointer = 0; + jlong subscriptionPointer = 0; + mamaQueue queue_c = NULL; + jlong queuePointer = 0; + jobject messageImpl = NULL; + mamaWildCardMsgCallbacks c_callback; + char errorString[UTILS_MAX_ERROR_STRING_LENGTH]; + callbackClosure* closureImpl = (callbackClosure*)calloc(1, + sizeof(callbackClosure)); + if (!closureImpl) + { + utils_throwMamaException(env,"createNativeWildCardSubscription():" + " Could not allocate."); + return; + } + + closureImpl->mUserData = NULL; + closureImpl->mClientCB = NULL; + closureImpl->mSubscription = NULL; + closureImpl->mMessage = NULL; + + /*Setup the callback structure*/ + memset(&c_callback, 0, sizeof(c_callback)); + c_callback.onCreate = (wombat_subscriptionCreateCB)subscriptionWildCardCreateCB; + c_callback.onError = (wombat_subscriptionErrorCB)subscriptionWildCardErrorCB; + c_callback.onMsg = (wombat_subscriptionWildCardOnMsgCB)subscriptionWildCardMsgCB; + c_callback.onDestroy = (wombat_subscriptionDestroyCB)subscriptionWildCardDestroyCB; + + if (topic) + { + c_topic = (*env)->GetStringUTFChars(env,topic,0); + if (!c_topic) return;/*Exception auto thrown*/ + } + + if (source) + { + c_source = (*env)->GetStringUTFChars(env, source, 0); + if (!c_source) return; /*Exception auto thrown*/ + } + + transportPointer = (*env)->GetLongField(env, transport, + transportPointerFieldId_g); + + closureImpl->mClientCB = (*env)->NewGlobalRef(env, callback); + + /*If the client supplied a Java closure add it to ours*/ + if(closure)closureImpl->mUserData = (*env)->NewGlobalRef(env,closure); + + /*Check if a queue was specified.*/ + if(queue) + { + /* Get the queue pointer value from the MamaQueue java object */ + queuePointer = (*env)->GetLongField(env, queue, queuePointerFieldId_g); + queue_c = CAST_JLONG_TO_POINTER(mamaQueue, queuePointer); + } + + /*Create a reuseable message object to hang off the subscription*/ + messageImpl = utils_createJavaMamaMsg(env); + if(NULL==messageImpl) + { + if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic); + if(c_source)(*env)->ReleaseStringUTFChars(env, source, c_source); + if(closure)(*env)->DeleteGlobalRef(env,closureImpl->mUserData); + (*env)->DeleteGlobalRef(env,closureImpl->mClientCB); + free(closureImpl); + return; + }/*Exception will have been thrown*/ + /*This global will be deleted when the subscription is destroyed*/ + closureImpl->mMessage = (*env)->NewGlobalRef(env,messageImpl); + + /*Add the Java Subscription to the closure - we need it in the + async callbacks so it can be passed to the Java callback*/ + closureImpl->mSubscription = (*env)->NewGlobalRef(env,this); + + subscriptionPointer = (*env)->GetLongField(env, this, + subscriptionPointerFieldId_g); + + /*Actually create the C basic Subscription*/ + if(MAMA_STATUS_OK!=(status=mamaSubscription_createBasicWildCard( + CAST_JLONG_TO_POINTER(mamaSubscription,subscriptionPointer), + CAST_JLONG_TO_POINTER(mamaTransport,transportPointer), + queue_c, + &c_callback, + c_source, + c_topic, + closureImpl))) + + { + if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic); + if(c_source)(*env)->ReleaseStringUTFChars(env, source, c_source); + if(closure)(*env)->DeleteGlobalRef(env,closureImpl->mUserData); + (*env)->DeleteGlobalRef(env,closureImpl->mClientCB); + (*env)->DeleteGlobalRef(env,closureImpl->mSubscription); + free(closureImpl); + utils_buildErrorStringForStatus( + errorString, + UTILS_MAX_ERROR_STRING_LENGTH, + "Failed to create basic subscription.", + status); + utils_throwMamaException(env,errorString); + return; + } + + /*Tidy up*/ + if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic); + if(c_source)(*env)->ReleaseStringUTFChars(env,source, c_source); + return; +} + /* * Class: com_wombat_mama_MamaBasicSubscription * Method: createNativeSubscription @@ -273,6 +418,7 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_initIDs (JNIEnv* env, jclass class) { jclass basicSubscriptionCallbackClass = NULL; + jclass basicWildCardSubscriptionCallbackClass = NULL; /*A reference to the */ subscriptionPointerFieldId_g = (*env)->GetFieldID(env, @@ -285,6 +431,32 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_initIDs "com/wombat/mama/MamaBasicSubscriptionCallback"); if (!basicSubscriptionCallbackClass) return;/*Exception auto thrown*/ + /*Get a reference to the wildcard subscription callback class */ + basicWildCardSubscriptionCallbackClass = (*env)->FindClass(env, + "com/wombat/mama/MamaBasicWildCardSubscriptionCallback"); + if (!basicWildCardSubscriptionCallbackClass) return; /*Exception auto thrown*/ + + /*MamaBasicWildCardSubscriptionCallback.onMsg */ + subWildCardCallbackonMsgId_g = (*env)->GetMethodID(env, + basicWildCardSubscriptionCallbackClass, + "onMsg", "(Lcom/wombat/mama/MamaBasicSubscription;" + "Lcom/wombat/mama/MamaMsg;Ljava/lang/String;)V" ); + if (!basicWildCardSubscriptionCallbackClass) return; /*Exception auto thrown*/ + + /*MamaBasicWildCardSubscriptionCallback.onDestroy*/ + subWildCardCallbackonDestroyId_g = (*env)->GetMethodID(env, + basicWildCardSubscriptionCallbackClass, + "onDestroy", "(Lcom/wombat/mama/MamaBasicSubscription;)V"); + + subWildCardCallbackonCreateId_g = (*env)->GetMethodID(env, + basicWildCardSubscriptionCallbackClass, + "onCreate", "(Lcom/wombat/mama/MamaBasicSubscription;)V" ); + + /*MamaBasicWildCardSubscriptionCallback.onError */ + subWildCardCallbackonErrorId_g = (*env)->GetMethodID(env, + basicWildCardSubscriptionCallbackClass, + "onError", "(Lcom/wombat/mama/MamaBasicSubscription;" + "SILjava/lang/String;)V" ); /*MamaSubscriptionCallback.onMsg()*/ subCallbackonMsgId_g = (*env)->GetMethodID(env, basicSubscriptionCallbackClass, @@ -382,6 +554,80 @@ void MAMACALLTYPE subscriptionDestroyCB (mamaSubscription subscription, void *cl return; } +void MAMACALLTYPE subscriptionWildCardMsgCB (mamaSubscription subscription, + mamaMsg msg, + const char* topic, + void* closure, + void* itemClosure) +{ + JNIEnv* env = NULL; + callbackClosure* closureImpl = (callbackClosure*)closure; + + assert(closureImpl!=NULL); + assert(closureImpl->mClientCB!=NULL); + assert(closureImpl->mMessage!=NULL); + assert(closureImpl->mSubscription!=NULL); + + /*Get the env for the current thread*/ + env = utils_getENV(javaVM_g); + if (!env) return;/*Can't throw exception without JNIEnv!!*/ + + (*env)->SetLongField(env, closureImpl->mMessage, + messagePointerFieldId_g, + CAST_POINTER_TO_JLONG(msg)); + + /*invoke the callback*/ + (*env)->CallVoidMethod(env, closureImpl->mClientCB, + subWildCardCallbackonMsgId_g, + closureImpl->mSubscription, + closureImpl->mMessage, + (*env)->NewStringUTF(env, topic)); + + /* + Need to check if any exceptions were propagated here. + If we don't the exceptions could actually fill the stack!! + */ + utils_printAndClearExceptionFromStack (env,"onMsg"); + + return; +} + +void MAMACALLTYPE subscriptionWildCardErrorCB ( + mamaSubscription subscription, + mama_status status, + void* platformError, + const char* subject, + void* closure ) +{ + subCommon_onErrorCb (subscription, + status, + platformError, + subject, + closure, + subWildCardCallbackonErrorId_g); + return; +} + +void MAMACALLTYPE subscriptionWildCardCreateCB ( + mamaSubscription subscription, + void* closure ) +{ + subCommon_createCb (subscription, + closure, + subscriptionPointerFieldId_g, + subWildCardCallbackonCreateId_g); + return; +} + +void MAMACALLTYPE subscriptionWildCardDestroyCB (mamaSubscription subscription, void *closure ) +{ + subCommon_destroyCb (subscription, + closure, + subscriptionPointerFieldId_g, + subWildCardCallbackonDestroyId_g); + return; +} + /* * Class: com_wombat_mama_MamaBasicSubscription * Method: getSubscriptionState diff --git a/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java b/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java index 0c834fe..bb76ae8 100644 --- a/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java +++ b/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java @@ -115,6 +115,48 @@ public class MamaBasicSubscription } /** + * Create a basic wildcard subscription. + * + * The topic must be a valid wildcard topic for the underlying middleware. + * + * For WMW a source with a NULL symbol parameter creates a "transport" + * subscription that receives all messages on the transport and bypasses the + * naming service. A publishing transport can be assigned a name with the + * publish_name property. + * + * @param subscription The subscription. + * @param transport The transport to use. + * @param queue The mama queue. + * @param callbacks The mamaMsgCallbacks structure containing the callback + * functions. + * @param source The source name of the feed handler to provide the + * subscription. + * @param symbol The symbol name. + * @param closure The closure will be passed to subsequent callback + * invocations for this subscription. + */ + public void createBasicWildCardSubscription( + final MamaBasicWildCardSubscriptionCallback callback, + final MamaTransport transport, + final MamaQueue queue, + final String source, + final String topic, + final Object closure) + { + // Save the closure as well + myClosure = closure; + + // Create the native subscription + createNativeWildCardSubscription( + callback, + transport, + queue, + source, + topic, + null); + } + + /** * This function returns the closure supplied to the createSubscription * function. * @@ -213,4 +255,12 @@ public class MamaBasicSubscription private native int getSubscriptionState(); private static native void initIDs(); + private native void createNativeWildCardSubscription( + final MamaBasicWildCardSubscriptionCallback callback, + final MamaTransport transport, + final MamaQueue queue, + final String source, + final String topic, + final Object closure); + }/*End class*/ -- 1.7.9.5
Please consider the environment before printing this e-mail. This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy. Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden. |
|