diff options
3 files changed, 26 insertions, 181 deletions
diff --git a/branches/sca-java-1.x/modules/binding-jms-runtime/pom.xml b/branches/sca-java-1.x/modules/binding-jms-runtime/pom.xml index 9dfcdd5ce6..7952d794a4 100644 --- a/branches/sca-java-1.x/modules/binding-jms-runtime/pom.xml +++ b/branches/sca-java-1.x/modules/binding-jms-runtime/pom.xml @@ -36,11 +36,11 @@ <version>1.5-SNAPSHOT</version> </dependency> - <!-- dependency> - <groupId>org.apache.tuscany.sca</groupId> - <artifactId>tuscany-binding-jms-policy</artifactId> - <version>1.5-SNAPSHOT</version> - </dependency--> + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-host-jms</artifactId> + <version>1.5-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.apache.tuscany.sca</groupId> diff --git a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java index a96e53d9e1..c9a11dc41e 100644 --- a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java +++ b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java @@ -21,14 +21,14 @@ package org.apache.tuscany.sca.binding.jms.provider; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.host.jms.JMSHostExtensionPoint; +import org.apache.tuscany.sca.host.jms.JMSServiceListenerFactory; import org.apache.tuscany.sca.provider.BindingProviderFactory; import org.apache.tuscany.sca.provider.ReferenceBindingProvider; import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentReference; import org.apache.tuscany.sca.runtime.RuntimeComponentService; -import org.apache.tuscany.sca.work.WorkScheduler; /** * A factory from creating the JMS binding provider. @@ -37,20 +37,21 @@ import org.apache.tuscany.sca.work.WorkScheduler; */ public class JMSBindingProviderFactory implements BindingProviderFactory<JMSBinding> { - private WorkScheduler workScheduler; private ExtensionPointRegistry extensionPoints; private JMSResourceFactoryExtensionPoint jmsRFEP; + private JMSServiceListenerFactory serviceListenerFactory; public JMSBindingProviderFactory(ExtensionPointRegistry extensionPoints) { this.extensionPoints = extensionPoints; - UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class); - workScheduler = utilities.getUtility(WorkScheduler.class); jmsRFEP = (JMSResourceFactoryExtensionPoint)extensionPoints.getExtensionPoint(JMSResourceFactoryExtensionPoint.class); if (jmsRFEP == null) { jmsRFEP = new DefaultJMSResourceFactoryExtensionPoint(); extensionPoints.addExtensionPoint(jmsRFEP); } + + JMSHostExtensionPoint jmsHostExtensionPoint = (JMSHostExtensionPoint)extensionPoints.getExtensionPoint(JMSHostExtensionPoint.class); + serviceListenerFactory = jmsHostExtensionPoint.getJMSServiceListenerFactory(); } public ReferenceBindingProvider createReferenceBindingProvider(RuntimeComponent component, RuntimeComponentReference reference, JMSBinding binding) { @@ -60,7 +61,7 @@ public class JMSBindingProviderFactory implements BindingProviderFactory<JMSBind public ServiceBindingProvider createServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, JMSBinding binding) { JMSResourceFactory jmsRF = jmsRFEP.createJMSResourceFactory(binding); - return new JMSBindingServiceBindingProvider(component, service, binding, binding, workScheduler, extensionPoints, jmsRF); + return new JMSBindingServiceBindingProvider(component, service, binding, binding, serviceListenerFactory, extensionPoints, jmsRF); } public Class<JMSBinding> getModelType() { diff --git a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java index 0aabc9a7a5..dd0a2c0b3e 100644 --- a/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java +++ b/branches/sca-java-1.x/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -19,18 +19,13 @@ package org.apache.tuscany.sca.binding.jms.provider; -import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; -import javax.jms.Session; import javax.jms.Topic; -import javax.naming.NamingException; import org.apache.tuscany.sca.assembly.Binding; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; @@ -39,6 +34,8 @@ import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; import org.apache.tuscany.sca.binding.jms.transport.TransportServiceInterceptor; import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.host.jms.JMSServiceListener; +import org.apache.tuscany.sca.host.jms.JMSServiceListenerFactory; import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.MessageFactory; @@ -52,7 +49,6 @@ import org.apache.tuscany.sca.provider.WireFormatProviderFactory; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.apache.tuscany.sca.runtime.RuntimeWire; -import org.apache.tuscany.sca.work.WorkScheduler; /** * Implementation of the JMS service binding provider. @@ -66,11 +62,8 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR private Binding targetBinding; private JMSBinding jmsBinding; private JMSResourceFactory jmsResourceFactory; - private MessageConsumer consumer; - private WorkScheduler workScheduler; - private boolean running; - - private Destination destination; + private JMSServiceListenerFactory serviceListenerFactory; + private JMSServiceListener serviceListener; private ExtensionPointRegistry extensionPoints; @@ -92,11 +85,11 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR private WireFormatProviderFactory responseWireFormatProviderFactory; private WireFormatProvider responseWireFormatProvider; - public JMSBindingServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, Binding targetBinding, JMSBinding binding, WorkScheduler workScheduler, ExtensionPointRegistry extensionPoints, JMSResourceFactory jmsResourceFactory) { + public JMSBindingServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, Binding targetBinding, JMSBinding binding, JMSServiceListenerFactory serviceListenerFactory, ExtensionPointRegistry extensionPoints, JMSResourceFactory jmsResourceFactory) { this.component = component; this.service = service; this.jmsBinding = binding; - this.workScheduler = workScheduler; + this.serviceListenerFactory = serviceListenerFactory; this.targetBinding = targetBinding; this.extensionPoints = extensionPoints; this.jmsResourceFactory = jmsResourceFactory; @@ -143,176 +136,27 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProviderR } public void start() { - this.running = true; - try { - registerListerner(); + + MessageListener listener = new RRBJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding, messageFactory); + this.serviceListener = serviceListenerFactory.createJMSServiceListener(service.getName(), service.isCallback(), jmsBinding, listener); + serviceListener.start(); + } catch (Exception e) { throw new JMSBindingException("Error starting JMSServiceBinding", e); } } public void stop() { - this.running = false; try { - consumer.close(); - jmsResourceFactory.closeConnection(); + serviceListener.stop(); } catch (Exception e) { - // if using an embedded broker then when shutting down Tuscany the broker may get closed - // before this stop method is called. I can't see how to detect that so for now just - // ignore the exception if the message is that the transport is already disposed - if (!"Transport disposed.".equals(e.getMessage())) { - throw new JMSBindingException("Error stopping JMSServiceBinding", e); - } - } - } - - private void registerListerner() throws NamingException, JMSException { - - Session session = jmsResourceFactory.createSession(); - destination = lookupDestinationQueue(); - if (destination == null) { - destination = session.createTemporaryQueue(); - } - - if (jmsBinding.getJMSSelector() != null) { - consumer = session.createConsumer(destination, jmsBinding.getJMSSelector()); - } else { - consumer = session.createConsumer(destination); - } - - MessageListener tmpListener = null; - - /* - * TODO turn on RRB version of JMS binding - */ - tmpListener = new RRBJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding, messageFactory); - //tmpListener = new DefaultJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding); - - final MessageListener listener = tmpListener; - - try { - - consumer.setMessageListener(listener); - jmsResourceFactory.startConnection(); - - } catch (javax.jms.IllegalStateException e) { - - // setMessageListener not allowed in JEE container so use Tuscany threads - - jmsResourceFactory.startConnection(); - workScheduler.scheduleWork(new Runnable() { - public void run() { - try { - while (running) { - final Message msg = consumer.receive(); - workScheduler.scheduleWork(new Runnable() { - public void run() { - try { - listener.onMessage(msg); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - }); + throw new JMSBindingException("Error stopping JMSServiceBinding", e); } - logger.log(Level.INFO, "JMS " + (service.isCallback() ? "callback service" : "service") - + " '" - + service.getName() - + "' listening on destination " - + ((destination instanceof Queue) ? ((Queue)destination).getQueueName() : ((Topic)destination).getTopicName())); - } - - /** - * Looks up the Destination Queue for the JMS Binding. - * <p> - * What happens in the look up will depend on the create mode specified for the JMS Binding: - * <ul> - * <li>always - the JMS queue is always created. It is an error if the queue already exists - * <li>ifnotexist - the JMS queue is created if it does not exist. It is not an error if the queue already exists - * <li>never - the JMS queue is never created. It is an error if the queue does not exist - * </ul> - * See the SCA JMS Binding specification for more information. - * <p> - * - * @return The Destination queue. - * @throws NamingException Failed to lookup JMS queue - * @throws JMSBindingException Failed to lookup JMS Queue. Probable cause is that the JMS queue's current existence/non-existence is not - * compatible with the create mode specified on the binding - */ - private Destination lookupDestinationQueue() throws NamingException, JMSBindingException { - - if (service.isCallback() && JMSBindingConstants.DEFAULT_DESTINATION_NAME.equals(jmsBinding.getDestinationName())) { - // if its a callback service returning null indicates to use a temporary queue - return null; - } - - Destination destination = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName()); - - String qCreateMode = jmsBinding.getDestinationCreate(); - if (qCreateMode.equals(JMSBindingConstants.CREATE_ALWAYS)) { - // In this mode, the queue must not already exist as we are creating it - if (destination != null) { - throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName() - + " already exists but has create mode of \"" - + qCreateMode - + "\" while registering service " - + service.getName() - + " listener"); - } - - // Create the queue - destination = jmsResourceFactory.createDestination(jmsBinding.getDestinationName()); - - } else if (qCreateMode.equals(JMSBindingConstants.CREATE_IF_NOT_EXIST)) { - // In this mode, the queue may nor may not exist. It will be created if it does not exist - if (destination == null) { - destination = jmsResourceFactory.createDestination(jmsBinding.getDestinationName()); - } - - } else if (qCreateMode.equals(JMSBindingConstants.CREATE_NEVER)) { - // In this mode, the queue must have already been created. - if (destination == null) { - throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName() - + " not found but create mode of \"" - + qCreateMode - + "\" while registering service " - + service.getName() - + " listener"); - } - } - - // Make sure we ended up with a queue - if (destination == null) { - throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName() - + " not found with create mode of \"" - + qCreateMode - + "\" while registering service " - + service.getName() - + " listener"); - } - - return destination; } public String getDestinationName() { - try { - if (destination instanceof Queue) { - return ((Queue)destination).getQueueName(); - } else if (destination instanceof Topic) { - return ((Topic)destination).getTopicName(); - } else { - return null; - } - } catch (JMSException e) { - throw new JMSBindingException(e); - } + return serviceListener.getDestinationName(); } /* |