summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java9
-rw-r--r--sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java3
-rw-r--r--sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java4
-rw-r--r--sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java21
-rw-r--r--sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java18
5 files changed, 40 insertions, 15 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java
index 6dac182d9a..6f1c6ed10e 100644
--- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java
+++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java
@@ -24,6 +24,9 @@ import javax.jms.JMSException;
import org.apache.tuscany.sca.binding.jms.JMSBinding;
import org.apache.tuscany.sca.binding.jms.JMSBindingException;
+import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor;
+import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Interceptor;
@@ -34,10 +37,12 @@ public class HeaderServiceInterceptor extends InterceptorAsyncImpl {
private Invoker next;
private JMSBinding jmsBinding;
+ private JMSMessageProcessor responseMessageProcessor;
- public HeaderServiceInterceptor(JMSBinding jmsBinding) {
+ public HeaderServiceInterceptor(ExtensionPointRegistry extensions, JMSBinding jmsBinding) {
super();
this.jmsBinding = jmsBinding;
+ this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(extensions, jmsBinding);
}
public Message invoke(Message msg) {
@@ -72,6 +77,8 @@ public class HeaderServiceInterceptor extends InterceptorAsyncImpl {
Operation operation = tuscanyMsg.getOperation();
String operationName = operation.getName();
+
+ responseMessageProcessor.setOperationName(operationName, jmsMsg);
for (String propName : jmsBinding.getPropertyNames()) {
Object value = jmsBinding.getProperty(propName);
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java
index f07e9de29f..1df3403451 100644
--- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java
+++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java
@@ -38,6 +38,7 @@ public class JMSBindingAsyncResponseInvoker implements InvokerAsyncResponse {
} // end constructor
public void invokeAsyncResponse(Message msg) {
- // TODO
+ // Deliberately left null since in JMS the TransportServiceInterceptor does all the work
} // end method invokeAsyncResponse
+
} // end class JMSBindingAsyncResponseInvoker \ No newline at end of file
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
index b96376eb15..2fdbf2afbe 100644
--- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
+++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
@@ -205,11 +205,11 @@ public class JMSBindingServiceBindingProvider implements EndpointAsyncProvider,
bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT,
new CallbackDestinationInterceptor(endpoint));
- bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, new HeaderServiceInterceptor(jmsBinding));
+ bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, new HeaderServiceInterceptor(registry, jmsBinding));
// add async response interceptor after header interceptor
bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT,
- new AsyncResponseDestinationInterceptor(endpoint));
+ new AsyncResponseDestinationInterceptor(endpoint, registry) );
// add request wire format
bindingChain.addInterceptor(requestWireFormatProvider.getPhase(),
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
index df8f33bac3..266f02b9f1 100644
--- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
+++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
@@ -210,10 +210,11 @@ public class TransportServiceInterceptor extends InterceptorAsyncImpl {
JMSBindingContext context = msg.getBindingContext();
try {
Session session = context.getJmsResponseSession();
- javax.jms.Message requestJMSMsg = context.getJmsMsg();
+ //javax.jms.Message requestJMSMsg = context.getJmsMsg();
javax.jms.Message responseJMSMsg = msg.getBody();
- Destination replyDest = requestJMSMsg.getJMSReplyTo();
+ //Destination replyDest = requestJMSMsg.getJMSReplyTo();
+ Destination replyDest = context.getReplyToDestination();
if (replyDest == null) {
if (jmsBinding.getResponseDestinationName() != null) {
try {
@@ -251,22 +252,24 @@ public class TransportServiceInterceptor extends InterceptorAsyncImpl {
}
}
+ /*
if (correlationScheme == null ||
JMSBindingConstants.CORRELATE_MSG_ID.equalsIgnoreCase(correlationScheme)) {
responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSMessageID());
} else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) {
responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID());
- }
+ }
+ */
MessageProducer producer = session.createProducer(replyDest);
// Set jms header attributes in producer, not message.
- int deliveryMode = requestJMSMsg.getJMSDeliveryMode();
- producer.setDeliveryMode(deliveryMode);
- int deliveryPriority = requestJMSMsg.getJMSPriority();
- producer.setPriority(deliveryPriority);
- long timeToLive = requestJMSMsg.getJMSExpiration();
- producer.setTimeToLive(timeToLive);
+ //int deliveryMode = requestJMSMsg.getJMSDeliveryMode();
+ //producer.setDeliveryMode(deliveryMode);
+ //int deliveryPriority = requestJMSMsg.getJMSPriority();
+ //producer.setPriority(deliveryPriority);
+ //long timeToLive = requestJMSMsg.getJMSExpiration();
+ //producer.setTimeToLive(timeToLive);
producer.send((javax.jms.Message)msg.getBody());
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java
index 43bf090796..0ecee7777c 100644
--- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java
+++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java
@@ -33,11 +33,14 @@ import org.apache.tuscany.sca.binding.jms.JMSBindingConstants;
import org.apache.tuscany.sca.binding.jms.JMSBindingException;
import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext;
import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.policy.Intent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
@@ -50,11 +53,13 @@ public class AsyncResponseDestinationInterceptor extends InterceptorAsyncImpl {
private Invoker next;
private RuntimeComponentService service;
private RuntimeEndpoint endpoint;
+ private ExtensionPointRegistry registry;
- public AsyncResponseDestinationInterceptor(RuntimeEndpoint endpoint) {
+ public AsyncResponseDestinationInterceptor(RuntimeEndpoint endpoint, ExtensionPointRegistry registry) {
super();
this.service = (RuntimeComponentService) endpoint.getService();
this.endpoint = endpoint;
+ this.registry = registry;
}
public Invoker getNext() {
@@ -104,8 +109,11 @@ public class AsyncResponseDestinationInterceptor extends InterceptorAsyncImpl {
// than this interceptor
String msgID = (String)msg.getHeaders().get("MESSAGE_ID");
+ String operationName = msg.getOperation().getName();
+
// Create a response invoker and add it to the message headers
- AsyncResponseInvoker<String> respInvoker = new AsyncResponseInvoker<String>(endpoint, null, asyncRespAddr, msgID);
+ AsyncResponseInvoker<String> respInvoker =
+ new AsyncResponseInvoker<String>(endpoint, null, asyncRespAddr, msgID, operationName, getMessageFactory());
msg.getHeaders().put("ASYNC_RESPONSE_INVOKER", respInvoker);
} catch (JMSException e) {
@@ -191,4 +199,10 @@ public class AsyncResponseDestinationInterceptor extends InterceptorAsyncImpl {
} // end while
return false;
} // end method isAsync
+
+ private MessageFactory getMessageFactory() {
+ FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+ return modelFactories.getFactory(MessageFactory.class);
+ } // end method getMessageFactory
+
} // end class \ No newline at end of file