summaryrefslogtreecommitdiffstats
path: root/java/sca/modules/binding-jms-runtime
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2008-07-13 08:48:50 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2008-07-13 08:48:50 +0000
commit65df16c669f9edbf295c402d4687165691b42a48 (patch)
tree16c80a97f548358693b457745bb6fdd2d0920f9d /java/sca/modules/binding-jms-runtime
parent81c4167d789e9f525af7a7bfc6a881810bb6fce7 (diff)
More jms callback work, support physical jms destination names in the call back headers and temporary callback queues
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@676280 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/sca/modules/binding-jms-runtime')
-rw-r--r--java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java127
-rw-r--r--java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java22
-rw-r--r--java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java38
-rw-r--r--java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java26
4 files changed, 122 insertions, 91 deletions
diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java
index 2cf4875da8..a94df5a166 100644
--- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java
+++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java
@@ -56,7 +56,7 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
protected JMSMessageProcessor responseMessageProcessor;
protected Destination requestDest;
protected Destination replyDest;
- private String callbackDestName;
+ protected RuntimeComponentReference reference;
public JMSBindingInvoker(JMSBinding jmsBinding, Operation operation, JMSResourceFactory jmsResourceFactory, RuntimeComponentReference reference) {
@@ -65,15 +65,14 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
this.jmsBinding = jmsBinding;
this.jmsResourceFactory = jmsResourceFactory;
- requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding);
- responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding);
+ this.reference = reference;
+ this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding);
+ this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding);
+
try {
+
requestDest = lookupDestination();
replyDest = lookupResponseDestination();
-
- if (hasCallback()) {
- callbackDestName = getCallbackDestinationName(reference);
- }
} catch (NamingException e) {
throw new JMSBindingException(e);
@@ -81,26 +80,6 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
}
- protected String getCallbackDestinationName(RuntimeComponentReference reference) {
- RuntimeComponentService s = (RuntimeComponentService) reference.getCallbackService();
- JMSBinding b = s.getBinding(JMSBinding.class);
- if (b != null) {
- JMSBindingServiceBindingProvider bp = (JMSBindingServiceBindingProvider) s.getBindingProvider(b);
- return bp.getDestinationName();
- }
- return null;
- }
-
- protected boolean hasCallback() {
- if (operation.getInterface() instanceof JavaInterface) {
- JavaInterface jiface = (JavaInterface) operation.getInterface();
- if (jiface.getCallbackClass() != null) {
- return true;
- }
- }
- return false;
- }
-
/**
* Looks up the Destination Queue for the JMS Binding
*
@@ -109,7 +88,7 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
* @throws JMSBindingException Failed to lookup Destination Queue
* @see #lookupDestinationQueue(boolean)
*/
- private Destination lookupDestination() throws NamingException, JMSBindingException {
+ protected Destination lookupDestination() throws NamingException, JMSBindingException {
return lookupDestinationQueue(false);
}
@@ -121,7 +100,7 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
* @throws JMSBindingException Failed to lookup Destination Response Queue
* @see #lookupDestinationQueue(boolean)
*/
- private Destination lookupResponseDestination() throws NamingException, JMSBindingException {
+ protected Destination lookupResponseDestination() throws NamingException, JMSBindingException {
return lookupDestinationQueue(true);
}
@@ -144,10 +123,11 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
* @throws JMSBindingException Failed to lookup JMS Queue. Probable cause is that the JMS queue's current
* existence/non-existence is not compatible with the create mode specified on the binding
*/
- private Destination lookupDestinationQueue(boolean isReponseQueue) throws NamingException, JMSBindingException {
+ protected Destination lookupDestinationQueue(boolean isReponseQueue) throws NamingException, JMSBindingException {
String queueName;
String queueType;
String qCreateMode;
+
if (isReponseQueue) {
queueName = jmsBinding.getResponseDestinationName();
queueType = "JMS Response Destination ";
@@ -236,12 +216,7 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
Session session = jmsResourceFactory.createSession();
try {
- Destination replyToDest;
- if (operation.isNonBlocking()) {
- replyToDest = null;
- } else {
- replyToDest = (replyDest != null) ? replyDest : session.createTemporaryQueue();
- }
+ Destination replyToDest = getReplyToDestination(session);
Message requestMsg = sendRequest(tuscanyMsg, session, replyToDest);
@@ -262,7 +237,21 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
}
}
- protected Message sendRequest(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session, Destination replyToDest) throws JMSException {
+ protected Destination getReplyToDestination(Session session) throws JMSException, JMSBindingException, NamingException {
+ Destination replyToDest;
+ if (operation.isNonBlocking()) {
+ replyToDest = null;
+ } else {
+ if (replyDest != null) {
+ replyToDest = replyDest;
+ } else {
+ replyToDest = session.createTemporaryQueue();
+ }
+ }
+ return replyToDest;
+ }
+
+ protected Message sendRequest(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session, Destination replyToDest) throws JMSException, JMSBindingException, NamingException {
Message requestMsg = requestMessageProcessor.insertPayloadIntoJMSMessage(session, tuscanyMsg.getBody());
@@ -274,7 +263,9 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
requestMessageProcessor.setOperationName(operationName, requestMsg);
requestMsg.setJMSReplyTo(replyToDest);
- MessageProducer producer = session.createProducer(requestDest);
+ Destination requestDest = getRequestDestination(tuscanyMsg, session);
+
+ MessageProducer producer = session.createProducer(requestDest);
try {
producer.send(requestMsg);
} finally {
@@ -283,21 +274,49 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
return requestMsg;
}
- private void setCallbackHeaders(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Message jmsMsg) throws JMSException {
+ protected Destination getRequestDestination(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session) throws JMSBindingException, NamingException, JMSException {
+ Destination requestDestination;
+ if (reference.isCallback()) {
+ String toURI = tuscanyMsg.getTo().getURI();
+ if (toURI != null && toURI.startsWith("jms:")) {
+ // the msg to uri contains the callback destination name
+ // this is an jms physical name not a jndi name so need to use session.createQueue
+ requestDestination = session.createQueue(toURI.substring(4));
+ } else {
+ requestDestination = lookupDestination();
+ }
+ } else {
+ requestDestination = requestDest;
+ }
+
+ return requestDestination;
+ }
- ReferenceParameters parameters = tuscanyMsg.getFrom().getReferenceParameters();
+ protected void setCallbackHeaders(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Message jmsMsg) throws JMSException {
+ if (hasCallback()) {
-// if (parameters.getCallbackReference() != null) {
-// jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_EPR_PROPERTY, parameters.getCallbackReference().getBinding().getURI());
-// }
+ ReferenceParameters parameters = tuscanyMsg.getFrom().getReferenceParameters();
- if (parameters.getCallbackID() != null) {
- jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY, parameters.getCallbackID().toString());
- }
+ if (parameters.getCallbackID() != null) {
+ jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY, parameters.getCallbackID().toString());
+ }
- if (callbackDestName != null) {
- jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY, callbackDestName);
- }
+ String callbackDestName = getCallbackDestinationName(reference);
+ if (callbackDestName != null) {
+ jmsMsg.setStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY, callbackDestName);
+ }
+
+ }
+ }
+
+ protected boolean hasCallback() {
+ if (operation.getInterface() instanceof JavaInterface) {
+ JavaInterface jiface = (JavaInterface) operation.getInterface();
+ if (jiface.getCallbackClass() != null) {
+ return true;
+ }
+ }
+ return false;
}
protected Message receiveReply(Session session, Destination replyToDest, String requestMsgId) throws JMSException,
@@ -317,6 +336,16 @@ public class JMSBindingInvoker implements Invoker, DataExchangeSemantics {
return replyMsg;
}
+ protected String getCallbackDestinationName(RuntimeComponentReference reference) {
+ RuntimeComponentService s = (RuntimeComponentService)reference.getCallbackService();
+ JMSBinding b = s.getBinding(JMSBinding.class);
+ if (b != null) {
+ JMSBindingServiceBindingProvider bp = (JMSBindingServiceBindingProvider)s.getBindingProvider(b);
+ return bp.getDestinationName();
+ }
+ return null;
+ }
+
public boolean allowsPassByReference() {
// JMS always pass by value
return true;
diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java
index c7907a0df5..1f47cc8fb9 100644
--- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java
+++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java
@@ -28,7 +28,9 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.Topic;
import javax.naming.NamingException;
import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
@@ -127,21 +129,35 @@ public class JMSBindingListener implements MessageListener {
tuscanyMsg.setBody(requestPayload);
tuscanyMsg.setOperation(operation);
- setCallbackProperties(requestJMSMsg, tuscanyMsg);
+ setCallbackProperties(requestJMSMsg, tuscanyMsg, operation);
return service.getRuntimeWire(jmsBinding).invoke(operation, tuscanyMsg);
}
- protected void setCallbackProperties(Message requestJMSMsg, MessageImpl tuscanyMsg) throws JMSException {
+ protected void setCallbackProperties(Message requestJMSMsg, MessageImpl tuscanyMsg, Operation operation) throws JMSException {
if (service.getInterfaceContract().getCallbackInterface() != null) {
EndpointReference from = new EndpointReferenceImpl(null);
tuscanyMsg.setFrom(from);
+
+ from.setCallbackEndpoint(new EndpointReferenceImpl("/")); // TODO: whats this for?
+
ReferenceParameters parameters = from.getReferenceParameters();
String callbackdestName = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY);
+ if (callbackdestName == null && operation.isNonBlocking()) {
+ // if the request has a replyTo but this service operation is oneway but the service uses callbacks
+ // then use the replyTo as the callback destination
+ Destination replyTo = requestJMSMsg.getJMSReplyTo();
+ if (replyTo != null) {
+ callbackdestName = (replyTo instanceof Queue) ? ((Queue)replyTo).getQueueName() : ((Topic)replyTo).getTopicName();
+ }
+ }
+
if (callbackdestName != null) {
- parameters.setCallbackReference(new EndpointReferenceImpl(callbackdestName));
+ // append "jms:" to make it an absolute uri so the invoker can determine it came in on the request
+ // as otherwise the invoker should use the uri from the service callback binding
+ parameters.setCallbackReference(new EndpointReferenceImpl("jms:" + callbackdestName));
}
String callbackID = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY);
diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
index 34d60da35d..cf61dbceb2 100644
--- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
+++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
@@ -77,43 +77,11 @@ public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvi
public Invoker createInvoker(Operation operation) {
if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)) {
- throw new JMSBindingException("No destination specified for reference " + reference.getName());
+ if (!reference.isCallback()) {
+ throw new JMSBindingException("No destination specified for reference " + reference.getName());
+ }
}
- /* The following doesn't work as I can't get to the
- * target list on the composite reference
- // if the default destination queue name is set
- // set the destination queue name to the wired service name
- // so that any wires can be assured a unique endpoint.
-
- if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)){
- // get the name of the target service
- List<ComponentService> targets = reference.getTargets();
-
- if (targets.size() < 1){
- throw new JMSBindingException("No target specified for reference " +
- reference.getName() +
- " so destination queue name can't be determined");
- }
-
- if (targets.size() > 1){
- throw new JMSBindingException("More than one target specified for reference " +
- reference.getName() +
- " so destination queue name can't be determined");
- }
-
- ComponentService service = targets.get(0);
- jmsBinding.setDestinationName(service.getName());
- }
-
-
- // if the default response queue name is set
- // set the response queue to the names of this
- // reference
- if (jmsBinding.getResponseDestinationName().equals(JMSBindingConstants.DEFAULT_RESPONSE_DESTINATION_NAME)){
- jmsBinding.setResponseDestinationName(reference.getName());
- }
- */
JMSBindingInvoker invoker = new JMSBindingInvoker(jmsBinding, operation, jmsResourceFactory, reference);
jmsBindingInvokers.add(invoker);
return invoker;
diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
index 381d3f9465..b21ae81cc4 100644
--- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
+++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
@@ -69,8 +69,10 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider
jmsResourceFactory = new JMSResourceFactory(binding.getConnectionFactoryName(), binding.getInitialContextFactoryName(), binding.getJndiURL());
if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)) {
- // use the SCA service name as the default destination name
- jmsBinding.setDestinationName(service.getName());
+ if (!service.isCallback()) {
+ // use the SCA service name as the default destination name
+ jmsBinding.setDestinationName(service.getName());
+ }
}
if (XMLTextMessageProcessor.class.isAssignableFrom(JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding).getClass())) {
@@ -127,6 +129,11 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider
Session session = jmsResourceFactory.createSession();
destination = lookupDestinationQueue();
+ if (destination == null) {
+ // TODO: temporary callback queues don't work yet as i can't see how to get the
+ // serice side to look up the temporary destination name
+ destination = session.createTemporaryQueue();
+ }
consumer = session.createConsumer(destination);
@@ -160,7 +167,10 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider
}
}});
}
- logger.log(Level.INFO, "JMS service '" + service.getName() + "' listening on destination " + jmsBinding.getDestinationName());
+ logger.log(Level.INFO,
+ "JMS " + (service.isCallback() ? "callback service" : "service") +
+ " '" + service.getName() + "' listening on destination " +
+ ((destination instanceof Queue) ? ((Queue)destination).getQueueName() : ((Topic)destination).getTopicName()));
}
/**
@@ -181,7 +191,15 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider
* existence/non-existence is not compatible with the create mode specified on the binding
*/
private Destination lookupDestinationQueue() throws NamingException, JMSBindingException {
- Destination destination = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName());
+
+ if (service.isCallback() && JMSBindingConstants.DEFAULT_DESTINATION_NAME.equals(jmsBinding.getDestinationName())) {
+ // if its a callback service returning null indicates to use a temporary queue
+ // TODO: temporary callback queues don't work yet as i can't see how to get the
+ // serice side to look up the temporary destination name
+ return null;
+ }
+
+ Destination destination = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName());
String qCreateMode = jmsBinding.getDestinationCreate();
if (qCreateMode.equals(JMSBindingConstants.CREATE_ALWAYS)) {