diff options
Diffstat (limited to '')
-rw-r--r-- | sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java | 77 |
1 files changed, 62 insertions, 15 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java index 4ee0719707..3471e68d6f 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java @@ -20,37 +20,45 @@ package org.apache.tuscany.sca.binding.jms.provider; import javax.jms.JMSException; +import javax.jms.MessageListener; +import javax.naming.NamingException; import org.apache.tuscany.sca.binding.jms.JMSBinding; import org.apache.tuscany.sca.binding.jms.JMSBindingException; import org.apache.tuscany.sca.binding.jms.headers.HeaderReferenceInterceptor; +import org.apache.tuscany.sca.binding.jms.host.AsyncResponseJMSServiceListener; +import org.apache.tuscany.sca.binding.jms.host.JMSAsyncResponseInvoker; import org.apache.tuscany.sca.binding.jms.transport.TransportReferenceInterceptor; import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.invocation.Phase; -import org.apache.tuscany.sca.provider.EndpointReferenceProvider; +import org.apache.tuscany.sca.provider.EndpointReferenceAsyncProvider; import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint; import org.apache.tuscany.sca.provider.WireFormatProvider; import org.apache.tuscany.sca.provider.WireFormatProviderFactory; -import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentReference; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.apache.tuscany.sca.work.WorkScheduler; /** * Implementation of the JMS reference binding provider. * + * This version supports native async service invocations + * * @version $Rev$ $Date$ */ -public class JMSBindingReferenceBindingProvider implements EndpointReferenceProvider { +public class JMSBindingReferenceBindingProvider implements EndpointReferenceAsyncProvider { private RuntimeEndpointReference endpointReference; private RuntimeComponentReference reference; private JMSBinding jmsBinding; private JMSResourceFactory jmsResourceFactory; - private RuntimeComponent component; private InterfaceContract interfaceContract; private ExtensionPointRegistry extensions; @@ -61,13 +69,14 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv private WireFormatProviderFactory responseWireFormatProviderFactory; private WireFormatProvider responseWireFormatProvider; + + private AsyncResponseJMSServiceListener responseQueue = null; public JMSBindingReferenceBindingProvider(RuntimeEndpointReference endpointReference, ExtensionPointRegistry extensions, JMSResourceFactory jmsResourceFactory) { this.endpointReference = endpointReference; this.reference = (RuntimeComponentReference) endpointReference.getReference(); this.jmsBinding = (JMSBinding) endpointReference.getBinding(); this.extensions = extensions; - this.component = (RuntimeComponent) endpointReference.getComponent(); this.jmsResourceFactory = jmsResourceFactory; // Get the factories/providers for operation selection @@ -95,16 +104,21 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv responseWireFormatProvider.configureWireFormatInterfaceContract(interfaceContract); } catch (CloneNotSupportedException ex){ interfaceContract = reference.getInterfaceContract(); - } - } + } // end try + + // If the service is asyncInvocation, then create a fixed response location + if( endpointReference.isAsyncInvocation() ) { + String asyncCallbackName = endpointReference.getReference().getName() + "_asyncResponse"; + jmsBinding.setResponseDestinationName(asyncCallbackName); + } // end if + + } // end constructor public Invoker createInvoker(Operation operation) { if (jmsBinding.getDestinationName() == null) { -// if (!reference.isCallback()) { // TODO: 2.x migration, is this check needed? throw new JMSBindingException("No destination specified for reference " + reference.getName()); -// } - } + } // end if if ( jmsBinding.getActivationSpecName() != null ) { throw new JMSBindingException("Activation spec can not be specified on an SCA reference binding."); @@ -113,7 +127,7 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv invoker = new RRBJMSBindingInvoker(operation, jmsResourceFactory, endpointReference); return invoker; - } + } // end method createInvoker public boolean supportsOneWayInvocation() { return true; @@ -124,17 +138,43 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv } public void start() { + // If the reference is async invocation, then a response queue handler and associated JMS listener must be created + // and started + if (endpointReference.isAsyncInvocation()) { + // Create the JMS listener + FactoryExtensionPoint modelFactories = extensions.getExtensionPoint(FactoryExtensionPoint.class); + MessageFactory messageFactory = modelFactories.getFactory(MessageFactory.class); + MessageListener listener; + try { + listener = new JMSAsyncResponseInvoker(endpointReference, messageFactory, jmsResourceFactory); + } catch (NamingException e) { + throw new JMSBindingException("Unable to create JMSResponseInvoker", e); + } // end try + + // Create the response queue handler + UtilityExtensionPoint utilities = extensions.getExtensionPoint(UtilityExtensionPoint.class); + WorkScheduler workScheduler = utilities.getUtility(WorkScheduler.class); + + responseQueue = new AsyncResponseJMSServiceListener(listener, + jmsBinding.getResponseDestinationName(), + jmsBinding, workScheduler, jmsResourceFactory); + responseQueue.start(); + } // end if - } + } // end method start public void stop() { try { + if( responseQueue != null ) { + responseQueue.stop(); + } // end if + jmsResourceFactory.closeConnection(); jmsResourceFactory.closeResponseConnection(); } catch (JMSException e) { throw new JMSBindingException(e); } - } + } // end method stop /* * set up the reference binding wire with the right set of jms reference @@ -167,6 +207,13 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv jmsBinding, jmsResourceFactory, endpointReference) ); - } + } + + /** + * Indicates that this binding supports async invocations natively + */ + public boolean supportsNativeAsync() { + return true; + } // end method supportsNativeAsync -} +} // end class |