diff options
author | slaws <slaws@13f79535-47bb-0310-9956-ffa450edef68> | 2008-11-03 15:11:49 +0000 |
---|---|---|
committer | slaws <slaws@13f79535-47bb-0310-9956-ffa450edef68> | 2008-11-03 15:11:49 +0000 |
commit | a46f9b3773b5c74da6e03dbac6037474ed75ab60 (patch) | |
tree | b78ce4450709e708e5cbae15c880d50a64e106d2 /java/sca/modules/binding-jms-runtime/src | |
parent | da3b5205524e71572a3f650b63c9e46ce4a05e1f (diff) |
Add a reference side example of the Request Response Binding in JMS
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@710080 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/sca/modules/binding-jms-runtime/src')
16 files changed, 1012 insertions, 308 deletions
diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/context/JMSBindingContext.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/context/JMSBindingContext.java index c4fd6f8357..bc2e783569 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/context/JMSBindingContext.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/context/JMSBindingContext.java @@ -18,9 +18,12 @@ */ package org.apache.tuscany.sca.binding.jms.context; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.Session; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; + /** * Context that the JMS binding puts on the Tuscany wire @@ -31,6 +34,9 @@ public class JMSBindingContext { private Message jmsMsg; private Session jmsSession; + private Destination requestDestination; + private Destination replyToDestination; + private JMSResourceFactory jmsResourceFactory; public Message getJmsMsg() { return jmsMsg; @@ -47,4 +53,30 @@ public class JMSBindingContext { public void setJmsSession(Session jmsSession) { this.jmsSession = jmsSession; } + + public Destination getRequestDestination() { + return requestDestination; + } + + public void setRequestDestination(Destination requestDestination) { + this.requestDestination = requestDestination; + } + + public Destination getReplyToDestination() { + return replyToDestination; + } + + public void setReplyToDestination(Destination replyToDestination) { + this.replyToDestination = replyToDestination; + } + + // TODO - difficult to get the resource factory into all the JMS providers + // so it's here for the moment + public JMSResourceFactory getJmsResourceFactory() { + return jmsResourceFactory; + } + + public void setJmsResourceFactory(JMSResourceFactory jmsResourceFactory) { + this.jmsResourceFactory = jmsResourceFactory; + } } diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderReferenceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderReferenceInterceptor.java new file mode 100644 index 0000000000..8a73627ba3 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderReferenceInterceptor.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.headers; + + + + +import java.util.Map; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.assembly.WireFormat; +import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.provider.JMSBindingServiceBindingProvider; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.java.JavaInterface; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.ReferenceParameters; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * + * @version $Rev$ $Date$ + */ +public class HeaderReferenceInterceptor implements Interceptor { + + private Invoker next; + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private String correlationScheme; + private WireFormat requestWireFormat; + private WireFormat responseWireFormat; + + public HeaderReferenceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + this.correlationScheme = jmsBinding.getCorrelationScheme(); + + } + + public Message invoke(Message msg) { + + return next.invoke(invokeRequest(msg)); + + } + + public Message invokeRequest(Message tuscanyMsg) { + try { + // get the jms context + JMSBindingContext context = (JMSBindingContext)tuscanyMsg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); + javax.jms.Message jmsMsg = (javax.jms.Message)tuscanyMsg.getBody(); + + Operation operation = tuscanyMsg.getOperation(); + String operationName = operation.getName(); + RuntimeComponentReference reference = (RuntimeComponentReference)runtimeWire.getSource().getContract(); + + requestMessageProcessor.setOperationName(jmsBinding.getNativeOperationName(operationName), jmsMsg); + + if (jmsBinding.getOperationJMSDeliveryMode(operationName) != null) { + if (jmsBinding.getOperationJMSDeliveryMode(operationName)) { + jmsMsg.setJMSDeliveryMode(DeliveryMode.PERSISTENT); + } else { + jmsMsg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + } + + if (jmsBinding.getOperationJMSCorrelationId(operationName) != null) { + jmsMsg.setJMSCorrelationID(jmsBinding.getOperationJMSCorrelationId(operationName)); + } + + if (jmsBinding.getOperationJMSPriority(operationName) != null) { + jmsMsg.setJMSPriority(jmsBinding.getOperationJMSPriority(operationName)); + } + + if (jmsBinding.getOperationJMSType(operationName) != null) { + jmsMsg.setJMSType(jmsBinding.getOperationJMSType(operationName)); + } + + ReferenceParameters parameters = tuscanyMsg.getFrom().getReferenceParameters(); + + Object conversationID = parameters.getConversationID(); + if (conversationID != null) { + jmsMsg.setStringProperty(JMSBindingConstants.CONVERSATION_ID_PROPERTY, conversationID.toString()); + } + + if (tuscanyMsg.getFrom().getCallbackEndpoint() != null) { + + if (parameters.getCallbackID() != null) { + jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY, parameters.getCallbackID().toString()); + } + + String callbackDestName = getCallbackDestinationName(reference); + if (callbackDestName != null) { + jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY, callbackDestName); + } + } + + for (String propName : jmsBinding.getPropertyNames()) { + Object value = jmsBinding.getProperty(propName); + jmsMsg.setObjectProperty(propName, value); + } + + Map<String, Object> operationProperties = jmsBinding.getOperationProperties(operationName); + if (operationProperties != null) { + for (String propName : operationProperties.keySet()) { + Object value = operationProperties.get(propName); + jmsMsg.setObjectProperty(propName, value); + } + } + + return tuscanyMsg; + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } + + + protected String getCallbackDestinationName(RuntimeComponentReference reference) { + RuntimeComponentService s = (RuntimeComponentService)reference.getCallbackService(); + JMSBinding b = s.getBinding(JMSBinding.class); + if (b != null) { + JMSBindingServiceBindingProvider bp = (JMSBindingServiceBindingProvider)s.getBindingProvider(b); + return bp.getDestinationName(); + } + return null; + } + + + public Invoker getNext() { + return next; + } + + public void setNext(Invoker next) { + this.next = next; + } + + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultProviderFactory.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultProviderFactory.java index cf9e55a54d..a4d0ccafb3 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultProviderFactory.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultProviderFactory.java @@ -20,6 +20,9 @@ package org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault; import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactoryExtensionPoint; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.provider.OperationSelectorProvider; import org.apache.tuscany.sca.provider.OperationSelectorProviderFactory; @@ -37,13 +40,13 @@ public class OperationSelectorJMSDefaultProviderFactory implements OperationSele super(); this.registry = registry; } - + /** */ public OperationSelectorProvider createReferenceOperationSelectorProvider(RuntimeComponent component, RuntimeComponentReference reference, Binding binding) { - return new OperationSelectorJMSDefaultReferenceProvider(component, reference, binding); + return null; } /** diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceInterceptor.java deleted file mode 100644 index 52433ed3ca..0000000000 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceInterceptor.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault; - -import java.util.List; - - -import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; -import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; -import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; -import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; -import org.apache.tuscany.sca.interfacedef.Operation; -import org.apache.tuscany.sca.invocation.Interceptor; -import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.Message; -import org.apache.tuscany.sca.runtime.RuntimeComponentService; -import org.apache.tuscany.sca.runtime.RuntimeWire; - -/** - * Policy handler to handle PolicySet related to Logging with the QName - * {http://tuscany.apache.org/xmlns/sca/1.0/impl/java}LoggingPolicy - * - * @version $Rev$ $Date$ - */ -public class OperationSelectorJMSDefaultReferenceInterceptor implements Interceptor { - - private static final String ON_MESSAGE_METHOD_NAME = "onMessage"; - - private Invoker next; - private OperationSelectorJMSDefault operationSelector; - private RuntimeWire runtimeWire; - private JMSResourceFactory jmsResourceFactory; - private JMSBinding jmsBinding; - private JMSMessageProcessor requestMessageProcessor; - private JMSMessageProcessor responseMessageProcessor; - private RuntimeComponentService service; - private List<Operation> serviceOperations; - - - public OperationSelectorJMSDefaultReferenceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { - super(); - this.jmsBinding = jmsBinding; - this.runtimeWire = runtimeWire; - this.jmsResourceFactory = jmsResourceFactory; - this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); - this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); - this.service = (RuntimeComponentService)runtimeWire.getTarget().getContract(); - this.serviceOperations = service.getInterfaceContract().getInterface().getOperations(); - } - - public Message invoke(Message msg) { - // TODO binding interceptor iface TBD - return null; - } - - public Message invokeRequest(Message msg) { - // TODO binding interceptor iface TBD - return null; - } - - public Message invokeResponse(Message msg) { - // TODO binding interceptor iface TBD - return null; - } - - public Invoker getNext() { - return next; - } - - public void setNext(Invoker next) { - this.next = next; - } -} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceProvider.java deleted file mode 100644 index e7e1fff0ca..0000000000 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceProvider.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault; - -import org.apache.tuscany.sca.assembly.Binding; -import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; -import org.apache.tuscany.sca.invocation.Interceptor; -import org.apache.tuscany.sca.invocation.Phase; -import org.apache.tuscany.sca.provider.OperationSelectorProvider; -import org.apache.tuscany.sca.runtime.RuntimeComponent; -import org.apache.tuscany.sca.runtime.RuntimeComponentReference; - -/** - * @version $Rev$ $Date$ - */ -public class OperationSelectorJMSDefaultReferenceProvider implements OperationSelectorProvider { - private RuntimeComponent component; - private RuntimeComponentReference reference; - private Binding binding; - - public OperationSelectorJMSDefaultReferenceProvider(RuntimeComponent component, - RuntimeComponentReference reference, - Binding binding) { - super(); - this.component = component; - this.reference = reference; - this.binding = binding; - } - - /** - * @see org.apache.tuscany.sca.provider.PolicyProvider#createInterceptor(org.apache.tuscany.sca.interfacedef.Operation) - */ - public Interceptor createInterceptor() { - return new OperationSelectorJMSDefaultReferenceInterceptor((JMSBinding)binding, - null, - reference.getRuntimeWire(binding)); - } - - /** - * @see org.apache.tuscany.sca.provider.PolicyProvider#getPhase() - */ - public String getPhase() { - return Phase.REFERENCE_BINDING_TRANSPORT; - } - -} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceInterceptor.java index b644a4883f..d765e08d79 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceInterceptor.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceInterceptor.java @@ -23,7 +23,9 @@ import java.util.List; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.Topic; import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; @@ -32,10 +34,12 @@ import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.core.assembly.EndpointReferenceImpl; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.Interceptor; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.ReferenceParameters; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.apache.tuscany.sca.runtime.RuntimeWire; @@ -71,41 +75,50 @@ public class OperationSelectorJMSDefaultServiceInterceptor implements Intercepto } public Message invoke(Message msg) { - return invokeResponse(next.invoke(invokeRequest(msg))); + return next.invoke(invokeRequest(msg)); } public Message invokeRequest(Message msg) { - // get the jms context - JMSBindingContext context = (JMSBindingContext)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); - javax.jms.Message jmsMsg = context.getJmsMsg(); - - String operationName = requestMessageProcessor.getOperationName(jmsMsg); - Operation operation = getTargetOperation(operationName); - msg.setOperation(operation); - - return msg; - } - - public Message invokeResponse(Message msg) { try { + // get the jms context JMSBindingContext context = (JMSBindingContext)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); - javax.jms.Message requestJMSMsg = context.getJmsMsg(); - Session session = context.getJmsSession(); - - Destination destination = requestJMSMsg.getJMSReplyTo(); - MessageProducer producer = session.createProducer(destination); + javax.jms.Message jmsMsg = context.getJmsMsg(); + + String operationName = requestMessageProcessor.getOperationName(jmsMsg); + Operation operation = getTargetOperation(operationName); + msg.setOperation(operation); + + ReferenceParameters parameters = msg.getFrom().getReferenceParameters(); + + if (service.getInterfaceContract().getCallbackInterface() != null) { + + String callbackdestName = jmsMsg.getStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY); + if (callbackdestName == null && msg.getOperation().isNonBlocking()) { + // if the request has a replyTo but this service operation is oneway but the service uses callbacks + // then use the replyTo as the callback destination + Destination replyTo = jmsMsg.getJMSReplyTo(); + if (replyTo != null) { + callbackdestName = (replyTo instanceof Queue) ? ((Queue)replyTo).getQueueName() : ((Topic)replyTo).getTopicName(); + } + } - producer.send((javax.jms.Message)msg.getBody()); + if (callbackdestName != null) { + // append "jms:" to make it an absolute uri so the invoker can determine it came in on the request + // as otherwise the invoker should use the uri from the service callback binding + parameters.setCallbackReference(new EndpointReferenceImpl("jms:" + callbackdestName)); + } - producer.close(); - session.close(); + String callbackID = jmsMsg.getStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY); + if (callbackID != null) { + parameters.setCallbackID(callbackID); + } + } return msg; - } catch (JMSException e) { throw new JMSBindingException(e); - } - } + } + } protected Operation getTargetOperation(String operationName) { Operation operation = null; diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceProvider.java index a1cfa02072..54c3eb54eb 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceProvider.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceProvider.java @@ -53,7 +53,7 @@ public class OperationSelectorJMSDefaultServiceProvider implements OperationSele /** */ public String getPhase() { - return Phase.SERVICE_BINDING_TRANSPORT; + return Phase.SERVICE_BINDING_OPERATION_SELECTOR; } } diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java index 1bcc09e366..0fa67ccc62 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java @@ -25,26 +25,39 @@ import java.util.List; import javax.jms.JMSException; import org.apache.axiom.om.OMElement; +import org.apache.tuscany.sca.binding.jms.headers.HeaderReferenceInterceptor; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault.OperationSelectorJMSDefault; +import org.apache.tuscany.sca.binding.jms.transport.TransportReferenceInterceptor; +import org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefault; import org.apache.tuscany.sca.binding.ws.WebServiceBinding; import org.apache.tuscany.sca.binding.ws.WebServiceBindingFactory; import org.apache.tuscany.sca.binding.ws.wsdlgen.BindingWSDLGenerator; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Phase; +import org.apache.tuscany.sca.provider.OperationSelectorProvider; +import org.apache.tuscany.sca.provider.OperationSelectorProviderFactory; +import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint; import org.apache.tuscany.sca.provider.ReferenceBindingProvider; +import org.apache.tuscany.sca.provider.ReferenceBindingProviderRRB; +import org.apache.tuscany.sca.provider.WireFormatProvider; +import org.apache.tuscany.sca.provider.WireFormatProviderFactory; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeWire; /** * Implementation of the JMS reference binding provider. * * @version $Rev$ $Date$ */ -public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvider { +public class JMSBindingReferenceBindingProvider implements ReferenceBindingProviderRRB { private RuntimeComponentReference reference; private JMSBinding jmsBinding; @@ -53,6 +66,14 @@ public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvi private RuntimeComponent component; private InterfaceContract wsdlInterfaceContract; private ExtensionPointRegistry extensions; + + private ProviderFactoryExtensionPoint providerFactories; + + private WireFormatProviderFactory requestWireFormatProviderFactory; + private WireFormatProvider requestWireFormatProvider; + + private WireFormatProviderFactory responseWireFormatProviderFactory; + private WireFormatProvider responseWireFormatProvider; public JMSBindingReferenceBindingProvider(RuntimeComponent component, RuntimeComponentReference reference, JMSBinding binding, ExtensionPointRegistry extensions, JMSResourceFactory jmsResourceFactory) { this.reference = reference; @@ -64,6 +85,39 @@ public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvi if (XMLTextMessageProcessor.class.isAssignableFrom(JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding).getClass())) { setXMLDataBinding(reference); } + + // Get the factories/providers for operation selection + + // if no operation selector is specified then assume the default + if (jmsBinding.getOperationSelector() == null){ + jmsBinding.setOperationSelector(new OperationSelectorJMSDefault()); + } + + this.providerFactories = extensions.getExtensionPoint(ProviderFactoryExtensionPoint.class); + + // Get the factories/providers for wire format + + // if no request wire format specified then assume the default + if (jmsBinding.getRequestWireFormat() == null){ + jmsBinding.setRequestWireFormat(new WireFormatJMSDefault()); + } + + // if no response wire format specific then assume the default + if (jmsBinding.getResponseWireFormat() == null){ + jmsBinding.setResponseWireFormat(new WireFormatJMSDefault()); + } + + this.requestWireFormatProviderFactory = + (WireFormatProviderFactory)providerFactories.getProviderFactory(jmsBinding.getRequestWireFormat().getClass()); + if (this.requestWireFormatProviderFactory != null){ + this.requestWireFormatProvider = requestWireFormatProviderFactory.createReferenceWireFormatProvider(component, reference, jmsBinding); + } + + this.responseWireFormatProviderFactory = + (WireFormatProviderFactory)providerFactories.getProviderFactory(jmsBinding.getResponseWireFormat().getClass()); + if (this.responseWireFormatProvider != null){ + this.responseWireFormatProvider = responseWireFormatProviderFactory.createReferenceWireFormatProvider(component, reference, jmsBinding); + } } @@ -98,8 +152,14 @@ public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvi } } - JMSBindingInvoker invoker = new JMSBindingInvoker(jmsBinding, operation, jmsResourceFactory, reference); - jmsBindingInvokers.add(invoker); + /* + * TODO turn on RRB version of JMS binding + */ + Invoker invoker = null; + invoker = new RRBJMSBindingInvoker(jmsBinding, operation, jmsResourceFactory, reference); + //invoker = new JMSBindingInvoker(jmsBinding, operation, jmsResourceFactory, reference); + //jmsBindingInvokers.add((JMSBindingInvoker)invoker); + return invoker; } @@ -130,5 +190,35 @@ public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvi throw new JMSBindingException(e); } } + + /* + * RRB test methods + */ + public void configureBindingChain(RuntimeWire runtimeWire) { + + InvocationChain bindingChain = runtimeWire.getBindingInvocationChain(); + + // add transport interceptor + bindingChain.addInterceptor(Phase.REFERENCE_BINDING_TRANSPORT, + new TransportReferenceInterceptor(jmsBinding, + jmsResourceFactory, + runtimeWire) ); + + // add request wire format + bindingChain.addInterceptor(requestWireFormatProvider.getPhase(), + requestWireFormatProvider.createInterceptor()); + + // add response wire format, but only add it if it's different from the request + if (!jmsBinding.getRequestWireFormat().equals(jmsBinding.getResponseWireFormat())){ + bindingChain.addInterceptor(responseWireFormatProvider.getPhase(), + responseWireFormatProvider.createInterceptor()); + } + + // add the header processor that comes after the wire formatter + bindingChain.addInterceptor(Phase.REFERENCE_BINDING_WIREFORMAT, + new HeaderReferenceInterceptor(jmsBinding, + jmsResourceFactory, + runtimeWire) ); + } } diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java index 60bca05982..2642d6d370 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -41,8 +41,9 @@ import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; import org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault.OperationSelectorJMSDefault; -import org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault.OperationSelectorJMSDefaultReferenceInterceptor; import org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault.OperationSelectorJMSDefaultServiceInterceptor; +import org.apache.tuscany.sca.binding.jms.transport.TransportReferenceInterceptor; +import org.apache.tuscany.sca.binding.jms.transport.TransportServiceInterceptor; import org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefault; import org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefaultReferenceInterceptor; import org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefaultServiceInterceptor; @@ -261,15 +262,11 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR MessageListener tmpListener = null; /* - * TODO a test to allow RRB experiments to take place without breaking everything else - * RRB stuff only happens if you add a wireFormat to a composite file + * TODO turn on RRB version of JMS binding */ - if (jmsBinding.getRequestWireFormat() != null ){ - tmpListener = new RRBJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding, messageFactory); - } else { - tmpListener = new DefaultJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding); - } - + tmpListener = new RRBJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding, messageFactory); + //tmpListener = new DefaultJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding); + final MessageListener listener = tmpListener; try { @@ -398,14 +395,17 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR /* * RRB test methods - * Interceptor selection is hard coded to the default here but of course should - * pick up the appropriate interceptor based on wireFormat and operationSelector - * elements in the SCDL */ public void configureBindingChain(RuntimeWire runtimeWire) { InvocationChain bindingChain = runtimeWire.getBindingInvocationChain(); + // add transport interceptor + bindingChain.addInterceptor(Phase.SERVICE_BINDING_TRANSPORT, + new TransportServiceInterceptor(jmsBinding, + jmsResourceFactory, + runtimeWire) ); + // add operation selector interceptor bindingChain.addInterceptor(operationSelectorProvider.getPhase(), operationSelectorProvider.createInterceptor()); diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java new file mode 100644 index 0000000000..b78f1f2044 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.provider; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Map; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.NamingException; +import javax.security.auth.Subject; + +import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.policy.authentication.token.JMSTokenAuthenticationPolicy; +import org.apache.tuscany.sca.binding.jms.policy.header.JMSHeaderPolicy; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.java.JavaInterface; +import org.apache.tuscany.sca.invocation.DataExchangeSemantics; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.policy.PolicySet; +import org.apache.tuscany.sca.policy.PolicySetAttachPoint; +import org.apache.tuscany.sca.policy.SecurityUtil; +import org.apache.tuscany.sca.policy.authentication.token.TokenPrincipal; +import org.apache.tuscany.sca.runtime.ReferenceParameters; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; +import org.osoa.sca.ServiceRuntimeException; + +/** + * Invoker for the JMS binding. + * + * @version $Rev$ $Date$ + */ +public class RRBJMSBindingInvoker implements Invoker, DataExchangeSemantics { + + protected Operation operation; + protected String operationName; + + protected JMSBinding jmsBinding; + protected JMSResourceFactory jmsResourceFactory; + protected JMSMessageProcessor requestMessageProcessor; + protected JMSMessageProcessor responseMessageProcessor; + protected Destination bindingRequestDest; + protected Destination bindingReplyDest; + protected RuntimeComponentReference reference; + protected RuntimeWire runtimeWire; + + public RRBJMSBindingInvoker(JMSBinding jmsBinding, Operation operation, JMSResourceFactory jmsResourceFactory, RuntimeComponentReference reference) { + + this.operation = operation; + operationName = operation.getName(); + + this.jmsBinding = jmsBinding; + this.jmsResourceFactory = jmsResourceFactory; + this.reference = reference; + this.runtimeWire = reference.getRuntimeWire(jmsBinding); + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + + try { + bindingRequestDest = lookupDestination(); + bindingReplyDest = lookupResponseDestination(); + } catch (NamingException e) { + throw new JMSBindingException(e); + } + } + + /** + * Looks up the Destination Queue for the JMS Binding + * + * @return The Destination Queue + * @throws NamingException Failed to lookup Destination Queue + * @throws JMSBindingException Failed to lookup Destination Queue + * @see #lookupDestinationQueue(boolean) + */ + protected Destination lookupDestination() throws NamingException, JMSBindingException { + return lookupDestinationQueue(false); + } + + /** + * Looks up the Destination Response Queue for the JMS Binding + * + * @return The Destination Response Queue + * @throws NamingException Failed to lookup Destination Response Queue + * @throws JMSBindingException Failed to lookup Destination Response Queue + * @see #lookupDestinationQueue(boolean) + */ + protected Destination lookupResponseDestination() throws NamingException, JMSBindingException { + return lookupDestinationQueue(true); + } + + /** + * Looks up the Destination Queue for the JMS Binding. + * <p> + * What happens in the look up will depend on the create mode specified for the JMS Binding: + * <ul> + * <li>always - the JMS queue is always created. It is an error if the queue already exists + * <li>ifnotexist - the JMS queue is created if it does not exist. It is not an error if the queue already exists + * <li>never - the JMS queue is never created. It is an error if the queue does not exist + * </ul> + * See the SCA JMS Binding specification for more information. + * <p> + * + * @param isReponseQueue <code>true</code> if we are creating a response queue. + * <code>false</code> if we are creating a request queue + * @return The Destination queue. + * @throws NamingException Failed to lookup JMS queue + * @throws JMSBindingException Failed to lookup JMS Queue. Probable cause is that + * the JMS queue's current existence/non-existence is not compatible with + * the create mode specified on the binding + */ + protected Destination lookupDestinationQueue(boolean isReponseQueue) throws NamingException, JMSBindingException { + String queueName; + String queueType; + String qCreateMode; + + if (isReponseQueue) { + queueName = jmsBinding.getResponseDestinationName(); + queueType = "JMS Response Destination "; + qCreateMode = jmsBinding.getResponseDestinationCreate(); + if (JMSBindingConstants.DEFAULT_RESPONSE_DESTINATION_NAME.equals(queueName)) { + return null; + } + } else { + queueName = jmsBinding.getDestinationName(); + queueType = "JMS Destination "; + qCreateMode = jmsBinding.getDestinationCreate(); + } + + Destination dest = jmsResourceFactory.lookupDestination(queueName); + + if (qCreateMode.equals(JMSBindingConstants.CREATE_ALWAYS)) { + // In this mode, the queue must not already exist as we are creating it + if (dest != null) { + throw new JMSBindingException(queueType + queueName + + " already exists but has create mode of \"" + + qCreateMode + + "\" while registering binding " + + jmsBinding.getName() + + " invoker"); + } + // Create the queue + dest = jmsResourceFactory.createDestination(queueName); + + } else if (qCreateMode.equals(JMSBindingConstants.CREATE_IF_NOT_EXIST)) { + // In this mode, the queue may nor may not exist. It will be created if it does not exist + if (dest == null) { + dest = jmsResourceFactory.createDestination(queueName); + } + + } else if (qCreateMode.equals(JMSBindingConstants.CREATE_NEVER)) { + // In this mode, the queue must have already been created. + if (dest == null) { + throw new JMSBindingException(queueType + queueName + + " not found but create mode of \"" + + qCreateMode + + "\" while registering binding " + + jmsBinding.getName() + + " invoker"); + } + } + + // Make sure we ended up with a queue + if (dest == null) { + throw new JMSBindingException(queueType + queueName + + " not found with create mode of \"" + + qCreateMode + + "\" while registering binding " + + jmsBinding.getName() + + " invoker"); + } + + return dest; + } + + public org.apache.tuscany.sca.invocation.Message invoke(org.apache.tuscany.sca.invocation.Message tuscanyMsg) { + try { + // create a jms session to cover the creation and sending of the message + Session session = jmsResourceFactory.createSession(); + + // populate the message context with JMS binding information + JMSBindingContext context = new JMSBindingContext(); + tuscanyMsg.getHeaders().add(JMSBindingConstants.MSG_CTXT_POSITION, context); + + context.setJmsSession(session); + context.setRequestDestination(getRequestDestination(tuscanyMsg, session)); + context.setReplyToDestination(getReplyToDestination(session)); + context.setJmsResourceFactory(jmsResourceFactory); + + try { + tuscanyMsg = runtimeWire.getBindingInvocationChain().getHeadInvoker().invoke(tuscanyMsg); + } catch (ServiceRuntimeException e) { + if (e.getCause() instanceof InvocationTargetException) { + if ((e.getCause().getCause() instanceof RuntimeException)) { + tuscanyMsg.setFaultBody(e.getCause()); + } else { + tuscanyMsg.setFaultBody(e.getCause().getCause()); + } + } else { + tuscanyMsg.setFaultBody(e); + } + } catch (IllegalStateException e) { + tuscanyMsg.setFaultBody(e); + } catch (Throwable e) { + tuscanyMsg.setFaultBody(e); + } finally { + session.close(); + } + + return tuscanyMsg; + } catch (Exception e) { + throw new JMSBindingException(e); + } + } + + protected Destination getRequestDestination(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session) throws JMSBindingException, NamingException, JMSException { + Destination requestDestination; + if (reference.isCallback()) { + String toURI = tuscanyMsg.getTo().getURI(); + if (toURI != null && toURI.startsWith("jms:")) { + // the msg to uri contains the callback destination name + // this is an jms physical name not a jndi name so need to use session.createQueue + requestDestination = session.createQueue(toURI.substring(4)); + } else { + requestDestination = lookupDestination(); + } + } else { + requestDestination = bindingRequestDest; + } + + return requestDestination; + } + + protected Destination getReplyToDestination(Session session) throws JMSException, JMSBindingException, NamingException { + Destination replyToDest; + if (operation.isNonBlocking()) { + replyToDest = null; + } else { + if (bindingReplyDest != null) { + replyToDest = bindingReplyDest; + } else { + replyToDest = session.createTemporaryQueue(); + } + } + return replyToDest; + } + + public boolean allowsPassByReference() { + // JMS always pass by value + return true; + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingListener.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingListener.java index 11c1127cf6..46578d0d30 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingListener.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingListener.java @@ -19,7 +19,6 @@ package org.apache.tuscany.sca.binding.jms.provider; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -42,7 +41,6 @@ import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; import org.apache.tuscany.sca.binding.jms.policy.authentication.token.JMSTokenAuthenticationPolicy; import org.apache.tuscany.sca.core.assembly.EndpointReferenceImpl; -import org.apache.tuscany.sca.core.invocation.MessageImpl; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; @@ -51,7 +49,6 @@ import org.apache.tuscany.sca.policy.PolicySet; import org.apache.tuscany.sca.policy.PolicySetAttachPoint; import org.apache.tuscany.sca.policy.SecurityUtil; import org.apache.tuscany.sca.policy.authentication.token.TokenPrincipal; -import org.apache.tuscany.sca.provider.ServiceBindingProviderRRB; import org.apache.tuscany.sca.runtime.EndpointReference; import org.apache.tuscany.sca.runtime.ReferenceParameters; import org.apache.tuscany.sca.runtime.RuntimeComponentService; @@ -66,7 +63,6 @@ public class RRBJMSBindingListener implements MessageListener { private static final Logger logger = Logger.getLogger(RRBJMSBindingListener.class.getName()); - private static final String ON_MESSAGE_METHOD_NAME = "onMessage"; private JMSBinding jmsBinding; private Binding targetBinding; private JMSResourceFactory jmsResourceFactory; @@ -76,14 +72,7 @@ public class RRBJMSBindingListener implements MessageListener { private String correlationScheme; private List<Operation> serviceOperations; private MessageFactory messageFactory; - protected JMSTokenAuthenticationPolicy jmsTokenAuthenticationPolicy = null; - /* - * TODO binding chains could be treated generically (RuntimeWire?) but are - * here just now for experimental convenience - */ - private List<Invoker> bindingRequestChain; - private List<Invoker> bindingResponseChain; public RRBJMSBindingListener(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeComponentService service, Binding targetBinding, MessageFactory messageFactory) throws NamingException { this.jmsBinding = jmsBinding; @@ -97,45 +86,18 @@ public class RRBJMSBindingListener implements MessageListener { correlationScheme = jmsBinding.getCorrelationScheme(); serviceOperations = service.getInterfaceContract().getInterface().getOperations(); - // find out which policies are active - if (jmsBinding instanceof PolicySetAttachPoint) { - List<PolicySet> policySets = ((PolicySetAttachPoint)jmsBinding).getApplicablePolicySets(); - for (PolicySet ps : policySets) { - for (Object p : ps.getPolicies()) { - if (JMSTokenAuthenticationPolicy.class.isInstance(p)) { - jmsTokenAuthenticationPolicy = (JMSTokenAuthenticationPolicy)p; - }else { - // etc. check for other types of policy being present - } - } - } - } } public void onMessage(Message requestJMSMsg) { logger.log(Level.FINE, "JMS service '" + service.getName() + "' received message " + requestJMSMsg); try { invokeService(requestJMSMsg); - //sendReply(requestJMSMsg, responsePayload, false); } catch (Throwable e) { logger.log(Level.SEVERE, "Exception invoking service '" + service.getName(), e); - sendReply(requestJMSMsg, e, true); + sendFaultReply(requestJMSMsg, e); } } - /** - * TODO RRB Experiment. Explicity call all the binding interceptors on the wire. Looking at - * how to handle requests and responses independently. The binding wire should/could - * be handled generically outside of this binding class but it's here for the momement - * while we look at what form it takes - * - * Turn the JMS message back into a Tuscany message and invoke the target component - * - * @param requestJMSMsg - * @return - * @throws JMSException - * @throws InvocationTargetException - */ protected void invokeService(Message requestJMSMsg) throws JMSException, InvocationTargetException { try { @@ -148,78 +110,24 @@ public class RRBJMSBindingListener implements MessageListener { context.setJmsMsg(requestJMSMsg); context.setJmsSession(jmsResourceFactory.createSession()); + context.setReplyToDestination(requestJMSMsg.getJMSReplyTo()); // set the message body tuscanyMsg.setBody(requestJMSMsg); // call the runtime wire - setHeaderProperties(requestJMSMsg, tuscanyMsg, tuscanyMsg.getOperation()); InvocationChain chain = service.getRuntimeWire(targetBinding).getBindingInvocationChain(); chain.getHeadInvoker().invoke(tuscanyMsg); + } catch (NamingException e) { throw new JMSBindingException(e); - } - - } - - /** - * TODO - RRB experiment. Needs refactoring - */ - protected void setHeaderProperties(javax.jms.Message requestJMSMsg, org.apache.tuscany.sca.invocation.Message tuscanyMsg, Operation operation) throws JMSException { - - EndpointReference from = new EndpointReferenceImpl(null); - tuscanyMsg.setFrom(from); - from.setCallbackEndpoint(new EndpointReferenceImpl("/")); // TODO: whats this for? - ReferenceParameters parameters = from.getReferenceParameters(); - - String conversationID = requestJMSMsg.getStringProperty(JMSBindingConstants.CONVERSATION_ID_PROPERTY); - if (conversationID != null) { - parameters.setConversationID(conversationID); - } - - if (service.getInterfaceContract().getCallbackInterface() != null) { - - String callbackdestName = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY); - if (callbackdestName == null && operation.isNonBlocking()) { - // if the request has a replyTo but this service operation is oneway but the service uses callbacks - // then use the replyTo as the callback destination - Destination replyTo = requestJMSMsg.getJMSReplyTo(); - if (replyTo != null) { - callbackdestName = (replyTo instanceof Queue) ? ((Queue)replyTo).getQueueName() : ((Topic)replyTo).getTopicName(); - } - } - - if (callbackdestName != null) { - // append "jms:" to make it an absolute uri so the invoker can determine it came in on the request - // as otherwise the invoker should use the uri from the service callback binding - parameters.setCallbackReference(new EndpointReferenceImpl("jms:" + callbackdestName)); - } - - String callbackID = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY); - if (callbackID != null) { - parameters.setCallbackID(callbackID); - } - } - - - if (jmsTokenAuthenticationPolicy != null) { - String token = requestJMSMsg.getStringProperty(jmsTokenAuthenticationPolicy.getTokenName().toString()); - - Subject subject = SecurityUtil.getSubject(tuscanyMsg); - TokenPrincipal principal = SecurityUtil.getPrincipal(subject, TokenPrincipal.class); - - if (principal == null){ - principal = new TokenPrincipal(token); - subject.getPrincipals().add(principal); - } - - } - } + } + } /* - * TODO RRB experiment. Still used to handle errors. Needs refactoring + * Send a message back if a fault has occurred */ - protected void sendReply(Message requestJMSMsg, Object responsePayload, boolean isFault) { + protected void sendFaultReply(Message requestJMSMsg, Object responsePayload) { try { if (requestJMSMsg.getJMSReplyTo() == null) { @@ -231,13 +139,7 @@ public class RRBJMSBindingListener implements MessageListener { } Session session = jmsResourceFactory.createSession(); - Message replyJMSMsg; - if (isFault) { - replyJMSMsg = responseMessageProcessor.createFaultMessage(session, (Throwable)responsePayload); - } else { - replyJMSMsg = responseMessageProcessor.insertPayloadIntoJMSMessage(session, responsePayload); - } - + Message replyJMSMsg = responseMessageProcessor.createFaultMessage(session, (Throwable)responsePayload); replyJMSMsg.setJMSDeliveryMode(requestJMSMsg.getJMSDeliveryMode()); replyJMSMsg.setJMSPriority(requestJMSMsg.getJMSPriority()); diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportReferenceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportReferenceInterceptor.java new file mode 100644 index 0000000000..e84279cae1 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportReferenceInterceptor.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.transport; + +import java.util.List; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.NamingException; + + +import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.policy.authentication.token.JMSTokenAuthenticationPolicy; +import org.apache.tuscany.sca.binding.jms.policy.header.JMSHeaderPolicy; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.policy.PolicySet; +import org.apache.tuscany.sca.policy.PolicySetAttachPoint; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * Policy handler to handle PolicySet related to Logging with the QName + * {http://tuscany.apache.org/xmlns/sca/1.0/impl/java}LoggingPolicy + * + * @version $Rev$ $Date$ + */ +public class TransportReferenceInterceptor implements Interceptor { + + private static final String ON_MESSAGE_METHOD_NAME = "onMessage"; + + private Invoker next; + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private RuntimeComponentReference reference; + protected JMSHeaderPolicy jmsHeaderPolicy = null; + + + public TransportReferenceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + this.reference = (RuntimeComponentReference)runtimeWire.getSource().getContract(); + + // find out if the header policy is active + if (jmsBinding instanceof PolicySetAttachPoint) { + List<PolicySet> policySets = ((PolicySetAttachPoint)jmsBinding).getApplicablePolicySets(); + for (PolicySet ps : policySets) { + for (Object p : ps.getPolicies()) { + if (JMSHeaderPolicy.class.isInstance(p)) { + jmsHeaderPolicy = (JMSHeaderPolicy)p; + } + } + } + } + } + + public Message invoke(Message msg) { + Message responseMsg = invokeRequest(msg); + + // get the jms context + JMSBindingContext context = (JMSBindingContext)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); + + if (context.getReplyToDestination() == null) { + responseMsg.setBody(null); + } else { + responseMsg = invokeResponse(msg); + } + + return responseMsg; + } + + public Message invokeRequest(Message msg) { + try { + // get the jms context + JMSBindingContext context = (JMSBindingContext)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); + Session session = context.getJmsSession(); + + MessageProducer producer = session.createProducer(context.getRequestDestination()); + + if (jmsBinding.getOperationJMSTimeToLive(msg.getOperation().getName()) != null) { + producer.setTimeToLive(jmsBinding.getOperationJMSTimeToLive(msg.getOperation().getName())); + } + + try { + producer.send((javax.jms.Message)msg.getBody()); + } finally { + producer.close(); + } + return msg; + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } + + public Message invokeResponse(Message msg) { + try { + // get the jms context + JMSBindingContext context = (JMSBindingContext)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); + Session session = context.getJmsSession(); + + javax.jms.Message requestMessage = (javax.jms.Message)msg.getBody(); + + String operationName = msg.getOperation().getName(); + + String msgSelector = "JMSCorrelationID = '" + + requestMessage.getJMSMessageID() + + "'"; + MessageConsumer consumer = session.createConsumer(context.getReplyToDestination(), msgSelector); + + long receiveWait; + + if ((jmsHeaderPolicy != null) && + (jmsHeaderPolicy.getTimeToLive() != null)) { + receiveWait = jmsHeaderPolicy.getTimeToLive(); + } else if (jmsBinding.getOperationJMSTimeToLive(operationName) != null) { + receiveWait = jmsBinding.getOperationJMSTimeToLive(operationName) * 2; + } else { + receiveWait = JMSBindingConstants.DEFAULT_TIME_TO_LIVE; + } + + javax.jms.Message replyMsg; + try { + context.getJmsResourceFactory().startConnection(); + //jmsResourceFactory.startConnection(); + replyMsg = consumer.receive(receiveWait); + } finally { + consumer.close(); + } + if (replyMsg == null) { + throw new JMSBindingException("No reply message received on " + + context.getReplyToDestination() + + " for message id " + + requestMessage.getJMSMessageID()); + } + + msg.setBody(replyMsg); + return msg; + } catch (JMSException e) { + throw new JMSBindingException(e); + } catch (NamingException e) { + throw new JMSBindingException(e); + } + } + + public Invoker getNext() { + return next; + } + + public void setNext(Invoker next) { + this.next = next; + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java new file mode 100644 index 0000000000..c0703bd495 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.transport; + +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.security.auth.Subject; + +import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.provider.DefaultJMSBindingListener; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.core.assembly.EndpointReferenceImpl; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.policy.SecurityUtil; +import org.apache.tuscany.sca.policy.authentication.token.TokenPrincipal; +import org.apache.tuscany.sca.runtime.EndpointReference; +import org.apache.tuscany.sca.runtime.ReferenceParameters; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * Policy handler to handle PolicySet related to Logging with the QName + * {http://tuscany.apache.org/xmlns/sca/1.0/impl/java}LoggingPolicy + * + * @version $Rev$ $Date$ + */ +public class TransportServiceInterceptor implements Interceptor { + private static final Logger logger = Logger.getLogger(TransportServiceInterceptor.class.getName()); + + private Invoker next; + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private RuntimeComponentService service; + + + public TransportServiceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + this.service = (RuntimeComponentService)runtimeWire.getTarget().getContract(); + } + + public Message invoke(Message msg) { + return invokeResponse(next.invoke(invokeRequest(msg))); + } + + public Message invokeRequest(Message msg) { + try { + JMSBindingContext context = (JMSBindingContext)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); + javax.jms.Message requestJMSMsg = context.getJmsMsg(); + + EndpointReference from = new EndpointReferenceImpl(null); + msg.setFrom(from); + from.setCallbackEndpoint(new EndpointReferenceImpl("/")); // TODO: whats this for? + ReferenceParameters parameters = from.getReferenceParameters(); + + String conversationID = requestJMSMsg.getStringProperty(JMSBindingConstants.CONVERSATION_ID_PROPERTY); + if (conversationID != null) { + parameters.setConversationID(conversationID); + } + + return msg; + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } + + public Message invokeResponse(Message msg) { + try { + JMSBindingContext context = (JMSBindingContext)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); + Session session = context.getJmsSession(); + javax.jms.Message requestJMSMsg = context.getJmsMsg(); + + if (requestJMSMsg.getJMSReplyTo() == null) { + // assume no reply is expected + if (msg.getBody() != null) { + logger.log(Level.FINE, "JMS service '" + service.getName() + "' dropped response as request has no replyTo"); + } + return msg; + } + + MessageProducer producer = session.createProducer(context.getReplyToDestination()); + + producer.send((javax.jms.Message)msg.getBody()); + + producer.close(); + session.close(); + + return msg; + + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } + + public Invoker getNext() { + return next; + } + + public void setNext(Invoker next) { + this.next = next; + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultProviderFactory.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultProviderFactory.java index c89638736b..fc06425e0d 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultProviderFactory.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultProviderFactory.java @@ -41,7 +41,6 @@ public class WireFormatJMSDefaultProviderFactory implements WireFormatProviderFa super(); this.registry = registry; jmsRFEP = (JMSResourceFactoryExtensionPoint)registry.getExtensionPoint(JMSResourceFactoryExtensionPoint.class); - } /** diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultReferenceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultReferenceInterceptor.java index 8df0349a21..b8b8d668ba 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultReferenceInterceptor.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultReferenceInterceptor.java @@ -21,14 +21,30 @@ package org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault; +import java.util.Map; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.tuscany.sca.assembly.Reference; import org.apache.tuscany.sca.assembly.WireFormat; +import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.provider.JMSBindingServiceBindingProvider; import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.java.JavaInterface; import org.apache.tuscany.sca.invocation.Interceptor; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.ReferenceParameters; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.apache.tuscany.sca.runtime.RuntimeWire; /** @@ -73,20 +89,40 @@ public class WireFormatJMSDefaultReferenceInterceptor implements Interceptor { msg = getNext().invoke(msg); if (responseWireFormat != null){ - msg = invokeRequest(msg); + msg = invokeResponse(msg); } return msg; } public Message invokeRequest(Message msg) { - // TODO binding interceptor iface TBD - return null; + try { + // get the jms context + JMSBindingContext context = (JMSBindingContext)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_POSITION); + Session session = context.getJmsSession(); + + javax.jms.Message requestMsg = requestMessageProcessor.insertPayloadIntoJMSMessage(session, msg.getBody()); + msg.setBody(requestMsg); + + requestMsg.setJMSReplyTo(context.getReplyToDestination()); + + return msg; + } catch (JMSException e) { + throw new JMSBindingException(e); + } } public Message invokeResponse(Message msg) { - // TODO binding interceptor iface TBD - return null; + if (msg.getBody() != null){ + Object[] response = (Object[])responseMessageProcessor.extractPayloadFromJMSMessage((javax.jms.Message)msg.getBody()); + if (response != null && response.length > 0){ + msg.setBody(response[0]); + } else { + msg.setBody(null); + } + } + + return msg; } public Invoker getNext() { @@ -95,5 +131,5 @@ public class WireFormatJMSDefaultReferenceInterceptor implements Interceptor { public void setNext(Invoker next) { this.next = next; - } + } } diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultServiceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultServiceInterceptor.java index ad3ff95306..8dd2f58efe 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultServiceInterceptor.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultServiceInterceptor.java @@ -122,7 +122,7 @@ public class WireFormatJMSDefaultServiceInterceptor implements Interceptor { } else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) { responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID()); } - + msg.setBody(responseJMSMsg); return msg; |