diff options
3 files changed, 110 insertions, 37 deletions
diff --git a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/runtime/OperationSelectorJMSDefaultServiceInterceptor.java b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/runtime/OperationSelectorJMSDefaultServiceInterceptor.java index bb67c3a67a..16d6dcefcd 100644 --- a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/runtime/OperationSelectorJMSDefaultServiceInterceptor.java +++ b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/runtime/OperationSelectorJMSDefaultServiceInterceptor.java @@ -25,10 +25,8 @@ import java.util.List; import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.TextMessage; -import javax.jms.Session; import javax.jms.Topic; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamException; @@ -43,6 +41,7 @@ import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefault; import org.apache.tuscany.sca.binding.jms.wireformat.jmstextxml.WireFormatJMSTextXML; import org.apache.tuscany.sca.core.assembly.EndpointReferenceImpl; import org.apache.tuscany.sca.interfacedef.Operation; @@ -73,7 +72,8 @@ public class OperationSelectorJMSDefaultServiceInterceptor implements Intercepto private List<Operation> serviceOperations; - public OperationSelectorJMSDefaultServiceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + public OperationSelectorJMSDefaultServiceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, + RuntimeWire runtimeWire) { super(); this.jmsBinding = jmsBinding; this.runtimeWire = runtimeWire; @@ -89,7 +89,6 @@ public class OperationSelectorJMSDefaultServiceInterceptor implements Intercepto } public Message invokeRequest(Message msg) { - try { // get the jms context JMSBindingContext context = msg.getBindingContext(); javax.jms.Message jmsMsg = context.getJmsMsg(); @@ -97,39 +96,10 @@ public class OperationSelectorJMSDefaultServiceInterceptor implements Intercepto String operationName = requestMessageProcessor.getOperationName(jmsMsg); Operation operation = getTargetOperation(operationName, jmsMsg); msg.setOperation(operation); - - ReferenceParameters parameters = msg.getFrom().getReferenceParameters(); - - if (service.getInterfaceContract().getCallbackInterface() != null) { - - String callbackdestName = jmsMsg.getStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY); - if (callbackdestName == null && msg.getOperation().isNonBlocking()) { - // if the request has a replyTo but this service operation is oneway but the service uses callbacks - // then use the replyTo as the callback destination - Destination replyTo = jmsMsg.getJMSReplyTo(); - if (replyTo != null) { - callbackdestName = (replyTo instanceof Queue) ? ((Queue)replyTo).getQueueName() : ((Topic)replyTo).getTopicName(); - } - } - - if (callbackdestName != null) { - // append "jms:" to make it an absolute uri so the invoker can determine it came in on the request - // as otherwise the invoker should use the uri from the service callback binding - parameters.setCallbackReference(new EndpointReferenceImpl("jms:" + callbackdestName)); - } - - String callbackID = jmsMsg.getStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY); - if (callbackID != null) { - parameters.setCallbackID(callbackID); - } - } - + return msg; - } catch (JMSException e) { - throw new JMSBindingException(e); - } - } - + } + protected Operation getTargetOperation(String operationName, javax.jms.Message jmsMsg) { Operation operation = null; @@ -147,7 +117,8 @@ public class OperationSelectorJMSDefaultServiceInterceptor implements Intercepto break; } } - } else if (jmsBinding.getRequestWireFormat() instanceof WireFormatJMSTextXML) { + } else if (jmsBinding.getRequestWireFormat() instanceof WireFormatJMSDefault + || jmsBinding.getRequestWireFormat() instanceof WireFormatJMSTextXML) { OMElement rootElement; String operationFromPayload; diff --git a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java index 4a86a92b0e..5a0b2e8823 100644 --- a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java +++ b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -25,6 +25,8 @@ import org.apache.tuscany.sca.assembly.Binding; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; import org.apache.tuscany.sca.binding.jms.transport.TransportServiceInterceptor; +import org.apache.tuscany.sca.binding.jms.wire.CallbackDestinationInterceptor; +import org.apache.tuscany.sca.binding.jms.wire.OperationPropertiesInterceptor; import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.host.jms.JMSServiceListener; @@ -180,6 +182,10 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR bindingChain.addInterceptor(operationSelectorProvider.getPhase(), operationSelectorProvider.createInterceptor()); + // add callback destination interceptor after operation selector + bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, + new CallbackDestinationInterceptor(runtimeWire)); + // add request wire format bindingChain.addInterceptor(requestWireFormatProvider.getPhase(), requestWireFormatProvider.createInterceptor()); diff --git a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/CallbackDestinationInterceptor.java b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/CallbackDestinationInterceptor.java new file mode 100644 index 0000000000..64badf9002 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/CallbackDestinationInterceptor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wire; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Topic; + +import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.core.assembly.EndpointReferenceImpl; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.ReferenceParameters; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +public class CallbackDestinationInterceptor implements Interceptor { + private Invoker next; + private RuntimeComponentService service; + + public CallbackDestinationInterceptor(RuntimeWire runtimeWire) { + super(); + this.service = (RuntimeComponentService) runtimeWire.getTarget().getContract(); + } + + public Invoker getNext() { + return next; + } + + public void setNext(Invoker next) { + this.next = next; + } + + public Message invoke(Message msg) { + return next.invoke(invokeRequest(msg)); + } + + public Message invokeRequest(Message msg) { + try { + // get the jms context + JMSBindingContext context = msg.getBindingContext(); + javax.jms.Message jmsMsg = context.getJmsMsg(); + + ReferenceParameters parameters = msg.getFrom().getReferenceParameters(); + + if (service.getInterfaceContract().getCallbackInterface() != null) { + + String callbackdestName = jmsMsg.getStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY); + if (callbackdestName == null && msg.getOperation().isNonBlocking()) { + // if the request has a replyTo but this service operation is oneway but the service uses callbacks + // then use the replyTo as the callback destination + Destination replyTo = jmsMsg.getJMSReplyTo(); + if (replyTo != null) { + callbackdestName = (replyTo instanceof Queue) ? ((Queue) replyTo).getQueueName() : ((Topic) replyTo).getTopicName(); + } + } + + if (callbackdestName != null) { + // append "jms:" to make it an absolute uri so the invoker can determine it came in on the request + // as otherwise the invoker should use the uri from the service callback binding + parameters.setCallbackReference(new EndpointReferenceImpl("jms:" + callbackdestName)); + } + + String callbackID = jmsMsg.getStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY); + if (callbackID != null) { + parameters.setCallbackID(callbackID); + } + } + + } catch (JMSException e) { + throw new JMSBindingException(e); + } + + return msg; + } +}
\ No newline at end of file |