summaryrefslogtreecommitdiffstats
path: root/branches
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2009-05-14 10:16:25 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2009-05-14 10:16:25 +0000
commite4510026a79a0878be92ab775edc381d3fbf4cfa (patch)
tree6f345c8058cf1c6b3672e70ad61a6fdc2c7a95d5 /branches
parent87017c6cc4874d3cb6c42567fb3484fdbed7bbbf (diff)
Add support for using Tuscany threads instead on setMessageListener in a JEE container environment
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@774720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches')
-rw-r--r--branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java7
-rw-r--r--branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java8
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java8
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java6
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java8
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java59
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java9
7 files changed, 90 insertions, 15 deletions
diff --git a/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java b/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
index 58a50d46dc..0450d07071 100644
--- a/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
+++ b/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceBindingProvider.java
@@ -31,6 +31,7 @@ import org.apache.tuscany.sca.binding.ws.axis2.Axis2ServiceProvider;
import org.apache.tuscany.sca.binding.ws.wsdlgen.BindingWSDLGenerator;
import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint;
import org.apache.tuscany.sca.host.http.ServletHost;
import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint;
@@ -40,6 +41,7 @@ import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* The service binding provider for the remote sca binding implementation. Relies on the
@@ -69,6 +71,8 @@ public class Axis2SCAServiceBindingProvider implements ServiceBindingProvider {
ModelFactoryExtensionPoint modelFactories = extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
MessageFactory messageFactory = modelFactories.getFactory(MessageFactory.class);
DataBindingExtensionPoint dataBindings = extensionPoints.getExtensionPoint(DataBindingExtensionPoint.class);
+ UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
+ WorkScheduler workScheduler = utilities.getUtility(WorkScheduler.class);
this.binding = binding.getSCABinding();
wsBinding = modelFactories.getFactory(WebServiceBindingFactory.class).createWebServiceBinding();
@@ -88,7 +92,8 @@ public class Axis2SCAServiceBindingProvider implements ServiceBindingProvider {
wsBinding,
servletHost,
messageFactory,
- policyHandlerClassnames);
+ policyHandlerClassnames,
+ workScheduler);
}
public InterfaceContract getBindingInterfaceContract() {
diff --git a/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java b/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
index cf13d821f7..dd66415cca 100644
--- a/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
+++ b/branches/sca-java-1.x/modules/binding-sca-axis2/src/main/java/org/apache/tuscany/sca/binding/sca/axis2/impl/Axis2SCAServiceProvider.java
@@ -25,11 +25,13 @@ import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.assembly.SCABinding;
import org.apache.tuscany.sca.binding.ws.WebServiceBinding;
import org.apache.tuscany.sca.binding.ws.axis2.Axis2ServiceProvider;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.host.http.ServletHost;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* A specialization of the Axis2BindingProvider that just switches in the SCABinding model
@@ -58,14 +60,16 @@ public class Axis2SCAServiceProvider extends Axis2ServiceProvider {
WebServiceBinding wsBinding,
ServletHost servletHost,
MessageFactory messageFactory,
- List<PolicyHandlerTuple> policyHandlerClassnames) {
+ List<PolicyHandlerTuple> policyHandlerClassnames,
+ WorkScheduler workScheduler) {
super(component,
service,
wsBinding,
servletHost,
messageFactory,
- policyHandlerClassnames);
+ policyHandlerClassnames,
+ workScheduler);
this.binding = binding;
}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
index c7eb2b7794..e6ca8e20a8 100644
--- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2BindingProviderFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.tuscany.sca.binding.ws.WebServiceBinding;
import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.databinding.DataBindingExtensionPoint;
import org.apache.tuscany.sca.host.http.ServletHost;
import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint;
@@ -34,7 +35,7 @@ import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
-import org.osoa.sca.ServiceRuntimeException;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* Axis2BindingProviderFactory
@@ -48,6 +49,7 @@ public class Axis2BindingProviderFactory implements BindingProviderFactory<WebSe
private ServletHost servletHost;
private List<PolicyHandlerTuple> policyHandlerClassnames = null;
private DataBindingExtensionPoint dataBindings;
+ private WorkScheduler workScheduler;
public Axis2BindingProviderFactory(ExtensionPointRegistry extensionPoints) {
ServletHostExtensionPoint servletHosts = extensionPoints.getExtensionPoint(ServletHostExtensionPoint.class);
@@ -58,6 +60,8 @@ public class Axis2BindingProviderFactory implements BindingProviderFactory<WebSe
modelFactories = extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
policyHandlerClassnames = PolicyHandlerDefinitionsLoader.loadPolicyHandlerClassnames();
dataBindings = extensionPoints.getExtensionPoint(DataBindingExtensionPoint.class);
+ UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
+ workScheduler = utilities.getUtility(WorkScheduler.class);
}
public ReferenceBindingProvider createReferenceBindingProvider(RuntimeComponent component,
@@ -72,7 +76,7 @@ public class Axis2BindingProviderFactory implements BindingProviderFactory<WebSe
WebServiceBinding binding) {
return new Axis2ServiceBindingProvider(component, service, binding,
servletHost, modelFactories,
- policyHandlerClassnames, dataBindings);
+ policyHandlerClassnames, dataBindings, workScheduler);
}
public Class<WebServiceBinding> getModelType() {
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
index 5b26290732..8a4b64d266 100644
--- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceBindingProvider.java
@@ -31,6 +31,7 @@ import org.apache.tuscany.sca.policy.util.PolicyHandlerTuple;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
import org.osoa.sca.ServiceRuntimeException;
public class Axis2ServiceBindingProvider implements ServiceBindingProvider {
@@ -44,7 +45,8 @@ public class Axis2ServiceBindingProvider implements ServiceBindingProvider {
ServletHost servletHost,
ModelFactoryExtensionPoint modelFactories,
List<PolicyHandlerTuple> policyHandlerClassnames,
- DataBindingExtensionPoint dataBindings) {
+ DataBindingExtensionPoint dataBindings,
+ WorkScheduler workScheduler) {
if (servletHost == null) {
throw new ServiceRuntimeException("No Servlet host is avaible for HTTP web services");
@@ -62,7 +64,7 @@ public class Axis2ServiceBindingProvider implements ServiceBindingProvider {
InterfaceContract contract = wsBinding.getBindingInterfaceContract();
contract.getInterface().resetDataBinding(OMElement.class.getName());
- axisProvider = new Axis2ServiceProvider(component, service, wsBinding, servletHost, messageFactory, policyHandlerClassnames);
+ axisProvider = new Axis2ServiceProvider(component, service, wsBinding, servletHost, messageFactory, policyHandlerClassnames, workScheduler);
}
public void start() {
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
index a562604115..64c0c66a2e 100644
--- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/Axis2ServiceProvider.java
@@ -100,6 +100,7 @@ import org.apache.tuscany.sca.runtime.ReferenceParameters;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeWire;
+import org.apache.tuscany.sca.work.WorkScheduler;
import org.apache.tuscany.sca.xsd.XSDefinition;
import org.apache.ws.commons.schema.XmlSchema;
import org.apache.ws.commons.schema.XmlSchemaExternal;
@@ -134,6 +135,7 @@ public class Axis2ServiceProvider {
private BasicAuthenticationPolicy basicAuthenticationPolicy = null;
private Axis2TokenAuthenticationPolicy axis2TokenAuthenticationPolicy = null;
private List<Axis2HeaderPolicy> axis2HeaderPolicies = new ArrayList<Axis2HeaderPolicy>();
+ private WorkScheduler workScheduler;
public static final QName QNAME_WSA_ADDRESS =
new QName(AddressingConstants.Final.WSA_NAMESPACE, AddressingConstants.EPR_ADDRESS);
@@ -166,7 +168,8 @@ public class Axis2ServiceProvider {
WebServiceBinding wsBinding,
ServletHost servletHost,
MessageFactory messageFactory,
- List<PolicyHandlerTuple> policyHandlerClassnames) {
+ List<PolicyHandlerTuple> policyHandlerClassnames,
+ WorkScheduler workScheduler) {
this.component = component;
this.contract = contract;
@@ -174,6 +177,7 @@ public class Axis2ServiceProvider {
this.servletHost = servletHost;
this.messageFactory = messageFactory;
this.policyHandlerClassnames = policyHandlerClassnames;
+ this.workScheduler = workScheduler;
final boolean isRampartRequired = AxisPolicyHelper.isRampartRequired(wsBinding);
try {
@@ -324,7 +328,7 @@ public class Axis2ServiceProvider {
} else if (endpointURL.startsWith("jms")) {
logger.log(Level.INFO,"Axis2 JMS URL=" + endpointURL);
- jmsListener = new JMSListener();
+ jmsListener = new JMSListener(workScheduler);
jmsSender = new JMSSender();
ListenerManager listenerManager = configContext.getListenerManager();
TransportInDescription trsIn = configContext.getAxisConfiguration().getTransportIn(Constants.TRANSPORT_JMS);
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
index fc650e2099..a96ec0b1c4 100644
--- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
@@ -28,6 +28,7 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
@@ -42,6 +43,7 @@ import org.apache.axis2.transport.jms.JMSConstants;
import org.apache.axis2.transport.jms.JMSUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* Encapsulate a JMS Connection factory definition within an Axis2.xml
@@ -147,19 +149,23 @@ public class JMSConnectionFactory {
*/
private String pass = null;
+ private WorkScheduler workScheduler;
+ private boolean consumerRunning;
+
/**
* Create a JMSConnectionFactory for the given Axis2 name and JNDI name
*
* @param name the local Axis2 name of the connection factory
* @param jndiName the JNDI name of the actual connection factory used
*/
- JMSConnectionFactory(String name, String jndiName) {
+ JMSConnectionFactory(String name, String jndiName, WorkScheduler workScheduler) {
this.name = name;
this.jndiName = jndiName;
serviceJNDINameMapping = new HashMap();
serviceDestinationMapping = new HashMap();
properties = new Hashtable();
jmsSessions = new HashMap();
+ this.workScheduler = workScheduler;
}
/**
@@ -167,8 +173,8 @@ public class JMSConnectionFactory {
*
* @param name the local Axis2 name of the connection factory
*/
- JMSConnectionFactory(String name) {
- this(name, null);
+ JMSConnectionFactory(String name, WorkScheduler workScheduler) {
+ this(name, null, workScheduler);
}
/**
@@ -425,7 +431,9 @@ public class JMSConnectionFactory {
}
// start the connection
- connection.start();
+ if (!consumerRunning) {
+ connection.start();
+ }
log.info("Connection factory : " + name + " initialized...");
}
@@ -458,10 +466,50 @@ public class JMSConnectionFactory {
}
MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(this.msgRcvr);
+// consumer.setMessageListener(this.msgRcvr); replace with new Tuscany method:
+ registerMessageReceiver(consumer, this.msgRcvr);
jmsSessions.put(destinationJndi, session);
}
+ private void registerMessageReceiver(final MessageConsumer consumer, final JMSMessageReceiver messageReceiver) throws JMSException {
+
+ try {
+
+ consumer.setMessageListener(messageReceiver);
+
+ } catch (javax.jms.JMSException e) {
+
+ // setMessageListener not allowed in JEE container so use Tuscany threads
+
+ connection.start();
+ consumerRunning = true;
+
+ workScheduler.scheduleWork(new Runnable() {
+
+ public void run() {
+ try {
+ while (consumerRunning) {
+ final Message msg = consumer.receive();
+ if (msg != null) {
+ workScheduler.scheduleWork(new Runnable() {
+ public void run() {
+ try {
+ messageReceiver.onMessage(msg);
+ } catch (Exception e) {
+ log.error("Exception on message receiver thread", e);
+ }
+ }
+ });
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exception on consumer receive thread", e);
+ }
+ }
+ });
+ }
+ }
+
/**
* Stop listening on the given destination - for undeployment of services
*
@@ -488,6 +536,7 @@ public class JMSConnectionFactory {
*/
public void stop() {
try {
+ consumerRunning = false;
connection.close();
} catch (JMSException e) {
log.warn("Error shutting down connection factory : " + name, e);
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
index 5758a1f6f2..08190fb57c 100644
--- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
@@ -49,6 +49,7 @@ import org.apache.axis2.transport.jms.JMSConstants;
import org.apache.axis2.transport.jms.JMSUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
@@ -109,6 +110,12 @@ public class JMSListener implements TransportListener {
private ExecutorService workerPool;
+ private WorkScheduler workScheduler;
+
+ public JMSListener(WorkScheduler workScheduler) {
+ this.workScheduler = workScheduler;
+ }
+
/**
* This is the TransportListener initialization method invoked by Axis2
*
@@ -221,7 +228,7 @@ public class JMSListener implements TransportListener {
Parameter param = (Parameter) conFacIter.next();
JMSConnectionFactory jmsConFactory =
- new JMSConnectionFactory(param.getName());
+ new JMSConnectionFactory(param.getName(), workScheduler);
ParameterIncludeImpl pi = new ParameterIncludeImpl();
try {