From e4510026a79a0878be92ab775edc381d3fbf4cfa Mon Sep 17 00:00:00 2001 From: antelder Date: Thu, 14 May 2009 10:16:25 +0000 Subject: Add support for using Tuscany threads instead on setMessageListener in a JEE container environment git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@774720 13f79535-47bb-0310-9956-ffa450edef68 --- .../axis2/impl/Axis2SCAServiceBindingProvider.java | 7 ++- .../sca/axis2/impl/Axis2SCAServiceProvider.java | 8 ++- .../ws/axis2/Axis2BindingProviderFactory.java | 8 ++- .../ws/axis2/Axis2ServiceBindingProvider.java | 6 ++- .../sca/binding/ws/axis2/Axis2ServiceProvider.java | 8 ++- .../binding/ws/axis2/jms/JMSConnectionFactory.java | 59 ++++++++++++++++++++-- .../sca/binding/ws/axis2/jms/JMSListener.java | 9 +++- 7 files changed, 90 insertions(+), 15 deletions(-) (limited to 'branches') diff --git a/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java b/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java index 58a50d46dc..0450d07071 100644 --- a/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java +++ b/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java @@ -31,6 +31,7 @@ import org.apache.tuscany.sca.binding.ws.axis2.Axis2ServiceProvider; import org.apache.tuscany.sca.binding.ws.wsdlgen.BindingWSDLGenerator; import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint; import org.apache.tuscany.sca.host.http.ServletHost; import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint; @@ -40,6 +41,7 @@ import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple; import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.work.WorkScheduler; /** * The service binding provider for the remote sca binding implementation. Relies on the @@ -69,6 +71,8 @@ public class Axis2SCAServiceBindingProvider implements ServiceBindingProvider { ModelFactoryExtensionPoint modelFactories = extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class); MessageFactory messageFactory = modelFactories.getFactory(MessageFactory.class); DataBindingExtensionPoint dataBindings = extensionPoints.getExtensionPoint(DataBindingExtensionPoint.class); + UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class); + WorkScheduler workScheduler = utilities.getUtility(WorkScheduler.class); this.binding = binding.getSCABinding(); wsBinding = modelFactories.getFactory(WebServiceBindingFactory.class).createWebServiceBinding(); @@ -88,7 +92,8 @@ public class Axis2SCAServiceBindingProvider implements ServiceBindingProvider { wsBinding, servletHost, messageFactory, - policyHandlerClassnames); + policyHandlerClassnames, + workScheduler); } public InterfaceContract getBindingInterfaceContract() { diff --git a/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java b/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java index cf13d821f7..dd66415cca 100644 --- a/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java +++ b/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java @@ -25,11 +25,13 @@ import org.apache.tuscany.sca.assembly.Binding; import org.apache.tuscany.sca.assembly.SCABinding; import org.apache.tuscany.sca.binding.ws.WebServiceBinding; import org.apache.tuscany.sca.binding.ws.axis2.Axis2ServiceProvider; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.host.http.ServletHost; import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.work.WorkScheduler; /** * A specialization of the Axis2BindingProvider that just switches in the SCABinding model @@ -58,14 +60,16 @@ public class Axis2SCAServiceProvider extends Axis2ServiceProvider { WebServiceBinding wsBinding, ServletHost servletHost, MessageFactory messageFactory, - List policyHandlerClassnames) { + List policyHandlerClassnames, + WorkScheduler workScheduler) { super(component, service, wsBinding, servletHost, messageFactory, - policyHandlerClassnames); + policyHandlerClassnames, + workScheduler); this.binding = binding; } diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java index c7eb2b7794..e6ca8e20a8 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.tuscany.sca.binding.ws.WebServiceBinding; import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint; import org.apache.tuscany.sca.host.http.ServletHost; import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint; @@ -34,7 +35,7 @@ import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentReference; import org.apache.tuscany.sca.runtime.RuntimeComponentService; -import org.osoa.sca.ServiceRuntimeException; +import org.apache.tuscany.sca.work.WorkScheduler; /** * Axis2BindingProviderFactory @@ -48,6 +49,7 @@ public class Axis2BindingProviderFactory implements BindingProviderFactory policyHandlerClassnames = null; private DataBindingExtensionPoint dataBindings; + private WorkScheduler workScheduler; public Axis2BindingProviderFactory(ExtensionPointRegistry extensionPoints) { ServletHostExtensionPoint servletHosts = extensionPoints.getExtensionPoint(ServletHostExtensionPoint.class); @@ -58,6 +60,8 @@ public class Axis2BindingProviderFactory implements BindingProviderFactory getModelType() { diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java index 5b26290732..8a4b64d266 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java @@ -31,6 +31,7 @@ import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple; import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.work.WorkScheduler; import org.osoa.sca.ServiceRuntimeException; public class Axis2ServiceBindingProvider implements ServiceBindingProvider { @@ -44,7 +45,8 @@ public class Axis2ServiceBindingProvider implements ServiceBindingProvider { ServletHost servletHost, ModelFactoryExtensionPoint modelFactories, List policyHandlerClassnames, - DataBindingExtensionPoint dataBindings) { + DataBindingExtensionPoint dataBindings, + WorkScheduler workScheduler) { if (servletHost == null) { throw new ServiceRuntimeException("No Servlet host is avaible for HTTP web services"); @@ -62,7 +64,7 @@ public class Axis2ServiceBindingProvider implements ServiceBindingProvider { InterfaceContract contract = wsBinding.getBindingInterfaceContract(); contract.getInterface().resetDataBinding(OMElement.class.getName()); - axisProvider = new Axis2ServiceProvider(component, service, wsBinding, servletHost, messageFactory, policyHandlerClassnames); + axisProvider = new Axis2ServiceProvider(component, service, wsBinding, servletHost, messageFactory, policyHandlerClassnames, workScheduler); } public void start() { diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java index a562604115..64c0c66a2e 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java @@ -100,6 +100,7 @@ import org.apache.tuscany.sca.runtime.ReferenceParameters; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.apache.tuscany.sca.runtime.RuntimeWire; +import org.apache.tuscany.sca.work.WorkScheduler; import org.apache.tuscany.sca.xsd.XSDefinition; import org.apache.ws.commons.schema.XmlSchema; import org.apache.ws.commons.schema.XmlSchemaExternal; @@ -134,6 +135,7 @@ public class Axis2ServiceProvider { private BasicAuthenticationPolicy basicAuthenticationPolicy = null; private Axis2TokenAuthenticationPolicy axis2TokenAuthenticationPolicy = null; private List axis2HeaderPolicies = new ArrayList(); + private WorkScheduler workScheduler; public static final QName QNAME_WSA_ADDRESS = new QName(AddressingConstants.Final.WSA_NAMESPACE, AddressingConstants.EPR_ADDRESS); @@ -166,7 +168,8 @@ public class Axis2ServiceProvider { WebServiceBinding wsBinding, ServletHost servletHost, MessageFactory messageFactory, - List policyHandlerClassnames) { + List policyHandlerClassnames, + WorkScheduler workScheduler) { this.component = component; this.contract = contract; @@ -174,6 +177,7 @@ public class Axis2ServiceProvider { this.servletHost = servletHost; this.messageFactory = messageFactory; this.policyHandlerClassnames = policyHandlerClassnames; + this.workScheduler = workScheduler; final boolean isRampartRequired = AxisPolicyHelper.isRampartRequired(wsBinding); try { @@ -324,7 +328,7 @@ public class Axis2ServiceProvider { } else if (endpointURL.startsWith("jms")) { logger.log(Level.INFO,"Axis2 JMS URL=" + endpointURL); - jmsListener = new JMSListener(); + jmsListener = new JMSListener(workScheduler); jmsSender = new JMSSender(); ListenerManager listenerManager = configContext.getListenerManager(); TransportInDescription trsIn = configContext.getAxisConfiguration().getTransportIn(Constants.TRANSPORT_JMS); diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java index fc650e2099..a96ec0b1c4 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java @@ -28,6 +28,7 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; @@ -42,6 +43,7 @@ import org.apache.axis2.transport.jms.JMSConstants; import org.apache.axis2.transport.jms.JMSUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.work.WorkScheduler; /** * Encapsulate a JMS Connection factory definition within an Axis2.xml @@ -147,19 +149,23 @@ public class JMSConnectionFactory { */ private String pass = null; + private WorkScheduler workScheduler; + private boolean consumerRunning; + /** * Create a JMSConnectionFactory for the given Axis2 name and JNDI name * * @param name the local Axis2 name of the connection factory * @param jndiName the JNDI name of the actual connection factory used */ - JMSConnectionFactory(String name, String jndiName) { + JMSConnectionFactory(String name, String jndiName, WorkScheduler workScheduler) { this.name = name; this.jndiName = jndiName; serviceJNDINameMapping = new HashMap(); serviceDestinationMapping = new HashMap(); properties = new Hashtable(); jmsSessions = new HashMap(); + this.workScheduler = workScheduler; } /** @@ -167,8 +173,8 @@ public class JMSConnectionFactory { * * @param name the local Axis2 name of the connection factory */ - JMSConnectionFactory(String name) { - this(name, null); + JMSConnectionFactory(String name, WorkScheduler workScheduler) { + this(name, null, workScheduler); } /** @@ -425,7 +431,9 @@ public class JMSConnectionFactory { } // start the connection - connection.start(); + if (!consumerRunning) { + connection.start(); + } log.info("Connection factory : " + name + " initialized..."); } @@ -458,10 +466,50 @@ public class JMSConnectionFactory { } MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(this.msgRcvr); +// consumer.setMessageListener(this.msgRcvr); replace with new Tuscany method: + registerMessageReceiver(consumer, this.msgRcvr); jmsSessions.put(destinationJndi, session); } + private void registerMessageReceiver(final MessageConsumer consumer, final JMSMessageReceiver messageReceiver) throws JMSException { + + try { + + consumer.setMessageListener(messageReceiver); + + } catch (javax.jms.JMSException e) { + + // setMessageListener not allowed in JEE container so use Tuscany threads + + connection.start(); + consumerRunning = true; + + workScheduler.scheduleWork(new Runnable() { + + public void run() { + try { + while (consumerRunning) { + final Message msg = consumer.receive(); + if (msg != null) { + workScheduler.scheduleWork(new Runnable() { + public void run() { + try { + messageReceiver.onMessage(msg); + } catch (Exception e) { + log.error("Exception on message receiver thread", e); + } + } + }); + } + } + } catch (Exception e) { + log.error("Exception on consumer receive thread", e); + } + } + }); + } + } + /** * Stop listening on the given destination - for undeployment of services * @@ -488,6 +536,7 @@ public class JMSConnectionFactory { */ public void stop() { try { + consumerRunning = false; connection.close(); } catch (JMSException e) { log.warn("Error shutting down connection factory : " + name, e); diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java index 5758a1f6f2..08190fb57c 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java @@ -49,6 +49,7 @@ import org.apache.axis2.transport.jms.JMSConstants; import org.apache.axis2.transport.jms.JMSUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.work.WorkScheduler; import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; @@ -109,6 +110,12 @@ public class JMSListener implements TransportListener { private ExecutorService workerPool; + private WorkScheduler workScheduler; + + public JMSListener(WorkScheduler workScheduler) { + this.workScheduler = workScheduler; + } + /** * This is the TransportListener initialization method invoked by Axis2 * @@ -221,7 +228,7 @@ public class JMSListener implements TransportListener { Parameter param = (Parameter) conFacIter.next(); JMSConnectionFactory jmsConFactory = - new JMSConnectionFactory(param.getName()); + new JMSConnectionFactory(param.getName(), workScheduler); ParameterIncludeImpl pi = new ParameterIncludeImpl(); try { -- cgit v1.2.3