summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java59
1 files changed, 54 insertions, 5 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
index fc650e2099..a96ec0b1c4 100644
--- a/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
@@ -28,6 +28,7 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
@@ -42,6 +43,7 @@ import org.apache.axis2.transport.jms.JMSConstants;
import org.apache.axis2.transport.jms.JMSUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
/**
* Encapsulate a JMS Connection factory definition within an Axis2.xml
@@ -147,19 +149,23 @@ public class JMSConnectionFactory {
*/
private String pass = null;
+ private WorkScheduler workScheduler;
+ private boolean consumerRunning;
+
/**
* Create a JMSConnectionFactory for the given Axis2 name and JNDI name
*
* @param name the local Axis2 name of the connection factory
* @param jndiName the JNDI name of the actual connection factory used
*/
- JMSConnectionFactory(String name, String jndiName) {
+ JMSConnectionFactory(String name, String jndiName, WorkScheduler workScheduler) {
this.name = name;
this.jndiName = jndiName;
serviceJNDINameMapping = new HashMap();
serviceDestinationMapping = new HashMap();
properties = new Hashtable();
jmsSessions = new HashMap();
+ this.workScheduler = workScheduler;
}
/**
@@ -167,8 +173,8 @@ public class JMSConnectionFactory {
*
* @param name the local Axis2 name of the connection factory
*/
- JMSConnectionFactory(String name) {
- this(name, null);
+ JMSConnectionFactory(String name, WorkScheduler workScheduler) {
+ this(name, null, workScheduler);
}
/**
@@ -425,7 +431,9 @@ public class JMSConnectionFactory {
}
// start the connection
- connection.start();
+ if (!consumerRunning) {
+ connection.start();
+ }
log.info("Connection factory : " + name + " initialized...");
}
@@ -458,10 +466,50 @@ public class JMSConnectionFactory {
}
MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(this.msgRcvr);
+// consumer.setMessageListener(this.msgRcvr); replace with new Tuscany method:
+ registerMessageReceiver(consumer, this.msgRcvr);
jmsSessions.put(destinationJndi, session);
}
+ private void registerMessageReceiver(final MessageConsumer consumer, final JMSMessageReceiver messageReceiver) throws JMSException {
+
+ try {
+
+ consumer.setMessageListener(messageReceiver);
+
+ } catch (javax.jms.JMSException e) {
+
+ // setMessageListener not allowed in JEE container so use Tuscany threads
+
+ connection.start();
+ consumerRunning = true;
+
+ workScheduler.scheduleWork(new Runnable() {
+
+ public void run() {
+ try {
+ while (consumerRunning) {
+ final Message msg = consumer.receive();
+ if (msg != null) {
+ workScheduler.scheduleWork(new Runnable() {
+ public void run() {
+ try {
+ messageReceiver.onMessage(msg);
+ } catch (Exception e) {
+ log.error("Exception on message receiver thread", e);
+ }
+ }
+ });
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exception on consumer receive thread", e);
+ }
+ }
+ });
+ }
+ }
+
/**
* Stop listening on the given destination - for undeployment of services
*
@@ -488,6 +536,7 @@ public class JMSConnectionFactory {
*/
public void stop() {
try {
+ consumerRunning = false;
connection.close();
} catch (JMSException e) {
log.warn("Error shutting down connection factory : " + name, e);