diff options
Diffstat (limited to 'java/sca/modules/binding-jms-runtime')
4 files changed, 122 insertions, 91 deletions
diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java index 2cf4875da8..a94df5a166 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java @@ -56,7 +56,7 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { protected JMSMessageProcessor responseMessageProcessor; protected Destination requestDest; protected Destination replyDest; - private String callbackDestName; + protected RuntimeComponentReference reference; public JMSBindingInvoker(JMSBinding jmsBinding, Operation operation, JMSResourceFactory jmsResourceFactory, RuntimeComponentReference reference) { @@ -65,15 +65,14 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { this.jmsBinding = jmsBinding; this.jmsResourceFactory = jmsResourceFactory; - requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); - responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + this.reference = reference; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + try { + requestDest = lookupDestination(); replyDest = lookupResponseDestination(); - - if (hasCallback()) { - callbackDestName = getCallbackDestinationName(reference); - } } catch (NamingException e) { throw new JMSBindingException(e); @@ -81,26 +80,6 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { } - protected String getCallbackDestinationName(RuntimeComponentReference reference) { - RuntimeComponentService s = (RuntimeComponentService) reference.getCallbackService(); - JMSBinding b = s.getBinding(JMSBinding.class); - if (b != null) { - JMSBindingServiceBindingProvider bp = (JMSBindingServiceBindingProvider) s.getBindingProvider(b); - return bp.getDestinationName(); - } - return null; - } - - protected boolean hasCallback() { - if (operation.getInterface() instanceof JavaInterface) { - JavaInterface jiface = (JavaInterface) operation.getInterface(); - if (jiface.getCallbackClass() != null) { - return true; - } - } - return false; - } - /** * Looks up the Destination Queue for the JMS Binding * @@ -109,7 +88,7 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { * @throws JMSBindingException Failed to lookup Destination Queue * @see #lookupDestinationQueue(boolean) */ - private Destination lookupDestination() throws NamingException, JMSBindingException { + protected Destination lookupDestination() throws NamingException, JMSBindingException { return lookupDestinationQueue(false); } @@ -121,7 +100,7 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { * @throws JMSBindingException Failed to lookup Destination Response Queue * @see #lookupDestinationQueue(boolean) */ - private Destination lookupResponseDestination() throws NamingException, JMSBindingException { + protected Destination lookupResponseDestination() throws NamingException, JMSBindingException { return lookupDestinationQueue(true); } @@ -144,10 +123,11 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { * @throws JMSBindingException Failed to lookup JMS Queue. Probable cause is that the JMS queue's current * existence/non-existence is not compatible with the create mode specified on the binding */ - private Destination lookupDestinationQueue(boolean isReponseQueue) throws NamingException, JMSBindingException { + protected Destination lookupDestinationQueue(boolean isReponseQueue) throws NamingException, JMSBindingException { String queueName; String queueType; String qCreateMode; + if (isReponseQueue) { queueName = jmsBinding.getResponseDestinationName(); queueType = "JMS Response Destination "; @@ -236,12 +216,7 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { Session session = jmsResourceFactory.createSession(); try { - Destination replyToDest; - if (operation.isNonBlocking()) { - replyToDest = null; - } else { - replyToDest = (replyDest != null) ? replyDest : session.createTemporaryQueue(); - } + Destination replyToDest = getReplyToDestination(session); Message requestMsg = sendRequest(tuscanyMsg, session, replyToDest); @@ -262,7 +237,21 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { } } - protected Message sendRequest(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session, Destination replyToDest) throws JMSException { + protected Destination getReplyToDestination(Session session) throws JMSException, JMSBindingException, NamingException { + Destination replyToDest; + if (operation.isNonBlocking()) { + replyToDest = null; + } else { + if (replyDest != null) { + replyToDest = replyDest; + } else { + replyToDest = session.createTemporaryQueue(); + } + } + return replyToDest; + } + + protected Message sendRequest(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session, Destination replyToDest) throws JMSException, JMSBindingException, NamingException { Message requestMsg = requestMessageProcessor.insertPayloadIntoJMSMessage(session, tuscanyMsg.getBody()); @@ -274,7 +263,9 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { requestMessageProcessor.setOperationName(operationName, requestMsg); requestMsg.setJMSReplyTo(replyToDest); - MessageProducer producer = session.createProducer(requestDest); + Destination requestDest = getRequestDestination(tuscanyMsg, session); + + MessageProducer producer = session.createProducer(requestDest); try { producer.send(requestMsg); } finally { @@ -283,21 +274,49 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { return requestMsg; } - private void setCallbackHeaders(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Message jmsMsg) throws JMSException { + protected Destination getRequestDestination(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session) throws JMSBindingException, NamingException, JMSException { + Destination requestDestination; + if (reference.isCallback()) { + String toURI = tuscanyMsg.getTo().getURI(); + if (toURI != null && toURI.startsWith("jms:")) { + // the msg to uri contains the callback destination name + // this is an jms physical name not a jndi name so need to use session.createQueue + requestDestination = session.createQueue(toURI.substring(4)); + } else { + requestDestination = lookupDestination(); + } + } else { + requestDestination = requestDest; + } + + return requestDestination; + } - ReferenceParameters parameters = tuscanyMsg.getFrom().getReferenceParameters(); + protected void setCallbackHeaders(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Message jmsMsg) throws JMSException { + if (hasCallback()) { -// if (parameters.getCallbackReference() != null) { -// jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_EPR_PROPERTY, parameters.getCallbackReference().getBinding().getURI()); -// } + ReferenceParameters parameters = tuscanyMsg.getFrom().getReferenceParameters(); - if (parameters.getCallbackID() != null) { - jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY, parameters.getCallbackID().toString()); - } + if (parameters.getCallbackID() != null) { + jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY, parameters.getCallbackID().toString()); + } - if (callbackDestName != null) { - jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY, callbackDestName); - } + String callbackDestName = getCallbackDestinationName(reference); + if (callbackDestName != null) { + jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY, callbackDestName); + } + + } + } + + protected boolean hasCallback() { + if (operation.getInterface() instanceof JavaInterface) { + JavaInterface jiface = (JavaInterface) operation.getInterface(); + if (jiface.getCallbackClass() != null) { + return true; + } + } + return false; } protected Message receiveReply(Session session, Destination replyToDest, String requestMsgId) throws JMSException, @@ -317,6 +336,16 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { return replyMsg; } + protected String getCallbackDestinationName(RuntimeComponentReference reference) { + RuntimeComponentService s = (RuntimeComponentService)reference.getCallbackService(); + JMSBinding b = s.getBinding(JMSBinding.class); + if (b != null) { + JMSBindingServiceBindingProvider bp = (JMSBindingServiceBindingProvider)s.getBindingProvider(b); + return bp.getDestinationName(); + } + return null; + } + public boolean allowsPassByReference() { // JMS always pass by value return true; diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java index c7907a0df5..1f47cc8fb9 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java @@ -28,7 +28,9 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.Topic; import javax.naming.NamingException; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; @@ -127,21 +129,35 @@ public class JMSBindingListener implements MessageListener { tuscanyMsg.setBody(requestPayload); tuscanyMsg.setOperation(operation); - setCallbackProperties(requestJMSMsg, tuscanyMsg); + setCallbackProperties(requestJMSMsg, tuscanyMsg, operation); return service.getRuntimeWire(jmsBinding).invoke(operation, tuscanyMsg); } - protected void setCallbackProperties(Message requestJMSMsg, MessageImpl tuscanyMsg) throws JMSException { + protected void setCallbackProperties(Message requestJMSMsg, MessageImpl tuscanyMsg, Operation operation) throws JMSException { if (service.getInterfaceContract().getCallbackInterface() != null) { EndpointReference from = new EndpointReferenceImpl(null); tuscanyMsg.setFrom(from); + + from.setCallbackEndpoint(new EndpointReferenceImpl("/")); // TODO: whats this for? + ReferenceParameters parameters = from.getReferenceParameters(); String callbackdestName = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY); + if (callbackdestName == null && operation.isNonBlocking()) { + // if the request has a replyTo but this service operation is oneway but the service uses callbacks + // then use the replyTo as the callback destination + Destination replyTo = requestJMSMsg.getJMSReplyTo(); + if (replyTo != null) { + callbackdestName = (replyTo instanceof Queue) ? ((Queue)replyTo).getQueueName() : ((Topic)replyTo).getTopicName(); + } + } + if (callbackdestName != null) { - parameters.setCallbackReference(new EndpointReferenceImpl(callbackdestName)); + // append "jms:" to make it an absolute uri so the invoker can determine it came in on the request + // as otherwise the invoker should use the uri from the service callback binding + parameters.setCallbackReference(new EndpointReferenceImpl("jms:" + callbackdestName)); } String callbackID = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY); diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java index 34d60da35d..cf61dbceb2 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java @@ -77,43 +77,11 @@ public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvi public Invoker createInvoker(Operation operation) { if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)) { - throw new JMSBindingException("No destination specified for reference " + reference.getName()); + if (!reference.isCallback()) { + throw new JMSBindingException("No destination specified for reference " + reference.getName()); + } } - /* The following doesn't work as I can't get to the - * target list on the composite reference - // if the default destination queue name is set - // set the destination queue name to the wired service name - // so that any wires can be assured a unique endpoint. - - if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)){ - // get the name of the target service - List<ComponentService> targets = reference.getTargets(); - - if (targets.size() < 1){ - throw new JMSBindingException("No target specified for reference " + - reference.getName() + - " so destination queue name can't be determined"); - } - - if (targets.size() > 1){ - throw new JMSBindingException("More than one target specified for reference " + - reference.getName() + - " so destination queue name can't be determined"); - } - - ComponentService service = targets.get(0); - jmsBinding.setDestinationName(service.getName()); - } - - - // if the default response queue name is set - // set the response queue to the names of this - // reference - if (jmsBinding.getResponseDestinationName().equals(JMSBindingConstants.DEFAULT_RESPONSE_DESTINATION_NAME)){ - jmsBinding.setResponseDestinationName(reference.getName()); - } - */ JMSBindingInvoker invoker = new JMSBindingInvoker(jmsBinding, operation, jmsResourceFactory, reference); jmsBindingInvokers.add(invoker); return invoker; diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java index 381d3f9465..b21ae81cc4 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -69,8 +69,10 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider jmsResourceFactory = new JMSResourceFactory(binding.getConnectionFactoryName(), binding.getInitialContextFactoryName(), binding.getJndiURL()); if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)) { - // use the SCA service name as the default destination name - jmsBinding.setDestinationName(service.getName()); + if (!service.isCallback()) { + // use the SCA service name as the default destination name + jmsBinding.setDestinationName(service.getName()); + } } if (XMLTextMessageProcessor.class.isAssignableFrom(JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding).getClass())) { @@ -127,6 +129,11 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider Session session = jmsResourceFactory.createSession(); destination = lookupDestinationQueue(); + if (destination == null) { + // TODO: temporary callback queues don't work yet as i can't see how to get the + // serice side to look up the temporary destination name + destination = session.createTemporaryQueue(); + } consumer = session.createConsumer(destination); @@ -160,7 +167,10 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider } }}); } - logger.log(Level.INFO, "JMS service '" + service.getName() + "' listening on destination " + jmsBinding.getDestinationName()); + logger.log(Level.INFO, + "JMS " + (service.isCallback() ? "callback service" : "service") + + " '" + service.getName() + "' listening on destination " + + ((destination instanceof Queue) ? ((Queue)destination).getQueueName() : ((Topic)destination).getTopicName())); } /** @@ -181,7 +191,15 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider * existence/non-existence is not compatible with the create mode specified on the binding */ private Destination lookupDestinationQueue() throws NamingException, JMSBindingException { - Destination destination = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName()); + + if (service.isCallback() && JMSBindingConstants.DEFAULT_DESTINATION_NAME.equals(jmsBinding.getDestinationName())) { + // if its a callback service returning null indicates to use a temporary queue + // TODO: temporary callback queues don't work yet as i can't see how to get the + // serice side to look up the temporary destination name + return null; + } + + Destination destination = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName()); String qCreateMode = jmsBinding.getDestinationCreate(); if (qCreateMode.equals(JMSBindingConstants.CREATE_ALWAYS)) { |