diff options
author | edwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68> | 2011-01-11 14:16:43 +0000 |
---|---|---|
committer | edwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68> | 2011-01-11 14:16:43 +0000 |
commit | 2c3a68e1c46978fea1603cd106ccc630d13ad68f (patch) | |
tree | 3b0ee9cd54e658ceefda6e75e16119f04816d510 /sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org | |
parent | 94c1f358aa3a64b5ec05d37bafc3e873d45c98ba (diff) |
Complete enablement of the JMS Binding to deal with Async invocations - as under TUSCANY-3809
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1057651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org')
5 files changed, 40 insertions, 15 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java index 6dac182d9a..6f1c6ed10e 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java @@ -24,6 +24,9 @@ import javax.jms.JMSException; import org.apache.tuscany.sca.binding.jms.JMSBinding; import org.apache.tuscany.sca.binding.jms.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.Interceptor; @@ -34,10 +37,12 @@ public class HeaderServiceInterceptor extends InterceptorAsyncImpl { private Invoker next; private JMSBinding jmsBinding; + private JMSMessageProcessor responseMessageProcessor; - public HeaderServiceInterceptor(JMSBinding jmsBinding) { + public HeaderServiceInterceptor(ExtensionPointRegistry extensions, JMSBinding jmsBinding) { super(); this.jmsBinding = jmsBinding; + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(extensions, jmsBinding); } public Message invoke(Message msg) { @@ -72,6 +77,8 @@ public class HeaderServiceInterceptor extends InterceptorAsyncImpl { Operation operation = tuscanyMsg.getOperation(); String operationName = operation.getName(); + + responseMessageProcessor.setOperationName(operationName, jmsMsg); for (String propName : jmsBinding.getPropertyNames()) { Object value = jmsBinding.getProperty(propName); diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java index f07e9de29f..1df3403451 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java @@ -38,6 +38,7 @@ public class JMSBindingAsyncResponseInvoker implements InvokerAsyncResponse { } // end constructor public void invokeAsyncResponse(Message msg) { - // TODO + // Deliberately left null since in JMS the TransportServiceInterceptor does all the work } // end method invokeAsyncResponse + } // end class JMSBindingAsyncResponseInvoker
\ No newline at end of file diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java index b96376eb15..2fdbf2afbe 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -205,11 +205,11 @@ public class JMSBindingServiceBindingProvider implements EndpointAsyncProvider, bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, new CallbackDestinationInterceptor(endpoint)); - bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, new HeaderServiceInterceptor(jmsBinding)); + bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, new HeaderServiceInterceptor(registry, jmsBinding)); // add async response interceptor after header interceptor bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, - new AsyncResponseDestinationInterceptor(endpoint)); + new AsyncResponseDestinationInterceptor(endpoint, registry) ); // add request wire format bindingChain.addInterceptor(requestWireFormatProvider.getPhase(), diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java index df8f33bac3..266f02b9f1 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java @@ -210,10 +210,11 @@ public class TransportServiceInterceptor extends InterceptorAsyncImpl { JMSBindingContext context = msg.getBindingContext(); try { Session session = context.getJmsResponseSession(); - javax.jms.Message requestJMSMsg = context.getJmsMsg(); + //javax.jms.Message requestJMSMsg = context.getJmsMsg(); javax.jms.Message responseJMSMsg = msg.getBody(); - Destination replyDest = requestJMSMsg.getJMSReplyTo(); + //Destination replyDest = requestJMSMsg.getJMSReplyTo(); + Destination replyDest = context.getReplyToDestination(); if (replyDest == null) { if (jmsBinding.getResponseDestinationName() != null) { try { @@ -251,22 +252,24 @@ public class TransportServiceInterceptor extends InterceptorAsyncImpl { } } + /* if (correlationScheme == null || JMSBindingConstants.CORRELATE_MSG_ID.equalsIgnoreCase(correlationScheme)) { responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSMessageID()); } else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) { responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID()); - } + } + */ MessageProducer producer = session.createProducer(replyDest); // Set jms header attributes in producer, not message. - int deliveryMode = requestJMSMsg.getJMSDeliveryMode(); - producer.setDeliveryMode(deliveryMode); - int deliveryPriority = requestJMSMsg.getJMSPriority(); - producer.setPriority(deliveryPriority); - long timeToLive = requestJMSMsg.getJMSExpiration(); - producer.setTimeToLive(timeToLive); + //int deliveryMode = requestJMSMsg.getJMSDeliveryMode(); + //producer.setDeliveryMode(deliveryMode); + //int deliveryPriority = requestJMSMsg.getJMSPriority(); + //producer.setPriority(deliveryPriority); + //long timeToLive = requestJMSMsg.getJMSExpiration(); + //producer.setTimeToLive(timeToLive); producer.send((javax.jms.Message)msg.getBody()); diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java index 43bf090796..0ecee7777c 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java @@ -33,11 +33,14 @@ import org.apache.tuscany.sca.binding.jms.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.JMSBindingException; import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker; import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; import org.apache.tuscany.sca.invocation.Interceptor; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.policy.Intent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; @@ -50,11 +53,13 @@ public class AsyncResponseDestinationInterceptor extends InterceptorAsyncImpl { private Invoker next; private RuntimeComponentService service; private RuntimeEndpoint endpoint; + private ExtensionPointRegistry registry; - public AsyncResponseDestinationInterceptor(RuntimeEndpoint endpoint) { + public AsyncResponseDestinationInterceptor(RuntimeEndpoint endpoint, ExtensionPointRegistry registry) { super(); this.service = (RuntimeComponentService) endpoint.getService(); this.endpoint = endpoint; + this.registry = registry; } public Invoker getNext() { @@ -104,8 +109,11 @@ public class AsyncResponseDestinationInterceptor extends InterceptorAsyncImpl { // than this interceptor String msgID = (String)msg.getHeaders().get("MESSAGE_ID"); + String operationName = msg.getOperation().getName(); + // Create a response invoker and add it to the message headers - AsyncResponseInvoker<String> respInvoker = new AsyncResponseInvoker<String>(endpoint, null, asyncRespAddr, msgID); + AsyncResponseInvoker<String> respInvoker = + new AsyncResponseInvoker<String>(endpoint, null, asyncRespAddr, msgID, operationName, getMessageFactory()); msg.getHeaders().put("ASYNC_RESPONSE_INVOKER", respInvoker); } catch (JMSException e) { @@ -191,4 +199,10 @@ public class AsyncResponseDestinationInterceptor extends InterceptorAsyncImpl { } // end while return false; } // end method isAsync + + private MessageFactory getMessageFactory() { + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + return modelFactories.getFactory(MessageFactory.class); + } // end method getMessageFactory + } // end class
\ No newline at end of file |