summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java31
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java72
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java75
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java56
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java393
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java122
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java273
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java111
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java28
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java294
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java237
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java332
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java306
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java499
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java1115
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java1217
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java49
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java43
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java74
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java64
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java37
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java39
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java39
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java23
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html356
25 files changed, 5885 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java
new file mode 100644
index 0000000000..ec53a2a1ca
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java
@@ -0,0 +1,31 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+public class AxisJMSException extends RuntimeException {
+
+ AxisJMSException() {
+ super();
+ }
+
+ AxisJMSException(String msg) {
+ super(msg);
+ }
+
+ AxisJMSException(String msg, Exception e) {
+ super(msg, e);
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java
new file mode 100644
index 0000000000..5228efa154
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java
@@ -0,0 +1,72 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+/**
+ * Data source implementation wrapping a JMS {@link BytesMessage}.
+ * <p>
+ * Note that two input streams created by the same instance of this
+ * class can not be used at the same time.
+ */
+public class BytesMessageDataSource implements SizeAwareDataSource {
+ private final BytesMessage message;
+ private final String contentType;
+
+ public BytesMessageDataSource(BytesMessage message, String contentType) {
+ this.message = message;
+ this.contentType = contentType;
+ }
+
+ public BytesMessageDataSource(BytesMessage message) {
+ this(message, "application/octet-stream");
+ }
+
+ public long getSize() {
+ try {
+ return message.getBodyLength();
+ } catch (JMSException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public InputStream getInputStream() throws IOException {
+ try {
+ message.reset();
+ } catch (JMSException ex) {
+ throw new JMSExceptionWrapper(ex);
+ }
+ return new BytesMessageInputStream(message);
+ }
+
+ public String getName() {
+ return null;
+ }
+
+ public OutputStream getOutputStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java
new file mode 100644
index 0000000000..9080641572
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java
@@ -0,0 +1,75 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.io.InputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+
+/**
+ * Input stream that reads data from a JMS {@link BytesMessage}.
+ * Note that since the current position in the message is managed by
+ * the underlying {@link BytesMessage} object, it is not possible to
+ * use several instances of this class operating on a single
+ * {@link BytesMessage} at the same time.
+ */
+public class BytesMessageInputStream extends InputStream {
+ private final BytesMessage message;
+
+ public BytesMessageInputStream(BytesMessage message) {
+ this.message = message;
+ }
+
+ @Override
+ public int read() throws JMSExceptionWrapper {
+ try {
+ return message.readByte() & 0xFF;
+ } catch (MessageEOFException ex) {
+ return -1;
+ } catch (JMSException ex) {
+ throw new JMSExceptionWrapper(ex);
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws JMSExceptionWrapper {
+ if (off == 0) {
+ try {
+ return message.readBytes(b, len);
+ } catch (JMSException ex) {
+ throw new JMSExceptionWrapper(ex);
+ }
+ } else {
+ byte[] b2 = new byte[len];
+ int c = read(b2);
+ if (c > 0) {
+ System.arraycopy(b2, 0, b, off, c);
+ }
+ return c;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws JMSExceptionWrapper {
+ try {
+ return message.readBytes(b);
+ } catch (JMSException ex) {
+ throw new JMSExceptionWrapper(ex);
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java
new file mode 100644
index 0000000000..4508d68280
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java
@@ -0,0 +1,56 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.io.OutputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+public class BytesMessageOutputStream extends OutputStream {
+ private final BytesMessage message;
+
+ public BytesMessageOutputStream(BytesMessage message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(int b) throws JMSExceptionWrapper {
+ try {
+ message.writeByte((byte)b);
+ } catch (JMSException ex) {
+ throw new JMSExceptionWrapper(ex);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws JMSExceptionWrapper {
+ try {
+ message.writeBytes(b, off, len);
+ } catch (JMSException ex) {
+ new JMSExceptionWrapper(ex);
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws JMSExceptionWrapper {
+ try {
+ message.writeBytes(b);
+ } catch (JMSException ex) {
+ throw new JMSExceptionWrapper(ex);
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
new file mode 100644
index 0000000000..d5d164ce76
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
@@ -0,0 +1,393 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.util.Hashtable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterIncludeImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Encapsulate a JMS Connection factory definition within an Axis2.xml
+ *
+ * JMS Connection Factory definitions, allows JNDI properties as well as other service
+ * level parameters to be defined, and re-used by each service that binds to it
+ *
+ * When used for sending messages out, the JMSConnectionFactory'ies are able to cache
+ * a Connection, Session or Producer
+ */
+public class JMSConnectionFactory {
+
+ private static final Log log = LogFactory.getLog(JMSConnectionFactory.class);
+
+ /** The name used for the connection factory definition within Axis2 */
+ private String name = null;
+ /** The list of parameters from the axis2.xml definition */
+ private Hashtable<String, String> parameters = new Hashtable<String, String>();
+
+ /** The cached InitialContext reference */
+ private Context context = null;
+ /** The JMS ConnectionFactory this definition refers to */
+ private ConnectionFactory conFactory = null;
+ /** The shared JMS Connection for this JMS connection factory */
+ private Connection sharedConnection = null;
+ /** The shared JMS Session for this JMS connection factory */
+ private Session sharedSession = null;
+ /** The shared JMS MessageProducer for this JMS connection factory */
+ private MessageProducer sharedProducer = null;
+ /** The Shared Destination */
+ private Destination sharedDestination = null;
+ /** The shared JMS connection for this JMS connection factory */
+ private int cacheLevel = JMSConstants.CACHE_CONNECTION;
+
+ /**
+ * Digest a JMS CF definition from an axis2.xml 'Parameter' and construct
+ * @param parameter the axis2.xml 'Parameter' that defined the JMS CF
+ */
+ public JMSConnectionFactory(Parameter parameter) {
+
+ this.name = parameter.getName();
+ ParameterIncludeImpl pi = new ParameterIncludeImpl();
+
+ try {
+ pi.deserializeParameters((OMElement) parameter.getValue());
+ } catch (AxisFault axisFault) {
+ handleException("Error reading parameters for JMS connection factory" + name, axisFault);
+ }
+
+ for (Object o : pi.getParameters()) {
+ Parameter p = (Parameter) o;
+ parameters.put(p.getName(), (String) p.getValue());
+ }
+
+ digestCacheLevel();
+ try {
+ context = new InitialContext(parameters);
+ conFactory = JMSUtils.lookup(context, ConnectionFactory.class,
+ parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME));
+ if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) {
+ sharedDestination = JMSUtils.lookup(context, Destination.class,
+ parameters.get(JMSConstants.PARAM_DESTINATION));
+ }
+ log.info("JMS ConnectionFactory : " + name + " initialized");
+
+ } catch (NamingException e) {
+ throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " +
+ parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " +
+ parameters.get(JMSConstants.PARAM_DESTINATION) +
+ " for JMS CF : " + name + " using : " + parameters);
+ }
+ }
+
+ /**
+ * Digest, the cache value iff specified
+ */
+ private void digestCacheLevel() {
+
+ String key = JMSConstants.PARAM_CACHE_LEVEL;
+ String val = parameters.get(key);
+
+ if ("none".equalsIgnoreCase(val)) {
+ this.cacheLevel = JMSConstants.CACHE_NONE;
+ } else if ("connection".equalsIgnoreCase(val)) {
+ this.cacheLevel = JMSConstants.CACHE_CONNECTION;
+ } else if ("session".equals(val)){
+ this.cacheLevel = JMSConstants.CACHE_SESSION;
+ } else if ("producer".equals(val)) {
+ this.cacheLevel = JMSConstants.CACHE_PRODUCER;
+ } else if (val != null) {
+ throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name);
+ }
+ }
+
+ /**
+ * Return the name assigned to this JMS CF definition
+ * @return name of the JMS CF
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * The list of properties (including JNDI and non-JNDI)
+ * @return properties defined on the JMS CF
+ */
+ public Hashtable<String, String> getParameters() {
+ return parameters;
+ }
+
+ /**
+ * Get cached InitialContext
+ * @return cache InitialContext
+ */
+ public Context getContext() {
+ return context;
+ }
+
+ /**
+ * Cache level applicable for this JMS CF
+ * @return applicable cache level
+ */
+ public int getCacheLevel() {
+ return cacheLevel;
+ }
+
+ /**
+ * Get the shared Destination - if defined
+ * @return
+ */
+ public Destination getSharedDestination() {
+ return sharedDestination;
+ }
+
+ /**
+ * Lookup a Destination using this JMS CF definitions and JNDI name
+ * @param name JNDI name of the Destionation
+ * @return JMS Destination for the given JNDI name or null
+ */
+ public Destination getDestination(String name) {
+ try {
+ return JMSUtils.lookup(context, Destination.class, name);
+ } catch (NamingException e) {
+ handleException("Unknown JMS Destination : " + name + " using : " + parameters, e);
+ }
+ return null;
+ }
+
+ /**
+ * Get the reply Destination from the PARAM_REPLY_DESTINATION parameter
+ * @return reply destination defined in the JMS CF
+ */
+ public String getReplyToDestination() {
+ return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION);
+ }
+
+ private void handleException(String msg, Exception e) {
+ log.error(msg, e);
+ throw new AxisJMSException(msg, e);
+ }
+
+ /**
+ * Should the JMS 1.1 API be used? - defaults to yes
+ * @return true, if JMS 1.1 api should be used
+ */
+ public boolean isJmsSpec11() {
+ return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null ||
+ "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER));
+ }
+
+ /**
+ * Return the type of the JMS CF Destination
+ * @return TRUE if a Queue, FALSE for a Topic and NULL for a JMS 1.1 Generic Destination
+ */
+ public Boolean isQueue() {
+ if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null &&
+ parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) {
+ return null;
+ }
+
+ if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) {
+ if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) {
+ return true;
+ } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) {
+ return false;
+ } else {
+ throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " +
+ parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name);
+ }
+ } else {
+ if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) {
+ return true;
+ } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) {
+ return false;
+ } else {
+ throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " +
+ parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name);
+ }
+ }
+ }
+
+ /**
+ * Is a session transaction requested from users of this JMS CF?
+ * @return session transaction required by the clients of this?
+ */
+ private boolean isSessionTransacted() {
+ return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null ||
+ Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED));
+ }
+
+ /**
+ * Create a new Connection
+ * @return a new Connection
+ */
+ private Connection createConnection() {
+
+ Connection connection = null;
+ try {
+ connection = JMSUtils.createConnection(
+ conFactory,
+ parameters.get(JMSConstants.PARAM_JMS_USERNAME),
+ parameters.get(JMSConstants.PARAM_JMS_PASSWORD),
+ isJmsSpec11(), isQueue());
+
+ if (log.isDebugEnabled()) {
+ log.debug("New JMS Connection from JMS CF : " + name + " created");
+ }
+
+ } catch (JMSException e) {
+ handleException("Error acquiring a Connection from the JMS CF : " + name +
+ " using properties : " + parameters, e);
+ }
+ return connection;
+ }
+
+ /**
+ * Create a new Session
+ * @param connection Connection to use
+ * @return A new Session
+ */
+ private Session createSession(Connection connection) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS Session from JMS CF : " + name);
+ }
+ return JMSUtils.createSession(
+ connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue());
+
+ } catch (JMSException e) {
+ handleException("Error creating JMS session from JMS CF : " + name, e);
+ }
+ return null;
+ }
+
+ /**
+ * Create a new MessageProducer
+ * @param session Session to be used
+ * @param destination Destination to be used
+ * @return a new MessageProducer
+ */
+ private MessageProducer createProducer(Session session, Destination destination) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS MessageProducer from JMS CF : " + name);
+ }
+
+ return JMSUtils.createProducer(
+ session, destination, isQueue(), isJmsSpec11());
+
+ } catch (JMSException e) {
+ handleException("Error creating JMS producer from JMS CF : " + name,e);
+ }
+ return null;
+ }
+
+ /**
+ * Get a new Connection or shared Connection from this JMS CF
+ * @return new or shared Connection from this JMS CF
+ */
+ public Connection getConnection() {
+ if (cacheLevel > JMSConstants.CACHE_NONE) {
+ return getSharedConnection();
+ } else {
+ return createConnection();
+ }
+ }
+
+ /**
+ * Get a new Session or shared Session from this JMS CF
+ * @param connection the Connection to be used
+ * @return new or shared Session from this JMS CF
+ */
+ public Session getSession(Connection connection) {
+ if (cacheLevel > JMSConstants.CACHE_CONNECTION) {
+ return getSharedSession();
+ } else {
+ return createSession((connection == null ? getConnection() : connection));
+ }
+ }
+
+ /**
+ * Get a new MessageProducer or shared MessageProducer from this JMS CF
+ * @param connection the Connection to be used
+ * @param session the Session to be used
+ * @param destination the Destination to bind MessageProducer to
+ * @return new or shared MessageProducer from this JMS CF
+ */
+ public MessageProducer getMessageProducer(
+ Connection connection, Session session, Destination destination) {
+ if (cacheLevel > JMSConstants.CACHE_SESSION) {
+ return getSharedProducer();
+ } else {
+ return createProducer((session == null ? getSession(connection) : session), destination);
+ }
+ }
+
+ /**
+ * Get a new Connection or shared Connection from this JMS CF
+ * @return new or shared Connection from this JMS CF
+ */
+ private Connection getSharedConnection() {
+ if (sharedConnection == null) {
+ sharedConnection = createConnection();
+ if (log.isDebugEnabled()) {
+ log.debug("Created shared JMS Connection for JMS CF : " + name);
+ }
+ }
+ return sharedConnection;
+ }
+
+ /**
+ * Get a shared Session from this JMS CF
+ * @return shared Session from this JMS CF
+ */
+ private Session getSharedSession() {
+ if (sharedSession == null) {
+ sharedSession = createSession(getSharedConnection());
+ if (log.isDebugEnabled()) {
+ log.debug("Created shared JMS Session for JMS CF : " + name);
+ }
+ }
+ return sharedSession;
+ }
+
+ /**
+ * Get a shared MessageProducer from this JMS CF
+ * @return shared MessageProducer from this JMS CF
+ */
+ private MessageProducer getSharedProducer() {
+ if (sharedProducer == null) {
+ sharedProducer = createProducer(getSharedSession(), sharedDestination);
+ if (log.isDebugEnabled()) {
+ log.debug("Created shared JMS MessageConsumer for JMS CF : " + name);
+ }
+ }
+ return sharedProducer;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java
new file mode 100644
index 0000000000..fb16500efc
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java
@@ -0,0 +1,122 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.naming.Context;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterInclude;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Class managing a set of {@link JMSConnectionFactory} objects.
+ */
+public class JMSConnectionFactoryManager {
+
+ private static final Log log = LogFactory.getLog(JMSConnectionFactoryManager.class);
+
+ /** A Map containing the JMS connection factories managed by this, keyed by name */
+ private final Map<String,JMSConnectionFactory> connectionFactories =
+ new HashMap<String,JMSConnectionFactory>();
+
+ /**
+ * Construct a Connection factory manager for the JMS transport sender or receiver
+ * @param trpInDesc
+ */
+ public JMSConnectionFactoryManager(ParameterInclude trpInDesc) {
+ loadConnectionFactoryDefinitions(trpInDesc);
+ }
+
+ /**
+ * Create JMSConnectionFactory instances for the definitions in the transport configuration,
+ * and add these into our collection of connectionFactories map keyed by name
+ *
+ * @param trpDesc the transport description for JMS
+ */
+ private void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) {
+
+ for (Object o : trpDesc.getParameters()) {
+ Parameter p = (Parameter)o;
+ try {
+ JMSConnectionFactory jmsConFactory = new JMSConnectionFactory(p);
+ connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
+ } catch (AxisJMSException e) {
+ log.error("Error setting up connection factory : " + p.getName(), e);
+ }
+ }
+ }
+
+ /**
+ * Get the JMS connection factory with the given name.
+ *
+ * @param name the name of the JMS connection factory
+ * @return the JMS connection factory or null if no connection factory with
+ * the given name exists
+ */
+ public JMSConnectionFactory getJMSConnectionFactory(String name) {
+ return connectionFactories.get(name);
+ }
+
+ /**
+ * Get the JMS connection factory that matches the given properties, i.e. referring to
+ * the same underlying connection factory. Used by the JMSSender to determine if already
+ * available resources should be used for outgoing messages
+ *
+ * @param props a Map of connection factory JNDI properties and name
+ * @return the JMS connection factory or null if no connection factory compatible
+ * with the given properties exists
+ */
+ public JMSConnectionFactory getJMSConnectionFactory(Map<String,String> props) {
+ for (JMSConnectionFactory cf : connectionFactories.values()) {
+ Map<String,String> cfProperties = cf.getParameters();
+
+ if (equals(props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME),
+ cfProperties.get(JMSConstants.PARAM_CONFAC_JNDI_NAME))
+ &&
+ equals(props.get(Context.INITIAL_CONTEXT_FACTORY),
+ cfProperties.get(Context.INITIAL_CONTEXT_FACTORY))
+ &&
+ equals(props.get(Context.PROVIDER_URL),
+ cfProperties.get(Context.PROVIDER_URL))
+ &&
+ equals(props.get(Context.SECURITY_PRINCIPAL),
+ cfProperties.get(Context.SECURITY_PRINCIPAL))
+ &&
+ equals(props.get(Context.SECURITY_CREDENTIALS),
+ cfProperties.get(Context.SECURITY_CREDENTIALS))) {
+ return cf;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Compare two values preventing NPEs
+ */
+ private static boolean equals(Object s1, Object s2) {
+ return s1 == s2 || s1 != null && s1.equals(s2);
+ }
+
+ protected void handleException(String msg, Exception e) throws AxisFault {
+ log.error(msg, e);
+ throw new AxisFault(msg, e);
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java
new file mode 100644
index 0000000000..6a11201625
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java
@@ -0,0 +1,273 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import org.apache.axis2.client.Options;
+
+public class JMSConstants {
+
+ /**
+ * The prefix indicating an Axis JMS URL
+ */
+ public static final String JMS_PREFIX = "jms:/";
+
+ //------------------------------------ defaults / constants ------------------------------------
+ /**
+ * The local (Axis2) JMS connection factory name of the default connection
+ * factory to be used, if a service does not explicitly state the connection
+ * factory it should be using by a Parameter named JMSConstants.CONFAC_PARAM
+ */
+ public static final String DEFAULT_CONFAC_NAME = "default";
+ /**
+ * The default JMS time out waiting for a reply - also see {@link JMS_WAIT_REPLY}
+ */
+ public static final long DEFAULT_JMS_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS;
+ /**
+ * Value indicating a Queue used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
+ */
+ public static final String DESTINATION_TYPE_QUEUE = "queue";
+ /**
+ * Value indicating a Topic used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
+ */
+ public static final String DESTINATION_TYPE_TOPIC = "topic";
+ /**
+ * Value indicating a JMS 1.1 Generic Destination used by {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
+ */
+ public static final String DESTINATION_TYPE_GENERIC = "generic";
+
+ /** Do not cache any JMS resources between tasks (when sending) or JMS CF's (when sending) */
+ public static final int CACHE_NONE = 0;
+ /** Cache only the JMS connection between tasks (when receiving), or JMS CF's (when sending)*/
+ public static final int CACHE_CONNECTION = 1;
+ /** Cache only the JMS connection and Session between tasks (receiving), or JMS CF's (sending) */
+ public static final int CACHE_SESSION = 2;
+ /** Cache the JMS connection, Session and Consumer between tasks when receiving*/
+ public static final int CACHE_CONSUMER = 3;
+ /** Cache the JMS connection, Session and Producer within a JMSConnectionFactory when sending */
+ public static final int CACHE_PRODUCER = 4;
+ /** automatic choice of an appropriate caching level (depending on the transaction strategy) */
+ public static final int CACHE_AUTO = 5;
+
+ /** A JMS 1.1 Generic Destination type or ConnectionFactory */
+ public static final int GENERIC = 0;
+ /** A Queue Destination type or ConnectionFactory */
+ public static final int QUEUE = 1;
+ /** A Topic Destination type or ConnectionFactory */
+ public static final int TOPIC = 2;
+
+ /**
+ * The EPR parameter name indicating the name of the message level property that indicated the content type.
+ */
+ public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty";
+
+ //---------------------------------- services.xml parameters -----------------------------------
+ /**
+ * The Service level Parameter name indicating the JMS destination for requests of a service
+ */
+ public static final String PARAM_DESTINATION = "transport.jms.Destination";
+ /**
+ * The Service level Parameter name indicating the destination type for requests.
+ * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC}
+ */
+ public static final String PARAM_DEST_TYPE = "transport.jms.DestinationType";
+ /**
+ * The Service level Parameter name indicating the [default] response destination of a service
+ */
+ public static final String PARAM_REPLY_DESTINATION = "transport.jms.ReplyDestination";
+ /**
+ * The Service level Parameter name indicating the response destination type
+ * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC}
+ */
+ public static final String PARAM_REPLY_DEST_TYPE = "transport.jms.ReplyDestinationType";
+ /**
+ * The Parameter name of an Axis2 service, indicating the JMS connection
+ * factory which should be used to listen for messages for it. This is
+ * the local (Axis2) name of the connection factory and not the JNDI name
+ */
+ public static final String PARAM_JMS_CONFAC = "transport.jms.ConnectionFactory";
+ /**
+ * Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC
+ */
+ public static final String PARAM_CONFAC_TYPE = "transport.jms.ConnectionFactoryType";
+ /**
+ * The Parameter name indicating the JMS connection factory JNDI name
+ */
+ public static final String PARAM_CONFAC_JNDI_NAME = "transport.jms.ConnectionFactoryJNDIName";
+ /**
+ * The Parameter indicating the expected content type for messages received by the service.
+ */
+ public static final String CONTENT_TYPE_PARAM = "transport.jms.ContentType";
+ /**
+ * The Parameter indicating a final EPR as a String, to be published on the WSDL of a service
+ * Could occur more than once, and could provide additional connection properties or a subset
+ * of the properties auto computed. Also could replace IP addresses with hostnames, and expose
+ * public credentials clients. If a user specified this parameter, the auto generated EPR will
+ * not be exposed - unless an instance of this parameter is added with the string "legacy"
+ * This parameter could be used to expose EPR's conforming to the proposed SOAP/JMS spec
+ * until such time full support is implemented for it.
+ */
+ public static final String PARAM_PUBLISH_EPR = "transport.jms.PublishEPR";
+ /** The parameter indicating the JMS API specification to be used - if this is "1.1" the JMS
+ * 1.1 API would be used, else the JMS 1.0.2B
+ */
+ public static final String PARAM_JMS_SPEC_VER = "transport.jms.JMSSpecVersion";
+
+ /**
+ * The Parameter indicating whether the JMS Session should be transacted for the service
+ * Specified as a "true" or "false"
+ */
+ public static final String PARAM_SESSION_TRANSACTED = "transport.jms.SessionTransacted";
+ /**
+ * The Parameter indicating the Session acknowledgement for the service. Must be one of the
+ * following Strings, or the appropriate Integer used by the JMS API
+ * "AUTO_ACKNOWLEDGE", "CLIENT_ACKNOWLEDGE", "DUPS_OK_ACKNOWLEDGE" or "SESSION_TRANSACTED"
+ */
+ public static final String PARAM_SESSION_ACK = "transport.jms.SessionAcknowledgement";
+ /** A message selector to be used when messages are sought for this service */
+ public static final String PARAM_MSG_SELECTOR = "transport.jms.MessageSelector";
+ /** Is the Subscription durable ? - "true" or "false" See {@link PARAM_DURABLE_SUB_NAME} */
+ public static final String PARAM_SUB_DURABLE = "transport.jms.SubscriptionDurable";
+ /** The name for the durable subscription See {@link PARAM_SUB_DURABLE}*/
+ public static final String PARAM_DURABLE_SUB_NAME = "transport.jms.DurableSubscriberName";
+ /**
+ * JMS Resource cachable level to be used for the service One of the following:
+ * {@link CACHE_NONE}, {@link CACHE_CONNECTION}, {@link CACHE_SESSION}, {@link CACHE_PRODUCER},
+ * {@link CACHE_CONSUMER}, or {@link CACHE_AUTO} - to let the transport decide
+ */
+ public static final String PARAM_CACHE_LEVEL = "transport.jms.CacheLevel";
+ /** Should a pub-sub connection receive messages published by itself? */
+ public static final String PARAM_PUBSUB_NO_LOCAL = "transport.jms.PubSubNoLocal";
+ /**
+ * The number of milliseconds to wait for a message on a consumer.receive() call
+ * negative number - wait forever
+ * 0 - do not wait at all
+ * positive number - indicates the number of milliseconds to wait
+ */
+ public static final String PARAM_RCV_TIMEOUT = "transport.jms.ReceiveTimeout";
+ /**
+ *The number of concurrent consumers to be created to poll for messages for this service
+ * For Topics, this should be ONE, to prevent receipt of multiple copies of the same message
+ */
+ public static final String PARAM_CONCURRENT_CONSUMERS = "transport.jms.ConcurrentConsumers";
+ /**
+ * The maximum number of concurrent consumers for the service - See {@link PARAM_CONCURRENT_CONSUMERS}
+ */
+ public static final String PARAM_MAX_CONSUMERS = "transport.jms.MaxConcurrentConsumers";
+ /**
+ * The number of idle (i.e. message-less) polling attempts before a worker task commits suicide,
+ * to scale down resources, as load decreases
+ */
+ public static final String PARAM_IDLE_TASK_LIMIT = "transport.jms.IdleTaskLimit";
+ /**
+ * The maximum number of messages a polling worker task should process, before suicide - to
+ * prevent many longer running threads - default is unlimited (i.e. a worker task will live forever)
+ */
+ public static final String PARAM_MAX_MSGS_PER_TASK = "transport.jms.MaxMessagesPerTask";
+ /**
+ * Number of milliseconds before the first reconnection attempt is tried, on detection of an
+ * error. Subsequent retries follow a geometric series, where the
+ * duration = previous duration * factor
+ * This is further limited by the {@link PARAM_RECON_MAX_DURATION} to be meaningful
+ */
+ public static final String PARAM_RECON_INIT_DURATION = "transport.jms.InitialReconnectDuration";
+ /** @see PARAM_RECON_INIT_DURATION */
+ public static final String PARAM_RECON_FACTOR = "transport.jms.ReconnectProgressFactor";
+ /** @see PARAM_RECON_INIT_DURATION */
+ public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration";
+
+ /** The username to use when obtaining a JMS Connection */
+ public static final String PARAM_JMS_USERNAME = "transport.jms.UserName";
+ /** The password to use when obtaining a JMS Connection */
+ public static final String PARAM_JMS_PASSWORD = "transport.jms.Password";
+
+ //-------------- message context / transport header properties and client options --------------
+ /**
+ * A MessageContext property or client Option indicating the JMS message type
+ */
+ public static final String JMS_MESSAGE_TYPE = "JMS_MESSAGE_TYPE";
+ /**
+ * The message type indicating a BytesMessage. See {@link JMS_MESSAGE_TYPE}
+ */
+ public static final String JMS_BYTE_MESSAGE = "JMS_BYTE_MESSAGE";
+ /**
+ * The message type indicating a TextMessage. See {@link JMS_MESSAGE_TYPE}
+ */
+ public static final String JMS_TEXT_MESSAGE = "JMS_TEXT_MESSAGE";
+ /**
+ * A MessageContext property or client Option indicating the time to wait for a response JMS message
+ */
+ public static final String JMS_WAIT_REPLY = "JMS_WAIT_REPLY";
+ /**
+ * A MessageContext property or client Option indicating the JMS correlation id
+ */
+ public static final String JMS_COORELATION_ID = "JMS_COORELATION_ID";
+ /**
+ * A MessageContext property or client Option indicating the JMS message id
+ */
+ public static final String JMS_MESSAGE_ID = "JMS_MESSAGE_ID";
+ /**
+ * A MessageContext property or client Option indicating the JMS delivery mode as an Integer or String
+ * Value 1 - javax.jms.DeliveryMode.NON_PERSISTENT
+ * Value 2 - javax.jms.DeliveryMode.PERSISTENT
+ */
+ public static final String JMS_DELIVERY_MODE = "JMS_DELIVERY_MODE";
+ /**
+ * A MessageContext property or client Option indicating the JMS destination to use on a Send
+ */
+ public static final String JMS_DESTINATION = "JMS_DESTINATION";
+ /**
+ * A MessageContext property or client Option indicating the JMS message expiration - a Long value
+ * specified as a String
+ */
+ public static final String JMS_EXPIRATION = "JMS_EXPIRATION";
+ /**
+ * A MessageContext property indicating if the message is a redelivery (Boolean as a String)
+ */
+ public static final String JMS_REDELIVERED = "JMS_REDELIVERED";
+ /**
+ * A MessageContext property or client Option indicating the JMS replyTo Destination
+ */
+ public static final String JMS_REPLY_TO = "JMS_REPLY_TO";
+ /**
+ * A MessageContext property or client Option indicating the JMS replyTo Destination type
+ * See {@link DESTINATION_TYPE_QUEUE} and {@link DESTINATION_TYPE_TOPIC}
+ */
+ public static final String JMS_REPLY_TO_TYPE = "JMS_REPLY_TO_TYPE";
+ /**
+ * A MessageContext property or client Option indicating the JMS timestamp (Long specified as String)
+ */
+ public static final String JMS_TIMESTAMP = "JMS_TIMESTAMP";
+ /**
+ * A MessageContext property indicating the JMS type String returned by {@link javax.jms.Message.getJMSType()}
+ */
+ public static final String JMS_TYPE = "JMS_TYPE";
+ /**
+ * A MessageContext property or client Option indicating the JMS priority
+ */
+ public static final String JMS_PRIORITY = "JMS_PRIORITY";
+ /**
+ * A MessageContext property or client Option indicating the JMS time to live for message sent
+ */
+ public static final String JMS_TIME_TO_LIVE = "JMS_TIME_TO_LIVE";
+
+ /** The prefix that denotes JMSX properties */
+ public static final String JMSX_PREFIX = "JMSX";
+ /** The JMSXGroupID property */
+ public static final String JMSX_GROUP_ID = "JMSXGroupID";
+ /** The JMSXGroupSeq property */
+ public static final String JMSX_GROUP_SEQ = "JMSXGroupSeq";
+
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java
new file mode 100644
index 0000000000..c465b1d989
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java
@@ -0,0 +1,111 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet;
+
+/**
+ * Class that links an Axis2 service to a JMS destination. Additionally, it contains
+ * all the required information to process incoming JMS messages and to inject them
+ * into Axis2.
+ */
+public class JMSEndpoint {
+ private JMSConnectionFactory cf;
+ private AxisService service;
+ private String jndiDestinationName;
+ private int destinationType = JMSConstants.GENERIC;
+ private Set<EndpointReference> endpointReferences = new HashSet<EndpointReference>();
+ private ContentTypeRuleSet contentTypeRuleSet;
+
+ public AxisService getService() {
+ return service;
+ }
+
+ public void setService(AxisService service) {
+ this.service = service;
+ }
+
+ public String getServiceName() {
+ return service.getName();
+ }
+
+ public String getJndiDestinationName() {
+ return jndiDestinationName;
+ }
+
+ public void setJndiDestinationName(String destinationJNDIName) {
+ this.jndiDestinationName = destinationJNDIName;
+ }
+
+ public void setDestinationType(String destinationType) {
+ if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) {
+ this.destinationType = JMSConstants.TOPIC;
+ } else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) {
+ this.destinationType = JMSConstants.QUEUE;
+ } else {
+ this.destinationType = JMSConstants.GENERIC;
+ }
+ }
+
+ public EndpointReference[] getEndpointReferences() {
+ return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]);
+ }
+
+ public void computeEPRs() {
+ List<EndpointReference> eprs = new ArrayList<EndpointReference>();
+ for (Object o : getService().getParameters()) {
+ Parameter p = (Parameter) o;
+ if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) {
+ if ("legacy".equalsIgnoreCase((String) p.getValue())) {
+ // if "legacy" specified, compute and replace it
+ endpointReferences.add(
+ new EndpointReference(JMSUtils.getEPR(cf, destinationType, this)));
+ } else {
+ endpointReferences.add(new EndpointReference((String) p.getValue()));
+ }
+ }
+ }
+
+ if (eprs.isEmpty()) {
+ // if nothing specified, compute and return legacy EPR
+ endpointReferences.add(new EndpointReference(JMSUtils.getEPR(cf, destinationType, this)));
+ }
+ }
+
+ public ContentTypeRuleSet getContentTypeRuleSet() {
+ return contentTypeRuleSet;
+ }
+
+ public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) {
+ this.contentTypeRuleSet = contentTypeRuleSet;
+ }
+
+ public JMSConnectionFactory getCf() {
+ return cf;
+ }
+
+ public void setCf(JMSConnectionFactory cf) {
+ this.cf = cf;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java
new file mode 100644
index 0000000000..ceeec4a6a3
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java
@@ -0,0 +1,28 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+public class JMSExceptionWrapper extends IOException {
+ private static final long serialVersionUID = 852441109009079511L;
+
+ public JMSExceptionWrapper(JMSException ex) {
+ initCause(ex);
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
new file mode 100644
index 0000000000..8c9f66dfbf
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
@@ -0,0 +1,294 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.TextMessage;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet;
+import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.MessageTypeRule;
+import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.PropertyRule;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportListener;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorListener;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSource;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSourceSupport;
+
+/**
+ * The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances
+ * for each service requesting exposure over JMS, and stops these if they are undeployed / stopped.
+ * <p>
+ * A service indicates a JMS Connection factory definition by name, which would be defined in the
+ * JMSListner on the axis2.xml, and this provides a way to reuse common configuration between
+ * services, as well as to optimize resources utilized
+ * <p>
+ * If the connection factory name was not specified, it will default to the one named "default"
+ * {@see JMSConstants.DEFAULT_CONFAC_NAME}
+ * <p>
+ * If a destination JNDI name is not specified, a service will expect to use a Queue with the same
+ * JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify
+ * many more detailed control options. See package documentation for more details
+ * <p>
+ * All Destinations / JMS Administered objects used MUST be pre-created or already available
+ */
+public class JMSListener extends AbstractTransportListener implements ManagementSupport,
+ TransportErrorSource {
+
+ public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
+
+ /** The JMSConnectionFactoryManager which centralizes the management of defined factories */
+ private JMSConnectionFactoryManager connFacManager;
+ /** A Map of service name to the JMS endpoints */
+ private Map<String,JMSEndpoint> serviceNameToEndpointMap = new HashMap<String,JMSEndpoint>();
+ /** A Map of service name to its ServiceTaskManager instances */
+ private Map<String, ServiceTaskManager> serviceNameToSTMMap =
+ new HashMap<String, ServiceTaskManager>();
+ private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this);
+
+ /**
+ * TransportListener initialization
+ *
+ * @param cfgCtx the Axis configuration context
+ * @param trpInDesc the TransportIn description
+ */
+ public void init(ConfigurationContext cfgCtx,
+ TransportInDescription trpInDesc) throws AxisFault {
+
+ super.init(cfgCtx, trpInDesc);
+ connFacManager = new JMSConnectionFactoryManager(trpInDesc);
+ log.info("JMS Transport Receiver/Listener initialized...");
+ }
+
+ /**
+ * Returns EPRs for the given service over the JMS transport
+ *
+ * @param serviceName service name
+ * @return the JMS EPRs for the service
+ */
+ public EndpointReference[] getEPRsForService(String serviceName) {
+ //Strip out the operation name
+ if (serviceName.indexOf('/') != -1) {
+ serviceName = serviceName.substring(0, serviceName.indexOf('/'));
+ }
+ // strip out the endpoint name if present
+ if (serviceName.indexOf('.') != -1) {
+ serviceName = serviceName.substring(0, serviceName.indexOf('.'));
+ }
+ JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName);
+ if (endpoint != null) {
+ return endpoint.getEndpointReferences();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Listen for JMS messages on behalf of the given service
+ *
+ * @param service the Axis service for which to listen for messages
+ */
+ protected void startListeningForService(AxisService service) throws AxisFault {
+ JMSConnectionFactory cf = getConnectionFactory(service);
+ if (cf == null) {
+ throw new AxisFault("The service doesn't specify a JMS connection factory or refers " +
+ "to an invalid factory.");
+ }
+
+ JMSEndpoint endpoint = new JMSEndpoint();
+ endpoint.setService(service);
+ endpoint.setCf(cf);
+
+ Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION);
+ if (destParam != null) {
+ endpoint.setJndiDestinationName((String)destParam.getValue());
+ } else {
+ // Assume that the JNDI destination name is the same as the service name
+ endpoint.setJndiDestinationName(service.getName());
+ }
+
+ Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE);
+ if (destTypeParam != null) {
+ String paramValue = (String) destTypeParam.getValue();
+ if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
+ JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
+ endpoint.setDestinationType(paramValue);
+ } else {
+ throw new AxisFault("Invalid destinaton type value " + paramValue);
+ }
+ } else {
+ log.debug("JMS destination type not given. default queue");
+ endpoint.setDestinationType(JMSConstants.DESTINATION_TYPE_QUEUE);
+ }
+
+ Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM);
+ if (contentTypeParam == null) {
+ ContentTypeRuleSet contentTypeRuleSet = new ContentTypeRuleSet();
+ contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE));
+ contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream"));
+ contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain"));
+ endpoint.setContentTypeRuleSet(contentTypeRuleSet);
+ } else {
+ endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam));
+ }
+
+ endpoint.computeEPRs(); // compute service EPR and keep for later use
+ serviceNameToEndpointMap.put(service.getName(), endpoint);
+
+ ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool);
+ stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint));
+ stm.start();
+ serviceNameToSTMMap.put(service.getName(), stm);
+
+ for (int i=0; i<3; i++) {
+ if (stm.getActiveTaskCount() > 0) {
+ log.info("Started to listen on destination : " + stm.getDestinationJNDIName() +
+ " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
+ " for service " + stm.getServiceName());
+ return;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {}
+ }
+
+ log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() +
+ " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
+ " for service " + stm.getServiceName() + " have not yet started after 3 seconds ..");
+ }
+
+ /**
+ * Stops listening for messages for the service thats undeployed or stopped
+ *
+ * @param service the service that was undeployed or stopped
+ */
+ protected void stopListeningForService(AxisService service) {
+
+ ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName());
+ if (stm != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() +
+ " for service : " + stm.getServiceName());
+ }
+
+ stm.stop();
+
+ serviceNameToSTMMap.remove(service.getName());
+ serviceNameToEndpointMap.remove(service.getName());
+ log.info("Stopped listening for JMS messages to service : " + service.getName());
+
+ } else {
+ log.error("Unable to stop service : " + service.getName() +
+ " - unable to find its ServiceTaskManager");
+ }
+ }
+ /**
+ * Return the connection factory name for this service. If this service
+ * refers to an invalid factory or defaults to a non-existent default
+ * factory, this returns null
+ *
+ * @param service the AxisService
+ * @return the JMSConnectionFactory to be used, or null if reference is invalid
+ */
+ public JMSConnectionFactory getConnectionFactory(AxisService service) {
+
+ Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC);
+ // validate connection factory name (specified or default)
+ if (conFacParam != null) {
+ return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue());
+ } else {
+ return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME);
+ }
+ }
+
+ // -- jmx/management methods--
+ /**
+ * Pause the listener - Stop accepting/processing new messages, but continues processing existing
+ * messages until they complete. This helps bring an instance into a maintenence mode
+ * @throws AxisFault on error
+ */
+ public void pause() throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ try {
+ for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
+ stm.pause();
+ }
+ state = BaseConstants.PAUSED;
+ log.info("Listener paused");
+ } catch (AxisJMSException e) {
+ log.error("At least one service could not be paused", e);
+ }
+ }
+
+ /**
+ * Resume the lister - Brings the lister into active mode back from a paused state
+ * @throws AxisFault on error
+ */
+ public void resume() throws AxisFault {
+ if (state != BaseConstants.PAUSED) return;
+ try {
+ for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
+ stm.resume();
+ }
+ state = BaseConstants.STARTED;
+ log.info("Listener resumed");
+ } catch (AxisJMSException e) {
+ log.error("At least one service could not be resumed", e);
+ }
+ }
+
+ /**
+ * Stop processing new messages, and wait the specified maximum time for in-flight
+ * requests to complete before a controlled shutdown for maintenence
+ *
+ * @param millis a number of milliseconds to wait until pending requests are allowed to complete
+ * @throws AxisFault on error
+ */
+ public void maintenenceShutdown(long millis) throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ try {
+ long start = System.currentTimeMillis();
+ stop();
+ state = BaseConstants.STOPPED;
+ log.info("Listener shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s");
+ } catch (Exception e) {
+ handleException("Error shutting down the listener for maintenence", e);
+ }
+ }
+
+ public void addErrorListener(TransportErrorListener listener) {
+ tess.addErrorListener(listener);
+ }
+
+ public void removeErrorListener(TransportErrorListener listener) {
+ tess.removeErrorListener(listener);
+ }
+
+ void error(AxisService service, Throwable ex) {
+ tess.error(service, ex);
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
new file mode 100644
index 0000000000..ebd67e53e1
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
@@ -0,0 +1,237 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.transaction.UserTransaction;
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeInfo;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector;
+
+/**
+ * This is the JMS message receiver which is invoked when a message is received. This processes
+ * the message through the engine
+ */
+public class JMSMessageReceiver {
+
+ private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
+
+ /** The JMSListener */
+ private JMSListener jmsListener = null;
+ /** A reference to the JMS Connection Factory */
+ private JMSConnectionFactory jmsConnectionFactory = null;
+ /** The JMS metrics collector */
+ private MetricsCollector metrics = null;
+ /** The endpoint this message receiver is bound to */
+ final JMSEndpoint endpoint;
+
+ /**
+ * Create a new JMSMessage receiver
+ *
+ * @param jmsListener the JMS transport Listener
+ * @param jmsConFac the JMS connection factory we are associated with
+ * @param workerPool the worker thread pool to be used
+ * @param cfgCtx the axis ConfigurationContext
+ * @param serviceName the name of the Axis service
+ * @param endpoint the JMSEndpoint definition to be used
+ */
+ JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
+ this.jmsListener = jmsListener;
+ this.jmsConnectionFactory = jmsConFac;
+ this.endpoint = endpoint;
+ this.metrics = jmsListener.getMetricsCollector();
+ }
+
+ /**
+ * Process a new message received
+ *
+ * @param message the JMS message received
+ * @param ut UserTransaction which was used to receive the message
+ * @return true if caller should commit
+ */
+ public boolean onMessage(Message message, UserTransaction ut) {
+
+ try {
+ if (log.isDebugEnabled()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
+ sb.append("\nDestination : ").append(message.getJMSDestination());
+ sb.append("\nMessage ID : ").append(message.getJMSMessageID());
+ sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
+ sb.append("\nReplyTo : ").append(message.getJMSReplyTo());
+ sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered());
+ sb.append("\nPriority : ").append(message.getJMSPriority());
+ sb.append("\nExpiration : ").append(message.getJMSExpiration());
+ sb.append("\nTimestamp : ").append(message.getJMSTimestamp());
+ sb.append("\nMessage Type : ").append(message.getJMSType());
+ sb.append("\nPersistent ? : ").append(
+ DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
+
+ log.debug(sb.toString());
+ if (log.isTraceEnabled() && message instanceof TextMessage) {
+ log.trace("\nMessage : " + ((TextMessage) message).getText());
+ }
+ }
+ } catch (JMSException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Error reading JMS message headers for debug logging", e);
+ }
+ }
+
+ // update transport level metrics
+ try {
+ metrics.incrementBytesReceived(JMSUtils.getMessageSize(message));
+ } catch (JMSException e) {
+ log.warn("Error reading JMS message size to update transport metrics", e);
+ }
+
+ // has this message already expired? expiration time == 0 means never expires
+ try {
+ long expiryTime = message.getJMSExpiration();
+ if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
+ if (log.isDebugEnabled()) {
+ log.debug("Discard expired message with ID : " + message.getJMSMessageID());
+ }
+ return true;
+ }
+ } catch (JMSException ignore) {}
+
+
+ boolean successful = false;
+ try {
+ successful = processThoughEngine(message, ut);
+
+ } catch (JMSException e) {
+ log.error("JMS Exception encountered while processing", e);
+ } catch (AxisFault e) {
+ log.error("Axis fault processing message", e);
+ } catch (Exception e) {
+ log.error("Unknown error processing message", e);
+
+ } finally {
+ if (successful) {
+ metrics.incrementMessagesReceived();
+ } else {
+ metrics.incrementFaultsReceiving();
+ }
+ }
+
+ return successful;
+ }
+
+ /**
+ * Process the new message through Axis2
+ *
+ * @param message the JMS message
+ * @param ut the UserTransaction used for receipt
+ * @return true if the caller should commit
+ * @throws JMSException, on JMS exceptions
+ * @throws AxisFault on Axis2 errors
+ */
+ private boolean processThoughEngine(Message message, UserTransaction ut)
+ throws JMSException, AxisFault {
+
+ MessageContext msgContext = jmsListener.createMessageContext();
+
+ // set the JMS Message ID as the Message ID of the MessageContext
+ try {
+ msgContext.setMessageID(message.getJMSMessageID());
+ msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
+ } catch (JMSException ignore) {}
+
+ String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
+
+ AxisService service = endpoint.getService();
+ msgContext.setAxisService(service);
+
+ // find the operation for the message, or default to one
+ Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
+ QName operationQName = (
+ operationParam != null ?
+ BaseUtils.getQNameFromString(operationParam.getValue()) :
+ BaseConstants.DEFAULT_OPERATION);
+
+ AxisOperation operation = service.getOperation(operationQName);
+ if (operation != null) {
+ msgContext.setAxisOperation(operation);
+ msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
+ }
+
+ ContentTypeInfo contentTypeInfo =
+ endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
+ if (contentTypeInfo == null) {
+ throw new AxisFault("Unable to determine content type for message " +
+ msgContext.getMessageID());
+ }
+
+ // set the message property OUT_TRANSPORT_INFO
+ // the reply is assumed to be over the JMSReplyTo destination, using
+ // the same incoming connection factory, if a JMSReplyTo is available
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo == null) {
+ // does the service specify a default reply destination ?
+ Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION);
+ if (param != null && param.getValue() != null) {
+ replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
+ }
+
+ }
+ if (replyTo != null) {
+ msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
+ new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
+ contentTypeInfo.getPropertyName()));
+ }
+
+ JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
+ if (ut != null) {
+ msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
+ }
+
+ try {
+ jmsListener.handleIncomingMessage(
+ msgContext,
+ JMSUtils.getTransportHeaders(message),
+ soapAction,
+ contentTypeInfo.getContentType());
+
+ } finally {
+
+ Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
+ if (o != null) {
+ if ((o instanceof Boolean && ((Boolean) o)) ||
+ (o instanceof String && Boolean.valueOf((String) o))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java
new file mode 100644
index 0000000000..01fdee77dd
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.QueueSender;
+import javax.jms.Session;
+import javax.jms.TopicPublisher;
+import javax.transaction.UserTransaction;
+
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
+
+/**
+ * Performs the actual sending of a JMS message, and the subsequent committing of a JTA transaction
+ * (if requested) or the local session transaction, if used. An instance of this class is unique
+ * to a single message send out operation and will not be shared.
+ */
+public class JMSMessageSender {
+
+ private static final Log log = LogFactory.getLog(JMSMessageSender.class);
+
+ /** The Connection to be used to send out */
+ private Connection connection = null;
+ /** The Session to be used to send out */
+ private Session session = null;
+ /** The MessageProducer used */
+ private MessageProducer producer = null;
+ /** Target Destination */
+ private Destination destination = null;
+ /** The level of cachability for resources */
+ private int cacheLevel = JMSConstants.CACHE_CONNECTION;
+ /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */
+ private boolean jmsSpec11 = true;
+ /** Are we sending to a Queue ? */
+ private Boolean isQueue = null;
+
+ /**
+ * This is a low-end method to support the one-time sends using JMS 1.0.2b
+ * @param connection the JMS Connection
+ * @param session JMS Session
+ * @param producer the MessageProducer
+ * @param destination the JMS Destination
+ * @param cacheLevel cacheLevel - None | Connection | Session | Producer
+ * @param jmsSpec11 true if the JMS 1.1 API should be used
+ * @param isQueue posting to a Queue?
+ */
+ public JMSMessageSender(Connection connection, Session session, MessageProducer producer,
+ Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) {
+
+ this.connection = connection;
+ this.session = session;
+ this.producer = producer;
+ this.destination = destination;
+ this.cacheLevel = cacheLevel;
+ this.jmsSpec11 = jmsSpec11;
+ this.isQueue = isQueue;
+ }
+
+ /**
+ * Create a JMSSender using a JMSConnectionFactory and target EPR
+ *
+ * @param jmsConnectionFactory the JMSConnectionFactory
+ * @param targetAddress target EPR
+ */
+ public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) {
+
+ if (jmsConnectionFactory != null) {
+ this.cacheLevel = jmsConnectionFactory.getCacheLevel();
+ this.jmsSpec11 = jmsConnectionFactory.isJmsSpec11();
+ this.connection = jmsConnectionFactory.getConnection();
+ this.session = jmsConnectionFactory.getSession(connection);
+ this.destination =
+ jmsConnectionFactory.getSharedDestination() == null ?
+ jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) :
+ jmsConnectionFactory.getSharedDestination();
+ this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination);
+
+ } else {
+ JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress);
+ jmsOut.loadConnectionFactoryFromProperies();
+ }
+ }
+
+ /**
+ * Perform actual send of JMS message to the Destination selected
+ *
+ * @param message the JMS message
+ * @param msgCtx the Axis2 MessageContext
+ */
+ public void send(Message message, MessageContext msgCtx) {
+
+ Boolean jtaCommit = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND);
+ Boolean rollbackOnly = getBooleanProperty(msgCtx, BaseConstants.SET_ROLLBACK_ONLY);
+ Boolean persistent = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE);
+ Integer priority = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY);
+ Integer timeToLive = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE);
+
+ // Do not commit, if message is marked for rollback
+ if (rollbackOnly != null && rollbackOnly) {
+ jtaCommit = Boolean.FALSE;
+ }
+
+ if (persistent != null) {
+ try {
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ } catch (JMSException e) {
+ handleException("Error setting JMS Producer for PERSISTENT delivery", e);
+ }
+ }
+ if (priority != null) {
+ try {
+ producer.setPriority(priority);
+ } catch (JMSException e) {
+ handleException("Error setting JMS Producer priority to : " + priority, e);
+ }
+ }
+ if (timeToLive != null) {
+ try {
+ producer.setTimeToLive(timeToLive);
+ } catch (JMSException e) {
+ handleException("Error setting JMS Producer TTL to : " + timeToLive, e);
+ }
+ }
+
+ boolean sendingSuccessful = false;
+ // perform actual message sending
+ try {
+ if (jmsSpec11 || isQueue == null) {
+ producer.send(message);
+
+ } else {
+ if (isQueue) {
+ ((QueueSender) producer).send(message);
+
+ } else {
+ ((TopicPublisher) producer).publish(message);
+ }
+ }
+
+ // set the actual MessageID to the message context for use by any others down the line
+ String msgId = null;
+ try {
+ msgId = message.getJMSMessageID();
+ if (msgId != null) {
+ msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId);
+ }
+ } catch (JMSException ignore) {}
+
+ sendingSuccessful = true;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Sent Message Context ID : " + msgCtx.getMessageID() +
+ " with JMS Message ID : " + msgId +
+ " to destination : " + producer.getDestination());
+ }
+
+ } catch (JMSException e) {
+ log.error("Error sending message with MessageContext ID : " +
+ msgCtx.getMessageID() + " to destination : " + destination, e);
+
+ } finally {
+
+ if (jtaCommit != null) {
+
+ UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION);
+ if (ut != null) {
+
+ try {
+ if (sendingSuccessful && jtaCommit) {
+ ut.commit();
+ } else {
+ ut.rollback();
+ }
+ msgCtx.removeProperty(BaseConstants.USER_TRANSACTION);
+
+ if (log.isDebugEnabled()) {
+ log.debug((sendingSuccessful ? "Committed" : "Rolled back") +
+ " JTA Transaction");
+ }
+
+ } catch (Exception e) {
+ handleException("Error committing/rolling back JTA transaction after " +
+ "sending of message with MessageContext ID : " + msgCtx.getMessageID() +
+ " to destination : " + destination, e);
+ }
+ }
+
+ } else {
+ try {
+ if (session.getTransacted()) {
+ if (sendingSuccessful && (rollbackOnly == null || !rollbackOnly)) {
+ session.commit();
+ } else {
+ session.rollback();
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug((sendingSuccessful ? "Committed" : "Rolled back") +
+ " local (JMS Session) Transaction");
+ }
+
+ } catch (JMSException e) {
+ handleException("Error committing/rolling back local (i.e. session) " +
+ "transaction after sending of message with MessageContext ID : " +
+ msgCtx.getMessageID() + " to destination : " + destination, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Close non-shared producer, session and connection if any
+ */
+ public void close() {
+ if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) {
+ try {
+ producer.close();
+ } catch (JMSException e) {
+ log.error("Error closing JMS MessageProducer after send", e);
+ } finally {
+ producer = null;
+ }
+ }
+
+ if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ log.error("Error closing JMS Session after send", e);
+ } finally {
+ session = null;
+ }
+ }
+
+ if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ try {
+ connection.close();
+ } catch (JMSException e) {
+ log.error("Error closing JMS Connection after send", e);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ private void handleException(String message, Exception e) {
+ log.error(message, e);
+ throw new AxisJMSException(message, e);
+ }
+
+ private Boolean getBooleanProperty(MessageContext msgCtx, String name) {
+ Object o = msgCtx.getProperty(name);
+ if (o != null) {
+ if (o instanceof Boolean) {
+ return (Boolean) o;
+ } else if (o instanceof String) {
+ return Boolean.valueOf((String) o);
+ }
+ }
+ return null;
+ }
+
+ private Integer getIntegerProperty(MessageContext msgCtx, String name) {
+ Object o = msgCtx.getProperty(name);
+ if (o != null) {
+ if (o instanceof Integer) {
+ return (Integer) o;
+ } else if (o instanceof String) {
+ return Integer.parseInt((String) o);
+ }
+ }
+ return null;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+ public void setSession(Session session) {
+ this.session = session;
+ }
+
+ public void setProducer(MessageProducer producer) {
+ this.producer = producer;
+ }
+
+ public void setCacheLevel(int cacheLevel) {
+ this.cacheLevel = cacheLevel;
+ }
+
+ public int getCacheLevel() {
+ return cacheLevel;
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public MessageProducer getProducer() {
+ return producer;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java
new file mode 100644
index 0000000000..9e029b33e1
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java
@@ -0,0 +1,306 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.util.Hashtable;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+import javax.naming.NamingException;
+
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
+
+/**
+ * The JMS OutTransportInfo is a holder of information to send an outgoing message
+ * (e.g. a Response) to a JMS destination. Thus at a minimum a reference to a
+ * ConnectionFactory and a Destination are held
+ */
+public class JMSOutTransportInfo implements OutTransportInfo {
+
+ private static final Log log = LogFactory.getLog(JMSOutTransportInfo.class);
+
+ /** The naming context */
+ private Context context;
+ /**
+ * this is a reference to the underlying JMS ConnectionFactory when sending messages
+ * through connection factories not defined at the TransportSender level
+ */
+ private ConnectionFactory connectionFactory = null;
+ /**
+ * this is a reference to a JMS Connection Factory instance, which has a reference
+ * to the underlying actual connection factory, an open connection to the JMS provider
+ * and optionally a session already available for use
+ */
+ private JMSConnectionFactory jmsConnectionFactory = null;
+ /** the Destination queue or topic for the outgoing message */
+ private Destination destination = null;
+ /** the Destination queue or topic for the outgoing message
+ * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+ */
+ private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
+ /** the Reply Destination queue or topic for the outgoing message */
+ private Destination replyDestination = null;
+ /** the Reply Destination name */
+ private String replyDestinationName = null;
+ /** the Reply Destination queue or topic for the outgoing message
+ * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
+ */
+ private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
+ /** the EPR properties when the out-transport info is generated from a target EPR */
+ private Hashtable<String,String> properties = null;
+ /** the target EPR string where applicable */
+ private String targetEPR = null;
+ /** the message property name that stores the content type of the outgoing message */
+ private String contentTypeProperty;
+
+ /**
+ * Creates an instance using the given JMS connection factory and destination
+ *
+ * @param jmsConnectionFactory the JMS connection factory
+ * @param dest the destination
+ * @param contentTypeProperty
+ */
+ JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest,
+ String contentTypeProperty) {
+ this.jmsConnectionFactory = jmsConnectionFactory;
+ this.destination = dest;
+ destinationType = dest instanceof Topic ? JMSConstants.DESTINATION_TYPE_TOPIC
+ : JMSConstants.DESTINATION_TYPE_QUEUE;
+ this.contentTypeProperty = contentTypeProperty;
+ }
+
+ /**
+ * Creates and instance using the given URL
+ *
+ * @param targetEPR the target EPR
+ */
+ JMSOutTransportInfo(String targetEPR) {
+
+ this.targetEPR = targetEPR;
+ if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) {
+ handleException("Invalid prefix for a JMS EPR : " + targetEPR);
+
+ } else {
+ properties = BaseUtils.getEPRProperties(targetEPR);
+ String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE);
+ if (destinationType != null) {
+ setDestinationType(destinationType);
+ }
+
+ String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE);
+ if (replyDestinationType != null) {
+ setReplyDestinationType(replyDestinationType);
+ }
+
+ replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
+ contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+ try {
+ context = new InitialContext(properties);
+ } catch (NamingException e) {
+ handleException("Could not get an initial context using " + properties, e);
+ }
+
+ destination = getDestination(context, targetEPR);
+ replyDestination = getReplyDestination(context, targetEPR);
+ }
+ }
+
+ /**
+ * Provides a lazy load when created with a target EPR. This method performs actual
+ * lookup for the connection factory and destination
+ */
+ public void loadConnectionFactoryFromProperies() {
+ if (properties != null) {
+ connectionFactory = getConnectionFactory(context, properties);
+ }
+ }
+
+ /**
+ * Get the referenced ConnectionFactory using the properties from the context
+ *
+ * @param context the context to use for lookup
+ * @param props the properties which contains the JNDI name of the factory
+ * @return the connection factory
+ */
+ private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) {
+ try {
+
+ String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME);
+ if (conFacJndiName != null) {
+ return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName);
+ } else {
+ handleException("Connection Factory JNDI name cannot be determined");
+ }
+ } catch (NamingException e) {
+ handleException("Failed to look up connection factory from JNDI", e);
+ }
+ return null;
+ }
+
+ /**
+ * Get the JMS destination specified by the given URL from the context
+ *
+ * @param context the Context to lookup
+ * @param url URL
+ * @return the JMS destination, or null if it does not exist
+ */
+ private Destination getDestination(Context context, String url) {
+ String destinationName = JMSUtils.getDestination(url);
+ try {
+ return JMSUtils.lookup(context, Destination.class, destinationName);
+ } catch (NameNotFoundException e) {
+ try {
+ return JMSUtils.lookup(context, Destination.class,
+ (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ?
+ "dynamicTopics/" : "dynamicQueues/") + destinationName);
+ } catch (NamingException x) {
+ handleException("Cannot locate destination : " + destinationName + " using " + url);
+ }
+ } catch (NamingException e) {
+ handleException("Cannot locate destination : " + destinationName + " using " + url, e);
+ }
+ return null;
+ }
+
+ /**
+ * Get the JMS reply destination specified by the given URL from the context
+ *
+ * @param context the Context to lookup
+ * @param url URL
+ * @return the JMS destination, or null if it does not exist
+ */
+ private Destination getReplyDestination(Context context, String url) {
+ String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
+ if(replyDestinationName == null) {
+ return null;
+ }
+
+ try {
+ return JMSUtils.lookup(context, Destination.class, replyDestinationName);
+ } catch (NameNotFoundException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cannot locate destination : " + replyDestinationName + " using " + url);
+ }
+ } catch (NamingException e) {
+ handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e);
+ }
+
+ return null;
+ }
+
+ /**
+ * Look up for the given destination
+ * @param replyDest the JNDI name to lookup Destination required
+ * @return Destination for the JNDI name passed
+ */
+ public Destination getReplyDestination(String replyDest) {
+ try {
+ return JMSUtils.lookup(jmsConnectionFactory.getContext(), Destination.class,
+ replyDest);
+ } catch (NameNotFoundException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cannot locate reply destination : " + replyDest, e);
+ }
+ } catch (NamingException e) {
+ handleException("Cannot locate reply destination : " + replyDest, e);
+ }
+ return null;
+ }
+
+
+ private void handleException(String s) {
+ log.error(s);
+ throw new AxisJMSException(s);
+ }
+
+ private void handleException(String s, Exception e) {
+ log.error(s, e);
+ throw new AxisJMSException(s, e);
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public JMSConnectionFactory getJmsConnectionFactory() {
+ return jmsConnectionFactory;
+ }
+
+ public void setContentType(String contentType) {
+ // this is a useless Axis2 method imposed by the OutTransportInfo interface :(
+ }
+
+ public Hashtable<String,String> getProperties() {
+ return properties;
+ }
+
+ public String getTargetEPR() {
+ return targetEPR;
+ }
+
+ public String getDestinationType() {
+ return destinationType;
+ }
+
+ public void setDestinationType(String destinationType) {
+ if (destinationType != null) {
+ this.destinationType = destinationType;
+ }
+ }
+
+ public Destination getReplyDestination() {
+ return replyDestination;
+ }
+
+ public void setReplyDestination(Destination replyDestination) {
+ this.replyDestination = replyDestination;
+ }
+
+ public String getReplyDestinationType() {
+ return replyDestinationType;
+ }
+
+ public void setReplyDestinationType(String replyDestinationType) {
+ this.replyDestinationType = replyDestinationType;
+ }
+
+ public String getReplyDestinationName() {
+ return replyDestinationName;
+ }
+
+ public void setReplyDestinationName(String replyDestinationName) {
+ this.replyDestinationName = replyDestinationName;
+ }
+
+ public String getContentTypeProperty() {
+ return contentTypeProperty;
+ }
+
+ public void setContentTypeProperty(String contentTypeProperty) {
+ this.contentTypeProperty = contentTypeProperty;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
new file mode 100644
index 0000000000..a5f77dc4c9
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
@@ -0,0 +1,499 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.Map;
+
+import javax.activation.DataHandler;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMNode;
+import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axiom.om.OMText;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.transport.MessageFormatter;
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportSender;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams.WriterOutputStream;
+
+/**
+ * The TransportSender for JMS
+ */
+public class JMSSender extends AbstractTransportSender implements ManagementSupport {
+
+ public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
+
+ /** The JMS connection factory manager to be used when sending messages out */
+ private JMSConnectionFactoryManager connFacManager;
+
+ /**
+ * Initialize the transport sender by reading pre-defined connection factories for
+ * outgoing messages.
+ *
+ * @param cfgCtx the configuration context
+ * @param transportOut the transport sender definition from axis2.xml
+ * @throws AxisFault on error
+ */
+ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
+ super.init(cfgCtx, transportOut);
+ connFacManager = new JMSConnectionFactoryManager(transportOut);
+ log.info("JMS Transport Sender initialized...");
+ }
+
+ /**
+ * Get corresponding JMS connection factory defined within the transport sender for the
+ * transport-out information - usually constructed from a targetEPR
+ *
+ * @param trpInfo the transport-out information
+ * @return the corresponding JMS connection factory, if any
+ */
+ private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) {
+ Map<String,String> props = trpInfo.getProperties();
+ if (trpInfo.getProperties() != null) {
+ String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC);
+ if (jmsConnectionFactoryName != null) {
+ return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
+ } else {
+ return connFacManager.getJMSConnectionFactory(props);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Performs the actual sending of the JMS message
+ */
+ public void sendMessage(MessageContext msgCtx, String targetAddress,
+ OutTransportInfo outTransportInfo) throws AxisFault {
+
+ JMSConnectionFactory jmsConnectionFactory = null;
+ JMSOutTransportInfo jmsOut = null;
+ JMSMessageSender messageSender = null;
+
+ if (targetAddress != null) {
+
+ jmsOut = new JMSOutTransportInfo(targetAddress);
+ // do we have a definition for a connection factory to use for this address?
+ jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
+
+ if (jmsConnectionFactory != null) {
+ messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress);
+
+ } else {
+ try {
+ messageSender = JMSUtils.createJMSSender(jmsOut);
+ } catch (JMSException e) {
+ handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+ }
+ }
+
+ } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
+
+ jmsOut = (JMSOutTransportInfo) outTransportInfo;
+ try {
+ messageSender = JMSUtils.createJMSSender(jmsOut);
+ } catch (JMSException e) {
+ handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
+ }
+ }
+
+ // The message property to be used to send the content type is determined by
+ // the out transport info, i.e. either from the EPR if we are sending a request,
+ // or, if we are sending a response, from the configuration of the service that
+ // received the request). The property name can be overridden by a message
+ // context property.
+ String contentTypeProperty =
+ (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+ if (contentTypeProperty == null) {
+ contentTypeProperty = jmsOut.getContentTypeProperty();
+ }
+
+ // need to synchronize as Sessions are not thread safe
+ synchronized (messageSender.getSession()) {
+ try {
+ sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
+ } finally {
+ messageSender.close();
+ }
+ }
+ }
+
+ /**
+ * Perform actual sending of the JMS message
+ */
+ private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender,
+ String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory,
+ JMSOutTransportInfo jmsOut) throws AxisFault {
+
+ // convert the axis message context into a JMS Message that we can send over JMS
+ Message message = null;
+ String correlationId = null;
+ try {
+ message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty);
+ } catch (JMSException e) {
+ handleException("Error creating a JMS message from the message context", e);
+ }
+
+ // should we wait for a synchronous response on this same thread?
+ boolean waitForResponse = waitForSynchronousResponse(msgCtx);
+ Destination replyDestination = jmsOut.getReplyDestination();
+
+ // if this is a synchronous out-in, prepare to listen on the response destination
+ if (waitForResponse) {
+
+ String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
+ if (replyDestName == null && jmsConnectionFactory != null) {
+ replyDestName = jmsConnectionFactory.getReplyToDestination();
+ }
+
+ if (replyDestName != null) {
+ if (jmsConnectionFactory != null) {
+ replyDestination = jmsConnectionFactory.getDestination(replyDestName);
+ } else {
+ replyDestination = jmsOut.getReplyDestination(replyDestName);
+ }
+ }
+ replyDestination = JMSUtils.setReplyDestination(
+ replyDestination, messageSender.getSession(), message);
+ }
+
+ try {
+ messageSender.send(message, msgCtx);
+ metrics.incrementMessagesSent(msgCtx);
+
+ } catch (AxisJMSException e) {
+ metrics.incrementFaultsSending();
+ handleException("Error sending JMS message", e);
+ }
+
+ try {
+ metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message));
+ } catch (JMSException e) {
+ log.warn("Error reading JMS message size to update transport metrics", e);
+ }
+
+ // if we are expecting a synchronous response back for the message sent out
+ if (waitForResponse) {
+ // TODO ********************************************************************************
+ // TODO **** replace with asynchronous polling via a poller task to process this *******
+ // information would be given. Then it should poll (until timeout) the
+ // requested destination for the response message and inject it from a
+ // asynchronous worker thread
+ try {
+ messageSender.getConnection().start(); // multiple calls are safely ignored
+ } catch (JMSException ignore) {}
+
+ try {
+ correlationId = message.getJMSMessageID();
+ } catch(JMSException ignore) {}
+
+ // We assume here that the response uses the same message property to
+ // specify the content type of the message.
+ waitForResponseAndProcess(messageSender.getSession(), replyDestination,
+ msgCtx, correlationId, contentTypeProperty);
+ // TODO ********************************************************************************
+ }
+ }
+
+ /**
+ * Create a Consumer for the reply destination and wait for the response JMS message
+ * synchronously. If a message arrives within the specified time interval, process it
+ * through Axis2
+ * @param session the session to use to listen for the response
+ * @param replyDestination the JMS reply Destination
+ * @param msgCtx the outgoing message for which we are expecting the response
+ * @param contentTypeProperty the message property used to determine the content type
+ * of the response message
+ * @throws AxisFault on error
+ */
+ private void waitForResponseAndProcess(Session session, Destination replyDestination,
+ MessageContext msgCtx, String correlationId,
+ String contentTypeProperty) throws AxisFault {
+
+ try {
+ MessageConsumer consumer;
+ consumer = JMSUtils.createConsumer(session, replyDestination,
+ "JMSCorrelationID = '" + correlationId + "'");
+
+ // how long are we willing to wait for the sync response
+ long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
+ String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY);
+ if (waitReply != null) {
+ timeout = Long.valueOf(waitReply).longValue();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Waiting for a maximum of " + timeout +
+ "ms for a response message to destination : " + replyDestination +
+ " with JMS correlation ID : " + correlationId);
+ }
+
+ Message reply = consumer.receive(timeout);
+
+ if (reply != null) {
+
+ // update transport level metrics
+ metrics.incrementMessagesReceived();
+ try {
+ metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply));
+ } catch (JMSException e) {
+ log.warn("Error reading JMS message size to update transport metrics", e);
+ }
+
+ try {
+ processSyncResponse(msgCtx, reply, contentTypeProperty);
+ metrics.incrementMessagesReceived();
+ } catch (AxisFault e) {
+ metrics.incrementFaultsReceiving();
+ throw e;
+ }
+
+ } else {
+ log.warn("Did not receive a JMS response within " +
+ timeout + " ms to destination : " + replyDestination +
+ " with JMS correlation ID : " + correlationId);
+ metrics.incrementTimeoutsReceiving();
+ }
+
+ } catch (JMSException e) {
+ metrics.incrementFaultsReceiving();
+ handleException("Error creating a consumer, or receiving a synchronous reply " +
+ "for outgoing MessageContext ID : " + msgCtx.getMessageID() +
+ " and reply Destination : " + replyDestination, e);
+ }
+ }
+
+ /**
+ * Create a JMS Message from the given MessageContext and using the given
+ * session
+ *
+ * @param msgContext the MessageContext
+ * @param session the JMS session
+ * @param contentTypeProperty the message property to be used to store the
+ * content type
+ * @return a JMS message from the context and session
+ * @throws JMSException on exception
+ * @throws AxisFault on exception
+ */
+ private Message createJMSMessage(MessageContext msgContext, Session session,
+ String contentTypeProperty) throws JMSException, AxisFault {
+
+ Message message = null;
+ String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
+
+ // check the first element of the SOAP body, do we have content wrapped using the
+ // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or
+ // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages
+ // for JMS but just get the payload in its native format
+ String jmsPayloadType = guessMessageType(msgContext);
+
+ if (jmsPayloadType == null) {
+
+ OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
+ MessageFormatter messageFormatter = null;
+ try {
+ messageFormatter = TransportUtils.getMessageFormatter(msgContext);
+ } catch (AxisFault axisFault) {
+ throw new JMSException("Unable to get the message formatter to use");
+ }
+
+ String contentType = messageFormatter.getContentType(
+ msgContext, format, msgContext.getSoapAction());
+
+ boolean useBytesMessage =
+ msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
+ contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1;
+
+ OutputStream out;
+ StringWriter sw;
+ if (useBytesMessage) {
+ BytesMessage bytesMsg = session.createBytesMessage();
+ sw = null;
+ out = new BytesMessageOutputStream(bytesMsg);
+ message = bytesMsg;
+ } else {
+ sw = new StringWriter();
+ try {
+ out = new WriterOutputStream(sw, format.getCharSetEncoding());
+ } catch (UnsupportedCharsetException ex) {
+ handleException("Unsupported encoding " + format.getCharSetEncoding(), ex);
+ return null;
+ }
+ }
+
+ try {
+ messageFormatter.writeTo(msgContext, format, out, true);
+ out.close();
+ } catch (IOException e) {
+ handleException("IO Error while creating BytesMessage", e);
+ }
+
+ if (!useBytesMessage) {
+ TextMessage txtMsg = session.createTextMessage();
+ txtMsg.setText(sw.toString());
+ message = txtMsg;
+ }
+
+ if (contentTypeProperty != null) {
+ message.setStringProperty(contentTypeProperty, contentType);
+ }
+
+ } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
+ message = session.createBytesMessage();
+ BytesMessage bytesMsg = (BytesMessage) message;
+ OMElement wrapper = msgContext.getEnvelope().getBody().
+ getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER);
+ OMNode omNode = wrapper.getFirstOMChild();
+ if (omNode != null && omNode instanceof OMText) {
+ Object dh = ((OMText) omNode).getDataHandler();
+ if (dh != null && dh instanceof DataHandler) {
+ try {
+ ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg));
+ } catch (IOException e) {
+ handleException("Error serializing binary content of element : " +
+ BaseConstants.DEFAULT_BINARY_WRAPPER, e);
+ }
+ }
+ }
+
+ } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) {
+ message = session.createTextMessage();
+ TextMessage txtMsg = (TextMessage) message;
+ txtMsg.setText(msgContext.getEnvelope().getBody().
+ getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText());
+ }
+
+ // set the JMS correlation ID if specified
+ String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID);
+ if (correlationId == null && msgContext.getRelatesTo() != null) {
+ correlationId = msgContext.getRelatesTo().getValue();
+ }
+
+ if (correlationId != null) {
+ message.setJMSCorrelationID(correlationId);
+ }
+
+ if (msgContext.isServerSide()) {
+ // set SOAP Action as a property on the JMS message
+ setProperty(message, msgContext, BaseConstants.SOAPACTION);
+ } else {
+ String action = msgContext.getOptions().getAction();
+ if (action != null) {
+ message.setStringProperty(BaseConstants.SOAPACTION, action);
+ }
+ }
+
+ JMSUtils.setTransportHeaders(msgContext, message);
+ return message;
+ }
+
+ /**
+ * Guess the message type to use for JMS looking at the message contexts' envelope
+ * @param msgContext the message context
+ * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null
+ */
+ private String guessMessageType(MessageContext msgContext) {
+ OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement();
+ if (firstChild != null) {
+ if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) {
+ return JMSConstants.JMS_BYTE_MESSAGE;
+ } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) {
+ return JMSConstants.JMS_TEXT_MESSAGE;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates an Axis MessageContext for the received JMS message and
+ * sets up the transports and various properties
+ *
+ * @param outMsgCtx the outgoing message for which we are expecting the response
+ * @param message the JMS response message received
+ * @param contentTypeProperty the message property used to determine the content type
+ * of the response message
+ * @throws AxisFault on error
+ */
+ private void processSyncResponse(MessageContext outMsgCtx, Message message,
+ String contentTypeProperty) throws AxisFault {
+
+ MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
+
+ // load any transport headers from received message
+ JMSUtils.loadTransportHeaders(message, responseMsgCtx);
+
+ // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response
+ // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This
+ // question is still under debate and due to the timelines, I am commiting this
+ // workaround as Axis2 1.2 is about to be released and Synapse 1.0
+ responseMsgCtx.setServerSide(false);
+
+ String contentType =
+ contentTypeProperty == null ? null
+ : JMSUtils.getProperty(message, contentTypeProperty);
+
+ try {
+ JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType);
+ } catch (JMSException ex) {
+ throw AxisFault.makeFault(ex);
+ }
+// responseMsgCtx.setServerSide(true);
+
+ handleIncomingMessage(
+ responseMsgCtx,
+ JMSUtils.getTransportHeaders(message),
+ JMSUtils.getProperty(message, BaseConstants.SOAPACTION),
+ contentType
+ );
+ }
+
+ private void setProperty(Message message, MessageContext msgCtx, String key) {
+
+ String value = getProperty(msgCtx, key);
+ if (value != null) {
+ try {
+ message.setStringProperty(key, value);
+ } catch (JMSException e) {
+ log.warn("Couldn't set message property : " + key + " = " + value, e);
+ }
+ }
+ }
+
+ private String getProperty(MessageContext mc, String key) {
+ return (String) mc.getProperty(key);
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java
new file mode 100644
index 0000000000..63faa0b852
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java
@@ -0,0 +1,1115 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.lang.reflect.Method;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.mail.internet.ContentType;
+import javax.mail.internet.ParseException;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.builder.Builder;
+import org.apache.axis2.builder.BuilderUtil;
+import org.apache.axis2.builder.SOAPBuilder;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.format.DataSourceMessageBuilder;
+import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilder;
+import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilderAdapter;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool;
+
+/**
+ * Miscallaneous methods used for the JMS transport
+ */
+public class JMSUtils extends BaseUtils {
+
+ private static final Log log = LogFactory.getLog(JMSUtils.class);
+ private static final Class[] NOARGS = new Class[] {};
+ private static final Object[] NOPARMS = new Object[] {};
+
+ /**
+ * Should this service be enabled over the JMS transport?
+ *
+ * @param service the Axis service
+ * @return true if JMS should be enabled
+ */
+ public static boolean isJMSService(AxisService service) {
+ if (service.isEnableAllTransports()) {
+ return true;
+
+ } else {
+ List transports = service.getExposedTransports();
+ for (Object transport : transports) {
+ if (JMSListener.TRANSPORT_NAME.equals(transport)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get the EPR for the given JMS connection factory and destination
+ * the form of the URL is
+ * jms:/<destination>?[<key>=<value>&]*
+ * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS
+ * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered
+ *
+ * @param cf the Axis2 JMS connection factory
+ * @param destinationType the type of destination
+ * @param endpoint JMSEndpoint
+ * @return the EPR as a String
+ */
+ static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(
+ JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName());
+ sb.append("?").
+ append(JMSConstants.PARAM_DEST_TYPE).append("=").append(
+ destinationType == JMSConstants.TOPIC ?
+ JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE);
+
+ if (endpoint.getContentTypeRuleSet() != null) {
+ String contentTypeProperty =
+ endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty();
+ if (contentTypeProperty != null) {
+ sb.append("&");
+ sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
+ sb.append("=");
+ sb.append(contentTypeProperty);
+ }
+ }
+
+ for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) {
+ if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) &&
+ !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) &&
+ !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) &&
+ !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) {
+ sb.append("&").append(
+ entry.getKey()).append("=").append(entry.getValue());
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Get a String property from the JMS message
+ *
+ * @param message JMS message
+ * @param property property name
+ * @return property value
+ */
+ public static String getProperty(Message message, String property) {
+ try {
+ return message.getStringProperty(property);
+ } catch (JMSException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Return the destination name from the given URL
+ *
+ * @param url the URL
+ * @return the destination name
+ */
+ public static String getDestination(String url) {
+ String tempUrl = url.substring(JMSConstants.JMS_PREFIX.length());
+ int propPos = tempUrl.indexOf("?");
+
+ if (propPos == -1) {
+ return tempUrl;
+ } else {
+ return tempUrl.substring(0, propPos);
+ }
+ }
+
+ /**
+ * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in
+ * @param message the JMS message read
+ * @param msgContext the Axis2 MessageContext to be populated
+ * @param contentType content type for the message
+ * @throws AxisFault
+ * @throws JMSException
+ */
+ public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType)
+ throws AxisFault, JMSException {
+
+ if (contentType == null) {
+ if (message instanceof TextMessage) {
+ contentType = "text/plain";
+ } else {
+ contentType = "application/octet-stream";
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("No content type specified; assuming " + contentType);
+ }
+ }
+
+ int index = contentType.indexOf(';');
+ String type = index > 0 ? contentType.substring(0, index) : contentType;
+ Builder builder = BuilderUtil.getBuilderFromSelector(type, msgContext);
+ if (builder == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("No message builder found for type '" + type + "'. Falling back to SOAP.");
+ }
+ builder = new SOAPBuilder();
+ }
+
+ OMElement documentElement;
+ if (message instanceof BytesMessage) {
+ // Extract the charset encoding from the content type and
+ // set the CHARACTER_SET_ENCODING property as e.g. SOAPBuilder relies on this.
+ String charSetEnc = null;
+ try {
+ if (contentType != null) {
+ charSetEnc = new ContentType(contentType).getParameter("charset");
+ }
+ } catch (ParseException ex) {
+ // ignore
+ }
+ msgContext.setProperty(Constants.Configuration.CHARACTER_SET_ENCODING, charSetEnc);
+
+ if (builder instanceof DataSourceMessageBuilder) {
+ documentElement = ((DataSourceMessageBuilder)builder).processDocument(
+ new BytesMessageDataSource((BytesMessage)message), contentType,
+ msgContext);
+ } else {
+ documentElement = builder.processDocument(
+ new BytesMessageInputStream((BytesMessage)message), contentType,
+ msgContext);
+ }
+ } else if (message instanceof TextMessage) {
+ TextMessageBuilder textMessageBuilder;
+ if (builder instanceof TextMessageBuilder) {
+ textMessageBuilder = (TextMessageBuilder)builder;
+ } else {
+ textMessageBuilder = new TextMessageBuilderAdapter(builder);
+ }
+ String content = ((TextMessage)message).getText();
+ documentElement = textMessageBuilder.processDocument(content, contentType, msgContext);
+ } else {
+ handleException("Unsupported JMS message type " + message.getClass().getName());
+ return; // Make compiler happy
+ }
+ msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement));
+ }
+
+ /**
+ * Set the JMS ReplyTo for the message
+ *
+ * @param replyDestination the JMS Destination where the reply is expected
+ * @param session the session to use to create a temp Queue if a response is expected
+ * but a Destination has not been specified
+ * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo
+ * @return the JMS ReplyTo Destination for the message
+ */
+ public static Destination setReplyDestination(Destination replyDestination, Session session,
+ Message message) {
+
+ if (replyDestination == null) {
+ try {
+ // create temporary queue to receive the reply
+ replyDestination = createTemporaryDestination(session);
+ } catch (JMSException e) {
+ handleException("Error creating temporary queue for response");
+ }
+ }
+
+ try {
+ message.setJMSReplyTo(replyDestination);
+ } catch (JMSException e) {
+ log.warn("Error setting JMS ReplyTo destination to : " + replyDestination, e);
+ }
+
+ if (log.isDebugEnabled()) {
+ try {
+ assert replyDestination != null;
+ log.debug("Expecting a response to JMS Destination : " +
+ (replyDestination instanceof Queue ?
+ ((Queue) replyDestination).getQueueName() :
+ ((Topic) replyDestination).getTopicName()));
+ } catch (JMSException ignore) {}
+ }
+ return replyDestination;
+ }
+
+ /**
+ * Set transport headers from the axis message context, into the JMS message
+ *
+ * @param msgContext the axis message context
+ * @param message the JMS Message
+ * @throws JMSException on exception
+ */
+ public static void setTransportHeaders(MessageContext msgContext, Message message)
+ throws JMSException {
+
+ Map headerMap = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
+
+ if (headerMap == null) {
+ return;
+ }
+
+ for (Object headerName : headerMap.keySet()) {
+
+ String name = (String) headerName;
+
+ if (name.startsWith(JMSConstants.JMSX_PREFIX) &&
+ !(name.equals(JMSConstants.JMSX_GROUP_ID) || name.equals(JMSConstants.JMSX_GROUP_SEQ))) {
+ continue;
+ }
+
+ if (JMSConstants.JMS_COORELATION_ID.equals(name)) {
+ message.setJMSCorrelationID(
+ (String) headerMap.get(JMSConstants.JMS_COORELATION_ID));
+ } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) {
+ Object o = headerMap.get(JMSConstants.JMS_DELIVERY_MODE);
+ if (o instanceof Integer) {
+ message.setJMSDeliveryMode((Integer) o);
+ } else if (o instanceof String) {
+ try {
+ message.setJMSDeliveryMode(Integer.parseInt((String) o));
+ } catch (NumberFormatException nfe) {
+ log.warn("Invalid delivery mode ignored : " + o, nfe);
+ }
+ } else {
+ log.warn("Invalid delivery mode ignored : " + o);
+ }
+
+ } else if (JMSConstants.JMS_EXPIRATION.equals(name)) {
+ message.setJMSExpiration(
+ Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION)));
+ } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) {
+ message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID));
+ } else if (JMSConstants.JMS_PRIORITY.equals(name)) {
+ message.setJMSPriority(
+ Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY)));
+ } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) {
+ message.setJMSTimestamp(
+ Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP)));
+ } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) {
+ message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE));
+
+ } else {
+ Object value = headerMap.get(name);
+ if (value instanceof String) {
+ message.setStringProperty(name, (String) value);
+ } else if (value instanceof Boolean) {
+ message.setBooleanProperty(name, (Boolean) value);
+ } else if (value instanceof Integer) {
+ message.setIntProperty(name, (Integer) value);
+ } else if (value instanceof Long) {
+ message.setLongProperty(name, (Long) value);
+ } else if (value instanceof Double) {
+ message.setDoubleProperty(name, (Double) value);
+ } else if (value instanceof Float) {
+ message.setFloatProperty(name, (Float) value);
+ }
+ }
+ }
+ }
+
+ /**
+ * Read the transport headers from the JMS Message and set them to the axis2 message context
+ *
+ * @param message the JMS Message received
+ * @param responseMsgCtx the axis message context
+ * @throws AxisFault on error
+ */
+ public static void loadTransportHeaders(Message message, MessageContext responseMsgCtx)
+ throws AxisFault {
+ responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, getTransportHeaders(message));
+ }
+
+ /**
+ * Extract transport level headers for JMS from the given message into a Map
+ *
+ * @param message the JMS message
+ * @return a Map of the transport headers
+ */
+ public static Map<String, Object> getTransportHeaders(Message message) {
+ // create a Map to hold transport headers
+ Map<String, Object> map = new HashMap<String, Object>();
+
+ // correlation ID
+ try {
+ if (message.getJMSCorrelationID() != null) {
+ map.put(JMSConstants.JMS_COORELATION_ID, message.getJMSCorrelationID());
+ }
+ } catch (JMSException ignore) {}
+
+ // set the delivery mode as persistent or not
+ try {
+ map.put(JMSConstants.JMS_DELIVERY_MODE, Integer.toString(message.getJMSDeliveryMode()));
+ } catch (JMSException ignore) {}
+
+ // destination name
+ try {
+ if (message.getJMSDestination() != null) {
+ Destination dest = message.getJMSDestination();
+ map.put(JMSConstants.JMS_DESTINATION,
+ dest instanceof Queue ?
+ ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName());
+ }
+ } catch (JMSException ignore) {}
+
+ // expiration
+ try {
+ map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message.getJMSExpiration()));
+ } catch (JMSException ignore) {}
+
+ // if a JMS message ID is found
+ try {
+ if (message.getJMSMessageID() != null) {
+ map.put(JMSConstants.JMS_MESSAGE_ID, message.getJMSMessageID());
+ }
+ } catch (JMSException ignore) {}
+
+ // priority
+ try {
+ map.put(JMSConstants.JMS_PRIORITY, Long.toString(message.getJMSPriority()));
+ } catch (JMSException ignore) {}
+
+ // redelivered
+ try {
+ map.put(JMSConstants.JMS_REDELIVERED, Boolean.toString(message.getJMSRedelivered()));
+ } catch (JMSException ignore) {}
+
+ // replyto destination name
+ try {
+ if (message.getJMSReplyTo() != null) {
+ Destination dest = message.getJMSReplyTo();
+ map.put(JMSConstants.JMS_REPLY_TO,
+ dest instanceof Queue ?
+ ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName());
+ }
+ } catch (JMSException ignore) {}
+
+ // priority
+ try {
+ map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message.getJMSTimestamp()));
+ } catch (JMSException ignore) {}
+
+ // message type
+ try {
+ if (message.getJMSType() != null) {
+ map.put(JMSConstants.JMS_TYPE, message.getJMSType());
+ }
+ } catch (JMSException ignore) {}
+
+ // any other transport properties / headers
+ Enumeration e = null;
+ try {
+ e = message.getPropertyNames();
+ } catch (JMSException ignore) {}
+
+ if (e != null) {
+ while (e.hasMoreElements()) {
+ String headerName = (String) e.nextElement();
+ try {
+ map.put(headerName, message.getStringProperty(headerName));
+ continue;
+ } catch (JMSException ignore) {}
+ try {
+ map.put(headerName, message.getBooleanProperty(headerName));
+ continue;
+ } catch (JMSException ignore) {}
+ try {
+ map.put(headerName, message.getIntProperty(headerName));
+ continue;
+ } catch (JMSException ignore) {}
+ try {
+ map.put(headerName, message.getLongProperty(headerName));
+ continue;
+ } catch (JMSException ignore) {}
+ try {
+ map.put(headerName, message.getDoubleProperty(headerName));
+ continue;
+ } catch (JMSException ignore) {}
+ try {
+ map.put(headerName, message.getFloatProperty(headerName));
+ } catch (JMSException ignore) {}
+ }
+ }
+
+ return map;
+ }
+
+
+ /**
+ * Create a MessageConsumer for the given Destination
+ * @param session JMS Session to use
+ * @param dest Destination for which the Consumer is to be created
+ * @param messageSelector the message selector to be used if any
+ * @return a MessageConsumer for the specified Destination
+ * @throws JMSException
+ */
+ public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector)
+ throws JMSException {
+
+ if (dest instanceof Queue) {
+ return ((QueueSession) session).createReceiver((Queue) dest, messageSelector);
+ } else {
+ return ((TopicSession) session).createSubscriber((Topic) dest, messageSelector, false);
+ }
+ }
+
+ /**
+ * Create a temp queue or topic for synchronous receipt of responses, when a reply destination
+ * is not specified
+ * @param session the JMS Session to use
+ * @return a temporary Queue or Topic, depending on the session
+ * @throws JMSException
+ */
+ public static Destination createTemporaryDestination(Session session) throws JMSException {
+
+ if (session instanceof QueueSession) {
+ return session.createTemporaryQueue();
+ } else {
+ return session.createTemporaryTopic();
+ }
+ }
+
+ /**
+ * Return the body length in bytes for a bytes message
+ * @param bMsg the JMS BytesMessage
+ * @return length of body in bytes
+ */
+ public static long getBodyLength(BytesMessage bMsg) {
+ try {
+ Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS);
+ if (mtd != null) {
+ return (Long) mtd.invoke(bMsg, NOPARMS);
+ }
+ } catch (Exception e) {
+ // JMS 1.0
+ if (log.isDebugEnabled()) {
+ log.debug("Error trying to determine JMS BytesMessage body length", e);
+ }
+ }
+
+ // if JMS 1.0
+ long length = 0;
+ try {
+ byte[] buffer = new byte[2048];
+ bMsg.reset();
+ for (int bytesRead = bMsg.readBytes(buffer); bytesRead != -1;
+ bytesRead = bMsg.readBytes(buffer)) {
+ length += bytesRead;
+ }
+ } catch (JMSException ignore) {}
+ return length;
+ }
+
+ /**
+ * Get the length of the message in bytes
+ * @param message
+ * @return message size (or approximation) in bytes
+ * @throws JMSException
+ */
+ public static long getMessageSize(Message message) throws JMSException {
+ if (message instanceof BytesMessage) {
+ return JMSUtils.getBodyLength((BytesMessage) message);
+ } else if (message instanceof TextMessage) {
+ // TODO: Converting the whole message to a byte array is too much overhead just to determine the message size.
+ // Anyway, the result is not accurate since we don't know what encoding the JMS provider uses.
+ return ((TextMessage) message).getText().getBytes().length;
+ } else {
+ log.warn("Can't determine size of JMS message; unsupported message type : "
+ + message.getClass().getName());
+ return 0;
+ }
+ }
+
+ public static <T> T lookup(Context context, Class<T> clazz, String name)
+ throws NamingException {
+
+ Object object = context.lookup(name);
+ try {
+ return clazz.cast(object);
+ } catch (ClassCastException ex) {
+ // Instead of a ClassCastException, throw an exception with some
+ // more information.
+ if (object instanceof Reference) {
+ Reference ref = (Reference)object;
+ handleException("JNDI failed to de-reference Reference with name " +
+ name + "; is the factory " + ref.getFactoryClassName() +
+ " in your classpath?");
+ return null;
+ } else {
+ handleException("JNDI lookup of name " + name + " returned a " +
+ object.getClass().getName() + " while a " + clazz + " was expected");
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory
+ * @param jcf
+ * @param service
+ * @param workerPool
+ * @return
+ */
+ public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf,
+ AxisService service, WorkerPool workerPool) {
+
+ String name = service.getName();
+ Map<String, String> svc = getServiceStringParameters(service.getParameters());
+ Map<String, String> cf = jcf.getParameters();
+
+ ServiceTaskManager stm = new ServiceTaskManager();
+
+ stm.setServiceName(name);
+ stm.addJmsProperties(cf);
+ stm.addJmsProperties(svc);
+
+ stm.setConnFactoryJNDIName(
+ getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf));
+ String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf);
+ if (destName == null) {
+ destName = service.getName();
+ }
+ stm.setDestinationJNDIName(destName);
+ stm.setDestinationType(getDestinationType(svc, cf));
+
+ stm.setJmsSpec11(
+ getJMSSpecVersion(svc, cf));
+ stm.setTransactionality(
+ getTransactionality(svc, cf));
+ stm.setCacheUserTransaction(
+ getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf));
+ stm.setUserTransactionJNDIName(
+ getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf));
+ stm.setSessionTransacted(
+ getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf));
+ stm.setSessionAckMode(
+ getSessionAck(svc, cf));
+ stm.setMessageSelector(
+ getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf));
+ stm.setSubscriptionDurable(
+ getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf));
+ stm.setDurableSubscriberName(
+ getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf));
+
+ stm.setCacheLevel(
+ getCacheLevel(svc, cf));
+ stm.setPubSubNoLocal(
+ getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf));
+
+ Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf);
+ if (value != null) {
+ stm.setReceiveTimeout(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf);
+ if (value != null) {
+ stm.setConcurrentConsumers(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf);
+ if (value != null) {
+ stm.setMaxConcurrentConsumers(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf);
+ if (value != null) {
+ stm.setIdleTaskExecutionLimit(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf);
+ if (value != null) {
+ stm.setMaxMessagesPerTask(value);
+ }
+
+ value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf);
+ if (value != null) {
+ stm.setInitialReconnectDuration(value);
+ }
+ value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf);
+ if (value != null) {
+ stm.setMaxReconnectDuration(value);
+ }
+ Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf);
+ if (dValue != null) {
+ stm.setReconnectionProgressionFactor(dValue);
+ }
+
+ stm.setWorkerPool(workerPool);
+
+ // remove processed properties from property bag
+ stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER);
+ stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY);
+ stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN);
+ stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED);
+ stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR);
+ stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE);
+ stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME);
+ stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL);
+ stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL);
+ stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT);
+ stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS);
+ stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS);
+ stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT);
+ stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION);
+ stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR);
+
+ return stm;
+ }
+
+ private static Map<String, String> getServiceStringParameters(List list) {
+
+ Map<String, String> map = new HashMap<String, String>();
+ for (Object o : list) {
+ Parameter p = (Parameter) o;
+ if (p.getValue() instanceof String) {
+ map.put(p.getName(), (String) p.getValue());
+ }
+ }
+ return map;
+ }
+
+ private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ if (value == null) {
+ throw new AxisJMSException("Service/connection factory property : " + key);
+ }
+ return value;
+ }
+
+ private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ return value;
+ }
+
+ private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ if (value == null) {
+ return null;
+ } else {
+ return Boolean.valueOf(value);
+ }
+ }
+
+ private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ if (value != null) {
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ throw new AxisJMSException("Invalid value : " + value + " for " + key);
+ }
+ }
+ return null;
+ }
+
+ private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) {
+ String value = (String) svcMap.get(key);
+ if (value == null) {
+ value = (String) cfMap.get(key);
+ }
+ if (value != null) {
+ try {
+ return Double.parseDouble(value);
+ } catch (NumberFormatException e) {
+ throw new AxisJMSException("Invalid value : " + value + " for " + key);
+ }
+ }
+ return null;
+ }
+
+ private static int getTransactionality(Map svcMap, Map cfMap) {
+
+ String key = BaseConstants.PARAM_TRANSACTIONALITY;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if (val == null) {
+ return BaseConstants.TRANSACTION_NONE;
+
+ } else {
+ if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) {
+ return BaseConstants.TRANSACTION_JTA;
+ } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) {
+ return BaseConstants.TRANSACTION_LOCAL;
+ } else {
+ throw new AxisJMSException("Invalid option : " + val + " for parameter : " +
+ BaseConstants.STR_TRANSACTION_JTA);
+ }
+ }
+ }
+
+ private static int getDestinationType(Map svcMap, Map cfMap) {
+
+ String key = JMSConstants.PARAM_DEST_TYPE;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) {
+ return JMSConstants.TOPIC;
+ }
+ return JMSConstants.QUEUE;
+ }
+
+ private static int getSessionAck(Map svcMap, Map cfMap) {
+
+ String key = JMSConstants.PARAM_SESSION_ACK;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) {
+ return Session.AUTO_ACKNOWLEDGE;
+ } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) {
+ return Session.CLIENT_ACKNOWLEDGE;
+ } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){
+ return Session.DUPS_OK_ACKNOWLEDGE;
+ } else if ("SESSION_TRANSACTED".equals(val)) {
+ return 0; //Session.SESSION_TRANSACTED;
+ } else {
+ try {
+ return Integer.parseInt(val);
+ } catch (NumberFormatException ignore) {
+ throw new AxisJMSException("Invalid session acknowledgement mode : " + val);
+ }
+ }
+ }
+
+ private static int getCacheLevel(Map svcMap, Map cfMap) {
+
+ String key = JMSConstants.PARAM_CACHE_LEVEL;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if ("none".equalsIgnoreCase(val)) {
+ return JMSConstants.CACHE_NONE;
+ } else if ("connection".equalsIgnoreCase(val)) {
+ return JMSConstants.CACHE_CONNECTION;
+ } else if ("session".equals(val)){
+ return JMSConstants.CACHE_SESSION;
+ } else if ("consumer".equals(val)) {
+ return JMSConstants.CACHE_CONSUMER;
+ } else if (val != null) {
+ throw new AxisJMSException("Invalid cache level : " + val);
+ }
+ return JMSConstants.CACHE_AUTO;
+ }
+
+ private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) {
+
+ String key = JMSConstants.PARAM_JMS_SPEC_VER;
+ String val = (String) svcMap.get(key);
+ if (val == null) {
+ val = (String) cfMap.get(key);
+ }
+
+ if (val == null || "1.1".equals(val)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * This is a JMS spec independent method to create a Connection. Please be cautious when
+ * making any changes
+ *
+ * @param conFac the ConnectionFactory to use
+ * @param user optional user name
+ * @param pass optional password
+ * @param jmsSpec11 should we use JMS 1.1 API ?
+ * @param isQueue is this to deal with a Queue?
+ * @return a JMS Connection as requested
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static Connection createConnection(ConnectionFactory conFac,
+ String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException {
+
+ Connection connection = null;
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") +
+ "Connection using credentials : (" + user + "/" + pass + ")");
+ }
+
+ if (jmsSpec11 || isQueue == null) {
+ if (user != null && pass != null) {
+ connection = conFac.createConnection(user, pass);
+ } else {
+ connection = conFac.createConnection();
+ }
+
+ } else {
+ QueueConnectionFactory qConFac = null;
+ TopicConnectionFactory tConFac = null;
+ if (isQueue) {
+ tConFac = (TopicConnectionFactory) conFac;
+ } else {
+ qConFac = (QueueConnectionFactory) conFac;
+ }
+
+ if (user != null && pass != null) {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection(user, pass);
+ } else if (tConFac != null) {
+ connection = tConFac.createTopicConnection(user, pass);
+ }
+ } else {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection();
+ } else if (tConFac != null) {
+ connection = tConFac.createTopicConnection();
+ }
+ }
+ }
+ return connection;
+ }
+
+ /**
+ * This is a JMS spec independent method to create a Session. Please be cautious when
+ * making any changes
+ *
+ * @param connection the JMS Connection
+ * @param transacted should the session be transacted?
+ * @param ackMode the ACK mode for the session
+ * @param jmsSpec11 should we use the JMS 1.1 API?
+ * @param isQueue is this Session to deal with a Queue?
+ * @return a Session created for the given information
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static Session createSession(Connection connection, boolean transacted, int ackMode,
+ boolean jmsSpec11, Boolean isQueue) throws JMSException {
+
+ if (jmsSpec11 || isQueue == null) {
+ return connection.createSession(transacted, ackMode);
+
+ } else {
+ if (isQueue) {
+ return ((QueueConnection) connection).createQueueSession(transacted, ackMode);
+ } else {
+ return ((TopicConnection) connection).createTopicSession(transacted, ackMode);
+ }
+ }
+ }
+
+ /**
+ * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when
+ * making any changes
+ *
+ * @param session JMS session
+ * @param destination the Destination
+ * @param isQueue is the Destination a queue?
+ * @param subscriberName optional client name to use for a durable subscription to a topic
+ * @param messageSelector optional message selector
+ * @param pubSubNoLocal should we receive messages sent by us during pub-sub?
+ * @param isDurable is this a durable topic subscription?
+ * @param jmsSpec11 should we use JMS 1.1 API ?
+ * @return a MessageConsumer to receive messages
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static MessageConsumer createConsumer(
+ Session session, Destination destination, Boolean isQueue,
+ String subscriberName, String messageSelector, boolean pubSubNoLocal,
+ boolean isDurable, boolean jmsSpec11) throws JMSException {
+
+ if (jmsSpec11 || isQueue == null) {
+ if (isDurable) {
+ return session.createDurableSubscriber(
+ (Topic) destination, subscriberName, messageSelector, pubSubNoLocal);
+ } else {
+ return session.createConsumer(destination, messageSelector, pubSubNoLocal);
+ }
+ } else {
+ if (isQueue) {
+ return ((QueueSession) session).createReceiver((Queue) destination, messageSelector);
+ } else {
+ if (isDurable) {
+ return ((TopicSession) session).createDurableSubscriber(
+ (Topic) destination, subscriberName, messageSelector, pubSubNoLocal);
+ } else {
+ return ((TopicSession) session).createSubscriber(
+ (Topic) destination, messageSelector, pubSubNoLocal);
+ }
+ }
+ }
+ }
+
+ /**
+ * This is a JMS spec independent method to create a MessageProducer. Please be cautious when
+ * making any changes
+ *
+ * @param session JMS session
+ * @param destination the Destination
+ * @param isQueue is the Destination a queue?
+ * @param jmsSpec11 should we use JMS 1.1 API ?
+ * @return a MessageProducer to send messages to the given Destination
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static MessageProducer createProducer(
+ Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException {
+
+ if (jmsSpec11 || isQueue == null) {
+ return session.createProducer(destination);
+ } else {
+ if (isQueue) {
+ return ((QueueSession) session).createSender((Queue) destination);
+ } else {
+ return ((TopicSession) session).createPublisher((Topic) destination);
+ }
+ }
+ }
+
+ /**
+ * Create a one time MessageProducer for the given JMS OutTransport information
+ * For simplicity and best compatibility, this method uses only JMS 1.0.2b API.
+ * Please be cautious when making any changes
+ *
+ * @param jmsOut the JMS OutTransport information (contains all properties)
+ * @return a JMSSender based on one-time use resources
+ * @throws JMSException on errors, to be handled and logged by the caller
+ */
+ public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut)
+ throws JMSException {
+
+ // digest the targetAddress and locate CF from the EPR
+ jmsOut.loadConnectionFactoryFromProperies();
+
+ // create a one time connection and session to be used
+ Hashtable<String,String> jmsProps = jmsOut.getProperties();
+ String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null;
+ String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null;
+
+ QueueConnectionFactory qConFac = null;
+ TopicConnectionFactory tConFac = null;
+
+ int destType = -1;
+ if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
+ destType = JMSConstants.QUEUE;
+ qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
+
+ } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
+ destType = JMSConstants.TOPIC;
+ tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
+ }
+
+ Connection connection = null;
+ if (user != null && pass != null) {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection(user, pass);
+ } else if (tConFac != null) {
+ connection = tConFac.createTopicConnection(user, pass);
+ }
+ } else {
+ if (qConFac != null) {
+ connection = qConFac.createQueueConnection();
+ } else if (tConFac != null) {
+ connection = tConFac.createTopicConnection();
+ }
+ }
+
+ if (connection == null && jmsOut.getJmsConnectionFactory() != null) {
+ connection = jmsOut.getJmsConnectionFactory().getConnection();
+ }
+
+ Session session = null;
+ MessageProducer producer = null;
+ Destination destination = jmsOut.getDestination();
+
+ if (destType == JMSConstants.QUEUE) {
+ session = ((QueueConnection) connection).
+ createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = ((QueueSession) session).createSender((Queue) destination);
+ } else {
+ session = ((TopicConnection) connection).
+ createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = ((TopicSession) session).createPublisher((Topic) destination);
+ }
+
+ return new JMSMessageSender(connection, session, producer,
+ destination, (jmsOut.getJmsConnectionFactory() == null ?
+ JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false,
+ destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ /**
+ * Return a String representation of the destination type
+ * @param destType the destination type indicator int
+ * @return a descriptive String
+ */
+ public static String getDestinationTypeAsString(int destType) {
+ if (destType == JMSConstants.QUEUE) {
+ return "Queue";
+ } else if (destType == JMSConstants.TOPIC) {
+ return "Topic";
+ } else {
+ return "Generic";
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java
new file mode 100644
index 0000000000..28c8da2a8d
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java
@@ -0,0 +1,1217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.SystemException;
+import javax.transaction.UserTransaction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool;
+
+/**
+ * Each service will have one ServiceTaskManager instance that will create, manage and also destroy
+ * idle tasks created for it, for message receipt. This will also allow individual tasks to cache
+ * the Connection, Session or Consumer as necessary, considering the transactionality required and
+ * user preference.
+ *
+ * This also acts as the ExceptionListener for all JMS connections made on behalf of the service.
+ * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try
+ * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh
+ * for the service, by discarding all connections.
+ */
+public class ServiceTaskManager {
+
+ /** The logger */
+ private static final Log log = LogFactory.getLog(ServiceTaskManager.class);
+
+ /** The Task manager is stopped or has not started */
+ private static final int STATE_STOPPED = 0;
+ /** The Task manager is started and active */
+ private static final int STATE_STARTED = 1;
+ /** The Task manager is paused temporarily */
+ private static final int STATE_PAUSED = 2;
+ /** The Task manager is started, but a shutdown has been requested */
+ private static final int STATE_SHUTTING_DOWN = 3;
+ /** The Task manager has encountered an error */
+ private static final int STATE_FAILURE = 4;
+
+ /** The name of the service managed by this instance */
+ private String serviceName;
+ /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */
+ private String connFactoryJNDIName;
+ /** The JNDI name of the Destination Queue or Topic */
+ private String destinationJNDIName;
+ /** JNDI location for the JTA UserTransaction */
+ private String userTransactionJNDIName = "java:comp/UserTransaction";
+ /** The type of destination - P2P or PubSub (or JMS 1.1 API generic?) */
+ private int destinationType = JMSConstants.GENERIC;
+ /** An optional message selector */
+ private String messageSelector = null;
+
+ /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */
+ private int transactionality = BaseConstants.TRANSACTION_NONE;
+ /** Should created Sessions be transactional ? - should be false when using JTA */
+ private boolean sessionTransacted = true;
+ /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */
+ private int sessionAckMode = Session.AUTO_ACKNOWLEDGE;
+
+ /** Is the subscription durable ? */
+ private boolean subscriptionDurable = false;
+ /** The name of the durable subscriber for this client */
+ private String durableSubscriberName = null;
+ /** In PubSub mode, should I receive messages sent by me / my connection ? */
+ private boolean pubSubNoLocal = false;
+ /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */
+ private int concurrentConsumers = 1;
+ /** Maximum number of consumers to create - see @concurrentConsumers */
+ private int maxConcurrentConsumers = 1;
+ /** The number of idle (i.e. message-less) attempts to be tried before suicide, to scale down */
+ private int idleTaskExecutionLimit = 10;
+ /** The maximum number of successful message receipts for a task - to limit thread life span */
+ private int maxMessagesPerTask = -1; // default is unlimited
+ /** The default receive timeout - a negative value means wait forever, zero dont wait at all */
+ private int receiveTimeout = 1000;
+ /** JMS Resource cache level - Connection, Session, Consumer. Auto will select safe default */
+ private int cacheLevel = JMSConstants.CACHE_AUTO;
+ /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */
+ private boolean cacheUserTransaction = true;
+ /** Shared UserTransactionHandle */
+ private UserTransaction sharedUserTransaction = null;
+ /** Should this service use JMS 1.1 ? (when false, defaults to 1.0.2b) */
+ private boolean jmsSpec11 = true;
+
+ /** Initial duration to attempt re-connection to JMS provider after failure */
+ private int initialReconnectDuration = 10000;
+ /** Progression factory for geometric series that calculates re-connection times */
+ private double reconnectionProgressionFactor = 2.0; // default to [bounded] exponential
+ /** Upper limit on reconnection attempt duration */
+ private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour
+
+ /** The JNDI context properties and other general properties */
+ private Hashtable<String,String> jmsProperties = new Hashtable<String, String>();
+ /** The JNDI Context acuired */
+ private Context context = null;
+ /** The ConnectionFactory to be used */
+ private ConnectionFactory conFactory = null;
+ /** The JMS Destination */
+ private Destination destination = null;
+
+ /** The list of active tasks thats managed by this instance */
+ private final List<MessageListenerTask> pollingTasks =
+ Collections.synchronizedList(new ArrayList<MessageListenerTask>());
+ /** The per-service JMS message receiver to be invoked after receipt of messages */
+ private JMSMessageReceiver jmsMessageReceiver = null;
+
+ /** State of this Task Manager */
+ private volatile int serviceTaskManagerState = STATE_STOPPED;
+ /** Number of invoker tasks active */
+ private volatile int activeTaskCount = 0;
+ /** The shared thread pool from the Listener */
+ private WorkerPool workerPool = null;
+
+ /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */
+ private Connection sharedConnection = null;
+
+ /**
+ * Start or re-start the Task Manager by shutting down any existing worker tasks and
+ * re-creating them. However, if this is STM is PAUSED, a start request is ignored.
+ * This applies for any connection failures during paused state as well, which then will
+ * not try to auto recover
+ */
+ public synchronized void start() {
+
+ if (serviceTaskManagerState == STATE_PAUSED) {
+ log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead");
+ return;
+ }
+
+ // if any tasks are running, stop whats running now
+ if (!pollingTasks.isEmpty()) {
+ stop();
+ }
+
+ if (cacheLevel == JMSConstants.CACHE_AUTO) {
+ cacheLevel =
+ transactionality == BaseConstants.TRANSACTION_NONE ?
+ JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE;
+ }
+ switch (cacheLevel) {
+ case JMSConstants.CACHE_NONE:
+ log.debug("No JMS resources will be cached/shared between poller " +
+ "worker tasks of service : " + serviceName);
+ break;
+ case JMSConstants.CACHE_CONNECTION:
+ log.debug("Only the JMS Connection will be cached and shared between *all* " +
+ "poller task invocations");
+ break;
+ case JMSConstants.CACHE_SESSION:
+ log.debug("The JMS Connection and Session will be cached and shared between " +
+ "successive poller task invocations");
+ break;
+ case JMSConstants.CACHE_CONSUMER:
+ log.debug("The JMS Connection, Session and MessageConsumer will be cached and " +
+ "shared between successive poller task invocations");
+ break;
+ default : {
+ handleException("Invalid cache level : " + cacheLevel +
+ " for service : " + serviceName);
+ }
+ }
+
+ for (int i=0; i<concurrentConsumers; i++) {
+ workerPool.execute(new MessageListenerTask());
+ }
+
+ serviceTaskManagerState = STATE_STARTED;
+ log.info("Task manager for service : " + serviceName + " [re-]initialized");
+ }
+
+ /**
+ * Shutdown the tasks and release any shared resources
+ */
+ public synchronized void stop() {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping ServiceTaskManager for service : " + serviceName);
+ }
+
+ if (serviceTaskManagerState != STATE_FAILURE) {
+ serviceTaskManagerState = STATE_SHUTTING_DOWN;
+ }
+
+ synchronized(pollingTasks) {
+ for (MessageListenerTask lstTask : pollingTasks) {
+ lstTask.requestShutdown();
+ }
+ }
+
+ // try to wait a bit for task shutdown
+ for (int i=0; i<5; i++) {
+ if (activeTaskCount == 0) {
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {}
+ }
+
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.stop();
+ } catch (JMSException e) {
+ logError("Error stopping shared Connection", e);
+ } finally {
+ sharedConnection = null;
+ }
+ }
+
+ if (activeTaskCount > 0) {
+ log.warn("Unable to shutdown all polling tasks of service : " + serviceName);
+ }
+
+ if (serviceTaskManagerState != STATE_FAILURE) {
+ serviceTaskManagerState = STATE_STOPPED;
+ }
+ log.info("Task manager for service : " + serviceName + " shutdown");
+ }
+
+ /**
+ * Temporarily suspend receipt and processing of messages. Accomplished by stopping the
+ * connection / or connections used by the poller tasks
+ */
+ public synchronized void pause() {
+ for (MessageListenerTask lstTask : pollingTasks) {
+ lstTask.pause();
+ }
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.stop();
+ } catch (JMSException e) {
+ logError("Error pausing shared Connection", e);
+ }
+ }
+ }
+
+ /**
+ * Resume receipt and processing of messages of paused tasks
+ */
+ public synchronized void resume() {
+ for (MessageListenerTask lstTask : pollingTasks) {
+ lstTask.resume();
+ }
+ if (sharedConnection != null) {
+ try {
+ sharedConnection.start();
+ } catch (JMSException e) {
+ logError("Error resuming shared Connection", e);
+ }
+ }
+ }
+
+ /**
+ * Start a new MessageListenerTask if we are still active, the threshold is not reached, and w
+ * e do not have any idle tasks - i.e. scale up listening
+ */
+ private void scheduleNewTaskIfAppropriate() {
+ if (serviceTaskManagerState == STATE_STARTED &&
+ pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) {
+ workerPool.execute(new MessageListenerTask());
+ }
+ }
+
+ /**
+ * Get the number of MessageListenerTasks that are currently idle
+ * @return idle task count
+ */
+ private int getIdleTaskCount() {
+ int count = 0;
+ for (MessageListenerTask lstTask : pollingTasks) {
+ if (lstTask.isTaskIdle()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Get the number of MessageListenerTasks that are currently connected to the JMS provider
+ * @return connected task count
+ */
+ private int getConnectedTaskCount() {
+ int count = 0;
+ for (MessageListenerTask lstTask : pollingTasks) {
+ if (lstTask.isConnected()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * The actual threads/tasks that perform message polling
+ */
+ private class MessageListenerTask implements Runnable, ExceptionListener {
+
+ /** The Connection used by the polling task */
+ private Connection connection = null;
+ /** The Sesson used by the polling task */
+ private Session session = null;
+ /** The MessageConsumer used by the polling task */
+ private MessageConsumer consumer = null;
+ /** State of the worker polling task */
+ private volatile int workerState = STATE_STOPPED;
+ /** The number of idle (i.e. without fetching a message) polls for this task */
+ private int idleExecutionCount = 0;
+ /** Is this task idle right now? */
+ private volatile boolean idle = false;
+ /** Is this task connected to the JMS provider successfully? */
+ private boolean connected = false;
+
+ /** As soon as we create a new polling task, add it to the STM for control later */
+ MessageListenerTask() {
+ synchronized(pollingTasks) {
+ pollingTasks.add(this);
+ }
+ }
+
+ /**
+ * Pause this polling worker task
+ */
+ public void pause() {
+ if (isActive()) {
+ if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ try {
+ connection.stop();
+ } catch (JMSException e) {
+ log.warn("Error pausing Message Listener task for service : " + serviceName);
+ }
+ }
+ workerState = STATE_PAUSED;
+ }
+ }
+
+ /**
+ * Resume this polling task
+ */
+ public void resume() {
+ if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ try {
+ connection.start();
+ } catch (JMSException e) {
+ log.warn("Error resuming Message Listener task for service : " + serviceName);
+ }
+ }
+ workerState = STATE_STARTED;
+ }
+
+ /**
+ * Execute the polling worker task
+ */
+ public void run() {
+ workerState = STATE_STARTED;
+ activeTaskCount++;
+ int messageCount = 0;
+
+ if (log.isDebugEnabled()) {
+ log.debug("New poll task starting : thread id = " + Thread.currentThread().getId());
+ }
+
+ try {
+ while (isActive() &&
+ (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) &&
+ (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) {
+
+ UserTransaction ut = null;
+ try {
+ if (transactionality == BaseConstants.TRANSACTION_JTA) {
+ ut = getUserTransaction();
+ ut.begin();
+ }
+ } catch (NotSupportedException e) {
+ handleException("Listener Task is already associated with a transaction", e);
+ } catch (SystemException e) {
+ handleException("Error starting a JTA transaction", e);
+ }
+
+ // Get a message by polling, or receive null
+ Message message = receiveMessage();
+
+ if (log.isTraceEnabled()) {
+ if (message != null) {
+ try {
+ log.trace("<<<<<<< READ message with Message ID : " +
+ message.getJMSMessageID() + " from : " + destination +
+ " by Thread ID : " + Thread.currentThread().getId());
+ } catch (JMSException ignore) {}
+ } else {
+ log.trace("No message received by Thread ID : " +
+ Thread.currentThread().getId() + " for destination : " + destination);
+ }
+ }
+
+ if (message != null) {
+ idle = false;
+ idleExecutionCount = 0;
+ messageCount++;
+ // I will be busy now while processing this message, so start another if needed
+ scheduleNewTaskIfAppropriate();
+ handleMessage(message, ut);
+
+ } else {
+ idle = true;
+ idleExecutionCount++;
+ }
+ }
+
+ } finally {
+ workerState = STATE_STOPPED;
+ activeTaskCount--;
+ synchronized(pollingTasks) {
+ pollingTasks.remove(this);
+ }
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() +
+ " is stopping after processing : " + messageCount + " messages :: " +
+ " isActive : " + isActive() + " maxMessagesPerTask : " +
+ getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() +
+ " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " +
+ getIdleTaskExecutionLimit());
+ } else if (log.isDebugEnabled()) {
+ log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() +
+ " is stopping after processing : " + messageCount + " messages");
+ }
+
+ closeConsumer(true);
+ closeSession(true);
+ closeConnection();
+
+ // My time is up, so if I am going away, create another
+ scheduleNewTaskIfAppropriate();
+ }
+
+ /**
+ * Poll for and return a message if available
+ *
+ * @return a message read, or null
+ */
+ private Message receiveMessage() {
+
+ // get a new connection, session and consumer to prevent a conflict.
+ // If idle, it means we can re-use what we already have
+ if (consumer == null) {
+ connection = getConnection();
+ session = getSession();
+ consumer = getMessageConsumer();
+ if (log.isDebugEnabled()) {
+ log.debug("Preparing a Connection, Session and Consumer to read messages");
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Waiting for a message for service : " + serviceName + " - duration : "
+ + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms")));
+ }
+
+ try {
+ if (getReceiveTimeout() < 0) {
+ return consumer.receive();
+ } else {
+ return consumer.receive(getReceiveTimeout());
+ }
+ } catch (IllegalStateException ignore) {
+ // probably the consumer (shared) was closed.. which is still ok.. as we didn't read
+ } catch (JMSException e) {
+ logError("Error receiving message for service : " + serviceName, e);
+ }
+ return null;
+ }
+
+ /**
+ * Invoke ultimate message handler/listener and ack message and/or
+ * commit/rollback transactions
+ * @param message the JMS message received
+ * @param ut the UserTransaction used to receive this message, or null
+ */
+ private void handleMessage(Message message, UserTransaction ut) {
+
+ String messageId = null;
+ try {
+ messageId = message.getJMSMessageID();
+ } catch (JMSException ignore) {}
+
+ boolean commitOrAck = true;
+ try {
+ commitOrAck = jmsMessageReceiver.onMessage(message, ut);
+
+ } finally {
+
+ // if client acknowledgement is selected, and processing requested ACK
+ if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) {
+ try {
+ message.acknowledge();
+ if (log.isDebugEnabled()) {
+ log.debug("Message : " + messageId + " acknowledged");
+ }
+ } catch (JMSException e) {
+ logError("Error acknowledging message : " + messageId, e);
+ }
+ }
+
+ // close the consumer
+ closeConsumer(false);
+
+ // if session was transacted, commit it or rollback
+ try {
+ if (session.getTransacted()) {
+ if (commitOrAck) {
+ session.commit();
+ if (log.isDebugEnabled()) {
+ log.debug("Session for message : " + messageId + " committed");
+ }
+ } else {
+ session.rollback();
+ if (log.isDebugEnabled()) {
+ log.debug("Session for message : " + messageId + " rolled back");
+ }
+ }
+ }
+ } catch (JMSException e) {
+ logError("Error " + (commitOrAck ? "committing" : "rolling back") +
+ " local session txn for message : " + messageId, e);
+ }
+
+ // if a JTA transaction was being used, commit it or rollback
+ try {
+ if (ut != null) {
+ if (commitOrAck) {
+ ut.commit();
+ if (log.isDebugEnabled()) {
+ log.debug("JTA txn for message : " + messageId + " committed");
+ }
+ } else {
+ ut.rollback();
+ if (log.isDebugEnabled()) {
+ log.debug("JTA txn for message : " + messageId + " rolled back");
+ }
+ }
+ }
+ } catch (Exception e) {
+ logError("Error " + (commitOrAck ? "committing" : "rolling back") +
+ " JTA txn for message : " + messageId + " from the session", e);
+ }
+
+ closeSession(false);
+ closeConnection();
+ }
+ }
+
+ /** Handle JMS Connection exceptions by re-initializing. A single connection failure could
+ * cause re-initialization of multiple MessageListenerTasks / Connections
+ */
+ public void onException(JMSException j) {
+
+ if (!isSTMActive()) {
+ requestShutdown();
+ return;
+ }
+
+ log.warn("JMS Connection failure : " + j.getMessage());
+ setConnected(false);
+
+ if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ // failed Connection was not shared, thus no need to restart the whole STM
+ requestShutdown();
+ return;
+ }
+
+ // if we failed while active, update state to show failure
+ setServiceTaskManagerState(STATE_FAILURE);
+ log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks");
+
+ int r = 1;
+ long retryDuration = initialReconnectDuration;
+
+ do {
+ try {
+ log.info("Reconnection attempt : " + r + " for service : " + serviceName);
+ start();
+ } catch (Exception ignore) {}
+
+ boolean connected = false;
+ for (int i=0; i<5; i++) {
+ if (getConnectedTaskCount() == concurrentConsumers) {
+ connected = true;
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {}
+ }
+
+ if (!connected) {
+ log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName +
+ " failed. Next retry in " + (retryDuration/1000) + "seconds");
+ retryDuration = (long) (retryDuration * reconnectionProgressionFactor);
+ if (retryDuration > maxReconnectDuration) {
+ retryDuration = maxReconnectDuration;
+ }
+
+ try {
+ Thread.sleep(retryDuration);
+ } catch (InterruptedException ignore) {}
+ }
+
+ } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers);
+ }
+
+ protected void requestShutdown() {
+ workerState = STATE_SHUTTING_DOWN;
+ }
+
+ private boolean isActive() {
+ return workerState == STATE_STARTED;
+ }
+
+ protected boolean isTaskIdle() {
+ return idle;
+ }
+
+ public boolean isConnected() {
+ return connected;
+ }
+
+ public void setConnected(boolean connected) {
+ this.connected = connected;
+ }
+
+ /**
+ * Get a Connection that could/should be used by this task - depends on the cache level to reuse
+ * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection
+ */
+ private Connection getConnection() {
+ if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ // Connection is not shared
+ if (connection == null) {
+ connection = createConnection();
+ }
+ } else {
+ if (sharedConnection != null) {
+ connection = sharedConnection;
+ } else {
+ synchronized(this) {
+ if (sharedConnection == null) {
+ sharedConnection = createConnection();
+ }
+ connection = sharedConnection;
+ }
+ }
+ }
+ setConnected(true);
+ return connection;
+ }
+
+ /**
+ * Get a Session that could/should be used by this task - depends on the cache level to reuse
+ * @param connection the connection (could be the shared connection) to use to create a Session
+ * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session
+ * created using the Connection passed, or a new/shared connection
+ */
+ private Session getSession() {
+ if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) {
+ session = createSession();
+ }
+ return session;
+ }
+
+ /**
+ * Get a MessageConsumer that chould/should be used by this task - depends on the cache
+ * level to reuse
+ * @param connection option Connection to be used
+ * @param session optional Session to be used
+ * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new
+ * MessageConsumer possibly using the Connection and Session passed in
+ */
+ private MessageConsumer getMessageConsumer() {
+ if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) {
+ consumer = createConsumer();
+ }
+ return consumer;
+ }
+
+ /**
+ * Close the given Connection, hiding exceptions if any which are logged
+ * @param connection the Connection to be closed
+ */
+ private void closeConnection() {
+ if (connection != null &&
+ cacheLevel < JMSConstants.CACHE_CONNECTION) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS connection for service : " + serviceName);
+ }
+ connection.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS connection", e);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ /**
+ * Close the given Session, hiding exceptions if any which are logged
+ * @param session the Session to be closed
+ */
+ private void closeSession(boolean forced) {
+ if (session != null &&
+ (cacheLevel < JMSConstants.CACHE_SESSION || forced)) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS session for service : " + serviceName);
+ }
+ session.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS session", e);
+ } finally {
+ session = null;
+ }
+ }
+ }
+
+ /**
+ * Close the given Consumer, hiding exceptions if any which are logged
+ * @param consumer the Consumer to be closed
+ */
+ private void closeConsumer(boolean forced) {
+ if (consumer != null &&
+ (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Closing non-shared JMS consumer for service : " + serviceName);
+ }
+ consumer.close();
+ } catch (JMSException e) {
+ logError("Error closing JMS consumer", e);
+ } finally {
+ consumer = null;
+ }
+ }
+ }
+
+ /**
+ * Create a new Connection for this STM, using JNDI properties and credentials provided
+ * @return a new Connection for this STM, using JNDI properties and credentials provided
+ */
+ private Connection createConnection() {
+
+ try {
+ conFactory = JMSUtils.lookup(
+ getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName());
+ log.debug("Connected to the JMS connection factory : " + getConnFactoryJNDIName());
+ } catch (NamingException e) {
+ handleException("Error looking up connection factory : " + getConnFactoryJNDIName() +
+ " using JNDI properties : " + jmsProperties, e);
+ }
+
+ Connection connection = null;
+ try {
+ connection = JMSUtils.createConnection(
+ conFactory,
+ jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME),
+ jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD),
+ isJmsSpec11(), isQueue());
+
+ connection.setExceptionListener(this);
+ connection.start();
+ log.debug("JMS Connection for service : " + serviceName + " created and started");
+
+ } catch (JMSException e) {
+ handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() +
+ " using JNDI properties : " + jmsProperties, e);
+ }
+ return connection;
+ }
+
+ /**
+ * Create a new Session for this STM
+ * @param connection the Connection to be used
+ * @return a new Session created using the Connection passed in
+ */
+ private Session createSession() {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS Session for service : " + serviceName);
+ }
+ return JMSUtils.createSession(
+ connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue());
+
+ } catch (JMSException e) {
+ handleException("Error creating JMS session for service : " + serviceName, e);
+ }
+ return null;
+ }
+
+ /**
+ * Create a new MessageConsumer for this STM
+ * @param session the Session to be used
+ * @return a new MessageConsumer created using the Session passed in
+ */
+ private MessageConsumer createConsumer() {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new JMS MessageConsumer for service : " + serviceName);
+ }
+
+ return JMSUtils.createConsumer(
+ session, getDestination(session), isQueue(),
+ (isSubscriptionDurable() && getDurableSubscriberName() == null ?
+ getDurableSubscriberName() : serviceName),
+ getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11());
+
+ } catch (JMSException e) {
+ handleException("Error creating JMS consumer for service : " + serviceName,e);
+ }
+ return null;
+ }
+ }
+
+ // -------------- mundane private methods ----------------
+ /**
+ * Get the InitialContext for lookup using the JNDI parameters applicable to the service
+ * @return the InitialContext to be used
+ * @throws NamingException
+ */
+ private Context getInitialContext() throws NamingException {
+ if (context == null) {
+ context = new InitialContext(jmsProperties);
+ }
+ return context;
+ }
+
+ /**
+ * Return the JMS Destination for the JNDI name of the Destination from the InitialContext
+ * @return the JMS Destination to which this STM listens for messages
+ */
+ private Destination getDestination(Session session) {
+ if (destination == null) {
+ try {
+ context = getInitialContext();
+ destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName());
+ if (log.isDebugEnabled()) {
+ log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() +
+ " found for service " + serviceName);
+ }
+ } catch (NamingException e) {
+ try {
+ switch (destinationType) {
+ case JMSConstants.QUEUE: {
+ destination = session.createQueue(getDestinationJNDIName());
+ break;
+ }
+ case JMSConstants.TOPIC: {
+ destination = session.createTopic(getDestinationJNDIName());
+ break;
+ }
+ default: {
+ handleException("Error looking up JMS destination : " +
+ getDestinationJNDIName() + " using JNDI properties : " +
+ jmsProperties, e);
+ }
+ }
+ } catch (JMSException j) {
+ handleException("Error looking up and creating JMS destination : " +
+ getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e);
+ }
+ }
+ }
+ return destination;
+ }
+
+ /**
+ * The UserTransaction to be used, looked up from the JNDI
+ * @return The UserTransaction to be used, looked up from the JNDI
+ */
+ private UserTransaction getUserTransaction() {
+ if (!cacheUserTransaction) {
+ if (log.isDebugEnabled()) {
+ log.debug("Acquiring a new UserTransaction for service : " + serviceName);
+ }
+
+ try {
+ context = getInitialContext();
+ return
+ JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName());
+ } catch (NamingException e) {
+ handleException("Error looking up UserTransaction : " + getDestinationJNDIName() +
+ " using JNDI properties : " + jmsProperties, e);
+ }
+ }
+
+ if (sharedUserTransaction == null) {
+ try {
+ context = getInitialContext();
+ sharedUserTransaction =
+ JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName());
+ if (log.isDebugEnabled()) {
+ log.debug("Acquired shared UserTransaction for service : " + serviceName);
+ }
+ } catch (NamingException e) {
+ handleException("Error looking up UserTransaction : " + getDestinationJNDIName() +
+ " using JNDI properties : " + jmsProperties, e);
+ }
+ }
+ return sharedUserTransaction;
+ }
+
+ // -------------------- trivial methods ---------------------
+ private boolean isSTMActive() {
+ return serviceTaskManagerState == STATE_STARTED;
+ }
+
+ /**
+ * Is this STM bound to a Queue, Topic or a JMS 1.1 Generic Destination?
+ * @return TRUE for a Queue, FALSE for a Topic and NULL for a Generic Destination
+ */
+ private Boolean isQueue() {
+ if (destinationType == JMSConstants.GENERIC) {
+ return null;
+ } else {
+ return destinationType == JMSConstants.QUEUE;
+ }
+ }
+
+ private void logError(String msg, Exception e) {
+ log.error(msg, e);
+ }
+
+ private void handleException(String msg, Exception e) {
+ log.error(msg, e);
+ throw new AxisJMSException(msg, e);
+ }
+
+ private void handleException(String msg) {
+ log.error(msg);
+ throw new AxisJMSException(msg);
+ }
+
+ // -------------- getters and setters ------------------
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public String getConnFactoryJNDIName() {
+ return connFactoryJNDIName;
+ }
+
+ public void setConnFactoryJNDIName(String connFactoryJNDIName) {
+ this.connFactoryJNDIName = connFactoryJNDIName;
+ }
+
+ public String getDestinationJNDIName() {
+ return destinationJNDIName;
+ }
+
+ public void setDestinationJNDIName(String destinationJNDIName) {
+ this.destinationJNDIName = destinationJNDIName;
+ }
+
+ public int getDestinationType() {
+ return destinationType;
+ }
+
+ public void setDestinationType(int destinationType) {
+ this.destinationType = destinationType;
+ }
+
+ public String getMessageSelector() {
+ return messageSelector;
+ }
+
+ public void setMessageSelector(String messageSelector) {
+ this.messageSelector = messageSelector;
+ }
+
+ public int getTransactionality() {
+ return transactionality;
+ }
+
+ public void setTransactionality(int transactionality) {
+ this.transactionality = transactionality;
+ sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL);
+ }
+
+ public boolean isSessionTransacted() {
+ return sessionTransacted;
+ }
+
+ public void setSessionTransacted(Boolean sessionTransacted) {
+ if (sessionTransacted != null) {
+ this.sessionTransacted = sessionTransacted;
+ // sesstionTransacted means local transactions are used, however !sessionTransacted does
+ // not mean that JTA is used
+ if (sessionTransacted) {
+ transactionality = BaseConstants.TRANSACTION_LOCAL;
+ }
+ }
+ }
+
+ public int getSessionAckMode() {
+ return sessionAckMode;
+ }
+
+ public void setSessionAckMode(int sessionAckMode) {
+ this.sessionAckMode = sessionAckMode;
+ }
+
+ public boolean isSubscriptionDurable() {
+ return subscriptionDurable;
+ }
+
+ public void setSubscriptionDurable(Boolean subscriptionDurable) {
+ if (subscriptionDurable != null) {
+ this.subscriptionDurable = subscriptionDurable;
+ }
+ }
+
+ public String getDurableSubscriberName() {
+ return durableSubscriberName;
+ }
+
+ public void setDurableSubscriberName(String durableSubscriberName) {
+ this.durableSubscriberName = durableSubscriberName;
+ }
+
+ public boolean isPubSubNoLocal() {
+ return pubSubNoLocal;
+ }
+
+ public void setPubSubNoLocal(Boolean pubSubNoLocal) {
+ if (pubSubNoLocal != null) {
+ this.pubSubNoLocal = pubSubNoLocal;
+ }
+ }
+
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
+
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
+ }
+
+ public int getMaxConcurrentConsumers() {
+ return maxConcurrentConsumers;
+ }
+
+ public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
+ this.maxConcurrentConsumers = maxConcurrentConsumers;
+ }
+
+ public int getIdleTaskExecutionLimit() {
+ return idleTaskExecutionLimit;
+ }
+
+ public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
+ this.idleTaskExecutionLimit = idleTaskExecutionLimit;
+ }
+
+ public int getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ public void setReceiveTimeout(int receiveTimeout) {
+ this.receiveTimeout = receiveTimeout;
+ }
+
+ public int getCacheLevel() {
+ return cacheLevel;
+ }
+
+ public void setCacheLevel(int cacheLevel) {
+ this.cacheLevel = cacheLevel;
+ }
+
+ public int getInitialReconnectDuration() {
+ return initialReconnectDuration;
+ }
+
+ public void setInitialReconnectDuration(int initialReconnectDuration) {
+ this.initialReconnectDuration = initialReconnectDuration;
+ }
+
+ public double getReconnectionProgressionFactor() {
+ return reconnectionProgressionFactor;
+ }
+
+ public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) {
+ this.reconnectionProgressionFactor = reconnectionProgressionFactor;
+ }
+
+ public long getMaxReconnectDuration() {
+ return maxReconnectDuration;
+ }
+
+ public void setMaxReconnectDuration(long maxReconnectDuration) {
+ this.maxReconnectDuration = maxReconnectDuration;
+ }
+
+ public int getMaxMessagesPerTask() {
+ return maxMessagesPerTask;
+ }
+
+ public void setMaxMessagesPerTask(int maxMessagesPerTask) {
+ this.maxMessagesPerTask = maxMessagesPerTask;
+ }
+
+ public String getUserTransactionJNDIName() {
+ return userTransactionJNDIName;
+ }
+
+ public void setUserTransactionJNDIName(String userTransactionJNDIName) {
+ if (userTransactionJNDIName != null) {
+ this.userTransactionJNDIName = userTransactionJNDIName;
+ }
+ }
+
+ public boolean isCacheUserTransaction() {
+ return cacheUserTransaction;
+ }
+
+ public void setCacheUserTransaction(Boolean cacheUserTransaction) {
+ if (cacheUserTransaction != null) {
+ this.cacheUserTransaction = cacheUserTransaction;
+ }
+ }
+
+ public boolean isJmsSpec11() {
+ return jmsSpec11;
+ }
+
+ public void setJmsSpec11(boolean jmsSpec11) {
+ this.jmsSpec11 = jmsSpec11;
+ }
+
+ public Hashtable<String, String> getJmsProperties() {
+ return jmsProperties;
+ }
+
+ public void addJmsProperties(Map<String, String> jmsProperties) {
+ this.jmsProperties.putAll(jmsProperties);
+ }
+
+ public void removeJmsProperties(String key) {
+ this.jmsProperties.remove(key);
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return conFactory;
+ }
+
+ public List<MessageListenerTask> getPollingTasks() {
+ return pollingTasks;
+ }
+
+ public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) {
+ this.jmsMessageReceiver = jmsMessageReceiver;
+ }
+
+ public void setWorkerPool(WorkerPool workerPool) {
+ this.workerPool = workerPool;
+ }
+
+ public int getActiveTaskCount() {
+ return activeTaskCount;
+ }
+
+ public void setServiceTaskManagerState(int serviceTaskManagerState) {
+ this.serviceTaskManagerState = serviceTaskManagerState;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java
new file mode 100644
index 0000000000..2b415df64d
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java
@@ -0,0 +1,49 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
+
+/**
+ * Class encapsulating the content type information for a given message.
+ */
+public class ContentTypeInfo {
+ private final String propertyName;
+ private final String contentType;
+
+ public ContentTypeInfo(String propertyName, String contentType) {
+ this.propertyName = propertyName;
+ this.contentType = contentType;
+ }
+
+ /**
+ * Get the name of the message property from which the content type
+ * has been extracted.
+ *
+ * @return the property name or null if the content type was not determined
+ * by extracting it from a message property
+ */
+ public String getPropertyName() {
+ return propertyName;
+ }
+
+ /**
+ * Get the content type of the message.
+ *
+ * @return The content type of the message. The return value is never null.
+ */
+ public String getContentType() {
+ return contentType;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java
new file mode 100644
index 0000000000..0dba93356f
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java
@@ -0,0 +1,43 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * Interface implemented by content type rules.
+ */
+public interface ContentTypeRule {
+ /**
+ * Attempt to determine the content type of the given JMS message.
+ *
+ * @param message the message
+ * @return If the rule matches, the return value encapsulates the content type of the
+ * message and the message property name from which is was extracted
+ * (if applicable). If the rule doesn't match, the method returns null.
+ * @throws JMSException
+ */
+ ContentTypeInfo getContentType(Message message) throws JMSException;
+
+ /**
+ * Get the name of the message property used to extract the content type from,
+ * if applicable.
+ *
+ * @return the property name or null if not applicable
+ */
+ String getExpectedContentTypeProperty();
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java
new file mode 100644
index 0000000000..a9fd25ef1b
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java
@@ -0,0 +1,74 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
+
+import java.util.Iterator;
+
+import javax.jms.BytesMessage;
+import javax.jms.TextMessage;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.Parameter;
+
+/**
+ * Utility class to create content type rules and rule sets from XML.
+ */
+public class ContentTypeRuleFactory {
+ private ContentTypeRuleFactory() {}
+
+ public static ContentTypeRule parse(OMElement element) throws AxisFault {
+ String name = element.getLocalName();
+ String value = element.getText();
+ if (name.equals("jmsProperty")) {
+ return new PropertyRule(value);
+ } else if (name.equals("textMessage")) {
+ return new MessageTypeRule(TextMessage.class, value);
+ } else if (name.equals("bytesMessage")) {
+ return new MessageTypeRule(BytesMessage.class, value);
+ } else if (name.equals("default")) {
+ return new DefaultRule(value);
+ } else {
+ throw new AxisFault("Unknown content rule type '" + name + "'");
+ }
+ }
+
+ public static ContentTypeRuleSet parse(Parameter param) throws AxisFault {
+ ContentTypeRuleSet ruleSet = new ContentTypeRuleSet();
+ Object value = param.getValue();
+ if (value instanceof OMElement) {
+ OMElement element = (OMElement)value;
+
+ // DescriptionBuilder#processParameters actually sets the parameter element
+ // itself as the value. We need to support this case.
+ // TODO: seems like a bug in Axis2 and is inconsistent with Synapse's way of parsing parameter in proxy definitions
+ if (element == param.getParameterElement()) {
+ element = element.getFirstElement();
+ }
+
+ if (element.getLocalName().equals("rules")) {
+ for (Iterator it = element.getChildElements(); it.hasNext(); ) {
+ ruleSet.addRule(parse((OMElement)it.next()));
+ }
+ } else {
+ throw new AxisFault("Expected <rules> element");
+ }
+ } else {
+ ruleSet.addRule(new DefaultRule((String)value));
+ }
+ return ruleSet;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java
new file mode 100644
index 0000000000..90383a42f8
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java
@@ -0,0 +1,64 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A set of content type rules.
+ */
+public class ContentTypeRuleSet {
+ private final List<ContentTypeRule> rules = new ArrayList<ContentTypeRule>();
+ private String defaultContentTypeProperty;
+
+ /**
+ * Add a content type rule to this set.
+ *
+ * @param rule the rule to add
+ */
+ public void addRule(ContentTypeRule rule) {
+ rules.add(rule);
+ if (defaultContentTypeProperty == null) {
+ defaultContentTypeProperty = rule.getExpectedContentTypeProperty();
+ }
+ }
+
+ /**
+ * Determine the content type of the given message.
+ * This method will try the registered rules in turn until the first rule matches.
+ *
+ * @param message the message
+ * @return the content type information for the message or null if none of the rules matches
+ * @throws JMSException
+ */
+ public ContentTypeInfo getContentTypeInfo(Message message) throws JMSException {
+ for (ContentTypeRule rule : rules) {
+ ContentTypeInfo contentTypeInfo = rule.getContentType(message);
+ if (contentTypeInfo != null) {
+ return contentTypeInfo;
+ }
+ }
+ return null;
+ }
+
+ public String getDefaultContentTypeProperty() {
+ return defaultContentTypeProperty;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java
new file mode 100644
index 0000000000..a158f6ec74
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java
@@ -0,0 +1,37 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
+
+import javax.jms.Message;
+
+/**
+ * Content type rule that always matches and that returns a fixed (default) content type.
+ */
+public class DefaultRule implements ContentTypeRule {
+ private final String contentType;
+
+ public DefaultRule(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public ContentTypeInfo getContentType(Message message) {
+ return new ContentTypeInfo(null, contentType);
+ }
+
+ public String getExpectedContentTypeProperty() {
+ return null;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java
new file mode 100644
index 0000000000..cb25ab93d4
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java
@@ -0,0 +1,39 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
+
+import javax.jms.Message;
+
+/**
+ * Content type rule that matches a given message type and returns a fixed content type.
+ */
+public class MessageTypeRule implements ContentTypeRule {
+ private final Class<? extends Message> messageType;
+ private final String contentType;
+
+ public MessageTypeRule(Class<? extends Message> messageType, String contentType) {
+ this.messageType = messageType;
+ this.contentType = contentType;
+ }
+
+ public ContentTypeInfo getContentType(Message message) {
+ return messageType.isInstance(message) ? new ContentTypeInfo(null, contentType) : null;
+ }
+
+ public String getExpectedContentTypeProperty() {
+ return null;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java
new file mode 100644
index 0000000000..c8d13ba462
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java
@@ -0,0 +1,39 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * Content type rule that attempts to extract the content type from a message property.
+ */
+public class PropertyRule implements ContentTypeRule {
+ private final String propertyName;
+
+ public PropertyRule(String propertyName) {
+ this.propertyName = propertyName;
+ }
+
+ public ContentTypeInfo getContentType(Message message) throws JMSException {
+ String value = message.getStringProperty(propertyName);
+ return value == null ? null : new ContentTypeInfo(propertyName, value);
+ }
+
+ public String getExpectedContentTypeProperty() {
+ return propertyName;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java
new file mode 100644
index 0000000000..750170edd7
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java
@@ -0,0 +1,23 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+ * Provides classes and interfaces to define content type rules.
+ *
+ * Content type rules are used to determine the content type of a
+ * received message based on JMS properties, message type, etc.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; \ No newline at end of file
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html
new file mode 100644
index 0000000000..b95b49eb69
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html
@@ -0,0 +1,356 @@
+<html>
+<title>JMS Transport Configuration</title>
+<body>
+
+<h2>JMS Listener Configuration (axis2.xml)</h2>
+
+e.g:
+
+<pre>
+ &lt;transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"&gt;
+ &lt;parameter name="myTopicConnectionFactory"&gt;
+ &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+ &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;TopicConnectionFactory&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.ConnectionFactoryType"&gt;topic&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
+ &lt;/parameter&gt;
+
+ &lt;parameter name="myQueueConnectionFactory"&gt;
+ &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+ &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;QueueConnectionFactory&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.ConnectionFactoryType"&gt;queue&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.1&lt;/parameter&gt;
+ &lt;/parameter&gt;
+
+ &lt;parameter name="default"&gt;
+ &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+ &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;ConnectionFactory&lt;/parameter&gt;
+ &lt;/parameter&gt;
+ &lt;/transportReceiver&gt;
+
+ &lt;transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"&gt;
+ &lt;parameter name="myTopicConnectionFactory"&gt;
+ &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+ &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;TopicConnectionFactory&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.CacheLevel"&gt;producer&lt;/parameter&gt;
+ &lt;/parameter&gt;
+
+ &lt;parameter name="myQueueConnectionFactory"&gt;
+ &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+ &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;QueueConnectionFactory&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.CacheLevel"&gt;producer&lt;/parameter&gt;
+ &lt;/parameter&gt;
+
+ &lt;parameter name="default"&gt;
+ &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
+ &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;ConnectionFactory&lt;/parameter&gt;
+ &lt;parameter name="transport.jms.CacheLevel"&gt;connection&lt;/parameter&gt;
+ &lt;/parameter&gt;
+ &lt;/transportSender&gt;
+</pre>
+
+<p>
+ The Transport Listener and Sender both allows the user to configure one or more logical JMS Connection
+ Factories, which are named definitions as shown above. Any remaining parameters maybe defined at the
+ service level via the services.xml. The applicable set of parameters for a service would be the
+ union of the properties from the services.xml and the corresponding logical connection factory.
+</p>
+
+<TABLE WIDTH="100%" BORDER=1 BORDERCOLOR="#000000" CELLPADDING=4 CELLSPACING=0>
+ <COL WIDTH="10%">
+ <COL WIDTH="20%">
+ <COL WIDTH="60%">
+ <COL WIDTH="5%">
+ <COL WIDTH="5%">
+ <tr>
+ <td>Transport level</td>
+ <td><BR></td>
+ <td><BR></td>
+ <td>Listening</td>
+ <td>Sending</td>
+ </tr>
+ <tr>
+ <td>JNDI</td>
+ <td>java.naming.factory.initial</td>
+ <td>The JNDI InitialContext factory class</td>
+ <td>Required</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>java.naming.provider.url</td>
+ <td>JNDI Provider URL</td>
+ <td>Required</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>java.naming.security.principal</td>
+ <td>Username for JNDI access</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>java.naming.security.credentials</td>
+ <td>Password for JNDI access</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Transactions</td>
+ <td>transport.Transactionality</td>
+ <td>Desired transactionality. One of none / local / jta</td>
+ <td>Defaults to <B>none</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.UserTxnJNDIName</td>
+ <td>JNDI name to be used to obtain a UserTransaction</td>
+ <td>Defaults to &quot;java:comp/UserTransaction&quot;</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.CacheUserTxn</td>
+ <td>Generally its safe and more efficient to cache the
+ UserTransaction reference from JNDI. One of true/ false</td>
+ <td>Defaults to <B>true</B></td>
+ <td><BR></td>
+ </tr>
+
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.SessionTransacted</td>
+ <td>Should the JMS Session be transacted. One of true/ false</td>
+ <td>Defaults to <B>true</B> when local transactions are used</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.SessionAcknowledgement</td>
+ <td>JMS Session acknowledgement mode to be used. One of AUTO_ACKNOWLEDGE | CLIENT_ACKNOWLEDGE | DUPS_OK_ACKNOWLEDGE | SESSION_TRANSACTED</td>
+ <td>Defaults to <B>AUTO_ACKNOWLEDGE</B></td>
+ <td><BR></td>
+ </tr>
+
+ <tr>
+ <td>Connection</td>
+ <td>transport.jms.ConnectionFactory</td>
+ <td>Name of the logical connection factory this service will use</td>
+ <td>Defaults to &quot;default&quot;</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ConnectionFactoryJNDIName</td>
+ <td>The JNDI name of the JMS ConnectionFactory</td>
+ <td>Required</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ConnectionFactoryType</td>
+ <td> Type of ConnectionFactory &ndash; queue / topic</td>
+ <td>Suggested to be specified</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.JMSSpecVersion</td>
+ <td>JMS API Version One of &quot;1.1&quot; or &quot;1.0.2b&quot;</td>
+ <td>Defaults to 1.1</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.UserName</td>
+ <td>The JMS connection username</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.Password</td>
+ <td>The JMS connection password</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Destinations</td>
+ <td>transport.jms.Destination</td>
+ <td>JNDI Name of the Destination </td>
+ <td>Defaults to Service name</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.DestinationType</td>
+ <td>Type of Destination &ndash; queue / topic</td>
+ <td>Defaults to a queue</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.DefaultReplyDestination</td>
+ <td>JNDI Name of the default reply Destination</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.DefaultReplyDestinationType</td>
+ <td>Type of the reply Destination &ndash; queue / topic</td>
+ <td>Same type as of Destination</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Advanced</td>
+ <td>transport.jms.MessageSelector</td>
+ <td>Optional message selector to be applied</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.SubscriptionDurable</td>
+ <td>Is the subscription durable? (For Topics) &ndash; true / false</td>
+ <td>Defaults to <B>false</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.DurableSubscriberName</td>
+ <td>Name to be used for the durable subscription</td>
+ <td>Required when subscription is durable</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.PubSubNoLocal</td>
+ <td>Should messages published by the same connection (for Topics)
+ be received? &ndash; true / false</td>
+ <td>Defaults to <B>false</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.CacheLevel</td>
+ <td>The JMS resource cache level. One of none / connection /
+ session / consumer / producer / auto</td>
+ <td>Defaults to <B>auto</B> </td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ReceiveTimeout</td>
+ <td>Time to wait for a JMS message during polling. Negative means
+ wait forever, while 0 means do not wait at all. Anything else, is
+ a millisecond value for the poll</td>
+ <td>Defaults to 1000ms</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ConcurrentConsumers</td>
+ <td>Number of concurrent consumer tasks (~threads) to be started to
+ poll for messages for this service. For Topics, this should be
+ always 1, to prevent the same message being processed multiple
+ times</td>
+ <td>Defaults to <B>1</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.MaxConcurrentConsumers</td>
+ <td>Will dynamically scale the number of concurrent consumer tasks
+ (~threads) until this value; as the load increases. Should always
+ be 1 for Topics.</td>
+ <td>Defaults to <B>1</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.IdleTaskLimit</td>
+ <td>The number of idle (i.e. poll without receipt of a message)
+ runs per task, before it dies &ndash; to recycle resources, and to
+ allow dynamic scale down.</td>
+ <td>Defaults to 10</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.MaxMessagesPerTask</td>
+ <td>The maximum number of successful message receipts to limit per
+ Task lifetime. </td>
+ <td>Defaults to <B>&ndash;1</B> which implies unlimited messages</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Reconnection</td>
+ <td>transport.jms.InitialReconnectDuration</td>
+ <td>Initial reconnection attempt duration</td>
+ <td>Defaults to 10,000ms</td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.ReconnectProgressFactor</td>
+ <td>Factor used to compute consecutive reconnection attempt
+ durations, in a geometric series</td>
+ <td>Defaults to <B>2 (i.e. exponential)</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.MaxReconnectDuration</td>
+ <td>Maximum limit for a reconnection duration</td>
+ <td>Defaults to <B>1 hour</B></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>transport.jms.PublishEPR</td>
+ <td>One or more JMS URL's to be showed as the JMS EPRs on the WSDL
+ for the service. Allows the specification of a custom EPR, and/or
+ hiding of internal properties from a public EPR (e.g.
+ credentials). Add one as LEGACY to retain auto generated EPR, when
+ adding new EPRs</td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td><BR></td>
+ <td><BR></td>
+ <td><BR></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td>Legacy Mode and Payload handling</td>
+ <td>Wrapper</td>
+ <td>Binary and Text payload wrapper element to be specified as &quot;{ns}name&quot; where ns refers to a namespace and name the name of the element</td>
+ <td>Default binary wrapper<ul><li>{http://ws.apache.org/commons/ns/payload}binary</li></ul>
+ Default text wrapper <ul><li>{http://ws.apache.org/commons/ns/payload}text</li></ul></td>
+ <td><BR></td>
+ </tr>
+ <tr>
+ <td><BR></td>
+ <td>Operation</td>
+ <td>operation name to be specified as &quot;{ns}name&quot; where ns refers to the namespace and name the name of the operation</td>
+ <td>Defaults to urn:mediate</td>
+ <td><BR></td>
+ </tr>
+</TABLE>
+
+</body>
+</html> \ No newline at end of file