diff options
author | antelder <antelder@13f79535-47bb-0310-9956-ffa450edef68> | 2009-05-14 10:16:25 +0000 |
---|---|---|
committer | antelder <antelder@13f79535-47bb-0310-9956-ffa450edef68> | 2009-05-14 10:16:25 +0000 |
commit | e4510026a79a0878be92ab775edc381d3fbf4cfa (patch) | |
tree | 6f345c8058cf1c6b3672e70ad61a6fdc2c7a95d5 /branches/sca-java-1.x/modules/binding-ws-axis2 | |
parent | 87017c6cc4874d3cb6c42567fb3484fdbed7bbbf (diff) |
Add support for using Tuscany threads instead on setMessageListener in a JEE container environment
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@774720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2')
5 files changed, 78 insertions, 12 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java index c7eb2b7794..e6ca8e20a8 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.tuscany.sca.binding.ws.WebServiceBinding; import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint; import org.apache.tuscany.sca.host.http.ServletHost; import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint; @@ -34,7 +35,7 @@ import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentReference; import org.apache.tuscany.sca.runtime.RuntimeComponentService; -import org.osoa.sca.ServiceRuntimeException; +import org.apache.tuscany.sca.work.WorkScheduler; /** * Axis2BindingProviderFactory @@ -48,6 +49,7 @@ public class Axis2BindingProviderFactory implements BindingProviderFactory<WebSe private ServletHost servletHost; private List<PolicyHandlerTuple> policyHandlerClassnames = null; private DataBindingExtensionPoint dataBindings; + private WorkScheduler workScheduler; public Axis2BindingProviderFactory(ExtensionPointRegistry extensionPoints) { ServletHostExtensionPoint servletHosts = extensionPoints.getExtensionPoint(ServletHostExtensionPoint.class); @@ -58,6 +60,8 @@ public class Axis2BindingProviderFactory implements BindingProviderFactory<WebSe modelFactories = extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class); policyHandlerClassnames = PolicyHandlerDefinitionsLoader.loadPolicyHandlerClassnames(); dataBindings = extensionPoints.getExtensionPoint(DataBindingExtensionPoint.class); + UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class); + workScheduler = utilities.getUtility(WorkScheduler.class); } public ReferenceBindingProvider createReferenceBindingProvider(RuntimeComponent component, @@ -72,7 +76,7 @@ public class Axis2BindingProviderFactory implements BindingProviderFactory<WebSe WebServiceBinding binding) { return new Axis2ServiceBindingProvider(component, service, binding, servletHost, modelFactories, - policyHandlerClassnames, dataBindings); + policyHandlerClassnames, dataBindings, workScheduler); } public Class<WebServiceBinding> getModelType() { diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java index 5b26290732..8a4b64d266 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java @@ -31,6 +31,7 @@ import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple; import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.work.WorkScheduler; import org.osoa.sca.ServiceRuntimeException; public class Axis2ServiceBindingProvider implements ServiceBindingProvider { @@ -44,7 +45,8 @@ public class Axis2ServiceBindingProvider implements ServiceBindingProvider { ServletHost servletHost, ModelFactoryExtensionPoint modelFactories, List<PolicyHandlerTuple> policyHandlerClassnames, - DataBindingExtensionPoint dataBindings) { + DataBindingExtensionPoint dataBindings, + WorkScheduler workScheduler) { if (servletHost == null) { throw new ServiceRuntimeException("No Servlet host is avaible for HTTP web services"); @@ -62,7 +64,7 @@ public class Axis2ServiceBindingProvider implements ServiceBindingProvider { InterfaceContract contract = wsBinding.getBindingInterfaceContract(); contract.getInterface().resetDataBinding(OMElement.class.getName()); - axisProvider = new Axis2ServiceProvider(component, service, wsBinding, servletHost, messageFactory, policyHandlerClassnames); + axisProvider = new Axis2ServiceProvider(component, service, wsBinding, servletHost, messageFactory, policyHandlerClassnames, workScheduler); } public void start() { diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java index a562604115..64c0c66a2e 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java @@ -100,6 +100,7 @@ import org.apache.tuscany.sca.runtime.ReferenceParameters; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.apache.tuscany.sca.runtime.RuntimeWire; +import org.apache.tuscany.sca.work.WorkScheduler; import org.apache.tuscany.sca.xsd.XSDefinition; import org.apache.ws.commons.schema.XmlSchema; import org.apache.ws.commons.schema.XmlSchemaExternal; @@ -134,6 +135,7 @@ public class Axis2ServiceProvider { private BasicAuthenticationPolicy basicAuthenticationPolicy = null; private Axis2TokenAuthenticationPolicy axis2TokenAuthenticationPolicy = null; private List<Axis2HeaderPolicy> axis2HeaderPolicies = new ArrayList<Axis2HeaderPolicy>(); + private WorkScheduler workScheduler; public static final QName QNAME_WSA_ADDRESS = new QName(AddressingConstants.Final.WSA_NAMESPACE, AddressingConstants.EPR_ADDRESS); @@ -166,7 +168,8 @@ public class Axis2ServiceProvider { WebServiceBinding wsBinding, ServletHost servletHost, MessageFactory messageFactory, - List<PolicyHandlerTuple> policyHandlerClassnames) { + List<PolicyHandlerTuple> policyHandlerClassnames, + WorkScheduler workScheduler) { this.component = component; this.contract = contract; @@ -174,6 +177,7 @@ public class Axis2ServiceProvider { this.servletHost = servletHost; this.messageFactory = messageFactory; this.policyHandlerClassnames = policyHandlerClassnames; + this.workScheduler = workScheduler; final boolean isRampartRequired = AxisPolicyHelper.isRampartRequired(wsBinding); try { @@ -324,7 +328,7 @@ public class Axis2ServiceProvider { } else if (endpointURL.startsWith("jms")) { logger.log(Level.INFO,"Axis2 JMS URL=" + endpointURL); - jmsListener = new JMSListener(); + jmsListener = new JMSListener(workScheduler); jmsSender = new JMSSender(); ListenerManager listenerManager = configContext.getListenerManager(); TransportInDescription trsIn = configContext.getAxisConfiguration().getTransportIn(Constants.TRANSPORT_JMS); diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java index fc650e2099..a96ec0b1c4 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java @@ -28,6 +28,7 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
@@ -42,6 +43,7 @@ import org.apache.axis2.transport.jms.JMSConstants; import org.apache.axis2.transport.jms.JMSUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* Encapsulate a JMS Connection factory definition within an Axis2.xml
@@ -147,19 +149,23 @@ public class JMSConnectionFactory { */
private String pass = null;
+ private WorkScheduler workScheduler;
+ private boolean consumerRunning;
+
/**
* Create a JMSConnectionFactory for the given Axis2 name and JNDI name
*
* @param name the local Axis2 name of the connection factory
* @param jndiName the JNDI name of the actual connection factory used
*/
- JMSConnectionFactory(String name, String jndiName) {
+ JMSConnectionFactory(String name, String jndiName, WorkScheduler workScheduler) {
this.name = name;
this.jndiName = jndiName;
serviceJNDINameMapping = new HashMap();
serviceDestinationMapping = new HashMap();
properties = new Hashtable();
jmsSessions = new HashMap();
+ this.workScheduler = workScheduler;
}
/**
@@ -167,8 +173,8 @@ public class JMSConnectionFactory { *
* @param name the local Axis2 name of the connection factory
*/
- JMSConnectionFactory(String name) {
- this(name, null);
+ JMSConnectionFactory(String name, WorkScheduler workScheduler) {
+ this(name, null, workScheduler);
}
/**
@@ -425,7 +431,9 @@ public class JMSConnectionFactory { }
// start the connection
- connection.start();
+ if (!consumerRunning) {
+ connection.start();
+ }
log.info("Connection factory : " + name + " initialized...");
}
@@ -458,10 +466,50 @@ public class JMSConnectionFactory { }
MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(this.msgRcvr);
+// consumer.setMessageListener(this.msgRcvr); replace with new Tuscany method:
+ registerMessageReceiver(consumer, this.msgRcvr);
jmsSessions.put(destinationJndi, session);
}
+ private void registerMessageReceiver(final MessageConsumer consumer, final JMSMessageReceiver messageReceiver) throws JMSException {
+
+ try {
+
+ consumer.setMessageListener(messageReceiver);
+
+ } catch (javax.jms.JMSException e) {
+
+ // setMessageListener not allowed in JEE container so use Tuscany threads
+
+ connection.start();
+ consumerRunning = true;
+
+ workScheduler.scheduleWork(new Runnable() {
+
+ public void run() {
+ try {
+ while (consumerRunning) {
+ final Message msg = consumer.receive();
+ if (msg != null) {
+ workScheduler.scheduleWork(new Runnable() {
+ public void run() {
+ try {
+ messageReceiver.onMessage(msg);
+ } catch (Exception e) {
+ log.error("Exception on message receiver thread", e);
+ }
+ }
+ });
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exception on consumer receive thread", e);
+ }
+ }
+ });
+ }
+ }
+
/**
* Stop listening on the given destination - for undeployment of services
*
@@ -488,6 +536,7 @@ public class JMSConnectionFactory { */
public void stop() {
try {
+ consumerRunning = false;
connection.close();
} catch (JMSException e) {
log.warn("Error shutting down connection factory : " + name, e);
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java index 5758a1f6f2..08190fb57c 100644 --- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java +++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java @@ -49,6 +49,7 @@ import org.apache.axis2.transport.jms.JMSConstants; import org.apache.axis2.transport.jms.JMSUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
@@ -109,6 +110,12 @@ public class JMSListener implements TransportListener { private ExecutorService workerPool;
+ private WorkScheduler workScheduler;
+
+ public JMSListener(WorkScheduler workScheduler) {
+ this.workScheduler = workScheduler;
+ }
+
/**
* This is the TransportListener initialization method invoked by Axis2
*
@@ -221,7 +228,7 @@ public class JMSListener implements TransportListener { Parameter param = (Parameter) conFacIter.next();
JMSConnectionFactory jmsConFactory =
- new JMSConnectionFactory(param.getName());
+ new JMSConnectionFactory(param.getName(), workScheduler);
ParameterIncludeImpl pi = new ParameterIncludeImpl();
try {
|