diff options
Diffstat (limited to '')
2 files changed, 108 insertions, 12 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportReferenceInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportReferenceInterceptor.java index 3ed8021107..cba6022fb1 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportReferenceInterceptor.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportReferenceInterceptor.java @@ -30,6 +30,7 @@ import org.apache.tuscany.sca.binding.jms.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.JMSBindingException; import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext; import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; import org.apache.tuscany.sca.invocation.Interceptor; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.Message; @@ -41,7 +42,7 @@ import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; * * @version $Rev$ $Date$ */ -public class TransportReferenceInterceptor implements Interceptor { +public class TransportReferenceInterceptor extends InterceptorAsyncImpl { private Invoker next; private JMSResourceFactory jmsResourceFactory; @@ -91,8 +92,7 @@ public class TransportReferenceInterceptor implements Interceptor { Boolean deliveryModePersistent = jmsBinding.getEffectiveJMSDeliveryMode(opName); if (deliveryModePersistent != null) { producer.setDeliveryMode( deliveryModePersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - } - + } try { producer.send((javax.jms.Message)msg.getBody()); @@ -102,8 +102,8 @@ public class TransportReferenceInterceptor implements Interceptor { return msg; } catch (JMSException e) { throw new JMSBindingException(e); - } - } + } // end try + } // end method invokeRequest public Message invokeResponse(Message msg) { JMSBindingContext context = msg.getBindingContext(); @@ -165,4 +165,23 @@ public class TransportReferenceInterceptor implements Interceptor { public void setNext(Invoker next) { this.next = next; } + + /** + * Process forward request message + * @param tuscanyMsg - the request message + * @return the processed version of the request message + */ + public Message processRequest(Message tuscanyMsg) { + return invokeRequest(tuscanyMsg); + } // end method processRequest + + /** + * Process response message + * @param tuscanyMsg - the response message + * @return the processed version of the response message + */ + public Message processResponse(Message tuscanyMsg) { + // TODO Auto-generated method stub + return tuscanyMsg; + } // end method processResponse } diff --git a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java index d371bbba86..df8f33bac3 100644 --- a/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java +++ b/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java @@ -40,6 +40,7 @@ import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.Interceptor; import org.apache.tuscany.sca.invocation.Invoker; @@ -51,7 +52,7 @@ import org.apache.tuscany.sca.runtime.RuntimeEndpoint; * * @version $Rev$ $Date$ */ -public class TransportServiceInterceptor implements Interceptor { +public class TransportServiceInterceptor extends InterceptorAsyncImpl { private static final Logger logger = Logger.getLogger(TransportServiceInterceptor.class.getName()); private Invoker next; @@ -96,7 +97,6 @@ public class TransportServiceInterceptor implements Interceptor { } public Message invokeRequest(Message msg) { -// try { EndpointReference from = assemblyFactory.createEndpointReference(); Endpoint fromEndpoint = assemblyFactory.createEndpoint(); @@ -109,10 +109,8 @@ public class TransportServiceInterceptor implements Interceptor { from.setCallbackEndpoint(callbackEndpoint); return msg; -// } catch (JMSException e) { -// throw new JMSBindingException(e); -// } - } + + } // end method invokeRequest public Message invokeResponse(Message msg) { JMSBindingContext context = msg.getBindingContext(); @@ -202,6 +200,85 @@ public class TransportServiceInterceptor implements Interceptor { public void setNext(Invoker next) { this.next = next; - } + } + + public Message processRequest(Message msg) { + return invokeRequest( msg ); + } // end method processRequest + + public Message processResponse(Message msg) { + JMSBindingContext context = msg.getBindingContext(); + try { + Session session = context.getJmsResponseSession(); + javax.jms.Message requestJMSMsg = context.getJmsMsg(); + javax.jms.Message responseJMSMsg = msg.getBody(); + + Destination replyDest = requestJMSMsg.getJMSReplyTo(); + if (replyDest == null) { + if (jmsBinding.getResponseDestinationName() != null) { + try { + replyDest = jmsResourceFactory.lookupDestination(jmsBinding.getResponseDestinationName()); + } catch (NamingException e) { + throw new JMSBindingException("Exception lookingup response destination", e); + } + } + } // end if + + if (replyDest == null) { + // assume no reply is expected + if (msg.getBody() != null) { + logger.log(Level.FINE, "JMS service '" + service.getName() + "' dropped response as request has no replyTo"); + } + return msg; + } // end if + + if ((msg.getOperation() != null)) { + String operationName = msg.getOperation().getName(); + if (jmsBinding.getEffectiveJMSPriority(operationName) != null) { + responseJMSMsg.setJMSPriority(jmsBinding.getEffectiveJMSPriority(operationName)); + } + + if ( jmsBinding.getEffectiveJMSType(operationName) != null) { + responseJMSMsg.setJMSType(jmsBinding.getEffectiveJMSType(operationName)); + } + + if ((jmsBinding.getEffectiveJMSDeliveryMode(operationName) != null)) { + responseJMSMsg.setJMSDeliveryMode(jmsBinding.getEffectiveJMSDeliveryMode(operationName) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } + + if ((jmsBinding.getEffectiveJMSTimeToLive(operationName) != null)) { + responseJMSMsg.setJMSExpiration(jmsBinding.getEffectiveJMSTimeToLive(operationName).longValue()); + } + } + + if (correlationScheme == null || + JMSBindingConstants.CORRELATE_MSG_ID.equalsIgnoreCase(correlationScheme)) { + responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSMessageID()); + } else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) { + responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID()); + } + + MessageProducer producer = session.createProducer(replyDest); + + // Set jms header attributes in producer, not message. + int deliveryMode = requestJMSMsg.getJMSDeliveryMode(); + producer.setDeliveryMode(deliveryMode); + int deliveryPriority = requestJMSMsg.getJMSPriority(); + producer.setPriority(deliveryPriority); + long timeToLive = requestJMSMsg.getJMSExpiration(); + producer.setTimeToLive(timeToLive); + + producer.send((javax.jms.Message)msg.getBody()); + + producer.close(); + + return msg; + + } catch (JMSException e) { + throw new JMSBindingException(e); + } finally { + context.closeJmsResponseSession(); + } // end try + } // end method processResponse } |