summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java77
1 files changed, 62 insertions, 15 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
index 4ee0719707..3471e68d6f 100644
--- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
+++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
@@ -20,37 +20,45 @@
package org.apache.tuscany.sca.binding.jms.provider;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.naming.NamingException;
import org.apache.tuscany.sca.binding.jms.JMSBinding;
import org.apache.tuscany.sca.binding.jms.JMSBindingException;
import org.apache.tuscany.sca.binding.jms.headers.HeaderReferenceInterceptor;
+import org.apache.tuscany.sca.binding.jms.host.AsyncResponseJMSServiceListener;
+import org.apache.tuscany.sca.binding.jms.host.JMSAsyncResponseInvoker;
import org.apache.tuscany.sca.binding.jms.transport.TransportReferenceInterceptor;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.invocation.Phase;
-import org.apache.tuscany.sca.provider.EndpointReferenceProvider;
+import org.apache.tuscany.sca.provider.EndpointReferenceAsyncProvider;
import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint;
import org.apache.tuscany.sca.provider.WireFormatProvider;
import org.apache.tuscany.sca.provider.WireFormatProviderFactory;
-import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* Implementation of the JMS reference binding provider.
*
+ * This version supports native async service invocations
+ *
* @version $Rev$ $Date$
*/
-public class JMSBindingReferenceBindingProvider implements EndpointReferenceProvider {
+public class JMSBindingReferenceBindingProvider implements EndpointReferenceAsyncProvider {
private RuntimeEndpointReference endpointReference;
private RuntimeComponentReference reference;
private JMSBinding jmsBinding;
private JMSResourceFactory jmsResourceFactory;
- private RuntimeComponent component;
private InterfaceContract interfaceContract;
private ExtensionPointRegistry extensions;
@@ -61,13 +69,14 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv
private WireFormatProviderFactory responseWireFormatProviderFactory;
private WireFormatProvider responseWireFormatProvider;
+
+ private AsyncResponseJMSServiceListener responseQueue = null;
public JMSBindingReferenceBindingProvider(RuntimeEndpointReference endpointReference, ExtensionPointRegistry extensions, JMSResourceFactory jmsResourceFactory) {
this.endpointReference = endpointReference;
this.reference = (RuntimeComponentReference) endpointReference.getReference();
this.jmsBinding = (JMSBinding) endpointReference.getBinding();
this.extensions = extensions;
- this.component = (RuntimeComponent) endpointReference.getComponent();
this.jmsResourceFactory = jmsResourceFactory;
// Get the factories/providers for operation selection
@@ -95,16 +104,21 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv
responseWireFormatProvider.configureWireFormatInterfaceContract(interfaceContract);
} catch (CloneNotSupportedException ex){
interfaceContract = reference.getInterfaceContract();
- }
- }
+ } // end try
+
+ // If the service is asyncInvocation, then create a fixed response location
+ if( endpointReference.isAsyncInvocation() ) {
+ String asyncCallbackName = endpointReference.getReference().getName() + "_asyncResponse";
+ jmsBinding.setResponseDestinationName(asyncCallbackName);
+ } // end if
+
+ } // end constructor
public Invoker createInvoker(Operation operation) {
if (jmsBinding.getDestinationName() == null) {
-// if (!reference.isCallback()) { // TODO: 2.x migration, is this check needed?
throw new JMSBindingException("No destination specified for reference " + reference.getName());
-// }
- }
+ } // end if
if ( jmsBinding.getActivationSpecName() != null ) {
throw new JMSBindingException("Activation spec can not be specified on an SCA reference binding.");
@@ -113,7 +127,7 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv
invoker = new RRBJMSBindingInvoker(operation, jmsResourceFactory, endpointReference);
return invoker;
- }
+ } // end method createInvoker
public boolean supportsOneWayInvocation() {
return true;
@@ -124,17 +138,43 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv
}
public void start() {
+ // If the reference is async invocation, then a response queue handler and associated JMS listener must be created
+ // and started
+ if (endpointReference.isAsyncInvocation()) {
+ // Create the JMS listener
+ FactoryExtensionPoint modelFactories = extensions.getExtensionPoint(FactoryExtensionPoint.class);
+ MessageFactory messageFactory = modelFactories.getFactory(MessageFactory.class);
+ MessageListener listener;
+ try {
+ listener = new JMSAsyncResponseInvoker(endpointReference, messageFactory, jmsResourceFactory);
+ } catch (NamingException e) {
+ throw new JMSBindingException("Unable to create JMSResponseInvoker", e);
+ } // end try
+
+ // Create the response queue handler
+ UtilityExtensionPoint utilities = extensions.getExtensionPoint(UtilityExtensionPoint.class);
+ WorkScheduler workScheduler = utilities.getUtility(WorkScheduler.class);
+
+ responseQueue = new AsyncResponseJMSServiceListener(listener,
+ jmsBinding.getResponseDestinationName(),
+ jmsBinding, workScheduler, jmsResourceFactory);
+ responseQueue.start();
+ } // end if
- }
+ } // end method start
public void stop() {
try {
+ if( responseQueue != null ) {
+ responseQueue.stop();
+ } // end if
+
jmsResourceFactory.closeConnection();
jmsResourceFactory.closeResponseConnection();
} catch (JMSException e) {
throw new JMSBindingException(e);
}
- }
+ } // end method stop
/*
* set up the reference binding wire with the right set of jms reference
@@ -167,6 +207,13 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv
jmsBinding,
jmsResourceFactory,
endpointReference) );
- }
+ }
+
+ /**
+ * Indicates that this binding supports async invocations natively
+ */
+ public boolean supportsNativeAsync() {
+ return true;
+ } // end method supportsNativeAsync
-}
+} // end class