Re: [PATCH 06/12] Added wildcard subscriptions in java


Ian Bell <IBell@...>
 

Attached modified pacth...

 

Forgot to include new file in patch

 

diff --git a/mama/jni/src/com/wombat/mama/MamaBasicWildCardSubscriptionCallback.java b/mama/jni/src/com/wombat/mama/MamaBasicWildCardSubscriptionCallback.java

new file mode 100644

index 0000000..c57a5e3

--- /dev/null

+++ b/mama/jni/src/com/wombat/mama/MamaBasicWildCardSubscriptionCallback.java

@@ -0,0 +1,53 @@

+package com.wombat.mama;

+

+/* $Id$ */

+

+/**

+ * @see MamaBasicSubscription

+ * @author ldelaney

+ */

+public interface MamaBasicWildCardSubscriptionCallback

+{

+    /**

+     * Method invoked when subscription creation is complete.

+     * Since subscriptions are created asynchronous by throttle, this callback

+     * provides the subscription instance after the throttle processes the

+     * creation request.

+     * In the case where a subscription is created on a queue other than the default

+     * it is possible for <code>onMsg</code> calls to be processed to be called before

+     * the <code>onCreate</code> callback is processed.

+     *

+     * @param subscription The subscription.

+     */

+    void onCreate (MamaBasicSubscription subscription);

+

+    /**

+     * Invoked if an error occurs during prior to subscription creation or if the

+     * subscription receives a message for an unentitled subject.

+     * <p>

+     * If the status

+     * is <code>MamaMsgStatus.NOT_ENTITTLED</code> the subject parameter is the

+     * specific unentitled subject. If the subscription subject contains

+     * wildcards, the subscription may still receive messages for other

+     * entitled subjects.

+     *

+     * @param subscription The subscription.

+     * @param wombatStatus The wombat error code.

+     * @param platformError Third party, platform specific messaging error.

+     * @param subject The subject for NOT_ENTITLED

+     */

+    void onError(MamaBasicSubscription subscription,

+                 short wombatStatus,

+                 int platformError,

+                 String subject);

+

+    /**

+     * Invoked when a message arrives.

+     *

+     * @param subscription the <code>MamaBasicSubscription</code>.

+     * @param msg The MamaMsg.

+     */

+    void onMsg (MamaBasicSubscription subscription, MamaMsg msg, String topic);

+

+    void onDestroy(MamaBasicSubscription subscription);

+}

--

1.7.9.5

 

From: openmama-dev-bounces@... [mailto:openmama-dev-bounces@...] On Behalf Of Ian Bell
Sent: 23 October 2012 19:54
To: openmama-dev@...
Subject: [Openmama-dev] [PATCH 06/12] Added wildcard subscriptions in java

 

From 668aa21bb4b9f1ae29679da83a464eb67e21b3e4 Mon Sep 17 00:00:00 2001

Message-Id: <668aa21bb4b9f1ae29679da83a464eb67e21b3e4.1351009358.git.ibell@...>

In-Reply-To: <180145dfd9a175bb1b824b63df34a41b7e97d425.1351009358.git.ibell@...>

References: <180145dfd9a175bb1b824b63df34a41b7e97d425.1351009358.git.ibell@...>

From: Ian Bell <ibell@...>

Date: Tue, 23 Oct 2012 16:16:36 +0100

Subject: [PATCH 06/12] Added wildcard subscriptions in java

 

Added support for wildcard subscirptions to the jni layer.

 

Signed-off-by: Ian Bell <ibell@...>

---

mama/jni/src/c/mamabasicsubscriptionjni.c          |  246 ++++++++++++++++++++

.../src/com/wombat/mama/MamaBasicSubscription.java |   50 ++++

2 files changed, 296 insertions(+)

 

diff --git a/mama/jni/src/c/mamabasicsubscriptionjni.c b/mama/jni/src/c/mamabasicsubscriptionjni.c

index 7138fcb..87f0060 100644

--- a/mama/jni/src/c/mamabasicsubscriptionjni.c

+++ b/mama/jni/src/c/mamabasicsubscriptionjni.c

@@ -49,6 +49,35 @@ static  jmethodID   subCallbackonMsgId_g            =   NULL;

static  jmethodID   subCallbackonErrorId_g          =   NULL;

static  jmethodID   subCallbackonDestroyId_g        =   NULL;

+static  jmethodID   subWildCardCallbackonCreateId_g     =   NULL;

+static  jmethodID   subWildCardCallbackonMsgId_g        =   NULL;

+static  jmethodID   subWildCardCallbackonErrorId_g      =   NULL;

+static  jmethodID   subWildCardCallbackonDestroyId_g    =   NULL;

+

+/*Invoked for onMsg for wildcards */

+static void MAMACALLTYPE subscriptionWildCardMsgCB (

+        mamaSubscription    subscription,

+        mamaMsg             msg,

+        const char*         topic,

+        void*               closure,

+        void*               itemClosure);

+

+/*Invoked in response to errors for wildcards*/

+static void MAMACALLTYPE subscriptionWildCardErrorCB  (

+        mamaSubscription    subscription,

+        mama_status         status,

+        void*               platformError,

+        const char*         subject,

+        void*               closure );

+

+/*Invoked when a wildcard subscription is first created*/

+static void MAMACALLTYPE subscriptionWildCardCreateCB (

+        mamaSubscription    subscription,

+        void*               closure );

+

+static void MAMACALLTYPE subscriptionWildCardDestroyCB (

+        mamaSubscription subscription,

+        void *closure );

/*Invoked for normal updates*/

static void MAMACALLTYPE subscriptionMsgCB (

         mamaSubscription    subscription,

@@ -73,6 +102,122 @@ static void MAMACALLTYPE subscriptionDestroyCB (

     mamaSubscription subscription,

     void *closure );

+JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_createNativeWildCardSubscription

+  (JNIEnv* env, jobject this, jobject callback, jobject transport, jobject queue,

+   jstring source, jstring topic, jobject closure)

+{

+    mama_status                 status              =   MAMA_STATUS_OK;

+    const char*                 c_topic             =   NULL;

+    const char*                 c_source            =   NULL;

+    jlong                       transportPointer    =   0;

+    jlong                       subscriptionPointer =   0;

+    mamaQueue                   queue_c             =   NULL;

+    jlong                       queuePointer        =   0;

+    jobject                     messageImpl         =   NULL;

+    mamaWildCardMsgCallbacks    c_callback;

+    char                        errorString[UTILS_MAX_ERROR_STRING_LENGTH];

+    callbackClosure*            closureImpl         = (callbackClosure*)calloc(1,

+                                              sizeof(callbackClosure));

+    if (!closureImpl)

+    {

+        utils_throwMamaException(env,"createNativeWildCardSubscription():"

+                " Could not allocate.");

+        return;

+    }

+

+    closureImpl->mUserData      =   NULL;

+    closureImpl->mClientCB      =   NULL;

+    closureImpl->mSubscription  =   NULL;

+    closureImpl->mMessage       =   NULL;

+

+    /*Setup the callback structure*/

+    memset(&c_callback, 0, sizeof(c_callback));

+    c_callback.onCreate   = (wombat_subscriptionCreateCB)subscriptionWildCardCreateCB;

+    c_callback.onError    = (wombat_subscriptionErrorCB)subscriptionWildCardErrorCB;

+    c_callback.onMsg      = (wombat_subscriptionWildCardOnMsgCB)subscriptionWildCardMsgCB;

+    c_callback.onDestroy  = (wombat_subscriptionDestroyCB)subscriptionWildCardDestroyCB;

+

+    if (topic)

+    {

+        c_topic = (*env)->GetStringUTFChars(env,topic,0);

+        if (!c_topic) return;/*Exception auto thrown*/

+    }

+  

+    if (source)

+    {

+        c_source = (*env)->GetStringUTFChars(env, source, 0);

+        if (!c_source) return; /*Exception auto thrown*/

+    }

+

+    transportPointer = (*env)->GetLongField(env, transport,

+            transportPointerFieldId_g);

+

+    closureImpl->mClientCB = (*env)->NewGlobalRef(env, callback);

+

+    /*If the client supplied a Java closure add it to ours*/

+    if(closure)closureImpl->mUserData = (*env)->NewGlobalRef(env,closure);

+

+    /*Check if a queue was specified.*/

+    if(queue)

+    {

+        /* Get the queue pointer value from the MamaQueue java object */

+        queuePointer = (*env)->GetLongField(env, queue, queuePointerFieldId_g);

+        queue_c = CAST_JLONG_TO_POINTER(mamaQueue, queuePointer);

+    }

+

+    /*Create a reuseable message object to hang off the subscription*/

+    messageImpl = utils_createJavaMamaMsg(env);

+    if(NULL==messageImpl)

+    {

+        if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);

+        if(c_source)(*env)->ReleaseStringUTFChars(env, source, c_source);

+        if(closure)(*env)->DeleteGlobalRef(env,closureImpl->mUserData);

+        (*env)->DeleteGlobalRef(env,closureImpl->mClientCB);

+        free(closureImpl);

+        return;

+    }/*Exception will have been thrown*/

+    /*This global will be deleted when the subscription is destroyed*/

+    closureImpl->mMessage  = (*env)->NewGlobalRef(env,messageImpl);

+

+    /*Add the Java Subscription to the closure - we need it in the

+     async callbacks so it can be passed to the Java callback*/

+    closureImpl->mSubscription = (*env)->NewGlobalRef(env,this);

+

+    subscriptionPointer = (*env)->GetLongField(env, this,

+            subscriptionPointerFieldId_g);

+

+    /*Actually create the C basic Subscription*/

+    if(MAMA_STATUS_OK!=(status=mamaSubscription_createBasicWildCard(

+                     CAST_JLONG_TO_POINTER(mamaSubscription,subscriptionPointer),

+                     CAST_JLONG_TO_POINTER(mamaTransport,transportPointer),

+                     queue_c,

+                     &c_callback,

+                     c_source,

+                     c_topic,

+                     closureImpl)))

+

+    {

+        if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);

+        if(c_source)(*env)->ReleaseStringUTFChars(env, source, c_source);

+        if(closure)(*env)->DeleteGlobalRef(env,closureImpl->mUserData);

+        (*env)->DeleteGlobalRef(env,closureImpl->mClientCB);

+        (*env)->DeleteGlobalRef(env,closureImpl->mSubscription);

+        free(closureImpl);

+        utils_buildErrorStringForStatus(

+                errorString,

+                UTILS_MAX_ERROR_STRING_LENGTH,

+                "Failed to create basic subscription.",

+                status);

+        utils_throwMamaException(env,errorString);

+        return;

+    }

+  

+    /*Tidy up*/

+    if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);

+    if(c_source)(*env)->ReleaseStringUTFChars(env,source, c_source);

+    return;

+}

+

/*

  * Class:     com_wombat_mama_MamaBasicSubscription

  * Method:    createNativeSubscription

@@ -273,6 +418,7 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_initIDs

   (JNIEnv* env, jclass class)

{

     jclass   basicSubscriptionCallbackClass  =   NULL;

+    jclass  basicWildCardSubscriptionCallbackClass  =   NULL;

     /*A reference to the */

     subscriptionPointerFieldId_g = (*env)->GetFieldID(env,

@@ -285,6 +431,32 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_initIDs

             "com/wombat/mama/MamaBasicSubscriptionCallback");

     if (!basicSubscriptionCallbackClass) return;/*Exception auto thrown*/

+    /*Get a reference to the wildcard subscription callback class */              

+    basicWildCardSubscriptionCallbackClass = (*env)->FindClass(env,               

+            "com/wombat/mama/MamaBasicWildCardSubscriptionCallback");             

+    if (!basicWildCardSubscriptionCallbackClass) return; /*Exception auto thrown*/

+                                                                                   

+    /*MamaBasicWildCardSubscriptionCallback.onMsg */                              

+    subWildCardCallbackonMsgId_g = (*env)->GetMethodID(env,                       

+            basicWildCardSubscriptionCallbackClass,                               

+            "onMsg", "(Lcom/wombat/mama/MamaBasicSubscription;"                   

+            "Lcom/wombat/mama/MamaMsg;Ljava/lang/String;)V" );                    

+    if (!basicWildCardSubscriptionCallbackClass) return; /*Exception auto thrown*/

+                                                                                  

+    /*MamaBasicWildCardSubscriptionCallback.onDestroy*/                           

+    subWildCardCallbackonDestroyId_g = (*env)->GetMethodID(env,                   

+            basicWildCardSubscriptionCallbackClass,                               

+            "onDestroy", "(Lcom/wombat/mama/MamaBasicSubscription;)V");           

+                                                                                   

+    subWildCardCallbackonCreateId_g = (*env)->GetMethodID(env,                    

+            basicWildCardSubscriptionCallbackClass,                               

+            "onCreate", "(Lcom/wombat/mama/MamaBasicSubscription;)V" );           

+                                                                                   

+    /*MamaBasicWildCardSubscriptionCallback.onError */                            

+    subWildCardCallbackonErrorId_g = (*env)->GetMethodID(env,                     

+            basicWildCardSubscriptionCallbackClass,                               

+            "onError", "(Lcom/wombat/mama/MamaBasicSubscription;"                 

+            "SILjava/lang/String;)V" );            

     /*MamaSubscriptionCallback.onMsg()*/

     subCallbackonMsgId_g = (*env)->GetMethodID(env,

             basicSubscriptionCallbackClass,

@@ -382,6 +554,80 @@ void MAMACALLTYPE subscriptionDestroyCB (mamaSubscription subscription, void *cl

     return;

}

+void MAMACALLTYPE subscriptionWildCardMsgCB (mamaSubscription subscription,

+                                             mamaMsg msg,

+                                             const char* topic,

+                                             void*       closure,

+                                             void*       itemClosure)

+{

+    JNIEnv*             env             =   NULL;

+    callbackClosure*    closureImpl     =   (callbackClosure*)closure;

+

+    assert(closureImpl!=NULL);

+    assert(closureImpl->mClientCB!=NULL);

+    assert(closureImpl->mMessage!=NULL);

+    assert(closureImpl->mSubscription!=NULL);

+

+    /*Get the env for the current thread*/

+    env = utils_getENV(javaVM_g);

+    if (!env) return;/*Can't throw exception without JNIEnv!!*/

+

+    (*env)->SetLongField(env, closureImpl->mMessage,

+                         messagePointerFieldId_g,

+                         CAST_POINTER_TO_JLONG(msg));

+

+    /*invoke the callback*/

+    (*env)->CallVoidMethod(env, closureImpl->mClientCB,

+                           subWildCardCallbackonMsgId_g,

+                           closureImpl->mSubscription,

+                           closureImpl->mMessage,

+                           (*env)->NewStringUTF(env, topic));

+

+    /*

+       Need to check if any exceptions were propagated here.

+       If we don't the exceptions could actually fill the stack!!

+    */

+    utils_printAndClearExceptionFromStack (env,"onMsg");

+

+    return;

+}

+

+void MAMACALLTYPE subscriptionWildCardErrorCB  (

+        mamaSubscription    subscription,

+        mama_status         status,

+        void*               platformError,

+        const char*         subject,

+        void*               closure )

+{

+     subCommon_onErrorCb (subscription,

+                          status,

+                          platformError,

+                          subject,

+                          closure,

+                          subWildCardCallbackonErrorId_g);

+    return;

+}

+

+void MAMACALLTYPE subscriptionWildCardCreateCB (

+        mamaSubscription    subscription,

+        void*               closure )

+{

+    subCommon_createCb (subscription,

+                        closure,

+                        subscriptionPointerFieldId_g,

+                        subWildCardCallbackonCreateId_g);

+    return;

+}

+

+void MAMACALLTYPE subscriptionWildCardDestroyCB (mamaSubscription subscription, void *closure )

+{

+    subCommon_destroyCb (subscription,

+                     closure,

+                     subscriptionPointerFieldId_g,

+                     subWildCardCallbackonDestroyId_g);

+    return;

+}

+

/*

  * Class:     com_wombat_mama_MamaBasicSubscription

  * Method:    getSubscriptionState

diff --git a/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java b/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java

index 0c834fe..bb76ae8 100644

--- a/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java

+++ b/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java

@@ -115,6 +115,48 @@ public class MamaBasicSubscription

     }

     /**

+    * Create a basic wildcard subscription.

+    *

+    * The topic must be a valid wildcard topic for the underlying middleware.

+    *

+    * For WMW a source with a NULL symbol parameter creates a "transport"

+    * subscription that receives all messages on the transport and bypasses the

+    * naming service. A publishing transport can be assigned a name with the

+    * publish_name property.

+    *

+    * @param subscription The subscription.

+    * @param transport The transport to use.

+    * @param queue The mama queue.

+    * @param callbacks The mamaMsgCallbacks structure containing the callback

+    * functions.

+    * @param source The source name of the feed handler to provide the

+    * subscription.

+    * @param symbol The symbol name.

+    * @param closure The closure will be passed to subsequent callback

+    * invocations for this subscription.

+    */

+    public void createBasicWildCardSubscription(

+        final MamaBasicWildCardSubscriptionCallback callback,

+        final MamaTransport                         transport,

+        final MamaQueue                             queue,

+        final String                                source,

+        final String                                topic,

+        final Object                                closure)

+    {

+        // Save the closure as well

+        myClosure = closure;

+

+        // Create the native subscription

+        createNativeWildCardSubscription(

+            callback,

+            transport,

+            queue,

+            source,

+            topic,

+            null);

+    }

+

+    /**

      * This function returns the closure supplied to the createSubscription

      * function.

      *

@@ -213,4 +255,12 @@ public class MamaBasicSubscription

     private native int getSubscriptionState();

     private static native void initIDs();

+    private native void createNativeWildCardSubscription(

+        final MamaBasicWildCardSubscriptionCallback callback,

+        final MamaTransport                         transport,

+        final MamaQueue                             queue,

+        final String                                source,

+        final String                                topic,

+        final Object                                closure);

+

}/*End class*/

--

1.7.9.5

 

 



Please consider the environment before printing this e-mail.

This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy.

Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden.




Please consider the environment before printing this e-mail.

This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy.

Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden.

Join Openmama-dev@lists.openmama.org to automatically receive all group messages.