From 1f91615143cc34ad7eeffc7d91b6ffec9d7d0271 Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Mon, 20 Dec 2010 19:36:27 +0000 Subject: Extending binding-jms-runtime to provide native async service invocation with separate forward request messages and back response messages - as described in TUSCANY-3809 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1051255 13f79535-47bb-0310-9956-ffa450edef68 --- .../provider/JMSBindingAsyncResponseInvoker.java | 43 +++++++++ .../JMSBindingReferenceBindingProvider.java | 77 ++++++++++++--- .../provider/JMSBindingServiceBindingProvider.java | 23 ++++- .../binding/jms/provider/RRBJMSBindingInvoker.java | 106 +++++++++++++++++++-- 4 files changed, 223 insertions(+), 26 deletions(-) create mode 100644 sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java (limited to 'sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider') diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java new file mode 100644 index 0000000000..f07e9de29f --- /dev/null +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.jms.provider; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; + +/** + * @version $Rev$ $Date$ + */ +public class JMSBindingAsyncResponseInvoker implements InvokerAsyncResponse { + + RuntimeEndpoint endpoint; + + public JMSBindingAsyncResponseInvoker(ExtensionPointRegistry extensionPoints, + RuntimeEndpoint endpoint) { + this.endpoint = endpoint; + } // end constructor + + public void invokeAsyncResponse(Message msg) { + // TODO + } // end method invokeAsyncResponse +} // end class JMSBindingAsyncResponseInvoker \ No newline at end of file diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java index 4ee0719707..3471e68d6f 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java @@ -20,37 +20,45 @@ package org.apache.tuscany.sca.binding.jms.provider; import javax.jms.JMSException; +import javax.jms.MessageListener; +import javax.naming.NamingException; import org.apache.tuscany.sca.binding.jms.JMSBinding; import org.apache.tuscany.sca.binding.jms.JMSBindingException; import org.apache.tuscany.sca.binding.jms.headers.HeaderReferenceInterceptor; +import org.apache.tuscany.sca.binding.jms.host.AsyncResponseJMSServiceListener; +import org.apache.tuscany.sca.binding.jms.host.JMSAsyncResponseInvoker; import org.apache.tuscany.sca.binding.jms.transport.TransportReferenceInterceptor; import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.invocation.Phase; -import org.apache.tuscany.sca.provider.EndpointReferenceProvider; +import org.apache.tuscany.sca.provider.EndpointReferenceAsyncProvider; import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint; import org.apache.tuscany.sca.provider.WireFormatProvider; import org.apache.tuscany.sca.provider.WireFormatProviderFactory; -import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentReference; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.apache.tuscany.sca.work.WorkScheduler; /** * Implementation of the JMS reference binding provider. * + * This version supports native async service invocations + * * @version $Rev$ $Date$ */ -public class JMSBindingReferenceBindingProvider implements EndpointReferenceProvider { +public class JMSBindingReferenceBindingProvider implements EndpointReferenceAsyncProvider { private RuntimeEndpointReference endpointReference; private RuntimeComponentReference reference; private JMSBinding jmsBinding; private JMSResourceFactory jmsResourceFactory; - private RuntimeComponent component; private InterfaceContract interfaceContract; private ExtensionPointRegistry extensions; @@ -61,13 +69,14 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv private WireFormatProviderFactory responseWireFormatProviderFactory; private WireFormatProvider responseWireFormatProvider; + + private AsyncResponseJMSServiceListener responseQueue = null; public JMSBindingReferenceBindingProvider(RuntimeEndpointReference endpointReference, ExtensionPointRegistry extensions, JMSResourceFactory jmsResourceFactory) { this.endpointReference = endpointReference; this.reference = (RuntimeComponentReference) endpointReference.getReference(); this.jmsBinding = (JMSBinding) endpointReference.getBinding(); this.extensions = extensions; - this.component = (RuntimeComponent) endpointReference.getComponent(); this.jmsResourceFactory = jmsResourceFactory; // Get the factories/providers for operation selection @@ -95,16 +104,21 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv responseWireFormatProvider.configureWireFormatInterfaceContract(interfaceContract); } catch (CloneNotSupportedException ex){ interfaceContract = reference.getInterfaceContract(); - } - } + } // end try + + // If the service is asyncInvocation, then create a fixed response location + if( endpointReference.isAsyncInvocation() ) { + String asyncCallbackName = endpointReference.getReference().getName() + "_asyncResponse"; + jmsBinding.setResponseDestinationName(asyncCallbackName); + } // end if + + } // end constructor public Invoker createInvoker(Operation operation) { if (jmsBinding.getDestinationName() == null) { -// if (!reference.isCallback()) { // TODO: 2.x migration, is this check needed? throw new JMSBindingException("No destination specified for reference " + reference.getName()); -// } - } + } // end if if ( jmsBinding.getActivationSpecName() != null ) { throw new JMSBindingException("Activation spec can not be specified on an SCA reference binding."); @@ -113,7 +127,7 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv invoker = new RRBJMSBindingInvoker(operation, jmsResourceFactory, endpointReference); return invoker; - } + } // end method createInvoker public boolean supportsOneWayInvocation() { return true; @@ -124,17 +138,43 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv } public void start() { + // If the reference is async invocation, then a response queue handler and associated JMS listener must be created + // and started + if (endpointReference.isAsyncInvocation()) { + // Create the JMS listener + FactoryExtensionPoint modelFactories = extensions.getExtensionPoint(FactoryExtensionPoint.class); + MessageFactory messageFactory = modelFactories.getFactory(MessageFactory.class); + MessageListener listener; + try { + listener = new JMSAsyncResponseInvoker(endpointReference, messageFactory, jmsResourceFactory); + } catch (NamingException e) { + throw new JMSBindingException("Unable to create JMSResponseInvoker", e); + } // end try + + // Create the response queue handler + UtilityExtensionPoint utilities = extensions.getExtensionPoint(UtilityExtensionPoint.class); + WorkScheduler workScheduler = utilities.getUtility(WorkScheduler.class); + + responseQueue = new AsyncResponseJMSServiceListener(listener, + jmsBinding.getResponseDestinationName(), + jmsBinding, workScheduler, jmsResourceFactory); + responseQueue.start(); + } // end if - } + } // end method start public void stop() { try { + if( responseQueue != null ) { + responseQueue.stop(); + } // end if + jmsResourceFactory.closeConnection(); jmsResourceFactory.closeResponseConnection(); } catch (JMSException e) { throw new JMSBindingException(e); } - } + } // end method stop /* * set up the reference binding wire with the right set of jms reference @@ -167,6 +207,13 @@ public class JMSBindingReferenceBindingProvider implements EndpointReferenceProv jmsBinding, jmsResourceFactory, endpointReference) ); - } + } + + /** + * Indicates that this binding supports async invocations natively + */ + public boolean supportsNativeAsync() { + return true; + } // end method supportsNativeAsync -} +} // end class diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java index 8428a45c6e..b96376eb15 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -29,14 +29,18 @@ import org.apache.tuscany.sca.binding.jms.host.JMSServiceListener; import org.apache.tuscany.sca.binding.jms.host.JMSServiceListenerDetails; import org.apache.tuscany.sca.binding.jms.host.JMSServiceListenerFactory; import org.apache.tuscany.sca.binding.jms.transport.TransportServiceInterceptor; +import org.apache.tuscany.sca.binding.jms.wire.AsyncResponseDestinationInterceptor; import org.apache.tuscany.sca.binding.jms.wire.CallbackDestinationInterceptor; import org.apache.tuscany.sca.binding.jms.wire.OperationPropertiesInterceptor; +import org.apache.tuscany.sca.binding.sca.provider.SCABindingAsyncResponseInvoker; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.invocation.Phase; +import org.apache.tuscany.sca.provider.EndpointAsyncProvider; import org.apache.tuscany.sca.provider.EndpointProvider; import org.apache.tuscany.sca.provider.OperationSelectorProvider; import org.apache.tuscany.sca.provider.OperationSelectorProviderFactory; @@ -52,7 +56,7 @@ import org.apache.tuscany.sca.runtime.RuntimeEndpoint; * * @version $Rev$ $Date$ */ -public class JMSBindingServiceBindingProvider implements EndpointProvider, JMSServiceListenerDetails { +public class JMSBindingServiceBindingProvider implements EndpointAsyncProvider, JMSServiceListenerDetails { private static final Logger logger = Logger.getLogger(JMSBindingServiceBindingProvider.class.getName()); private ExtensionPointRegistry registry; @@ -202,7 +206,11 @@ public class JMSBindingServiceBindingProvider implements EndpointProvider, JMSSe new CallbackDestinationInterceptor(endpoint)); bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, new HeaderServiceInterceptor(jmsBinding)); - + + // add async response interceptor after header interceptor + bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, + new AsyncResponseDestinationInterceptor(endpoint)); + // add request wire format bindingChain.addInterceptor(requestWireFormatProvider.getPhase(), requestWireFormatProvider.createInterceptor()); @@ -243,4 +251,15 @@ public class JMSBindingServiceBindingProvider implements EndpointProvider, JMSSe return endpoint; } + /** + * Indicates that this service binding does support native async service invocations + */ + public boolean supportsNativeAsync() { + return true; + } // end method supportsNativeAsync + + public InvokerAsyncResponse createAsyncResponseInvoker() { + return new JMSBindingAsyncResponseInvoker(null, endpoint); + } // end method createAsyncResponseInvoker + } diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java index 098bdde3ae..0e88b283dc 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java @@ -29,9 +29,11 @@ import org.apache.tuscany.sca.binding.jms.JMSBinding; import org.apache.tuscany.sca.binding.jms.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.JMSBindingException; import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; +import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.util.FaultException; import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; import org.oasisopen.sca.ServiceRuntimeException; @@ -40,7 +42,7 @@ import org.oasisopen.sca.ServiceRuntimeException; * * @version $Rev$ $Date$ */ -public class RRBJMSBindingInvoker implements Invoker { +public class RRBJMSBindingInvoker extends InterceptorAsyncImpl { protected Operation operation; protected String operationName; @@ -65,14 +67,12 @@ public class RRBJMSBindingInvoker implements Invoker { // properties of the inbound service request. We should not look for or require a // statically-configured destination unless a message is received that does not have // the necessary properties. -// if (!reference.isCallback()) { // TODO: 2.x migration, is this check needed? - bindingRequestDest = lookupDestination(); -// } + bindingRequestDest = lookupDestination(); bindingReplyDest = lookupResponseDestination(); } catch (NamingException e) { throw new JMSBindingException(e); - } - } + } // end try + } // end constructor /** * Looks up the Destination Queue for the JMS Binding @@ -136,7 +136,7 @@ public class RRBJMSBindingInvoker implements Invoker { qCreateMode = jmsBinding.getDestinationCreate(); } - // FIXME: [rfeng] A hack to remove jms:jndi: prefix + // Remove jms:jndi: prefix if present if (queueName.startsWith("jms:jndi:")) { queueName = queueName.substring("jms:jndi:".length()); } @@ -186,7 +186,15 @@ public class RRBJMSBindingInvoker implements Invoker { } return dest; - } + } // end method lookupDestinationQueue + + /** + * Get the next in the chain from the binding invocation chain + */ + public Invoker getNext() { + return (Invoker)endpointReference.getBindingInvocationChain().getHeadInvoker(); + } // end method getNext + public org.apache.tuscany.sca.invocation.Message invoke(org.apache.tuscany.sca.invocation.Message tuscanyMsg) { try { @@ -264,6 +272,86 @@ public class RRBJMSBindingInvoker implements Invoker { } } return replyToDest; - } + } + + /** + * Process forward request message + * @param tuscanyMsg - the request message + * @return the processed version of the request message + */ + public Message processRequest(Message tuscanyMsg) { + try { + // populate the message context with JMS binding information + JMSBindingContext context = new JMSBindingContext(); + context.setJmsResourceFactory(jmsResourceFactory); + tuscanyMsg.setBindingContext(context); + + // get a JMS session to cover the creation and sending of the message + Session session = context.getJmsSession(); + + context.setRequestDestination(getRequestDestination(tuscanyMsg, session)); + context.setReplyToDestination(getReplyToDestination(session)); + + return tuscanyMsg; + } catch (Exception e) { + throw new JMSBindingException(e); + } // end try + } // end method processRequest + + /** + * Post processing for a request message where an error occurred + * @param tuscanyMsg + * @return the post processed message + */ + public Message postProcessRequest(Message tuscanyMsg, Throwable e) { + // Exception handling + if ( e instanceof ServiceRuntimeException ) { + if (e.getCause() instanceof InvocationTargetException) { + if ((e.getCause().getCause() instanceof RuntimeException)) { + tuscanyMsg.setFaultBody(e.getCause()); + } else { + tuscanyMsg.setFaultBody(((InvocationTargetException)e.getCause()).getTargetException()); + } // end if + } else if (e.getCause() instanceof FaultException) { + tuscanyMsg.setFaultBody(e.getCause()); + } else { + tuscanyMsg.setFaultBody(e); + } // end if + } else { + tuscanyMsg.setFaultBody(e); + } // end if + + return postProcessRequest( tuscanyMsg ); + } // end method postProcessRequest + + /** + * General post processing for a request message + * - close out the JMS session & connection + * @param tuscanyMsg + * @return the post processed message + */ + public Message postProcessRequest(Message tuscanyMsg) { + // Close of JMS session + try { + JMSBindingContext context = tuscanyMsg.getBindingContext(); + context.closeJmsSession(); + if (jmsResourceFactory.isConnectionClosedAfterUse()) { + jmsResourceFactory.closeConnection(); + } // end if + } catch (JMSException ex) { + throw new JMSBindingException(ex); + } // end try + return tuscanyMsg; + } // end method postProcessRequest + + /** + * Process response message + * @param tuscanyMsg - the response message + * @return the processed version of the response message + */ + public Message processResponse(Message tuscanyMsg) { + // For async handling, there is nothing to do here + return tuscanyMsg; + } } -- cgit v1.2.3