summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java91
1 files changed, 84 insertions, 7 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
index d371bbba86..df8f33bac3 100644
--- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
+++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
@@ -40,6 +40,7 @@ import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil;
import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.Invoker;
@@ -51,7 +52,7 @@ import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
*
* @version $Rev$ $Date$
*/
-public class TransportServiceInterceptor implements Interceptor {
+public class TransportServiceInterceptor extends InterceptorAsyncImpl {
private static final Logger logger = Logger.getLogger(TransportServiceInterceptor.class.getName());
private Invoker next;
@@ -96,7 +97,6 @@ public class TransportServiceInterceptor implements Interceptor {
}
public Message invokeRequest(Message msg) {
-// try {
EndpointReference from = assemblyFactory.createEndpointReference();
Endpoint fromEndpoint = assemblyFactory.createEndpoint();
@@ -109,10 +109,8 @@ public class TransportServiceInterceptor implements Interceptor {
from.setCallbackEndpoint(callbackEndpoint);
return msg;
-// } catch (JMSException e) {
-// throw new JMSBindingException(e);
-// }
- }
+
+ } // end method invokeRequest
public Message invokeResponse(Message msg) {
JMSBindingContext context = msg.getBindingContext();
@@ -202,6 +200,85 @@ public class TransportServiceInterceptor implements Interceptor {
public void setNext(Invoker next) {
this.next = next;
- }
+ }
+
+ public Message processRequest(Message msg) {
+ return invokeRequest( msg );
+ } // end method processRequest
+
+ public Message processResponse(Message msg) {
+ JMSBindingContext context = msg.getBindingContext();
+ try {
+ Session session = context.getJmsResponseSession();
+ javax.jms.Message requestJMSMsg = context.getJmsMsg();
+ javax.jms.Message responseJMSMsg = msg.getBody();
+
+ Destination replyDest = requestJMSMsg.getJMSReplyTo();
+ if (replyDest == null) {
+ if (jmsBinding.getResponseDestinationName() != null) {
+ try {
+ replyDest = jmsResourceFactory.lookupDestination(jmsBinding.getResponseDestinationName());
+ } catch (NamingException e) {
+ throw new JMSBindingException("Exception lookingup response destination", e);
+ }
+ }
+ } // end if
+
+ if (replyDest == null) {
+ // assume no reply is expected
+ if (msg.getBody() != null) {
+ logger.log(Level.FINE, "JMS service '" + service.getName() + "' dropped response as request has no replyTo");
+ }
+ return msg;
+ } // end if
+
+ if ((msg.getOperation() != null)) {
+ String operationName = msg.getOperation().getName();
+ if (jmsBinding.getEffectiveJMSPriority(operationName) != null) {
+ responseJMSMsg.setJMSPriority(jmsBinding.getEffectiveJMSPriority(operationName));
+ }
+
+ if ( jmsBinding.getEffectiveJMSType(operationName) != null) {
+ responseJMSMsg.setJMSType(jmsBinding.getEffectiveJMSType(operationName));
+ }
+
+ if ((jmsBinding.getEffectiveJMSDeliveryMode(operationName) != null)) {
+ responseJMSMsg.setJMSDeliveryMode(jmsBinding.getEffectiveJMSDeliveryMode(operationName) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ }
+
+ if ((jmsBinding.getEffectiveJMSTimeToLive(operationName) != null)) {
+ responseJMSMsg.setJMSExpiration(jmsBinding.getEffectiveJMSTimeToLive(operationName).longValue());
+ }
+ }
+
+ if (correlationScheme == null ||
+ JMSBindingConstants.CORRELATE_MSG_ID.equalsIgnoreCase(correlationScheme)) {
+ responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSMessageID());
+ } else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) {
+ responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID());
+ }
+
+ MessageProducer producer = session.createProducer(replyDest);
+
+ // Set jms header attributes in producer, not message.
+ int deliveryMode = requestJMSMsg.getJMSDeliveryMode();
+ producer.setDeliveryMode(deliveryMode);
+ int deliveryPriority = requestJMSMsg.getJMSPriority();
+ producer.setPriority(deliveryPriority);
+ long timeToLive = requestJMSMsg.getJMSExpiration();
+ producer.setTimeToLive(timeToLive);
+
+ producer.send((javax.jms.Message)msg.getBody());
+
+ producer.close();
+
+ return msg;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ } finally {
+ context.closeJmsResponseSession();
+ } // end try
+ } // end method processResponse
}