[PATCH 06/12] Added wildcard subscriptions in java


Ian Bell <IBell@...>
 

From 668aa21bb4b9f1ae29679da83a464eb67e21b3e4 Mon Sep 17 00:00:00 2001

Message-Id: <668aa21bb4b9f1ae29679da83a464eb67e21b3e4.1351009358.git.ibell@...>

In-Reply-To: <180145dfd9a175bb1b824b63df34a41b7e97d425.1351009358.git.ibell@...>

References: <180145dfd9a175bb1b824b63df34a41b7e97d425.1351009358.git.ibell@...>

From: Ian Bell <ibell@...>

Date: Tue, 23 Oct 2012 16:16:36 +0100

Subject: [PATCH 06/12] Added wildcard subscriptions in java

 

Added support for wildcard subscirptions to the jni layer.

 

Signed-off-by: Ian Bell <ibell@...>

---

mama/jni/src/c/mamabasicsubscriptionjni.c          |  246 ++++++++++++++++++++

.../src/com/wombat/mama/MamaBasicSubscription.java |   50 ++++

2 files changed, 296 insertions(+)

 

diff --git a/mama/jni/src/c/mamabasicsubscriptionjni.c b/mama/jni/src/c/mamabasicsubscriptionjni.c

index 7138fcb..87f0060 100644

--- a/mama/jni/src/c/mamabasicsubscriptionjni.c

+++ b/mama/jni/src/c/mamabasicsubscriptionjni.c

@@ -49,6 +49,35 @@ static  jmethodID   subCallbackonMsgId_g            =   NULL;

static  jmethodID   subCallbackonErrorId_g          =   NULL;

static  jmethodID   subCallbackonDestroyId_g        =   NULL;

+static  jmethodID   subWildCardCallbackonCreateId_g     =   NULL;

+static  jmethodID   subWildCardCallbackonMsgId_g        =   NULL;

+static  jmethodID   subWildCardCallbackonErrorId_g      =   NULL;

+static  jmethodID   subWildCardCallbackonDestroyId_g    =   NULL;

+

+/*Invoked for onMsg for wildcards */

+static void MAMACALLTYPE subscriptionWildCardMsgCB (

+        mamaSubscription    subscription,

+        mamaMsg             msg,

+        const char*         topic,

+        void*               closure,

+        void*               itemClosure);

+

+/*Invoked in response to errors for wildcards*/

+static void MAMACALLTYPE subscriptionWildCardErrorCB  (

+        mamaSubscription    subscription,

+        mama_status         status,

+        void*               platformError,

+        const char*         subject,

+        void*               closure );

+

+/*Invoked when a wildcard subscription is first created*/

+static void MAMACALLTYPE subscriptionWildCardCreateCB (

+        mamaSubscription    subscription,

+        void*               closure );

+

+static void MAMACALLTYPE subscriptionWildCardDestroyCB (

+        mamaSubscription subscription,

+        void *closure );

/*Invoked for normal updates*/

static void MAMACALLTYPE subscriptionMsgCB (

         mamaSubscription    subscription,

@@ -73,6 +102,122 @@ static void MAMACALLTYPE subscriptionDestroyCB (

     mamaSubscription subscription,

     void *closure );

+JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_createNativeWildCardSubscription

+  (JNIEnv* env, jobject this, jobject callback, jobject transport, jobject queue,

+   jstring source, jstring topic, jobject closure)

+{

+    mama_status                 status              =   MAMA_STATUS_OK;

+    const char*                 c_topic             =   NULL;

+    const char*                 c_source            =   NULL;

+    jlong                       transportPointer    =   0;

+    jlong                       subscriptionPointer =   0;

+    mamaQueue                   queue_c             =   NULL;

+    jlong                       queuePointer        =   0;

+    jobject                     messageImpl         =   NULL;

+    mamaWildCardMsgCallbacks    c_callback;

+    char                        errorString[UTILS_MAX_ERROR_STRING_LENGTH];

+    callbackClosure*            closureImpl         = (callbackClosure*)calloc(1,

+                                              sizeof(callbackClosure));

+    if (!closureImpl)

+    {

+        utils_throwMamaException(env,"createNativeWildCardSubscription():"

+                " Could not allocate.");

+        return;

+    }

+

+    closureImpl->mUserData      =   NULL;

+    closureImpl->mClientCB      =   NULL;

+    closureImpl->mSubscription  =   NULL;

+    closureImpl->mMessage       =   NULL;

+

+    /*Setup the callback structure*/

+    memset(&c_callback, 0, sizeof(c_callback));

+    c_callback.onCreate   = (wombat_subscriptionCreateCB)subscriptionWildCardCreateCB;

+    c_callback.onError    = (wombat_subscriptionErrorCB)subscriptionWildCardErrorCB;

+    c_callback.onMsg      = (wombat_subscriptionWildCardOnMsgCB)subscriptionWildCardMsgCB;

+    c_callback.onDestroy  = (wombat_subscriptionDestroyCB)subscriptionWildCardDestroyCB;

+

+    if (topic)

+    {

+        c_topic = (*env)->GetStringUTFChars(env,topic,0);

+        if (!c_topic) return;/*Exception auto thrown*/

+    }

+  

+    if (source)

+    {

+        c_source = (*env)->GetStringUTFChars(env, source, 0);

+        if (!c_source) return; /*Exception auto thrown*/

+    }

+

+    transportPointer = (*env)->GetLongField(env, transport,

+            transportPointerFieldId_g);

+

+    closureImpl->mClientCB = (*env)->NewGlobalRef(env, callback);

+

+    /*If the client supplied a Java closure add it to ours*/

+    if(closure)closureImpl->mUserData = (*env)->NewGlobalRef(env,closure);

+

+    /*Check if a queue was specified.*/

+    if(queue)

+    {

+        /* Get the queue pointer value from the MamaQueue java object */

+        queuePointer = (*env)->GetLongField(env, queue, queuePointerFieldId_g);

+        queue_c = CAST_JLONG_TO_POINTER(mamaQueue, queuePointer);

+    }

+

+    /*Create a reuseable message object to hang off the subscription*/

+    messageImpl = utils_createJavaMamaMsg(env);

+    if(NULL==messageImpl)

+    {

+        if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);

+        if(c_source)(*env)->ReleaseStringUTFChars(env, source, c_source);

+        if(closure)(*env)->DeleteGlobalRef(env,closureImpl->mUserData);

+        (*env)->DeleteGlobalRef(env,closureImpl->mClientCB);

+        free(closureImpl);

+        return;

+    }/*Exception will have been thrown*/

+    /*This global will be deleted when the subscription is destroyed*/

+    closureImpl->mMessage  = (*env)->NewGlobalRef(env,messageImpl);

+

+    /*Add the Java Subscription to the closure - we need it in the

+     async callbacks so it can be passed to the Java callback*/

+    closureImpl->mSubscription = (*env)->NewGlobalRef(env,this);

+

+    subscriptionPointer = (*env)->GetLongField(env, this,

+            subscriptionPointerFieldId_g);

+

+    /*Actually create the C basic Subscription*/

+    if(MAMA_STATUS_OK!=(status=mamaSubscription_createBasicWildCard(

+                     CAST_JLONG_TO_POINTER(mamaSubscription,subscriptionPointer),

+                     CAST_JLONG_TO_POINTER(mamaTransport,transportPointer),

+                     queue_c,

+                     &c_callback,

+                     c_source,

+                     c_topic,

+                     closureImpl)))

+

+    {

+        if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);

+        if(c_source)(*env)->ReleaseStringUTFChars(env, source, c_source);

+        if(closure)(*env)->DeleteGlobalRef(env,closureImpl->mUserData);

+        (*env)->DeleteGlobalRef(env,closureImpl->mClientCB);

+        (*env)->DeleteGlobalRef(env,closureImpl->mSubscription);

+        free(closureImpl);

+        utils_buildErrorStringForStatus(

+                errorString,

+                UTILS_MAX_ERROR_STRING_LENGTH,

+                "Failed to create basic subscription.",

+                status);

+        utils_throwMamaException(env,errorString);

+        return;

+    }

+  

+    /*Tidy up*/

+    if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);

+    if(c_source)(*env)->ReleaseStringUTFChars(env,source, c_source);

+    return;

+}

+

/*

  * Class:     com_wombat_mama_MamaBasicSubscription

  * Method:    createNativeSubscription

@@ -273,6 +418,7 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_initIDs

   (JNIEnv* env, jclass class)

{

     jclass   basicSubscriptionCallbackClass  =   NULL;

+    jclass  basicWildCardSubscriptionCallbackClass  =   NULL;

     /*A reference to the */

     subscriptionPointerFieldId_g = (*env)->GetFieldID(env,

@@ -285,6 +431,32 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_initIDs

             "com/wombat/mama/MamaBasicSubscriptionCallback");

     if (!basicSubscriptionCallbackClass) return;/*Exception auto thrown*/

+    /*Get a reference to the wildcard subscription callback class */              

+    basicWildCardSubscriptionCallbackClass = (*env)->FindClass(env,               

+            "com/wombat/mama/MamaBasicWildCardSubscriptionCallback");             

+    if (!basicWildCardSubscriptionCallbackClass) return; /*Exception auto thrown*/

+                                                                                   

+    /*MamaBasicWildCardSubscriptionCallback.onMsg */                              

+    subWildCardCallbackonMsgId_g = (*env)->GetMethodID(env,                       

+            basicWildCardSubscriptionCallbackClass,                               

+            "onMsg", "(Lcom/wombat/mama/MamaBasicSubscription;"                   

+            "Lcom/wombat/mama/MamaMsg;Ljava/lang/String;)V" );                    

+    if (!basicWildCardSubscriptionCallbackClass) return; /*Exception auto thrown*/

+                                                                                  

+    /*MamaBasicWildCardSubscriptionCallback.onDestroy*/                           

+    subWildCardCallbackonDestroyId_g = (*env)->GetMethodID(env,                   

+            basicWildCardSubscriptionCallbackClass,                               

+            "onDestroy", "(Lcom/wombat/mama/MamaBasicSubscription;)V");           

+                                                                                   

+    subWildCardCallbackonCreateId_g = (*env)->GetMethodID(env,                    

+            basicWildCardSubscriptionCallbackClass,                               

+            "onCreate", "(Lcom/wombat/mama/MamaBasicSubscription;)V" );           

+                                                                                   

+    /*MamaBasicWildCardSubscriptionCallback.onError */                            

+    subWildCardCallbackonErrorId_g = (*env)->GetMethodID(env,                     

+            basicWildCardSubscriptionCallbackClass,                               

+            "onError", "(Lcom/wombat/mama/MamaBasicSubscription;"                 

+            "SILjava/lang/String;)V" );            

     /*MamaSubscriptionCallback.onMsg()*/

     subCallbackonMsgId_g = (*env)->GetMethodID(env,

             basicSubscriptionCallbackClass,

@@ -382,6 +554,80 @@ void MAMACALLTYPE subscriptionDestroyCB (mamaSubscription subscription, void *cl

     return;

}

+void MAMACALLTYPE subscriptionWildCardMsgCB (mamaSubscription subscription,

+                                             mamaMsg msg,

+                                             const char* topic,

+                                             void*       closure,

+                                             void*       itemClosure)

+{

+    JNIEnv*             env             =   NULL;

+    callbackClosure*    closureImpl     =   (callbackClosure*)closure;

+

+    assert(closureImpl!=NULL);

+    assert(closureImpl->mClientCB!=NULL);

+    assert(closureImpl->mMessage!=NULL);

+    assert(closureImpl->mSubscription!=NULL);

+

+    /*Get the env for the current thread*/

+    env = utils_getENV(javaVM_g);

+    if (!env) return;/*Can't throw exception without JNIEnv!!*/

+

+    (*env)->SetLongField(env, closureImpl->mMessage,

+                         messagePointerFieldId_g,

+                         CAST_POINTER_TO_JLONG(msg));

+

+    /*invoke the callback*/

+    (*env)->CallVoidMethod(env, closureImpl->mClientCB,

+                           subWildCardCallbackonMsgId_g,

+                           closureImpl->mSubscription,

+                           closureImpl->mMessage,

+                           (*env)->NewStringUTF(env, topic));

+

+    /*

+       Need to check if any exceptions were propagated here.

+       If we don't the exceptions could actually fill the stack!!

+    */

+    utils_printAndClearExceptionFromStack (env,"onMsg");

+

+    return;

+}

+

+void MAMACALLTYPE subscriptionWildCardErrorCB  (

+        mamaSubscription    subscription,

+        mama_status         status,

+        void*               platformError,

+        const char*         subject,

+        void*               closure )

+{

+     subCommon_onErrorCb (subscription,

+                          status,

+                          platformError,

+                          subject,

+                          closure,

+                          subWildCardCallbackonErrorId_g);

+    return;

+}

+

+void MAMACALLTYPE subscriptionWildCardCreateCB (

+        mamaSubscription    subscription,

+        void*               closure )

+{

+    subCommon_createCb (subscription,

+                        closure,

+                        subscriptionPointerFieldId_g,

+                        subWildCardCallbackonCreateId_g);

+    return;

+}

+

+void MAMACALLTYPE subscriptionWildCardDestroyCB (mamaSubscription subscription, void *closure )

+{

+    subCommon_destroyCb (subscription,

+                     closure,

+                     subscriptionPointerFieldId_g,

+                     subWildCardCallbackonDestroyId_g);

+    return;

+}

+

/*

  * Class:     com_wombat_mama_MamaBasicSubscription

  * Method:    getSubscriptionState

diff --git a/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java b/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java

index 0c834fe..bb76ae8 100644

--- a/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java

+++ b/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java

@@ -115,6 +115,48 @@ public class MamaBasicSubscription

     }

     /**

+    * Create a basic wildcard subscription.

+    *

+    * The topic must be a valid wildcard topic for the underlying middleware.

+    *

+    * For WMW a source with a NULL symbol parameter creates a "transport"

+    * subscription that receives all messages on the transport and bypasses the

+    * naming service. A publishing transport can be assigned a name with the

+    * publish_name property.

+    *

+    * @param subscription The subscription.

+    * @param transport The transport to use.

+    * @param queue The mama queue.

+    * @param callbacks The mamaMsgCallbacks structure containing the callback

+    * functions.

+    * @param source The source name of the feed handler to provide the

+    * subscription.

+    * @param symbol The symbol name.

+    * @param closure The closure will be passed to subsequent callback

+    * invocations for this subscription.

+    */

+    public void createBasicWildCardSubscription(

+        final MamaBasicWildCardSubscriptionCallback callback,

+        final MamaTransport                         transport,

+        final MamaQueue                             queue,

+        final String                                source,

+        final String                                topic,

+        final Object                                closure)

+    {

+        // Save the closure as well

+        myClosure = closure;

+

+        // Create the native subscription

+        createNativeWildCardSubscription(

+            callback,

+            transport,

+            queue,

+            source,

+            topic,

+            null);

+    }

+

+    /**

      * This function returns the closure supplied to the createSubscription

      * function.

      *

@@ -213,4 +255,12 @@ public class MamaBasicSubscription

     private native int getSubscriptionState();

     private static native void initIDs();

+    private native void createNativeWildCardSubscription(

+        final MamaBasicWildCardSubscriptionCallback callback,

+        final MamaTransport                         transport,

+        final MamaQueue                             queue,

+        final String                                source,

+        final String                                topic,

+        final Object                                closure);

+

}/*End class*/

--

1.7.9.5

 




Please consider the environment before printing this e-mail.

This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy.

Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden.

Join {Openmama-dev@lists.openmama.org to automatically receive all group messages.