summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2009-01-28 14:56:56 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2009-01-28 14:56:56 +0000
commit1a3cec01524fc7d4c85793de9a60bca23d8025af (patch)
tree55901adf26810e781f23baa0fc8bec8820c3c8d6 /branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java
parent4c20594aea3ead4b143eb785259830b07e69ec63 (diff)
Intial cut at changing the jms binding to use the jms host for the service listener
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@738502 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java')
-rw-r--r--branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java13
-rw-r--r--branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java184
2 files changed, 21 insertions, 176 deletions
diff --git a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java
index a96e53d9e1..c9a11dc41e 100644
--- a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java
+++ b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java
@@ -21,14 +21,14 @@ package org.apache.tuscany.sca.binding.jms.provider;
import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
-import org.apache.tuscany.sca.core.UtilityExtensionPoint;
+import org.apache.tuscany.sca.host.jms.JMSHostExtensionPoint;
+import org.apache.tuscany.sca.host.jms.JMSServiceListenerFactory;
import org.apache.tuscany.sca.provider.BindingProviderFactory;
import org.apache.tuscany.sca.provider.ReferenceBindingProvider;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
-import org.apache.tuscany.sca.work.WorkScheduler;
/**
* A factory from creating the JMS binding provider.
@@ -37,20 +37,21 @@ import org.apache.tuscany.sca.work.WorkScheduler;
*/
public class JMSBindingProviderFactory implements BindingProviderFactory<JMSBinding> {
- private WorkScheduler workScheduler;
private ExtensionPointRegistry extensionPoints;
private JMSResourceFactoryExtensionPoint jmsRFEP;
+ private JMSServiceListenerFactory serviceListenerFactory;
public JMSBindingProviderFactory(ExtensionPointRegistry extensionPoints) {
this.extensionPoints = extensionPoints;
- UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
- workScheduler = utilities.getUtility(WorkScheduler.class);
jmsRFEP = (JMSResourceFactoryExtensionPoint)extensionPoints.getExtensionPoint(JMSResourceFactoryExtensionPoint.class);
if (jmsRFEP == null) {
jmsRFEP = new DefaultJMSResourceFactoryExtensionPoint();
extensionPoints.addExtensionPoint(jmsRFEP);
}
+
+ JMSHostExtensionPoint jmsHostExtensionPoint = (JMSHostExtensionPoint)extensionPoints.getExtensionPoint(JMSHostExtensionPoint.class);
+ serviceListenerFactory = jmsHostExtensionPoint.getJMSServiceListenerFactory();
}
public ReferenceBindingProvider createReferenceBindingProvider(RuntimeComponent component, RuntimeComponentReference reference, JMSBinding binding) {
@@ -60,7 +61,7 @@ public class JMSBindingProviderFactory implements BindingProviderFactory<JMSBind
public ServiceBindingProvider createServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, JMSBinding binding) {
JMSResourceFactory jmsRF = jmsRFEP.createJMSResourceFactory(binding);
- return new JMSBindingServiceBindingProvider(component, service, binding, binding, workScheduler, extensionPoints, jmsRF);
+ return new JMSBindingServiceBindingProvider(component, service, binding, binding, serviceListenerFactory, extensionPoints, jmsRF);
}
public Class<JMSBinding> getModelType() {
diff --git a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
index 0aabc9a7a5..dd0a2c0b3e 100644
--- a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
+++ b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
@@ -19,18 +19,13 @@
package org.apache.tuscany.sca.binding.jms.provider;
-import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
-import javax.jms.Session;
import javax.jms.Topic;
-import javax.naming.NamingException;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
@@ -39,6 +34,8 @@ import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
import org.apache.tuscany.sca.binding.jms.transport.TransportServiceInterceptor;
import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.host.jms.JMSServiceListener;
+import org.apache.tuscany.sca.host.jms.JMSServiceListenerFactory;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.MessageFactory;
@@ -52,7 +49,6 @@ import org.apache.tuscany.sca.provider.WireFormatProviderFactory;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeWire;
-import org.apache.tuscany.sca.work.WorkScheduler;
/**
* Implementation of the JMS service binding provider.
@@ -66,11 +62,8 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR
private Binding targetBinding;
private JMSBinding jmsBinding;
private JMSResourceFactory jmsResourceFactory;
- private MessageConsumer consumer;
- private WorkScheduler workScheduler;
- private boolean running;
-
- private Destination destination;
+ private JMSServiceListenerFactory serviceListenerFactory;
+ private JMSServiceListener serviceListener;
private ExtensionPointRegistry extensionPoints;
@@ -92,11 +85,11 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR
private WireFormatProviderFactory responseWireFormatProviderFactory;
private WireFormatProvider responseWireFormatProvider;
- public JMSBindingServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, Binding targetBinding, JMSBinding binding, WorkScheduler workScheduler, ExtensionPointRegistry extensionPoints, JMSResourceFactory jmsResourceFactory) {
+ public JMSBindingServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, Binding targetBinding, JMSBinding binding, JMSServiceListenerFactory serviceListenerFactory, ExtensionPointRegistry extensionPoints, JMSResourceFactory jmsResourceFactory) {
this.component = component;
this.service = service;
this.jmsBinding = binding;
- this.workScheduler = workScheduler;
+ this.serviceListenerFactory = serviceListenerFactory;
this.targetBinding = targetBinding;
this.extensionPoints = extensionPoints;
this.jmsResourceFactory = jmsResourceFactory;
@@ -143,176 +136,27 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR
}
public void start() {
- this.running = true;
-
try {
- registerListerner();
+
+ MessageListener listener = new RRBJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding, messageFactory);
+ this.serviceListener = serviceListenerFactory.createJMSServiceListener(service.getName(), service.isCallback(), jmsBinding, listener);
+ serviceListener.start();
+
} catch (Exception e) {
throw new JMSBindingException("Error starting JMSServiceBinding", e);
}
}
public void stop() {
- this.running = false;
try {
- consumer.close();
- jmsResourceFactory.closeConnection();
+ serviceListener.stop();
} catch (Exception e) {
- // if using an embedded broker then when shutting down Tuscany the broker may get closed
- // before this stop method is called. I can't see how to detect that so for now just
- // ignore the exception if the message is that the transport is already disposed
- if (!"Transport disposed.".equals(e.getMessage())) {
- throw new JMSBindingException("Error stopping JMSServiceBinding", e);
- }
- }
- }
-
- private void registerListerner() throws NamingException, JMSException {
-
- Session session = jmsResourceFactory.createSession();
- destination = lookupDestinationQueue();
- if (destination == null) {
- destination = session.createTemporaryQueue();
- }
-
- if (jmsBinding.getJMSSelector() != null) {
- consumer = session.createConsumer(destination, jmsBinding.getJMSSelector());
- } else {
- consumer = session.createConsumer(destination);
- }
-
- MessageListener tmpListener = null;
-
- /*
- * TODO turn on RRB version of JMS binding
- */
- tmpListener = new RRBJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding, messageFactory);
- //tmpListener = new DefaultJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding);
-
- final MessageListener listener = tmpListener;
-
- try {
-
- consumer.setMessageListener(listener);
- jmsResourceFactory.startConnection();
-
- } catch (javax.jms.IllegalStateException e) {
-
- // setMessageListener not allowed in JEE container so use Tuscany threads
-
- jmsResourceFactory.startConnection();
- workScheduler.scheduleWork(new Runnable() {
- public void run() {
- try {
- while (running) {
- final Message msg = consumer.receive();
- workScheduler.scheduleWork(new Runnable() {
- public void run() {
- try {
- listener.onMessage(msg);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
+ throw new JMSBindingException("Error stopping JMSServiceBinding", e);
}
- logger.log(Level.INFO, "JMS " + (service.isCallback() ? "callback service" : "service")
- + " '"
- + service.getName()
- + "' listening on destination "
- + ((destination instanceof Queue) ? ((Queue)destination).getQueueName() : ((Topic)destination).getTopicName()));
- }
-
- /**
- * Looks up the Destination Queue for the JMS Binding.
- * <p>
- * What happens in the look up will depend on the create mode specified for the JMS Binding:
- * <ul>
- * <li>always - the JMS queue is always created. It is an error if the queue already exists
- * <li>ifnotexist - the JMS queue is created if it does not exist. It is not an error if the queue already exists
- * <li>never - the JMS queue is never created. It is an error if the queue does not exist
- * </ul>
- * See the SCA JMS Binding specification for more information.
- * <p>
- *
- * @return The Destination queue.
- * @throws NamingException Failed to lookup JMS queue
- * @throws JMSBindingException Failed to lookup JMS Queue. Probable cause is that the JMS queue's current existence/non-existence is not
- * compatible with the create mode specified on the binding
- */
- private Destination lookupDestinationQueue() throws NamingException, JMSBindingException {
-
- if (service.isCallback() && JMSBindingConstants.DEFAULT_DESTINATION_NAME.equals(jmsBinding.getDestinationName())) {
- // if its a callback service returning null indicates to use a temporary queue
- return null;
- }
-
- Destination destination = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName());
-
- String qCreateMode = jmsBinding.getDestinationCreate();
- if (qCreateMode.equals(JMSBindingConstants.CREATE_ALWAYS)) {
- // In this mode, the queue must not already exist as we are creating it
- if (destination != null) {
- throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName()
- + " already exists but has create mode of \""
- + qCreateMode
- + "\" while registering service "
- + service.getName()
- + " listener");
- }
-
- // Create the queue
- destination = jmsResourceFactory.createDestination(jmsBinding.getDestinationName());
-
- } else if (qCreateMode.equals(JMSBindingConstants.CREATE_IF_NOT_EXIST)) {
- // In this mode, the queue may nor may not exist. It will be created if it does not exist
- if (destination == null) {
- destination = jmsResourceFactory.createDestination(jmsBinding.getDestinationName());
- }
-
- } else if (qCreateMode.equals(JMSBindingConstants.CREATE_NEVER)) {
- // In this mode, the queue must have already been created.
- if (destination == null) {
- throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName()
- + " not found but create mode of \""
- + qCreateMode
- + "\" while registering service "
- + service.getName()
- + " listener");
- }
- }
-
- // Make sure we ended up with a queue
- if (destination == null) {
- throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName()
- + " not found with create mode of \""
- + qCreateMode
- + "\" while registering service "
- + service.getName()
- + " listener");
- }
-
- return destination;
}
public String getDestinationName() {
- try {
- if (destination instanceof Queue) {
- return ((Queue)destination).getQueueName();
- } else if (destination instanceof Topic) {
- return ((Topic)destination).getTopicName();
- } else {
- return null;
- }
- } catch (JMSException e) {
- throw new JMSBindingException(e);
- }
+ return serviceListener.getDestinationName();
}
/*