diff options
author | antelder <antelder@13f79535-47bb-0310-9956-ffa450edef68> | 2008-07-11 13:32:31 +0000 |
---|---|---|
committer | antelder <antelder@13f79535-47bb-0310-9956-ffa450edef68> | 2008-07-11 13:32:31 +0000 |
commit | fa46111ff3b9e4136baa79a7c43df564e3ae743b (patch) | |
tree | bdf665829e89535db07df21a3bbda4a6dfee7065 /java/sca/modules/binding-jms-runtime/src | |
parent | 1c688308fe026ffb9354465a5e9ee5e8c666b431 (diff) |
Start getting callbacks over jms to work
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@675946 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/sca/modules/binding-jms-runtime/src')
5 files changed, 120 insertions, 22 deletions
diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java index 3cfc2d192a..2cf4875da8 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java @@ -32,8 +32,12 @@ import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; import org.apache.tuscany.sca.interfacedef.Operation; -import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.interfacedef.java.JavaInterface; import org.apache.tuscany.sca.invocation.DataExchangeSemantics; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.runtime.ReferenceParameters; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.osoa.sca.ServiceRuntimeException; /** @@ -52,8 +56,9 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { protected JMSMessageProcessor responseMessageProcessor; protected Destination requestDest; protected Destination replyDest; + private String callbackDestName; - public JMSBindingInvoker(JMSBinding jmsBinding, Operation operation, JMSResourceFactory jmsResourceFactory) { + public JMSBindingInvoker(JMSBinding jmsBinding, Operation operation, JMSResourceFactory jmsResourceFactory, RuntimeComponentReference reference) { this.operation = operation; operationName = operation.getName(); @@ -65,13 +70,38 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { try { requestDest = lookupDestination(); replyDest = lookupResponseDestination(); + + if (hasCallback()) { + callbackDestName = getCallbackDestinationName(reference); + } + } catch (NamingException e) { throw new JMSBindingException(e); } } - /** + protected String getCallbackDestinationName(RuntimeComponentReference reference) { + RuntimeComponentService s = (RuntimeComponentService) reference.getCallbackService(); + JMSBinding b = s.getBinding(JMSBinding.class); + if (b != null) { + JMSBindingServiceBindingProvider bp = (JMSBindingServiceBindingProvider) s.getBindingProvider(b); + return bp.getDestinationName(); + } + return null; + } + + protected boolean hasCallback() { + if (operation.getInterface() instanceof JavaInterface) { + JavaInterface jiface = (JavaInterface) operation.getInterface(); + if (jiface.getCallbackClass() != null) { + return true; + } + } + return false; + } + + /** * Looks up the Destination Queue for the JMS Binding * * @return The Destination Queue @@ -177,29 +207,31 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { return dest; } - public org.apache.tuscany.sca.invocation.Message invoke(org.apache.tuscany.sca.invocation.Message msg) { + public org.apache.tuscany.sca.invocation.Message invoke(org.apache.tuscany.sca.invocation.Message tuscanyMsg) { try { - Object resp = invokeTarget((Object[])msg.getBody(), (short)0); - msg.setBody(resp); + + Object resp = invokeTarget(tuscanyMsg); + tuscanyMsg.setBody(resp); + } catch (InvocationTargetException e) { - msg.setFaultBody(e.getCause()); + tuscanyMsg.setFaultBody(e.getCause()); } catch (ServiceRuntimeException e) { if (e.getCause() instanceof InvocationTargetException) { if ((e.getCause().getCause() instanceof RuntimeException)) { - msg.setFaultBody(e.getCause()); + tuscanyMsg.setFaultBody(e.getCause()); } else { - msg.setFaultBody(e.getCause().getCause()); + tuscanyMsg.setFaultBody(e.getCause().getCause()); } } else { - msg.setFaultBody(e); + tuscanyMsg.setFaultBody(e); } } catch (Throwable e) { - msg.setFaultBody(e); + tuscanyMsg.setFaultBody(e); } - return msg; + return tuscanyMsg; } - public Object invokeTarget(Object payload, final short sequence) throws InvocationTargetException { + public Object invokeTarget(org.apache.tuscany.sca.invocation.Message tuscanyMsg) throws InvocationTargetException { try { Session session = jmsResourceFactory.createSession(); try { @@ -211,7 +243,8 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { replyToDest = (replyDest != null) ? replyDest : session.createTemporaryQueue(); } - Message requestMsg = sendRequest((Object[])payload, session, replyToDest); + Message requestMsg = sendRequest(tuscanyMsg, session, replyToDest); + if (replyToDest == null) { return null; } else { @@ -229,13 +262,15 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { } } - protected Message sendRequest(Object payload, Session session, Destination replyToDest) throws JMSException { + protected Message sendRequest(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session, Destination replyToDest) throws JMSException { - Message requestMsg = requestMessageProcessor.insertPayloadIntoJMSMessage(session, payload); + Message requestMsg = requestMessageProcessor.insertPayloadIntoJMSMessage(session, tuscanyMsg.getBody()); requestMsg.setJMSDeliveryMode(jmsBinding.getDeliveryMode()); requestMsg.setJMSPriority(jmsBinding.getPriority()); + setCallbackHeaders(tuscanyMsg, requestMsg); + requestMessageProcessor.setOperationName(operationName, requestMsg); requestMsg.setJMSReplyTo(replyToDest); @@ -248,6 +283,23 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics { return requestMsg; } + private void setCallbackHeaders(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Message jmsMsg) throws JMSException { + + ReferenceParameters parameters = tuscanyMsg.getFrom().getReferenceParameters(); + +// if (parameters.getCallbackReference() != null) { +// jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_EPR_PROPERTY, parameters.getCallbackReference().getBinding().getURI()); +// } + + if (parameters.getCallbackID() != null) { + jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY, parameters.getCallbackID().toString()); + } + + if (callbackDestName != null) { + jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY, callbackDestName); + } + } + protected Message receiveReply(Session session, Destination replyToDest, String requestMsgId) throws JMSException, NamingException { String msgSelector = "JMSCorrelationID = '" + requestMsgId + "'"; diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java index 70bc4de9b0..c7907a0df5 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java @@ -34,7 +34,11 @@ import javax.naming.NamingException; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.core.assembly.EndpointReferenceImpl; +import org.apache.tuscany.sca.core.invocation.MessageImpl; import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.runtime.EndpointReference; +import org.apache.tuscany.sca.runtime.ReferenceParameters; import org.apache.tuscany.sca.runtime.RuntimeComponentService; /** @@ -114,15 +118,39 @@ public class JMSBindingListener implements MessageListener { } } - if (operation != null) { - return service.getRuntimeWire(jmsBinding).invoke(operation, (Object[])requestPayload); - } else { + if (operation == null) { throw new JMSBindingException("Can't find operation " + (operationName != null ? operationName : ON_MESSAGE_METHOD_NAME)); } + MessageImpl tuscanyMsg = new MessageImpl(); + tuscanyMsg.setBody(requestPayload); + tuscanyMsg.setOperation(operation); + + setCallbackProperties(requestJMSMsg, tuscanyMsg); + + return service.getRuntimeWire(jmsBinding).invoke(operation, tuscanyMsg); } + protected void setCallbackProperties(Message requestJMSMsg, MessageImpl tuscanyMsg) throws JMSException { + if (service.getInterfaceContract().getCallbackInterface() != null) { + + EndpointReference from = new EndpointReferenceImpl(null); + tuscanyMsg.setFrom(from); + ReferenceParameters parameters = from.getReferenceParameters(); + + String callbackdestName = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY); + if (callbackdestName != null) { + parameters.setCallbackReference(new EndpointReferenceImpl(callbackdestName)); + } + + String callbackID = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY); + if (callbackID != null) { + parameters.setCallbackID(callbackID); + } + } + } + protected void sendReply(Message requestJMSMsg, Object responsePayload, boolean isFault) { try { diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java index 451ac64813..34d60da35d 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java @@ -114,7 +114,7 @@ public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvi jmsBinding.setResponseDestinationName(reference.getName()); } */ - JMSBindingInvoker invoker = new JMSBindingInvoker(jmsBinding, operation, jmsResourceFactory); + JMSBindingInvoker invoker = new JMSBindingInvoker(jmsBinding, operation, jmsResourceFactory, reference); jmsBindingInvokers.add(invoker); return invoker; } diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java index ce5ade3a11..381d3f9465 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -26,7 +26,9 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.Topic; import javax.naming.NamingException; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; @@ -54,6 +56,8 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider private WorkScheduler workScheduler; private boolean running; + private Destination destination; + public JMSBindingServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, JMSBinding binding, @@ -122,7 +126,7 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider private void registerListerner() throws NamingException, JMSException { Session session = jmsResourceFactory.createSession(); - Destination destination = lookupDestinationQueue(); + destination = lookupDestinationQueue(); consumer = session.createConsumer(destination); @@ -224,4 +228,18 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider return destination; } + + public String getDestinationName() { + try { + if (destination instanceof Queue) { + return ((Queue)destination).getQueueName(); + } else if (destination instanceof Topic){ + return ((Topic)destination).getTopicName(); + } else { + return null; + } + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } } diff --git a/java/sca/modules/binding-jms-runtime/src/test/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceQueueCreateModeTestCaseFIXME.java b/java/sca/modules/binding-jms-runtime/src/test/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceQueueCreateModeTestCaseFIXME.java index 96f0497303..e10e3785ab 100644 --- a/java/sca/modules/binding-jms-runtime/src/test/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceQueueCreateModeTestCaseFIXME.java +++ b/java/sca/modules/binding-jms-runtime/src/test/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceQueueCreateModeTestCaseFIXME.java @@ -296,7 +296,7 @@ public class JMSBindingReferenceQueueCreateModeTestCaseFIXME { // Try and create the JMS Binding Invoker for the JMS Binding
try {
- new JMSBindingInvoker(jmsBinding, operation, null);
+ new JMSBindingInvoker(jmsBinding, operation, null, null);
// Check whether we were expecting an exception
if (expectingRequestException || expectingResponseException) {
|