[PATCH 06/12] Added wildcard subscriptions in java
From 668aa21bb4b9f1ae29679da83a464eb67e21b3e4 Mon Sep 17 00:00:00 2001
Message-Id: <668aa21bb4b9f1ae29679da83a464eb67e21b3e4.1351009358.git.ibell@...>
In-Reply-To: <180145dfd9a175bb1b824b63df34a41b7e97d425.1351009358.git.ibell@...>
References: <180145dfd9a175bb1b824b63df34a41b7e97d425.1351009358.git.ibell@...>
From: Ian Bell <ibell@...>
Date: Tue, 23 Oct 2012 16:16:36 +0100
Subject: [PATCH 06/12] Added wildcard subscriptions in java
Added support for wildcard subscirptions to the jni layer.
Signed-off-by: Ian Bell <ibell@...>
---
mama/jni/src/c/mamabasicsubscriptionjni.c | 246 ++++++++++++++++++++
.../src/com/wombat/mama/MamaBasicSubscription.java | 50 ++++
2 files changed, 296 insertions(+)
diff --git a/mama/jni/src/c/mamabasicsubscriptionjni.c b/mama/jni/src/c/mamabasicsubscriptionjni.c
index 7138fcb..87f0060 100644
--- a/mama/jni/src/c/mamabasicsubscriptionjni.c
+++ b/mama/jni/src/c/mamabasicsubscriptionjni.c
@@ -49,6 +49,35 @@ static jmethodID subCallbackonMsgId_g = NULL;
static jmethodID subCallbackonErrorId_g = NULL;
static jmethodID subCallbackonDestroyId_g = NULL;
+static jmethodID subWildCardCallbackonCreateId_g = NULL;
+static jmethodID subWildCardCallbackonMsgId_g = NULL;
+static jmethodID subWildCardCallbackonErrorId_g = NULL;
+static jmethodID subWildCardCallbackonDestroyId_g = NULL;
+
+/*Invoked for onMsg for wildcards */
+static void MAMACALLTYPE subscriptionWildCardMsgCB (
+ mamaSubscription subscription,
+ mamaMsg msg,
+ const char* topic,
+ void* closure,
+ void* itemClosure);
+
+/*Invoked in response to errors for wildcards*/
+static void MAMACALLTYPE subscriptionWildCardErrorCB (
+ mamaSubscription subscription,
+ mama_status status,
+ void* platformError,
+ const char* subject,
+ void* closure );
+
+/*Invoked when a wildcard subscription is first created*/
+static void MAMACALLTYPE subscriptionWildCardCreateCB (
+ mamaSubscription subscription,
+ void* closure );
+
+static void MAMACALLTYPE subscriptionWildCardDestroyCB (
+ mamaSubscription subscription,
+ void *closure );
/*Invoked for normal updates*/
static void MAMACALLTYPE subscriptionMsgCB (
mamaSubscription subscription,
@@ -73,6 +102,122 @@ static void MAMACALLTYPE subscriptionDestroyCB (
mamaSubscription subscription,
void *closure );
+JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_createNativeWildCardSubscription
+ (JNIEnv* env, jobject this, jobject callback, jobject transport, jobject queue,
+ jstring source, jstring topic, jobject closure)
+{
+ mama_status status = MAMA_STATUS_OK;
+ const char* c_topic = NULL;
+ const char* c_source = NULL;
+ jlong transportPointer = 0;
+ jlong subscriptionPointer = 0;
+ mamaQueue queue_c = NULL;
+ jlong queuePointer = 0;
+ jobject messageImpl = NULL;
+ mamaWildCardMsgCallbacks c_callback;
+ char errorString[UTILS_MAX_ERROR_STRING_LENGTH];
+ callbackClosure* closureImpl = (callbackClosure*)calloc(1,
+ sizeof(callbackClosure));
+ if (!closureImpl)
+ {
+ utils_throwMamaException(env,"createNativeWildCardSubscription():"
+ " Could not allocate.");
+ return;
+ }
+
+ closureImpl->mUserData = NULL;
+ closureImpl->mClientCB = NULL;
+ closureImpl->mSubscription = NULL;
+ closureImpl->mMessage = NULL;
+
+ /*Setup the callback structure*/
+ memset(&c_callback, 0, sizeof(c_callback));
+ c_callback.onCreate = (wombat_subscriptionCreateCB)subscriptionWildCardCreateCB;
+ c_callback.onError = (wombat_subscriptionErrorCB)subscriptionWildCardErrorCB;
+ c_callback.onMsg = (wombat_subscriptionWildCardOnMsgCB)subscriptionWildCardMsgCB;
+ c_callback.onDestroy = (wombat_subscriptionDestroyCB)subscriptionWildCardDestroyCB;
+
+ if (topic)
+ {
+ c_topic = (*env)->GetStringUTFChars(env,topic,0);
+ if (!c_topic) return;/*Exception auto thrown*/
+ }
+
+ if (source)
+ {
+ c_source = (*env)->GetStringUTFChars(env, source, 0);
+ if (!c_source) return; /*Exception auto thrown*/
+ }
+
+ transportPointer = (*env)->GetLongField(env, transport,
+ transportPointerFieldId_g);
+
+ closureImpl->mClientCB = (*env)->NewGlobalRef(env, callback);
+
+ /*If the client supplied a Java closure add it to ours*/
+ if(closure)closureImpl->mUserData = (*env)->NewGlobalRef(env,closure);
+
+ /*Check if a queue was specified.*/
+ if(queue)
+ {
+ /* Get the queue pointer value from the MamaQueue java object */
+ queuePointer = (*env)->GetLongField(env, queue, queuePointerFieldId_g);
+ queue_c = CAST_JLONG_TO_POINTER(mamaQueue, queuePointer);
+ }
+
+ /*Create a reuseable message object to hang off the subscription*/
+ messageImpl = utils_createJavaMamaMsg(env);
+ if(NULL==messageImpl)
+ {
+ if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);
+ if(c_source)(*env)->ReleaseStringUTFChars(env, source, c_source);
+ if(closure)(*env)->DeleteGlobalRef(env,closureImpl->mUserData);
+ (*env)->DeleteGlobalRef(env,closureImpl->mClientCB);
+ free(closureImpl);
+ return;
+ }/*Exception will have been thrown*/
+ /*This global will be deleted when the subscription is destroyed*/
+ closureImpl->mMessage = (*env)->NewGlobalRef(env,messageImpl);
+
+ /*Add the Java Subscription to the closure - we need it in the
+ async callbacks so it can be passed to the Java callback*/
+ closureImpl->mSubscription = (*env)->NewGlobalRef(env,this);
+
+ subscriptionPointer = (*env)->GetLongField(env, this,
+ subscriptionPointerFieldId_g);
+
+ /*Actually create the C basic Subscription*/
+ if(MAMA_STATUS_OK!=(status=mamaSubscription_createBasicWildCard(
+ CAST_JLONG_TO_POINTER(mamaSubscription,subscriptionPointer),
+ CAST_JLONG_TO_POINTER(mamaTransport,transportPointer),
+ queue_c,
+ &c_callback,
+ c_source,
+ c_topic,
+ closureImpl)))
+
+ {
+ if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);
+ if(c_source)(*env)->ReleaseStringUTFChars(env, source, c_source);
+ if(closure)(*env)->DeleteGlobalRef(env,closureImpl->mUserData);
+ (*env)->DeleteGlobalRef(env,closureImpl->mClientCB);
+ (*env)->DeleteGlobalRef(env,closureImpl->mSubscription);
+ free(closureImpl);
+ utils_buildErrorStringForStatus(
+ errorString,
+ UTILS_MAX_ERROR_STRING_LENGTH,
+ "Failed to create basic subscription.",
+ status);
+ utils_throwMamaException(env,errorString);
+ return;
+ }
+
+ /*Tidy up*/
+ if(c_topic)(*env)->ReleaseStringUTFChars(env,topic, c_topic);
+ if(c_source)(*env)->ReleaseStringUTFChars(env,source, c_source);
+ return;
+}
+
/*
* Class: com_wombat_mama_MamaBasicSubscription
* Method: createNativeSubscription
@@ -273,6 +418,7 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_initIDs
(JNIEnv* env, jclass class)
{
jclass basicSubscriptionCallbackClass = NULL;
+ jclass basicWildCardSubscriptionCallbackClass = NULL;
/*A reference to the */
subscriptionPointerFieldId_g = (*env)->GetFieldID(env,
@@ -285,6 +431,32 @@ JNIEXPORT void JNICALL Java_com_wombat_mama_MamaBasicSubscription_initIDs
"com/wombat/mama/MamaBasicSubscriptionCallback");
if (!basicSubscriptionCallbackClass) return;/*Exception auto thrown*/
+ /*Get a reference to the wildcard subscription callback class */
+ basicWildCardSubscriptionCallbackClass = (*env)->FindClass(env,
+ "com/wombat/mama/MamaBasicWildCardSubscriptionCallback");
+ if (!basicWildCardSubscriptionCallbackClass) return; /*Exception auto thrown*/
+
+ /*MamaBasicWildCardSubscriptionCallback.onMsg */
+ subWildCardCallbackonMsgId_g = (*env)->GetMethodID(env,
+ basicWildCardSubscriptionCallbackClass,
+ "onMsg", "(Lcom/wombat/mama/MamaBasicSubscription;"
+ "Lcom/wombat/mama/MamaMsg;Ljava/lang/String;)V" );
+ if (!basicWildCardSubscriptionCallbackClass) return; /*Exception auto thrown*/
+
+ /*MamaBasicWildCardSubscriptionCallback.onDestroy*/
+ subWildCardCallbackonDestroyId_g = (*env)->GetMethodID(env,
+ basicWildCardSubscriptionCallbackClass,
+ "onDestroy", "(Lcom/wombat/mama/MamaBasicSubscription;)V");
+
+ subWildCardCallbackonCreateId_g = (*env)->GetMethodID(env,
+ basicWildCardSubscriptionCallbackClass,
+ "onCreate", "(Lcom/wombat/mama/MamaBasicSubscription;)V" );
+
+ /*MamaBasicWildCardSubscriptionCallback.onError */
+ subWildCardCallbackonErrorId_g = (*env)->GetMethodID(env,
+ basicWildCardSubscriptionCallbackClass,
+ "onError", "(Lcom/wombat/mama/MamaBasicSubscription;"
+ "SILjava/lang/String;)V" );
/*MamaSubscriptionCallback.onMsg()*/
subCallbackonMsgId_g = (*env)->GetMethodID(env,
basicSubscriptionCallbackClass,
@@ -382,6 +554,80 @@ void MAMACALLTYPE subscriptionDestroyCB (mamaSubscription subscription, void *cl
return;
}
+void MAMACALLTYPE subscriptionWildCardMsgCB (mamaSubscription subscription,
+ mamaMsg msg,
+ const char* topic,
+ void* closure,
+ void* itemClosure)
+{
+ JNIEnv* env = NULL;
+ callbackClosure* closureImpl = (callbackClosure*)closure;
+
+ assert(closureImpl!=NULL);
+ assert(closureImpl->mClientCB!=NULL);
+ assert(closureImpl->mMessage!=NULL);
+ assert(closureImpl->mSubscription!=NULL);
+
+ /*Get the env for the current thread*/
+ env = utils_getENV(javaVM_g);
+ if (!env) return;/*Can't throw exception without JNIEnv!!*/
+
+ (*env)->SetLongField(env, closureImpl->mMessage,
+ messagePointerFieldId_g,
+ CAST_POINTER_TO_JLONG(msg));
+
+ /*invoke the callback*/
+ (*env)->CallVoidMethod(env, closureImpl->mClientCB,
+ subWildCardCallbackonMsgId_g,
+ closureImpl->mSubscription,
+ closureImpl->mMessage,
+ (*env)->NewStringUTF(env, topic));
+
+ /*
+ Need to check if any exceptions were propagated here.
+ If we don't the exceptions could actually fill the stack!!
+ */
+ utils_printAndClearExceptionFromStack (env,"onMsg");
+
+ return;
+}
+
+void MAMACALLTYPE subscriptionWildCardErrorCB (
+ mamaSubscription subscription,
+ mama_status status,
+ void* platformError,
+ const char* subject,
+ void* closure )
+{
+ subCommon_onErrorCb (subscription,
+ status,
+ platformError,
+ subject,
+ closure,
+ subWildCardCallbackonErrorId_g);
+ return;
+}
+
+void MAMACALLTYPE subscriptionWildCardCreateCB (
+ mamaSubscription subscription,
+ void* closure )
+{
+ subCommon_createCb (subscription,
+ closure,
+ subscriptionPointerFieldId_g,
+ subWildCardCallbackonCreateId_g);
+ return;
+}
+
+void MAMACALLTYPE subscriptionWildCardDestroyCB (mamaSubscription subscription, void *closure )
+{
+ subCommon_destroyCb (subscription,
+ closure,
+ subscriptionPointerFieldId_g,
+ subWildCardCallbackonDestroyId_g);
+ return;
+}
+
/*
* Class: com_wombat_mama_MamaBasicSubscription
* Method: getSubscriptionState
diff --git a/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java b/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java
index 0c834fe..bb76ae8 100644
--- a/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java
+++ b/mama/jni/src/com/wombat/mama/MamaBasicSubscription.java
@@ -115,6 +115,48 @@ public class MamaBasicSubscription
}
/**
+ * Create a basic wildcard subscription.
+ *
+ * The topic must be a valid wildcard topic for the underlying middleware.
+ *
+ * For WMW a source with a NULL symbol parameter creates a "transport"
+ * subscription that receives all messages on the transport and bypasses the
+ * naming service. A publishing transport can be assigned a name with the
+ * publish_name property.
+ *
+ * @param subscription The subscription.
+ * @param transport The transport to use.
+ * @param queue The mama queue.
+ * @param callbacks The mamaMsgCallbacks structure containing the callback
+ * functions.
+ * @param source The source name of the feed handler to provide the
+ * subscription.
+ * @param symbol The symbol name.
+ * @param closure The closure will be passed to subsequent callback
+ * invocations for this subscription.
+ */
+ public void createBasicWildCardSubscription(
+ final MamaBasicWildCardSubscriptionCallback callback,
+ final MamaTransport transport,
+ final MamaQueue queue,
+ final String source,
+ final String topic,
+ final Object closure)
+ {
+ // Save the closure as well
+ myClosure = closure;
+
+ // Create the native subscription
+ createNativeWildCardSubscription(
+ callback,
+ transport,
+ queue,
+ source,
+ topic,
+ null);
+ }
+
+ /**
* This function returns the closure supplied to the createSubscription
* function.
*
@@ -213,4 +255,12 @@ public class MamaBasicSubscription
private native int getSubscriptionState();
private static native void initIDs();
+ private native void createNativeWildCardSubscription(
+ final MamaBasicWildCardSubscriptionCallback callback,
+ final MamaTransport transport,
+ final MamaQueue queue,
+ final String source,
+ final String topic,
+ final Object closure);
+
}/*End class*/
--
1.7.9.5
Please consider the environment before printing this e-mail.
This e-mail may contain confidential and/or privileged information. If you are not the intended recipient or have received this e-mail in error, please advise the sender immediately by reply e-mail and delete this message and any attachments without retaining a copy.
Any unauthorised copying, disclosure or distribution of the material in this e-mail is strictly forbidden.