summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2009-05-13 12:39:53 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2009-05-13 12:39:53 +0000
commit9145d1479e838918317bc9d4c5e25fe537e5f6de (patch)
tree12c504c96900ee6ebeb19e71627cf3e97df4ab78 /branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms
parenta0d58fe9ff6b7babfa8fd076779fbd374fe1db19 (diff)
Abandon trying to use the new Axis2 JMS transport for now as its proving too messy tryingto backport it to the 1.4.1 release. Now trying a new approach which modifies the JMS transport from Axis2 1.4.1
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@774293 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java31
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java72
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java75
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java56
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java393
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java122
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java273
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java111
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java28
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java294
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java237
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java332
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java306
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java499
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java1115
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java1217
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java49
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java43
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java74
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java64
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java37
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java39
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java39
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java23
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html356
25 files changed, 0 insertions, 5885 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java
deleted file mode 100644
index ec53a2a1ca..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-public class AxisJMSException extends RuntimeException {
-
- AxisJMSException() {
- super();
- }
-
- AxisJMSException(String msg) {
- super(msg);
- }
-
- AxisJMSException(String msg, Exception e) {
- super(msg, e);
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java
deleted file mode 100644
index 5228efa154..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-
-/**
- * Data source implementation wrapping a JMS {@link BytesMessage}.
- * <p>
- * Note that two input streams created by the same instance of this
- * class can not be used at the same time.
- */
-public class BytesMessageDataSource implements SizeAwareDataSource {
- private final BytesMessage message;
- private final String contentType;
-
- public BytesMessageDataSource(BytesMessage message, String contentType) {
- this.message = message;
- this.contentType = contentType;
- }
-
- public BytesMessageDataSource(BytesMessage message) {
- this(message, "application/octet-stream");
- }
-
- public long getSize() {
- try {
- return message.getBodyLength();
- } catch (JMSException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- public String getContentType() {
- return contentType;
- }
-
- public InputStream getInputStream() throws IOException {
- try {
- message.reset();
- } catch (JMSException ex) {
- throw new JMSExceptionWrapper(ex);
- }
- return new BytesMessageInputStream(message);
- }
-
- public String getName() {
- return null;
- }
-
- public OutputStream getOutputStream() throws IOException {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java
deleted file mode 100644
index 9080641572..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.io.InputStream;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-
-/**
- * Input stream that reads data from a JMS {@link BytesMessage}.
- * Note that since the current position in the message is managed by
- * the underlying {@link BytesMessage} object, it is not possible to
- * use several instances of this class operating on a single
- * {@link BytesMessage} at the same time.
- */
-public class BytesMessageInputStream extends InputStream {
- private final BytesMessage message;
-
- public BytesMessageInputStream(BytesMessage message) {
- this.message = message;
- }
-
- @Override
- public int read() throws JMSExceptionWrapper {
- try {
- return message.readByte() & 0xFF;
- } catch (MessageEOFException ex) {
- return -1;
- } catch (JMSException ex) {
- throw new JMSExceptionWrapper(ex);
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws JMSExceptionWrapper {
- if (off == 0) {
- try {
- return message.readBytes(b, len);
- } catch (JMSException ex) {
- throw new JMSExceptionWrapper(ex);
- }
- } else {
- byte[] b2 = new byte[len];
- int c = read(b2);
- if (c > 0) {
- System.arraycopy(b2, 0, b, off, c);
- }
- return c;
- }
- }
-
- @Override
- public int read(byte[] b) throws JMSExceptionWrapper {
- try {
- return message.readBytes(b);
- } catch (JMSException ex) {
- throw new JMSExceptionWrapper(ex);
- }
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java
deleted file mode 100644
index 4508d68280..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.io.OutputStream;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-
-public class BytesMessageOutputStream extends OutputStream {
- private final BytesMessage message;
-
- public BytesMessageOutputStream(BytesMessage message) {
- this.message = message;
- }
-
- @Override
- public void write(int b) throws JMSExceptionWrapper {
- try {
- message.writeByte((byte)b);
- } catch (JMSException ex) {
- throw new JMSExceptionWrapper(ex);
- }
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws JMSExceptionWrapper {
- try {
- message.writeBytes(b, off, len);
- } catch (JMSException ex) {
- new JMSExceptionWrapper(ex);
- }
- }
-
- @Override
- public void write(byte[] b) throws JMSExceptionWrapper {
- try {
- message.writeBytes(b);
- } catch (JMSException ex) {
- throw new JMSExceptionWrapper(ex);
- }
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
deleted file mode 100644
index d5d164ce76..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.util.Hashtable;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterIncludeImpl;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Encapsulate a JMS Connection factory definition within an Axis2.xml
- *
- * JMS Connection Factory definitions, allows JNDI properties as well as other service
- * level parameters to be defined, and re-used by each service that binds to it
- *
- * When used for sending messages out, the JMSConnectionFactory'ies are able to cache
- * a Connection, Session or Producer
- */
-public class JMSConnectionFactory {
-
- private static final Log log = LogFactory.getLog(JMSConnectionFactory.class);
-
- /** The name used for the connection factory definition within Axis2 */
- private String name = null;
- /** The list of parameters from the axis2.xml definition */
- private Hashtable<String, String> parameters = new Hashtable<String, String>();
-
- /** The cached InitialContext reference */
- private Context context = null;
- /** The JMS ConnectionFactory this definition refers to */
- private ConnectionFactory conFactory = null;
- /** The shared JMS Connection for this JMS connection factory */
- private Connection sharedConnection = null;
- /** The shared JMS Session for this JMS connection factory */
- private Session sharedSession = null;
- /** The shared JMS MessageProducer for this JMS connection factory */
- private MessageProducer sharedProducer = null;
- /** The Shared Destination */
- private Destination sharedDestination = null;
- /** The shared JMS connection for this JMS connection factory */
- private int cacheLevel = JMSConstants.CACHE_CONNECTION;
-
- /**
- * Digest a JMS CF definition from an axis2.xml 'Parameter' and construct
- * @param parameter the axis2.xml 'Parameter' that defined the JMS CF
- */
- public JMSConnectionFactory(Parameter parameter) {
-
- this.name = parameter.getName();
- ParameterIncludeImpl pi = new ParameterIncludeImpl();
-
- try {
- pi.deserializeParameters((OMElement) parameter.getValue());
- } catch (AxisFault axisFault) {
- handleException("Error reading parameters for JMS connection factory" + name, axisFault);
- }
-
- for (Object o : pi.getParameters()) {
- Parameter p = (Parameter) o;
- parameters.put(p.getName(), (String) p.getValue());
- }
-
- digestCacheLevel();
- try {
- context = new InitialContext(parameters);
- conFactory = JMSUtils.lookup(context, ConnectionFactory.class,
- parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME));
- if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) {
- sharedDestination = JMSUtils.lookup(context, Destination.class,
- parameters.get(JMSConstants.PARAM_DESTINATION));
- }
- log.info("JMS ConnectionFactory : " + name + " initialized");
-
- } catch (NamingException e) {
- throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " +
- parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " +
- parameters.get(JMSConstants.PARAM_DESTINATION) +
- " for JMS CF : " + name + " using : " + parameters);
- }
- }
-
- /**
- * Digest, the cache value iff specified
- */
- private void digestCacheLevel() {
-
- String key = JMSConstants.PARAM_CACHE_LEVEL;
- String val = parameters.get(key);
-
- if ("none".equalsIgnoreCase(val)) {
- this.cacheLevel = JMSConstants.CACHE_NONE;
- } else if ("connection".equalsIgnoreCase(val)) {
- this.cacheLevel = JMSConstants.CACHE_CONNECTION;
- } else if ("session".equals(val)){
- this.cacheLevel = JMSConstants.CACHE_SESSION;
- } else if ("producer".equals(val)) {
- this.cacheLevel = JMSConstants.CACHE_PRODUCER;
- } else if (val != null) {
- throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name);
- }
- }
-
- /**
- * Return the name assigned to this JMS CF definition
- * @return name of the JMS CF
- */
- public String getName() {
- return name;
- }
-
- /**
- * The list of properties (including JNDI and non-JNDI)
- * @return properties defined on the JMS CF
- */
- public Hashtable<String, String> getParameters() {
- return parameters;
- }
-
- /**
- * Get cached InitialContext
- * @return cache InitialContext
- */
- public Context getContext() {
- return context;
- }
-
- /**
- * Cache level applicable for this JMS CF
- * @return applicable cache level
- */
- public int getCacheLevel() {
- return cacheLevel;
- }
-
- /**
- * Get the shared Destination - if defined
- * @return
- */
- public Destination getSharedDestination() {
- return sharedDestination;
- }
-
- /**
- * Lookup a Destination using this JMS CF definitions and JNDI name
- * @param name JNDI name of the Destionation
- * @return JMS Destination for the given JNDI name or null
- */
- public Destination getDestination(String name) {
- try {
- return JMSUtils.lookup(context, Destination.class, name);
- } catch (NamingException e) {
- handleException("Unknown JMS Destination : " + name + " using : " + parameters, e);
- }
- return null;
- }
-
- /**
- * Get the reply Destination from the PARAM_REPLY_DESTINATION parameter
- * @return reply destination defined in the JMS CF
- */
- public String getReplyToDestination() {
- return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION);
- }
-
- private void handleException(String msg, Exception e) {
- log.error(msg, e);
- throw new AxisJMSException(msg, e);
- }
-
- /**
- * Should the JMS 1.1 API be used? - defaults to yes
- * @return true, if JMS 1.1 api should be used
- */
- public boolean isJmsSpec11() {
- return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null ||
- "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER));
- }
-
- /**
- * Return the type of the JMS CF Destination
- * @return TRUE if a Queue, FALSE for a Topic and NULL for a JMS 1.1 Generic Destination
- */
- public Boolean isQueue() {
- if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null &&
- parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) {
- return null;
- }
-
- if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) {
- if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) {
- return true;
- } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) {
- return false;
- } else {
- throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " +
- parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name);
- }
- } else {
- if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) {
- return true;
- } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) {
- return false;
- } else {
- throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " +
- parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name);
- }
- }
- }
-
- /**
- * Is a session transaction requested from users of this JMS CF?
- * @return session transaction required by the clients of this?
- */
- private boolean isSessionTransacted() {
- return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null ||
- Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED));
- }
-
- /**
- * Create a new Connection
- * @return a new Connection
- */
- private Connection createConnection() {
-
- Connection connection = null;
- try {
- connection = JMSUtils.createConnection(
- conFactory,
- parameters.get(JMSConstants.PARAM_JMS_USERNAME),
- parameters.get(JMSConstants.PARAM_JMS_PASSWORD),
- isJmsSpec11(), isQueue());
-
- if (log.isDebugEnabled()) {
- log.debug("New JMS Connection from JMS CF : " + name + " created");
- }
-
- } catch (JMSException e) {
- handleException("Error acquiring a Connection from the JMS CF : " + name +
- " using properties : " + parameters, e);
- }
- return connection;
- }
-
- /**
- * Create a new Session
- * @param connection Connection to use
- * @return A new Session
- */
- private Session createSession(Connection connection) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Creating a new JMS Session from JMS CF : " + name);
- }
- return JMSUtils.createSession(
- connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue());
-
- } catch (JMSException e) {
- handleException("Error creating JMS session from JMS CF : " + name, e);
- }
- return null;
- }
-
- /**
- * Create a new MessageProducer
- * @param session Session to be used
- * @param destination Destination to be used
- * @return a new MessageProducer
- */
- private MessageProducer createProducer(Session session, Destination destination) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Creating a new JMS MessageProducer from JMS CF : " + name);
- }
-
- return JMSUtils.createProducer(
- session, destination, isQueue(), isJmsSpec11());
-
- } catch (JMSException e) {
- handleException("Error creating JMS producer from JMS CF : " + name,e);
- }
- return null;
- }
-
- /**
- * Get a new Connection or shared Connection from this JMS CF
- * @return new or shared Connection from this JMS CF
- */
- public Connection getConnection() {
- if (cacheLevel > JMSConstants.CACHE_NONE) {
- return getSharedConnection();
- } else {
- return createConnection();
- }
- }
-
- /**
- * Get a new Session or shared Session from this JMS CF
- * @param connection the Connection to be used
- * @return new or shared Session from this JMS CF
- */
- public Session getSession(Connection connection) {
- if (cacheLevel > JMSConstants.CACHE_CONNECTION) {
- return getSharedSession();
- } else {
- return createSession((connection == null ? getConnection() : connection));
- }
- }
-
- /**
- * Get a new MessageProducer or shared MessageProducer from this JMS CF
- * @param connection the Connection to be used
- * @param session the Session to be used
- * @param destination the Destination to bind MessageProducer to
- * @return new or shared MessageProducer from this JMS CF
- */
- public MessageProducer getMessageProducer(
- Connection connection, Session session, Destination destination) {
- if (cacheLevel > JMSConstants.CACHE_SESSION) {
- return getSharedProducer();
- } else {
- return createProducer((session == null ? getSession(connection) : session), destination);
- }
- }
-
- /**
- * Get a new Connection or shared Connection from this JMS CF
- * @return new or shared Connection from this JMS CF
- */
- private Connection getSharedConnection() {
- if (sharedConnection == null) {
- sharedConnection = createConnection();
- if (log.isDebugEnabled()) {
- log.debug("Created shared JMS Connection for JMS CF : " + name);
- }
- }
- return sharedConnection;
- }
-
- /**
- * Get a shared Session from this JMS CF
- * @return shared Session from this JMS CF
- */
- private Session getSharedSession() {
- if (sharedSession == null) {
- sharedSession = createSession(getSharedConnection());
- if (log.isDebugEnabled()) {
- log.debug("Created shared JMS Session for JMS CF : " + name);
- }
- }
- return sharedSession;
- }
-
- /**
- * Get a shared MessageProducer from this JMS CF
- * @return shared MessageProducer from this JMS CF
- */
- private MessageProducer getSharedProducer() {
- if (sharedProducer == null) {
- sharedProducer = createProducer(getSharedSession(), sharedDestination);
- if (log.isDebugEnabled()) {
- log.debug("Created shared JMS MessageConsumer for JMS CF : " + name);
- }
- }
- return sharedProducer;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java
deleted file mode 100644
index fb16500efc..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.naming.Context;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterInclude;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Class managing a set of {@link JMSConnectionFactory} objects.
- */
-public class JMSConnectionFactoryManager {
-
- private static final Log log = LogFactory.getLog(JMSConnectionFactoryManager.class);
-
- /** A Map containing the JMS connection factories managed by this, keyed by name */
- private final Map<String,JMSConnectionFactory> connectionFactories =
- new HashMap<String,JMSConnectionFactory>();
-
- /**
- * Construct a Connection factory manager for the JMS transport sender or receiver
- * @param trpInDesc
- */
- public JMSConnectionFactoryManager(ParameterInclude trpInDesc) {
- loadConnectionFactoryDefinitions(trpInDesc);
- }
-
- /**
- * Create JMSConnectionFactory instances for the definitions in the transport configuration,
- * and add these into our collection of connectionFactories map keyed by name
- *
- * @param trpDesc the transport description for JMS
- */
- private void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) {
-
- for (Object o : trpDesc.getParameters()) {
- Parameter p = (Parameter)o;
- try {
- JMSConnectionFactory jmsConFactory = new JMSConnectionFactory(p);
- connectionFactories.put(jmsConFactory.getName(), jmsConFactory);
- } catch (AxisJMSException e) {
- log.error("Error setting up connection factory : " + p.getName(), e);
- }
- }
- }
-
- /**
- * Get the JMS connection factory with the given name.
- *
- * @param name the name of the JMS connection factory
- * @return the JMS connection factory or null if no connection factory with
- * the given name exists
- */
- public JMSConnectionFactory getJMSConnectionFactory(String name) {
- return connectionFactories.get(name);
- }
-
- /**
- * Get the JMS connection factory that matches the given properties, i.e. referring to
- * the same underlying connection factory. Used by the JMSSender to determine if already
- * available resources should be used for outgoing messages
- *
- * @param props a Map of connection factory JNDI properties and name
- * @return the JMS connection factory or null if no connection factory compatible
- * with the given properties exists
- */
- public JMSConnectionFactory getJMSConnectionFactory(Map<String,String> props) {
- for (JMSConnectionFactory cf : connectionFactories.values()) {
- Map<String,String> cfProperties = cf.getParameters();
-
- if (equals(props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME),
- cfProperties.get(JMSConstants.PARAM_CONFAC_JNDI_NAME))
- &&
- equals(props.get(Context.INITIAL_CONTEXT_FACTORY),
- cfProperties.get(Context.INITIAL_CONTEXT_FACTORY))
- &&
- equals(props.get(Context.PROVIDER_URL),
- cfProperties.get(Context.PROVIDER_URL))
- &&
- equals(props.get(Context.SECURITY_PRINCIPAL),
- cfProperties.get(Context.SECURITY_PRINCIPAL))
- &&
- equals(props.get(Context.SECURITY_CREDENTIALS),
- cfProperties.get(Context.SECURITY_CREDENTIALS))) {
- return cf;
- }
- }
- return null;
- }
-
- /**
- * Compare two values preventing NPEs
- */
- private static boolean equals(Object s1, Object s2) {
- return s1 == s2 || s1 != null && s1.equals(s2);
- }
-
- protected void handleException(String msg, Exception e) throws AxisFault {
- log.error(msg, e);
- throw new AxisFault(msg, e);
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java
deleted file mode 100644
index 6a11201625..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import org.apache.axis2.client.Options;
-
-public class JMSConstants {
-
- /**
- * The prefix indicating an Axis JMS URL
- */
- public static final String JMS_PREFIX = "jms:/";
-
- //------------------------------------ defaults / constants ------------------------------------
- /**
- * The local (Axis2) JMS connection factory name of the default connection
- * factory to be used, if a service does not explicitly state the connection
- * factory it should be using by a Parameter named JMSConstants.CONFAC_PARAM
- */
- public static final String DEFAULT_CONFAC_NAME = "default";
- /**
- * The default JMS time out waiting for a reply - also see {@link JMS_WAIT_REPLY}
- */
- public static final long DEFAULT_JMS_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS;
- /**
- * Value indicating a Queue used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
- */
- public static final String DESTINATION_TYPE_QUEUE = "queue";
- /**
- * Value indicating a Topic used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
- */
- public static final String DESTINATION_TYPE_TOPIC = "topic";
- /**
- * Value indicating a JMS 1.1 Generic Destination used by {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE}
- */
- public static final String DESTINATION_TYPE_GENERIC = "generic";
-
- /** Do not cache any JMS resources between tasks (when sending) or JMS CF's (when sending) */
- public static final int CACHE_NONE = 0;
- /** Cache only the JMS connection between tasks (when receiving), or JMS CF's (when sending)*/
- public static final int CACHE_CONNECTION = 1;
- /** Cache only the JMS connection and Session between tasks (receiving), or JMS CF's (sending) */
- public static final int CACHE_SESSION = 2;
- /** Cache the JMS connection, Session and Consumer between tasks when receiving*/
- public static final int CACHE_CONSUMER = 3;
- /** Cache the JMS connection, Session and Producer within a JMSConnectionFactory when sending */
- public static final int CACHE_PRODUCER = 4;
- /** automatic choice of an appropriate caching level (depending on the transaction strategy) */
- public static final int CACHE_AUTO = 5;
-
- /** A JMS 1.1 Generic Destination type or ConnectionFactory */
- public static final int GENERIC = 0;
- /** A Queue Destination type or ConnectionFactory */
- public static final int QUEUE = 1;
- /** A Topic Destination type or ConnectionFactory */
- public static final int TOPIC = 2;
-
- /**
- * The EPR parameter name indicating the name of the message level property that indicated the content type.
- */
- public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty";
-
- //---------------------------------- services.xml parameters -----------------------------------
- /**
- * The Service level Parameter name indicating the JMS destination for requests of a service
- */
- public static final String PARAM_DESTINATION = "transport.jms.Destination";
- /**
- * The Service level Parameter name indicating the destination type for requests.
- * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC}
- */
- public static final String PARAM_DEST_TYPE = "transport.jms.DestinationType";
- /**
- * The Service level Parameter name indicating the [default] response destination of a service
- */
- public static final String PARAM_REPLY_DESTINATION = "transport.jms.ReplyDestination";
- /**
- * The Service level Parameter name indicating the response destination type
- * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC}
- */
- public static final String PARAM_REPLY_DEST_TYPE = "transport.jms.ReplyDestinationType";
- /**
- * The Parameter name of an Axis2 service, indicating the JMS connection
- * factory which should be used to listen for messages for it. This is
- * the local (Axis2) name of the connection factory and not the JNDI name
- */
- public static final String PARAM_JMS_CONFAC = "transport.jms.ConnectionFactory";
- /**
- * Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC
- */
- public static final String PARAM_CONFAC_TYPE = "transport.jms.ConnectionFactoryType";
- /**
- * The Parameter name indicating the JMS connection factory JNDI name
- */
- public static final String PARAM_CONFAC_JNDI_NAME = "transport.jms.ConnectionFactoryJNDIName";
- /**
- * The Parameter indicating the expected content type for messages received by the service.
- */
- public static final String CONTENT_TYPE_PARAM = "transport.jms.ContentType";
- /**
- * The Parameter indicating a final EPR as a String, to be published on the WSDL of a service
- * Could occur more than once, and could provide additional connection properties or a subset
- * of the properties auto computed. Also could replace IP addresses with hostnames, and expose
- * public credentials clients. If a user specified this parameter, the auto generated EPR will
- * not be exposed - unless an instance of this parameter is added with the string "legacy"
- * This parameter could be used to expose EPR's conforming to the proposed SOAP/JMS spec
- * until such time full support is implemented for it.
- */
- public static final String PARAM_PUBLISH_EPR = "transport.jms.PublishEPR";
- /** The parameter indicating the JMS API specification to be used - if this is "1.1" the JMS
- * 1.1 API would be used, else the JMS 1.0.2B
- */
- public static final String PARAM_JMS_SPEC_VER = "transport.jms.JMSSpecVersion";
-
- /**
- * The Parameter indicating whether the JMS Session should be transacted for the service
- * Specified as a "true" or "false"
- */
- public static final String PARAM_SESSION_TRANSACTED = "transport.jms.SessionTransacted";
- /**
- * The Parameter indicating the Session acknowledgement for the service. Must be one of the
- * following Strings, or the appropriate Integer used by the JMS API
- * "AUTO_ACKNOWLEDGE", "CLIENT_ACKNOWLEDGE", "DUPS_OK_ACKNOWLEDGE" or "SESSION_TRANSACTED"
- */
- public static final String PARAM_SESSION_ACK = "transport.jms.SessionAcknowledgement";
- /** A message selector to be used when messages are sought for this service */
- public static final String PARAM_MSG_SELECTOR = "transport.jms.MessageSelector";
- /** Is the Subscription durable ? - "true" or "false" See {@link PARAM_DURABLE_SUB_NAME} */
- public static final String PARAM_SUB_DURABLE = "transport.jms.SubscriptionDurable";
- /** The name for the durable subscription See {@link PARAM_SUB_DURABLE}*/
- public static final String PARAM_DURABLE_SUB_NAME = "transport.jms.DurableSubscriberName";
- /**
- * JMS Resource cachable level to be used for the service One of the following:
- * {@link CACHE_NONE}, {@link CACHE_CONNECTION}, {@link CACHE_SESSION}, {@link CACHE_PRODUCER},
- * {@link CACHE_CONSUMER}, or {@link CACHE_AUTO} - to let the transport decide
- */
- public static final String PARAM_CACHE_LEVEL = "transport.jms.CacheLevel";
- /** Should a pub-sub connection receive messages published by itself? */
- public static final String PARAM_PUBSUB_NO_LOCAL = "transport.jms.PubSubNoLocal";
- /**
- * The number of milliseconds to wait for a message on a consumer.receive() call
- * negative number - wait forever
- * 0 - do not wait at all
- * positive number - indicates the number of milliseconds to wait
- */
- public static final String PARAM_RCV_TIMEOUT = "transport.jms.ReceiveTimeout";
- /**
- *The number of concurrent consumers to be created to poll for messages for this service
- * For Topics, this should be ONE, to prevent receipt of multiple copies of the same message
- */
- public static final String PARAM_CONCURRENT_CONSUMERS = "transport.jms.ConcurrentConsumers";
- /**
- * The maximum number of concurrent consumers for the service - See {@link PARAM_CONCURRENT_CONSUMERS}
- */
- public static final String PARAM_MAX_CONSUMERS = "transport.jms.MaxConcurrentConsumers";
- /**
- * The number of idle (i.e. message-less) polling attempts before a worker task commits suicide,
- * to scale down resources, as load decreases
- */
- public static final String PARAM_IDLE_TASK_LIMIT = "transport.jms.IdleTaskLimit";
- /**
- * The maximum number of messages a polling worker task should process, before suicide - to
- * prevent many longer running threads - default is unlimited (i.e. a worker task will live forever)
- */
- public static final String PARAM_MAX_MSGS_PER_TASK = "transport.jms.MaxMessagesPerTask";
- /**
- * Number of milliseconds before the first reconnection attempt is tried, on detection of an
- * error. Subsequent retries follow a geometric series, where the
- * duration = previous duration * factor
- * This is further limited by the {@link PARAM_RECON_MAX_DURATION} to be meaningful
- */
- public static final String PARAM_RECON_INIT_DURATION = "transport.jms.InitialReconnectDuration";
- /** @see PARAM_RECON_INIT_DURATION */
- public static final String PARAM_RECON_FACTOR = "transport.jms.ReconnectProgressFactor";
- /** @see PARAM_RECON_INIT_DURATION */
- public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration";
-
- /** The username to use when obtaining a JMS Connection */
- public static final String PARAM_JMS_USERNAME = "transport.jms.UserName";
- /** The password to use when obtaining a JMS Connection */
- public static final String PARAM_JMS_PASSWORD = "transport.jms.Password";
-
- //-------------- message context / transport header properties and client options --------------
- /**
- * A MessageContext property or client Option indicating the JMS message type
- */
- public static final String JMS_MESSAGE_TYPE = "JMS_MESSAGE_TYPE";
- /**
- * The message type indicating a BytesMessage. See {@link JMS_MESSAGE_TYPE}
- */
- public static final String JMS_BYTE_MESSAGE = "JMS_BYTE_MESSAGE";
- /**
- * The message type indicating a TextMessage. See {@link JMS_MESSAGE_TYPE}
- */
- public static final String JMS_TEXT_MESSAGE = "JMS_TEXT_MESSAGE";
- /**
- * A MessageContext property or client Option indicating the time to wait for a response JMS message
- */
- public static final String JMS_WAIT_REPLY = "JMS_WAIT_REPLY";
- /**
- * A MessageContext property or client Option indicating the JMS correlation id
- */
- public static final String JMS_COORELATION_ID = "JMS_COORELATION_ID";
- /**
- * A MessageContext property or client Option indicating the JMS message id
- */
- public static final String JMS_MESSAGE_ID = "JMS_MESSAGE_ID";
- /**
- * A MessageContext property or client Option indicating the JMS delivery mode as an Integer or String
- * Value 1 - javax.jms.DeliveryMode.NON_PERSISTENT
- * Value 2 - javax.jms.DeliveryMode.PERSISTENT
- */
- public static final String JMS_DELIVERY_MODE = "JMS_DELIVERY_MODE";
- /**
- * A MessageContext property or client Option indicating the JMS destination to use on a Send
- */
- public static final String JMS_DESTINATION = "JMS_DESTINATION";
- /**
- * A MessageContext property or client Option indicating the JMS message expiration - a Long value
- * specified as a String
- */
- public static final String JMS_EXPIRATION = "JMS_EXPIRATION";
- /**
- * A MessageContext property indicating if the message is a redelivery (Boolean as a String)
- */
- public static final String JMS_REDELIVERED = "JMS_REDELIVERED";
- /**
- * A MessageContext property or client Option indicating the JMS replyTo Destination
- */
- public static final String JMS_REPLY_TO = "JMS_REPLY_TO";
- /**
- * A MessageContext property or client Option indicating the JMS replyTo Destination type
- * See {@link DESTINATION_TYPE_QUEUE} and {@link DESTINATION_TYPE_TOPIC}
- */
- public static final String JMS_REPLY_TO_TYPE = "JMS_REPLY_TO_TYPE";
- /**
- * A MessageContext property or client Option indicating the JMS timestamp (Long specified as String)
- */
- public static final String JMS_TIMESTAMP = "JMS_TIMESTAMP";
- /**
- * A MessageContext property indicating the JMS type String returned by {@link javax.jms.Message.getJMSType()}
- */
- public static final String JMS_TYPE = "JMS_TYPE";
- /**
- * A MessageContext property or client Option indicating the JMS priority
- */
- public static final String JMS_PRIORITY = "JMS_PRIORITY";
- /**
- * A MessageContext property or client Option indicating the JMS time to live for message sent
- */
- public static final String JMS_TIME_TO_LIVE = "JMS_TIME_TO_LIVE";
-
- /** The prefix that denotes JMSX properties */
- public static final String JMSX_PREFIX = "JMSX";
- /** The JMSXGroupID property */
- public static final String JMSX_GROUP_ID = "JMSXGroupID";
- /** The JMSXGroupSeq property */
- public static final String JMSX_GROUP_SEQ = "JMSXGroupSeq";
-
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java
deleted file mode 100644
index c465b1d989..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet;
-
-/**
- * Class that links an Axis2 service to a JMS destination. Additionally, it contains
- * all the required information to process incoming JMS messages and to inject them
- * into Axis2.
- */
-public class JMSEndpoint {
- private JMSConnectionFactory cf;
- private AxisService service;
- private String jndiDestinationName;
- private int destinationType = JMSConstants.GENERIC;
- private Set<EndpointReference> endpointReferences = new HashSet<EndpointReference>();
- private ContentTypeRuleSet contentTypeRuleSet;
-
- public AxisService getService() {
- return service;
- }
-
- public void setService(AxisService service) {
- this.service = service;
- }
-
- public String getServiceName() {
- return service.getName();
- }
-
- public String getJndiDestinationName() {
- return jndiDestinationName;
- }
-
- public void setJndiDestinationName(String destinationJNDIName) {
- this.jndiDestinationName = destinationJNDIName;
- }
-
- public void setDestinationType(String destinationType) {
- if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) {
- this.destinationType = JMSConstants.TOPIC;
- } else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) {
- this.destinationType = JMSConstants.QUEUE;
- } else {
- this.destinationType = JMSConstants.GENERIC;
- }
- }
-
- public EndpointReference[] getEndpointReferences() {
- return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]);
- }
-
- public void computeEPRs() {
- List<EndpointReference> eprs = new ArrayList<EndpointReference>();
- for (Object o : getService().getParameters()) {
- Parameter p = (Parameter) o;
- if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) {
- if ("legacy".equalsIgnoreCase((String) p.getValue())) {
- // if "legacy" specified, compute and replace it
- endpointReferences.add(
- new EndpointReference(JMSUtils.getEPR(cf, destinationType, this)));
- } else {
- endpointReferences.add(new EndpointReference((String) p.getValue()));
- }
- }
- }
-
- if (eprs.isEmpty()) {
- // if nothing specified, compute and return legacy EPR
- endpointReferences.add(new EndpointReference(JMSUtils.getEPR(cf, destinationType, this)));
- }
- }
-
- public ContentTypeRuleSet getContentTypeRuleSet() {
- return contentTypeRuleSet;
- }
-
- public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) {
- this.contentTypeRuleSet = contentTypeRuleSet;
- }
-
- public JMSConnectionFactory getCf() {
- return cf;
- }
-
- public void setCf(JMSConnectionFactory cf) {
- this.cf = cf;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java
deleted file mode 100644
index ceeec4a6a3..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.io.IOException;
-
-import javax.jms.JMSException;
-
-public class JMSExceptionWrapper extends IOException {
- private static final long serialVersionUID = 852441109009079511L;
-
- public JMSExceptionWrapper(JMSException ex) {
- initCause(ex);
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
deleted file mode 100644
index 8c9f66dfbf..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.BytesMessage;
-import javax.jms.TextMessage;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.TransportInDescription;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleFactory;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.MessageTypeRule;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.PropertyRule;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportListener;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorListener;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSource;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSourceSupport;
-
-/**
- * The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances
- * for each service requesting exposure over JMS, and stops these if they are undeployed / stopped.
- * <p>
- * A service indicates a JMS Connection factory definition by name, which would be defined in the
- * JMSListner on the axis2.xml, and this provides a way to reuse common configuration between
- * services, as well as to optimize resources utilized
- * <p>
- * If the connection factory name was not specified, it will default to the one named "default"
- * {@see JMSConstants.DEFAULT_CONFAC_NAME}
- * <p>
- * If a destination JNDI name is not specified, a service will expect to use a Queue with the same
- * JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify
- * many more detailed control options. See package documentation for more details
- * <p>
- * All Destinations / JMS Administered objects used MUST be pre-created or already available
- */
-public class JMSListener extends AbstractTransportListener implements ManagementSupport,
- TransportErrorSource {
-
- public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
-
- /** The JMSConnectionFactoryManager which centralizes the management of defined factories */
- private JMSConnectionFactoryManager connFacManager;
- /** A Map of service name to the JMS endpoints */
- private Map<String,JMSEndpoint> serviceNameToEndpointMap = new HashMap<String,JMSEndpoint>();
- /** A Map of service name to its ServiceTaskManager instances */
- private Map<String, ServiceTaskManager> serviceNameToSTMMap =
- new HashMap<String, ServiceTaskManager>();
- private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this);
-
- /**
- * TransportListener initialization
- *
- * @param cfgCtx the Axis configuration context
- * @param trpInDesc the TransportIn description
- */
- public void init(ConfigurationContext cfgCtx,
- TransportInDescription trpInDesc) throws AxisFault {
-
- super.init(cfgCtx, trpInDesc);
- connFacManager = new JMSConnectionFactoryManager(trpInDesc);
- log.info("JMS Transport Receiver/Listener initialized...");
- }
-
- /**
- * Returns EPRs for the given service over the JMS transport
- *
- * @param serviceName service name
- * @return the JMS EPRs for the service
- */
- public EndpointReference[] getEPRsForService(String serviceName) {
- //Strip out the operation name
- if (serviceName.indexOf('/') != -1) {
- serviceName = serviceName.substring(0, serviceName.indexOf('/'));
- }
- // strip out the endpoint name if present
- if (serviceName.indexOf('.') != -1) {
- serviceName = serviceName.substring(0, serviceName.indexOf('.'));
- }
- JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName);
- if (endpoint != null) {
- return endpoint.getEndpointReferences();
- } else {
- return null;
- }
- }
-
- /**
- * Listen for JMS messages on behalf of the given service
- *
- * @param service the Axis service for which to listen for messages
- */
- protected void startListeningForService(AxisService service) throws AxisFault {
- JMSConnectionFactory cf = getConnectionFactory(service);
- if (cf == null) {
- throw new AxisFault("The service doesn't specify a JMS connection factory or refers " +
- "to an invalid factory.");
- }
-
- JMSEndpoint endpoint = new JMSEndpoint();
- endpoint.setService(service);
- endpoint.setCf(cf);
-
- Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION);
- if (destParam != null) {
- endpoint.setJndiDestinationName((String)destParam.getValue());
- } else {
- // Assume that the JNDI destination name is the same as the service name
- endpoint.setJndiDestinationName(service.getName());
- }
-
- Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE);
- if (destTypeParam != null) {
- String paramValue = (String) destTypeParam.getValue();
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
- JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
- endpoint.setDestinationType(paramValue);
- } else {
- throw new AxisFault("Invalid destinaton type value " + paramValue);
- }
- } else {
- log.debug("JMS destination type not given. default queue");
- endpoint.setDestinationType(JMSConstants.DESTINATION_TYPE_QUEUE);
- }
-
- Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM);
- if (contentTypeParam == null) {
- ContentTypeRuleSet contentTypeRuleSet = new ContentTypeRuleSet();
- contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE));
- contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream"));
- contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain"));
- endpoint.setContentTypeRuleSet(contentTypeRuleSet);
- } else {
- endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam));
- }
-
- endpoint.computeEPRs(); // compute service EPR and keep for later use
- serviceNameToEndpointMap.put(service.getName(), endpoint);
-
- ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool);
- stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint));
- stm.start();
- serviceNameToSTMMap.put(service.getName(), stm);
-
- for (int i=0; i<3; i++) {
- if (stm.getActiveTaskCount() > 0) {
- log.info("Started to listen on destination : " + stm.getDestinationJNDIName() +
- " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
- " for service " + stm.getServiceName());
- return;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {}
- }
-
- log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() +
- " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
- " for service " + stm.getServiceName() + " have not yet started after 3 seconds ..");
- }
-
- /**
- * Stops listening for messages for the service thats undeployed or stopped
- *
- * @param service the service that was undeployed or stopped
- */
- protected void stopListeningForService(AxisService service) {
-
- ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName());
- if (stm != null) {
- if (log.isDebugEnabled()) {
- log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() +
- " for service : " + stm.getServiceName());
- }
-
- stm.stop();
-
- serviceNameToSTMMap.remove(service.getName());
- serviceNameToEndpointMap.remove(service.getName());
- log.info("Stopped listening for JMS messages to service : " + service.getName());
-
- } else {
- log.error("Unable to stop service : " + service.getName() +
- " - unable to find its ServiceTaskManager");
- }
- }
- /**
- * Return the connection factory name for this service. If this service
- * refers to an invalid factory or defaults to a non-existent default
- * factory, this returns null
- *
- * @param service the AxisService
- * @return the JMSConnectionFactory to be used, or null if reference is invalid
- */
- public JMSConnectionFactory getConnectionFactory(AxisService service) {
-
- Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC);
- // validate connection factory name (specified or default)
- if (conFacParam != null) {
- return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue());
- } else {
- return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME);
- }
- }
-
- // -- jmx/management methods--
- /**
- * Pause the listener - Stop accepting/processing new messages, but continues processing existing
- * messages until they complete. This helps bring an instance into a maintenence mode
- * @throws AxisFault on error
- */
- public void pause() throws AxisFault {
- if (state != BaseConstants.STARTED) return;
- try {
- for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
- stm.pause();
- }
- state = BaseConstants.PAUSED;
- log.info("Listener paused");
- } catch (AxisJMSException e) {
- log.error("At least one service could not be paused", e);
- }
- }
-
- /**
- * Resume the lister - Brings the lister into active mode back from a paused state
- * @throws AxisFault on error
- */
- public void resume() throws AxisFault {
- if (state != BaseConstants.PAUSED) return;
- try {
- for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
- stm.resume();
- }
- state = BaseConstants.STARTED;
- log.info("Listener resumed");
- } catch (AxisJMSException e) {
- log.error("At least one service could not be resumed", e);
- }
- }
-
- /**
- * Stop processing new messages, and wait the specified maximum time for in-flight
- * requests to complete before a controlled shutdown for maintenence
- *
- * @param millis a number of milliseconds to wait until pending requests are allowed to complete
- * @throws AxisFault on error
- */
- public void maintenenceShutdown(long millis) throws AxisFault {
- if (state != BaseConstants.STARTED) return;
- try {
- long start = System.currentTimeMillis();
- stop();
- state = BaseConstants.STOPPED;
- log.info("Listener shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s");
- } catch (Exception e) {
- handleException("Error shutting down the listener for maintenence", e);
- }
- }
-
- public void addErrorListener(TransportErrorListener listener) {
- tess.addErrorListener(listener);
- }
-
- public void removeErrorListener(TransportErrorListener listener) {
- tess.removeErrorListener(listener);
- }
-
- void error(AxisService service, Throwable ex) {
- tess.error(service, ex);
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
deleted file mode 100644
index ebd67e53e1..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-import javax.transaction.UserTransaction;
-import javax.xml.namespace.QName;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeInfo;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector;
-
-/**
- * This is the JMS message receiver which is invoked when a message is received. This processes
- * the message through the engine
- */
-public class JMSMessageReceiver {
-
- private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
-
- /** The JMSListener */
- private JMSListener jmsListener = null;
- /** A reference to the JMS Connection Factory */
- private JMSConnectionFactory jmsConnectionFactory = null;
- /** The JMS metrics collector */
- private MetricsCollector metrics = null;
- /** The endpoint this message receiver is bound to */
- final JMSEndpoint endpoint;
-
- /**
- * Create a new JMSMessage receiver
- *
- * @param jmsListener the JMS transport Listener
- * @param jmsConFac the JMS connection factory we are associated with
- * @param workerPool the worker thread pool to be used
- * @param cfgCtx the axis ConfigurationContext
- * @param serviceName the name of the Axis service
- * @param endpoint the JMSEndpoint definition to be used
- */
- JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
- this.jmsListener = jmsListener;
- this.jmsConnectionFactory = jmsConFac;
- this.endpoint = endpoint;
- this.metrics = jmsListener.getMetricsCollector();
- }
-
- /**
- * Process a new message received
- *
- * @param message the JMS message received
- * @param ut UserTransaction which was used to receive the message
- * @return true if caller should commit
- */
- public boolean onMessage(Message message, UserTransaction ut) {
-
- try {
- if (log.isDebugEnabled()) {
- StringBuffer sb = new StringBuffer();
- sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
- sb.append("\nDestination : ").append(message.getJMSDestination());
- sb.append("\nMessage ID : ").append(message.getJMSMessageID());
- sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
- sb.append("\nReplyTo : ").append(message.getJMSReplyTo());
- sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered());
- sb.append("\nPriority : ").append(message.getJMSPriority());
- sb.append("\nExpiration : ").append(message.getJMSExpiration());
- sb.append("\nTimestamp : ").append(message.getJMSTimestamp());
- sb.append("\nMessage Type : ").append(message.getJMSType());
- sb.append("\nPersistent ? : ").append(
- DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
-
- log.debug(sb.toString());
- if (log.isTraceEnabled() && message instanceof TextMessage) {
- log.trace("\nMessage : " + ((TextMessage) message).getText());
- }
- }
- } catch (JMSException e) {
- if (log.isDebugEnabled()) {
- log.debug("Error reading JMS message headers for debug logging", e);
- }
- }
-
- // update transport level metrics
- try {
- metrics.incrementBytesReceived(JMSUtils.getMessageSize(message));
- } catch (JMSException e) {
- log.warn("Error reading JMS message size to update transport metrics", e);
- }
-
- // has this message already expired? expiration time == 0 means never expires
- try {
- long expiryTime = message.getJMSExpiration();
- if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
- if (log.isDebugEnabled()) {
- log.debug("Discard expired message with ID : " + message.getJMSMessageID());
- }
- return true;
- }
- } catch (JMSException ignore) {}
-
-
- boolean successful = false;
- try {
- successful = processThoughEngine(message, ut);
-
- } catch (JMSException e) {
- log.error("JMS Exception encountered while processing", e);
- } catch (AxisFault e) {
- log.error("Axis fault processing message", e);
- } catch (Exception e) {
- log.error("Unknown error processing message", e);
-
- } finally {
- if (successful) {
- metrics.incrementMessagesReceived();
- } else {
- metrics.incrementFaultsReceiving();
- }
- }
-
- return successful;
- }
-
- /**
- * Process the new message through Axis2
- *
- * @param message the JMS message
- * @param ut the UserTransaction used for receipt
- * @return true if the caller should commit
- * @throws JMSException, on JMS exceptions
- * @throws AxisFault on Axis2 errors
- */
- private boolean processThoughEngine(Message message, UserTransaction ut)
- throws JMSException, AxisFault {
-
- MessageContext msgContext = jmsListener.createMessageContext();
-
- // set the JMS Message ID as the Message ID of the MessageContext
- try {
- msgContext.setMessageID(message.getJMSMessageID());
- msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
- } catch (JMSException ignore) {}
-
- String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
-
- AxisService service = endpoint.getService();
- msgContext.setAxisService(service);
-
- // find the operation for the message, or default to one
- Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
- QName operationQName = (
- operationParam != null ?
- BaseUtils.getQNameFromString(operationParam.getValue()) :
- BaseConstants.DEFAULT_OPERATION);
-
- AxisOperation operation = service.getOperation(operationQName);
- if (operation != null) {
- msgContext.setAxisOperation(operation);
- msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
- }
-
- ContentTypeInfo contentTypeInfo =
- endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
- if (contentTypeInfo == null) {
- throw new AxisFault("Unable to determine content type for message " +
- msgContext.getMessageID());
- }
-
- // set the message property OUT_TRANSPORT_INFO
- // the reply is assumed to be over the JMSReplyTo destination, using
- // the same incoming connection factory, if a JMSReplyTo is available
- Destination replyTo = message.getJMSReplyTo();
- if (replyTo == null) {
- // does the service specify a default reply destination ?
- Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION);
- if (param != null && param.getValue() != null) {
- replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
- }
-
- }
- if (replyTo != null) {
- msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
- new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
- contentTypeInfo.getPropertyName()));
- }
-
- JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
- if (ut != null) {
- msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
- }
-
- try {
- jmsListener.handleIncomingMessage(
- msgContext,
- JMSUtils.getTransportHeaders(message),
- soapAction,
- contentTypeInfo.getContentType());
-
- } finally {
-
- Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
- if (o != null) {
- if ((o instanceof Boolean && ((Boolean) o)) ||
- (o instanceof String && Boolean.valueOf((String) o))) {
- return false;
- }
- }
- return true;
- }
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java
deleted file mode 100644
index 01fdee77dd..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.QueueSender;
-import javax.jms.Session;
-import javax.jms.TopicPublisher;
-import javax.transaction.UserTransaction;
-
-import org.apache.axis2.context.MessageContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-
-/**
- * Performs the actual sending of a JMS message, and the subsequent committing of a JTA transaction
- * (if requested) or the local session transaction, if used. An instance of this class is unique
- * to a single message send out operation and will not be shared.
- */
-public class JMSMessageSender {
-
- private static final Log log = LogFactory.getLog(JMSMessageSender.class);
-
- /** The Connection to be used to send out */
- private Connection connection = null;
- /** The Session to be used to send out */
- private Session session = null;
- /** The MessageProducer used */
- private MessageProducer producer = null;
- /** Target Destination */
- private Destination destination = null;
- /** The level of cachability for resources */
- private int cacheLevel = JMSConstants.CACHE_CONNECTION;
- /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */
- private boolean jmsSpec11 = true;
- /** Are we sending to a Queue ? */
- private Boolean isQueue = null;
-
- /**
- * This is a low-end method to support the one-time sends using JMS 1.0.2b
- * @param connection the JMS Connection
- * @param session JMS Session
- * @param producer the MessageProducer
- * @param destination the JMS Destination
- * @param cacheLevel cacheLevel - None | Connection | Session | Producer
- * @param jmsSpec11 true if the JMS 1.1 API should be used
- * @param isQueue posting to a Queue?
- */
- public JMSMessageSender(Connection connection, Session session, MessageProducer producer,
- Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) {
-
- this.connection = connection;
- this.session = session;
- this.producer = producer;
- this.destination = destination;
- this.cacheLevel = cacheLevel;
- this.jmsSpec11 = jmsSpec11;
- this.isQueue = isQueue;
- }
-
- /**
- * Create a JMSSender using a JMSConnectionFactory and target EPR
- *
- * @param jmsConnectionFactory the JMSConnectionFactory
- * @param targetAddress target EPR
- */
- public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) {
-
- if (jmsConnectionFactory != null) {
- this.cacheLevel = jmsConnectionFactory.getCacheLevel();
- this.jmsSpec11 = jmsConnectionFactory.isJmsSpec11();
- this.connection = jmsConnectionFactory.getConnection();
- this.session = jmsConnectionFactory.getSession(connection);
- this.destination =
- jmsConnectionFactory.getSharedDestination() == null ?
- jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) :
- jmsConnectionFactory.getSharedDestination();
- this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination);
-
- } else {
- JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress);
- jmsOut.loadConnectionFactoryFromProperies();
- }
- }
-
- /**
- * Perform actual send of JMS message to the Destination selected
- *
- * @param message the JMS message
- * @param msgCtx the Axis2 MessageContext
- */
- public void send(Message message, MessageContext msgCtx) {
-
- Boolean jtaCommit = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND);
- Boolean rollbackOnly = getBooleanProperty(msgCtx, BaseConstants.SET_ROLLBACK_ONLY);
- Boolean persistent = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE);
- Integer priority = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY);
- Integer timeToLive = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE);
-
- // Do not commit, if message is marked for rollback
- if (rollbackOnly != null && rollbackOnly) {
- jtaCommit = Boolean.FALSE;
- }
-
- if (persistent != null) {
- try {
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- } catch (JMSException e) {
- handleException("Error setting JMS Producer for PERSISTENT delivery", e);
- }
- }
- if (priority != null) {
- try {
- producer.setPriority(priority);
- } catch (JMSException e) {
- handleException("Error setting JMS Producer priority to : " + priority, e);
- }
- }
- if (timeToLive != null) {
- try {
- producer.setTimeToLive(timeToLive);
- } catch (JMSException e) {
- handleException("Error setting JMS Producer TTL to : " + timeToLive, e);
- }
- }
-
- boolean sendingSuccessful = false;
- // perform actual message sending
- try {
- if (jmsSpec11 || isQueue == null) {
- producer.send(message);
-
- } else {
- if (isQueue) {
- ((QueueSender) producer).send(message);
-
- } else {
- ((TopicPublisher) producer).publish(message);
- }
- }
-
- // set the actual MessageID to the message context for use by any others down the line
- String msgId = null;
- try {
- msgId = message.getJMSMessageID();
- if (msgId != null) {
- msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId);
- }
- } catch (JMSException ignore) {}
-
- sendingSuccessful = true;
-
- if (log.isDebugEnabled()) {
- log.debug("Sent Message Context ID : " + msgCtx.getMessageID() +
- " with JMS Message ID : " + msgId +
- " to destination : " + producer.getDestination());
- }
-
- } catch (JMSException e) {
- log.error("Error sending message with MessageContext ID : " +
- msgCtx.getMessageID() + " to destination : " + destination, e);
-
- } finally {
-
- if (jtaCommit != null) {
-
- UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION);
- if (ut != null) {
-
- try {
- if (sendingSuccessful && jtaCommit) {
- ut.commit();
- } else {
- ut.rollback();
- }
- msgCtx.removeProperty(BaseConstants.USER_TRANSACTION);
-
- if (log.isDebugEnabled()) {
- log.debug((sendingSuccessful ? "Committed" : "Rolled back") +
- " JTA Transaction");
- }
-
- } catch (Exception e) {
- handleException("Error committing/rolling back JTA transaction after " +
- "sending of message with MessageContext ID : " + msgCtx.getMessageID() +
- " to destination : " + destination, e);
- }
- }
-
- } else {
- try {
- if (session.getTransacted()) {
- if (sendingSuccessful && (rollbackOnly == null || !rollbackOnly)) {
- session.commit();
- } else {
- session.rollback();
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug((sendingSuccessful ? "Committed" : "Rolled back") +
- " local (JMS Session) Transaction");
- }
-
- } catch (JMSException e) {
- handleException("Error committing/rolling back local (i.e. session) " +
- "transaction after sending of message with MessageContext ID : " +
- msgCtx.getMessageID() + " to destination : " + destination, e);
- }
- }
- }
- }
-
- /**
- * Close non-shared producer, session and connection if any
- */
- public void close() {
- if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) {
- try {
- producer.close();
- } catch (JMSException e) {
- log.error("Error closing JMS MessageProducer after send", e);
- } finally {
- producer = null;
- }
- }
-
- if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) {
- try {
- session.close();
- } catch (JMSException e) {
- log.error("Error closing JMS Session after send", e);
- } finally {
- session = null;
- }
- }
-
- if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
- try {
- connection.close();
- } catch (JMSException e) {
- log.error("Error closing JMS Connection after send", e);
- } finally {
- connection = null;
- }
- }
- }
-
- private void handleException(String message, Exception e) {
- log.error(message, e);
- throw new AxisJMSException(message, e);
- }
-
- private Boolean getBooleanProperty(MessageContext msgCtx, String name) {
- Object o = msgCtx.getProperty(name);
- if (o != null) {
- if (o instanceof Boolean) {
- return (Boolean) o;
- } else if (o instanceof String) {
- return Boolean.valueOf((String) o);
- }
- }
- return null;
- }
-
- private Integer getIntegerProperty(MessageContext msgCtx, String name) {
- Object o = msgCtx.getProperty(name);
- if (o != null) {
- if (o instanceof Integer) {
- return (Integer) o;
- } else if (o instanceof String) {
- return Integer.parseInt((String) o);
- }
- }
- return null;
- }
-
- public void setConnection(Connection connection) {
- this.connection = connection;
- }
-
- public void setSession(Session session) {
- this.session = session;
- }
-
- public void setProducer(MessageProducer producer) {
- this.producer = producer;
- }
-
- public void setCacheLevel(int cacheLevel) {
- this.cacheLevel = cacheLevel;
- }
-
- public int getCacheLevel() {
- return cacheLevel;
- }
-
- public Connection getConnection() {
- return connection;
- }
-
- public MessageProducer getProducer() {
- return producer;
- }
-
- public Session getSession() {
- return session;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java
deleted file mode 100644
index 9e029b33e1..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.util.Hashtable;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
-import javax.naming.NamingException;
-
-import org.apache.axis2.transport.OutTransportInfo;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
-
-/**
- * The JMS OutTransportInfo is a holder of information to send an outgoing message
- * (e.g. a Response) to a JMS destination. Thus at a minimum a reference to a
- * ConnectionFactory and a Destination are held
- */
-public class JMSOutTransportInfo implements OutTransportInfo {
-
- private static final Log log = LogFactory.getLog(JMSOutTransportInfo.class);
-
- /** The naming context */
- private Context context;
- /**
- * this is a reference to the underlying JMS ConnectionFactory when sending messages
- * through connection factories not defined at the TransportSender level
- */
- private ConnectionFactory connectionFactory = null;
- /**
- * this is a reference to a JMS Connection Factory instance, which has a reference
- * to the underlying actual connection factory, an open connection to the JMS provider
- * and optionally a session already available for use
- */
- private JMSConnectionFactory jmsConnectionFactory = null;
- /** the Destination queue or topic for the outgoing message */
- private Destination destination = null;
- /** the Destination queue or topic for the outgoing message
- * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
- */
- private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
- /** the Reply Destination queue or topic for the outgoing message */
- private Destination replyDestination = null;
- /** the Reply Destination name */
- private String replyDestinationName = null;
- /** the Reply Destination queue or topic for the outgoing message
- * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC
- */
- private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
- /** the EPR properties when the out-transport info is generated from a target EPR */
- private Hashtable<String,String> properties = null;
- /** the target EPR string where applicable */
- private String targetEPR = null;
- /** the message property name that stores the content type of the outgoing message */
- private String contentTypeProperty;
-
- /**
- * Creates an instance using the given JMS connection factory and destination
- *
- * @param jmsConnectionFactory the JMS connection factory
- * @param dest the destination
- * @param contentTypeProperty
- */
- JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest,
- String contentTypeProperty) {
- this.jmsConnectionFactory = jmsConnectionFactory;
- this.destination = dest;
- destinationType = dest instanceof Topic ? JMSConstants.DESTINATION_TYPE_TOPIC
- : JMSConstants.DESTINATION_TYPE_QUEUE;
- this.contentTypeProperty = contentTypeProperty;
- }
-
- /**
- * Creates and instance using the given URL
- *
- * @param targetEPR the target EPR
- */
- JMSOutTransportInfo(String targetEPR) {
-
- this.targetEPR = targetEPR;
- if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) {
- handleException("Invalid prefix for a JMS EPR : " + targetEPR);
-
- } else {
- properties = BaseUtils.getEPRProperties(targetEPR);
- String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE);
- if (destinationType != null) {
- setDestinationType(destinationType);
- }
-
- String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE);
- if (replyDestinationType != null) {
- setReplyDestinationType(replyDestinationType);
- }
-
- replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
- contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
- try {
- context = new InitialContext(properties);
- } catch (NamingException e) {
- handleException("Could not get an initial context using " + properties, e);
- }
-
- destination = getDestination(context, targetEPR);
- replyDestination = getReplyDestination(context, targetEPR);
- }
- }
-
- /**
- * Provides a lazy load when created with a target EPR. This method performs actual
- * lookup for the connection factory and destination
- */
- public void loadConnectionFactoryFromProperies() {
- if (properties != null) {
- connectionFactory = getConnectionFactory(context, properties);
- }
- }
-
- /**
- * Get the referenced ConnectionFactory using the properties from the context
- *
- * @param context the context to use for lookup
- * @param props the properties which contains the JNDI name of the factory
- * @return the connection factory
- */
- private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) {
- try {
-
- String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME);
- if (conFacJndiName != null) {
- return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName);
- } else {
- handleException("Connection Factory JNDI name cannot be determined");
- }
- } catch (NamingException e) {
- handleException("Failed to look up connection factory from JNDI", e);
- }
- return null;
- }
-
- /**
- * Get the JMS destination specified by the given URL from the context
- *
- * @param context the Context to lookup
- * @param url URL
- * @return the JMS destination, or null if it does not exist
- */
- private Destination getDestination(Context context, String url) {
- String destinationName = JMSUtils.getDestination(url);
- try {
- return JMSUtils.lookup(context, Destination.class, destinationName);
- } catch (NameNotFoundException e) {
- try {
- return JMSUtils.lookup(context, Destination.class,
- (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ?
- "dynamicTopics/" : "dynamicQueues/") + destinationName);
- } catch (NamingException x) {
- handleException("Cannot locate destination : " + destinationName + " using " + url);
- }
- } catch (NamingException e) {
- handleException("Cannot locate destination : " + destinationName + " using " + url, e);
- }
- return null;
- }
-
- /**
- * Get the JMS reply destination specified by the given URL from the context
- *
- * @param context the Context to lookup
- * @param url URL
- * @return the JMS destination, or null if it does not exist
- */
- private Destination getReplyDestination(Context context, String url) {
- String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION);
- if(replyDestinationName == null) {
- return null;
- }
-
- try {
- return JMSUtils.lookup(context, Destination.class, replyDestinationName);
- } catch (NameNotFoundException e) {
- if (log.isDebugEnabled()) {
- log.debug("Cannot locate destination : " + replyDestinationName + " using " + url);
- }
- } catch (NamingException e) {
- handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e);
- }
-
- return null;
- }
-
- /**
- * Look up for the given destination
- * @param replyDest the JNDI name to lookup Destination required
- * @return Destination for the JNDI name passed
- */
- public Destination getReplyDestination(String replyDest) {
- try {
- return JMSUtils.lookup(jmsConnectionFactory.getContext(), Destination.class,
- replyDest);
- } catch (NameNotFoundException e) {
- if (log.isDebugEnabled()) {
- log.debug("Cannot locate reply destination : " + replyDest, e);
- }
- } catch (NamingException e) {
- handleException("Cannot locate reply destination : " + replyDest, e);
- }
- return null;
- }
-
-
- private void handleException(String s) {
- log.error(s);
- throw new AxisJMSException(s);
- }
-
- private void handleException(String s, Exception e) {
- log.error(s, e);
- throw new AxisJMSException(s, e);
- }
-
- public Destination getDestination() {
- return destination;
- }
-
- public ConnectionFactory getConnectionFactory() {
- return connectionFactory;
- }
-
- public JMSConnectionFactory getJmsConnectionFactory() {
- return jmsConnectionFactory;
- }
-
- public void setContentType(String contentType) {
- // this is a useless Axis2 method imposed by the OutTransportInfo interface :(
- }
-
- public Hashtable<String,String> getProperties() {
- return properties;
- }
-
- public String getTargetEPR() {
- return targetEPR;
- }
-
- public String getDestinationType() {
- return destinationType;
- }
-
- public void setDestinationType(String destinationType) {
- if (destinationType != null) {
- this.destinationType = destinationType;
- }
- }
-
- public Destination getReplyDestination() {
- return replyDestination;
- }
-
- public void setReplyDestination(Destination replyDestination) {
- this.replyDestination = replyDestination;
- }
-
- public String getReplyDestinationType() {
- return replyDestinationType;
- }
-
- public void setReplyDestinationType(String replyDestinationType) {
- this.replyDestinationType = replyDestinationType;
- }
-
- public String getReplyDestinationName() {
- return replyDestinationName;
- }
-
- public void setReplyDestinationName(String replyDestinationName) {
- this.replyDestinationName = replyDestinationName;
- }
-
- public String getContentTypeProperty() {
- return contentTypeProperty;
- }
-
- public void setContentTypeProperty(String contentTypeProperty) {
- this.contentTypeProperty = contentTypeProperty;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
deleted file mode 100644
index a5f77dc4c9..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.StringWriter;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-
-import javax.activation.DataHandler;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMNode;
-import org.apache.axiom.om.OMOutputFormat;
-import org.apache.axiom.om.OMText;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.transport.MessageFormatter;
-import org.apache.axis2.transport.OutTransportInfo;
-import org.apache.axis2.transport.TransportUtils;
-import org.apache.axis2.transport.http.HTTPConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportSender;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams.WriterOutputStream;
-
-/**
- * The TransportSender for JMS
- */
-public class JMSSender extends AbstractTransportSender implements ManagementSupport {
-
- public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
-
- /** The JMS connection factory manager to be used when sending messages out */
- private JMSConnectionFactoryManager connFacManager;
-
- /**
- * Initialize the transport sender by reading pre-defined connection factories for
- * outgoing messages.
- *
- * @param cfgCtx the configuration context
- * @param transportOut the transport sender definition from axis2.xml
- * @throws AxisFault on error
- */
- public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
- super.init(cfgCtx, transportOut);
- connFacManager = new JMSConnectionFactoryManager(transportOut);
- log.info("JMS Transport Sender initialized...");
- }
-
- /**
- * Get corresponding JMS connection factory defined within the transport sender for the
- * transport-out information - usually constructed from a targetEPR
- *
- * @param trpInfo the transport-out information
- * @return the corresponding JMS connection factory, if any
- */
- private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) {
- Map<String,String> props = trpInfo.getProperties();
- if (trpInfo.getProperties() != null) {
- String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC);
- if (jmsConnectionFactoryName != null) {
- return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
- } else {
- return connFacManager.getJMSConnectionFactory(props);
- }
- } else {
- return null;
- }
- }
-
- /**
- * Performs the actual sending of the JMS message
- */
- public void sendMessage(MessageContext msgCtx, String targetAddress,
- OutTransportInfo outTransportInfo) throws AxisFault {
-
- JMSConnectionFactory jmsConnectionFactory = null;
- JMSOutTransportInfo jmsOut = null;
- JMSMessageSender messageSender = null;
-
- if (targetAddress != null) {
-
- jmsOut = new JMSOutTransportInfo(targetAddress);
- // do we have a definition for a connection factory to use for this address?
- jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
-
- if (jmsConnectionFactory != null) {
- messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress);
-
- } else {
- try {
- messageSender = JMSUtils.createJMSSender(jmsOut);
- } catch (JMSException e) {
- handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
- }
- }
-
- } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
-
- jmsOut = (JMSOutTransportInfo) outTransportInfo;
- try {
- messageSender = JMSUtils.createJMSSender(jmsOut);
- } catch (JMSException e) {
- handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
- }
- }
-
- // The message property to be used to send the content type is determined by
- // the out transport info, i.e. either from the EPR if we are sending a request,
- // or, if we are sending a response, from the configuration of the service that
- // received the request). The property name can be overridden by a message
- // context property.
- String contentTypeProperty =
- (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
- if (contentTypeProperty == null) {
- contentTypeProperty = jmsOut.getContentTypeProperty();
- }
-
- // need to synchronize as Sessions are not thread safe
- synchronized (messageSender.getSession()) {
- try {
- sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
- } finally {
- messageSender.close();
- }
- }
- }
-
- /**
- * Perform actual sending of the JMS message
- */
- private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender,
- String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory,
- JMSOutTransportInfo jmsOut) throws AxisFault {
-
- // convert the axis message context into a JMS Message that we can send over JMS
- Message message = null;
- String correlationId = null;
- try {
- message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty);
- } catch (JMSException e) {
- handleException("Error creating a JMS message from the message context", e);
- }
-
- // should we wait for a synchronous response on this same thread?
- boolean waitForResponse = waitForSynchronousResponse(msgCtx);
- Destination replyDestination = jmsOut.getReplyDestination();
-
- // if this is a synchronous out-in, prepare to listen on the response destination
- if (waitForResponse) {
-
- String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
- if (replyDestName == null && jmsConnectionFactory != null) {
- replyDestName = jmsConnectionFactory.getReplyToDestination();
- }
-
- if (replyDestName != null) {
- if (jmsConnectionFactory != null) {
- replyDestination = jmsConnectionFactory.getDestination(replyDestName);
- } else {
- replyDestination = jmsOut.getReplyDestination(replyDestName);
- }
- }
- replyDestination = JMSUtils.setReplyDestination(
- replyDestination, messageSender.getSession(), message);
- }
-
- try {
- messageSender.send(message, msgCtx);
- metrics.incrementMessagesSent(msgCtx);
-
- } catch (AxisJMSException e) {
- metrics.incrementFaultsSending();
- handleException("Error sending JMS message", e);
- }
-
- try {
- metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message));
- } catch (JMSException e) {
- log.warn("Error reading JMS message size to update transport metrics", e);
- }
-
- // if we are expecting a synchronous response back for the message sent out
- if (waitForResponse) {
- // TODO ********************************************************************************
- // TODO **** replace with asynchronous polling via a poller task to process this *******
- // information would be given. Then it should poll (until timeout) the
- // requested destination for the response message and inject it from a
- // asynchronous worker thread
- try {
- messageSender.getConnection().start(); // multiple calls are safely ignored
- } catch (JMSException ignore) {}
-
- try {
- correlationId = message.getJMSMessageID();
- } catch(JMSException ignore) {}
-
- // We assume here that the response uses the same message property to
- // specify the content type of the message.
- waitForResponseAndProcess(messageSender.getSession(), replyDestination,
- msgCtx, correlationId, contentTypeProperty);
- // TODO ********************************************************************************
- }
- }
-
- /**
- * Create a Consumer for the reply destination and wait for the response JMS message
- * synchronously. If a message arrives within the specified time interval, process it
- * through Axis2
- * @param session the session to use to listen for the response
- * @param replyDestination the JMS reply Destination
- * @param msgCtx the outgoing message for which we are expecting the response
- * @param contentTypeProperty the message property used to determine the content type
- * of the response message
- * @throws AxisFault on error
- */
- private void waitForResponseAndProcess(Session session, Destination replyDestination,
- MessageContext msgCtx, String correlationId,
- String contentTypeProperty) throws AxisFault {
-
- try {
- MessageConsumer consumer;
- consumer = JMSUtils.createConsumer(session, replyDestination,
- "JMSCorrelationID = '" + correlationId + "'");
-
- // how long are we willing to wait for the sync response
- long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
- String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY);
- if (waitReply != null) {
- timeout = Long.valueOf(waitReply).longValue();
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Waiting for a maximum of " + timeout +
- "ms for a response message to destination : " + replyDestination +
- " with JMS correlation ID : " + correlationId);
- }
-
- Message reply = consumer.receive(timeout);
-
- if (reply != null) {
-
- // update transport level metrics
- metrics.incrementMessagesReceived();
- try {
- metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply));
- } catch (JMSException e) {
- log.warn("Error reading JMS message size to update transport metrics", e);
- }
-
- try {
- processSyncResponse(msgCtx, reply, contentTypeProperty);
- metrics.incrementMessagesReceived();
- } catch (AxisFault e) {
- metrics.incrementFaultsReceiving();
- throw e;
- }
-
- } else {
- log.warn("Did not receive a JMS response within " +
- timeout + " ms to destination : " + replyDestination +
- " with JMS correlation ID : " + correlationId);
- metrics.incrementTimeoutsReceiving();
- }
-
- } catch (JMSException e) {
- metrics.incrementFaultsReceiving();
- handleException("Error creating a consumer, or receiving a synchronous reply " +
- "for outgoing MessageContext ID : " + msgCtx.getMessageID() +
- " and reply Destination : " + replyDestination, e);
- }
- }
-
- /**
- * Create a JMS Message from the given MessageContext and using the given
- * session
- *
- * @param msgContext the MessageContext
- * @param session the JMS session
- * @param contentTypeProperty the message property to be used to store the
- * content type
- * @return a JMS message from the context and session
- * @throws JMSException on exception
- * @throws AxisFault on exception
- */
- private Message createJMSMessage(MessageContext msgContext, Session session,
- String contentTypeProperty) throws JMSException, AxisFault {
-
- Message message = null;
- String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
-
- // check the first element of the SOAP body, do we have content wrapped using the
- // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or
- // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages
- // for JMS but just get the payload in its native format
- String jmsPayloadType = guessMessageType(msgContext);
-
- if (jmsPayloadType == null) {
-
- OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
- MessageFormatter messageFormatter = null;
- try {
- messageFormatter = TransportUtils.getMessageFormatter(msgContext);
- } catch (AxisFault axisFault) {
- throw new JMSException("Unable to get the message formatter to use");
- }
-
- String contentType = messageFormatter.getContentType(
- msgContext, format, msgContext.getSoapAction());
-
- boolean useBytesMessage =
- msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
- contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1;
-
- OutputStream out;
- StringWriter sw;
- if (useBytesMessage) {
- BytesMessage bytesMsg = session.createBytesMessage();
- sw = null;
- out = new BytesMessageOutputStream(bytesMsg);
- message = bytesMsg;
- } else {
- sw = new StringWriter();
- try {
- out = new WriterOutputStream(sw, format.getCharSetEncoding());
- } catch (UnsupportedCharsetException ex) {
- handleException("Unsupported encoding " + format.getCharSetEncoding(), ex);
- return null;
- }
- }
-
- try {
- messageFormatter.writeTo(msgContext, format, out, true);
- out.close();
- } catch (IOException e) {
- handleException("IO Error while creating BytesMessage", e);
- }
-
- if (!useBytesMessage) {
- TextMessage txtMsg = session.createTextMessage();
- txtMsg.setText(sw.toString());
- message = txtMsg;
- }
-
- if (contentTypeProperty != null) {
- message.setStringProperty(contentTypeProperty, contentType);
- }
-
- } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
- message = session.createBytesMessage();
- BytesMessage bytesMsg = (BytesMessage) message;
- OMElement wrapper = msgContext.getEnvelope().getBody().
- getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER);
- OMNode omNode = wrapper.getFirstOMChild();
- if (omNode != null && omNode instanceof OMText) {
- Object dh = ((OMText) omNode).getDataHandler();
- if (dh != null && dh instanceof DataHandler) {
- try {
- ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg));
- } catch (IOException e) {
- handleException("Error serializing binary content of element : " +
- BaseConstants.DEFAULT_BINARY_WRAPPER, e);
- }
- }
- }
-
- } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) {
- message = session.createTextMessage();
- TextMessage txtMsg = (TextMessage) message;
- txtMsg.setText(msgContext.getEnvelope().getBody().
- getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText());
- }
-
- // set the JMS correlation ID if specified
- String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID);
- if (correlationId == null && msgContext.getRelatesTo() != null) {
- correlationId = msgContext.getRelatesTo().getValue();
- }
-
- if (correlationId != null) {
- message.setJMSCorrelationID(correlationId);
- }
-
- if (msgContext.isServerSide()) {
- // set SOAP Action as a property on the JMS message
- setProperty(message, msgContext, BaseConstants.SOAPACTION);
- } else {
- String action = msgContext.getOptions().getAction();
- if (action != null) {
- message.setStringProperty(BaseConstants.SOAPACTION, action);
- }
- }
-
- JMSUtils.setTransportHeaders(msgContext, message);
- return message;
- }
-
- /**
- * Guess the message type to use for JMS looking at the message contexts' envelope
- * @param msgContext the message context
- * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null
- */
- private String guessMessageType(MessageContext msgContext) {
- OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement();
- if (firstChild != null) {
- if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) {
- return JMSConstants.JMS_BYTE_MESSAGE;
- } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) {
- return JMSConstants.JMS_TEXT_MESSAGE;
- }
- }
- return null;
- }
-
- /**
- * Creates an Axis MessageContext for the received JMS message and
- * sets up the transports and various properties
- *
- * @param outMsgCtx the outgoing message for which we are expecting the response
- * @param message the JMS response message received
- * @param contentTypeProperty the message property used to determine the content type
- * of the response message
- * @throws AxisFault on error
- */
- private void processSyncResponse(MessageContext outMsgCtx, Message message,
- String contentTypeProperty) throws AxisFault {
-
- MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
-
- // load any transport headers from received message
- JMSUtils.loadTransportHeaders(message, responseMsgCtx);
-
- // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response
- // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This
- // question is still under debate and due to the timelines, I am commiting this
- // workaround as Axis2 1.2 is about to be released and Synapse 1.0
- responseMsgCtx.setServerSide(false);
-
- String contentType =
- contentTypeProperty == null ? null
- : JMSUtils.getProperty(message, contentTypeProperty);
-
- try {
- JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType);
- } catch (JMSException ex) {
- throw AxisFault.makeFault(ex);
- }
-// responseMsgCtx.setServerSide(true);
-
- handleIncomingMessage(
- responseMsgCtx,
- JMSUtils.getTransportHeaders(message),
- JMSUtils.getProperty(message, BaseConstants.SOAPACTION),
- contentType
- );
- }
-
- private void setProperty(Message message, MessageContext msgCtx, String key) {
-
- String value = getProperty(msgCtx, key);
- if (value != null) {
- try {
- message.setStringProperty(key, value);
- } catch (JMSException e) {
- log.warn("Couldn't set message property : " + key + " = " + value, e);
- }
- }
- }
-
- private String getProperty(MessageContext mc, String key) {
- return (String) mc.getProperty(key);
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java
deleted file mode 100644
index 63faa0b852..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java
+++ /dev/null
@@ -1,1115 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.lang.reflect.Method;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicSession;
-import javax.mail.internet.ContentType;
-import javax.mail.internet.ParseException;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.builder.Builder;
-import org.apache.axis2.builder.BuilderUtil;
-import org.apache.axis2.builder.SOAPBuilder;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.axis2.transport.TransportUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tuscany.sca.binding.ws.axis2.format.DataSourceMessageBuilder;
-import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilder;
-import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilderAdapter;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool;
-
-/**
- * Miscallaneous methods used for the JMS transport
- */
-public class JMSUtils extends BaseUtils {
-
- private static final Log log = LogFactory.getLog(JMSUtils.class);
- private static final Class[] NOARGS = new Class[] {};
- private static final Object[] NOPARMS = new Object[] {};
-
- /**
- * Should this service be enabled over the JMS transport?
- *
- * @param service the Axis service
- * @return true if JMS should be enabled
- */
- public static boolean isJMSService(AxisService service) {
- if (service.isEnableAllTransports()) {
- return true;
-
- } else {
- List transports = service.getExposedTransports();
- for (Object transport : transports) {
- if (JMSListener.TRANSPORT_NAME.equals(transport)) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Get the EPR for the given JMS connection factory and destination
- * the form of the URL is
- * jms:/<destination>?[<key>=<value>&]*
- * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS
- * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered
- *
- * @param cf the Axis2 JMS connection factory
- * @param destinationType the type of destination
- * @param endpoint JMSEndpoint
- * @return the EPR as a String
- */
- static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) {
- StringBuffer sb = new StringBuffer();
-
- sb.append(
- JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName());
- sb.append("?").
- append(JMSConstants.PARAM_DEST_TYPE).append("=").append(
- destinationType == JMSConstants.TOPIC ?
- JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE);
-
- if (endpoint.getContentTypeRuleSet() != null) {
- String contentTypeProperty =
- endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty();
- if (contentTypeProperty != null) {
- sb.append("&");
- sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
- sb.append("=");
- sb.append(contentTypeProperty);
- }
- }
-
- for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) {
- if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) &&
- !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) &&
- !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) &&
- !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) {
- sb.append("&").append(
- entry.getKey()).append("=").append(entry.getValue());
- }
- }
- return sb.toString();
- }
-
- /**
- * Get a String property from the JMS message
- *
- * @param message JMS message
- * @param property property name
- * @return property value
- */
- public static String getProperty(Message message, String property) {
- try {
- return message.getStringProperty(property);
- } catch (JMSException e) {
- return null;
- }
- }
-
- /**
- * Return the destination name from the given URL
- *
- * @param url the URL
- * @return the destination name
- */
- public static String getDestination(String url) {
- String tempUrl = url.substring(JMSConstants.JMS_PREFIX.length());
- int propPos = tempUrl.indexOf("?");
-
- if (propPos == -1) {
- return tempUrl;
- } else {
- return tempUrl.substring(0, propPos);
- }
- }
-
- /**
- * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in
- * @param message the JMS message read
- * @param msgContext the Axis2 MessageContext to be populated
- * @param contentType content type for the message
- * @throws AxisFault
- * @throws JMSException
- */
- public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType)
- throws AxisFault, JMSException {
-
- if (contentType == null) {
- if (message instanceof TextMessage) {
- contentType = "text/plain";
- } else {
- contentType = "application/octet-stream";
- }
- if (log.isDebugEnabled()) {
- log.debug("No content type specified; assuming " + contentType);
- }
- }
-
- int index = contentType.indexOf(';');
- String type = index > 0 ? contentType.substring(0, index) : contentType;
- Builder builder = BuilderUtil.getBuilderFromSelector(type, msgContext);
- if (builder == null) {
- if (log.isDebugEnabled()) {
- log.debug("No message builder found for type '" + type + "'. Falling back to SOAP.");
- }
- builder = new SOAPBuilder();
- }
-
- OMElement documentElement;
- if (message instanceof BytesMessage) {
- // Extract the charset encoding from the content type and
- // set the CHARACTER_SET_ENCODING property as e.g. SOAPBuilder relies on this.
- String charSetEnc = null;
- try {
- if (contentType != null) {
- charSetEnc = new ContentType(contentType).getParameter("charset");
- }
- } catch (ParseException ex) {
- // ignore
- }
- msgContext.setProperty(Constants.Configuration.CHARACTER_SET_ENCODING, charSetEnc);
-
- if (builder instanceof DataSourceMessageBuilder) {
- documentElement = ((DataSourceMessageBuilder)builder).processDocument(
- new BytesMessageDataSource((BytesMessage)message), contentType,
- msgContext);
- } else {
- documentElement = builder.processDocument(
- new BytesMessageInputStream((BytesMessage)message), contentType,
- msgContext);
- }
- } else if (message instanceof TextMessage) {
- TextMessageBuilder textMessageBuilder;
- if (builder instanceof TextMessageBuilder) {
- textMessageBuilder = (TextMessageBuilder)builder;
- } else {
- textMessageBuilder = new TextMessageBuilderAdapter(builder);
- }
- String content = ((TextMessage)message).getText();
- documentElement = textMessageBuilder.processDocument(content, contentType, msgContext);
- } else {
- handleException("Unsupported JMS message type " + message.getClass().getName());
- return; // Make compiler happy
- }
- msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement));
- }
-
- /**
- * Set the JMS ReplyTo for the message
- *
- * @param replyDestination the JMS Destination where the reply is expected
- * @param session the session to use to create a temp Queue if a response is expected
- * but a Destination has not been specified
- * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo
- * @return the JMS ReplyTo Destination for the message
- */
- public static Destination setReplyDestination(Destination replyDestination, Session session,
- Message message) {
-
- if (replyDestination == null) {
- try {
- // create temporary queue to receive the reply
- replyDestination = createTemporaryDestination(session);
- } catch (JMSException e) {
- handleException("Error creating temporary queue for response");
- }
- }
-
- try {
- message.setJMSReplyTo(replyDestination);
- } catch (JMSException e) {
- log.warn("Error setting JMS ReplyTo destination to : " + replyDestination, e);
- }
-
- if (log.isDebugEnabled()) {
- try {
- assert replyDestination != null;
- log.debug("Expecting a response to JMS Destination : " +
- (replyDestination instanceof Queue ?
- ((Queue) replyDestination).getQueueName() :
- ((Topic) replyDestination).getTopicName()));
- } catch (JMSException ignore) {}
- }
- return replyDestination;
- }
-
- /**
- * Set transport headers from the axis message context, into the JMS message
- *
- * @param msgContext the axis message context
- * @param message the JMS Message
- * @throws JMSException on exception
- */
- public static void setTransportHeaders(MessageContext msgContext, Message message)
- throws JMSException {
-
- Map headerMap = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
-
- if (headerMap == null) {
- return;
- }
-
- for (Object headerName : headerMap.keySet()) {
-
- String name = (String) headerName;
-
- if (name.startsWith(JMSConstants.JMSX_PREFIX) &&
- !(name.equals(JMSConstants.JMSX_GROUP_ID) || name.equals(JMSConstants.JMSX_GROUP_SEQ))) {
- continue;
- }
-
- if (JMSConstants.JMS_COORELATION_ID.equals(name)) {
- message.setJMSCorrelationID(
- (String) headerMap.get(JMSConstants.JMS_COORELATION_ID));
- } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) {
- Object o = headerMap.get(JMSConstants.JMS_DELIVERY_MODE);
- if (o instanceof Integer) {
- message.setJMSDeliveryMode((Integer) o);
- } else if (o instanceof String) {
- try {
- message.setJMSDeliveryMode(Integer.parseInt((String) o));
- } catch (NumberFormatException nfe) {
- log.warn("Invalid delivery mode ignored : " + o, nfe);
- }
- } else {
- log.warn("Invalid delivery mode ignored : " + o);
- }
-
- } else if (JMSConstants.JMS_EXPIRATION.equals(name)) {
- message.setJMSExpiration(
- Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION)));
- } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) {
- message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID));
- } else if (JMSConstants.JMS_PRIORITY.equals(name)) {
- message.setJMSPriority(
- Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY)));
- } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) {
- message.setJMSTimestamp(
- Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP)));
- } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) {
- message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE));
-
- } else {
- Object value = headerMap.get(name);
- if (value instanceof String) {
- message.setStringProperty(name, (String) value);
- } else if (value instanceof Boolean) {
- message.setBooleanProperty(name, (Boolean) value);
- } else if (value instanceof Integer) {
- message.setIntProperty(name, (Integer) value);
- } else if (value instanceof Long) {
- message.setLongProperty(name, (Long) value);
- } else if (value instanceof Double) {
- message.setDoubleProperty(name, (Double) value);
- } else if (value instanceof Float) {
- message.setFloatProperty(name, (Float) value);
- }
- }
- }
- }
-
- /**
- * Read the transport headers from the JMS Message and set them to the axis2 message context
- *
- * @param message the JMS Message received
- * @param responseMsgCtx the axis message context
- * @throws AxisFault on error
- */
- public static void loadTransportHeaders(Message message, MessageContext responseMsgCtx)
- throws AxisFault {
- responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, getTransportHeaders(message));
- }
-
- /**
- * Extract transport level headers for JMS from the given message into a Map
- *
- * @param message the JMS message
- * @return a Map of the transport headers
- */
- public static Map<String, Object> getTransportHeaders(Message message) {
- // create a Map to hold transport headers
- Map<String, Object> map = new HashMap<String, Object>();
-
- // correlation ID
- try {
- if (message.getJMSCorrelationID() != null) {
- map.put(JMSConstants.JMS_COORELATION_ID, message.getJMSCorrelationID());
- }
- } catch (JMSException ignore) {}
-
- // set the delivery mode as persistent or not
- try {
- map.put(JMSConstants.JMS_DELIVERY_MODE, Integer.toString(message.getJMSDeliveryMode()));
- } catch (JMSException ignore) {}
-
- // destination name
- try {
- if (message.getJMSDestination() != null) {
- Destination dest = message.getJMSDestination();
- map.put(JMSConstants.JMS_DESTINATION,
- dest instanceof Queue ?
- ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName());
- }
- } catch (JMSException ignore) {}
-
- // expiration
- try {
- map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message.getJMSExpiration()));
- } catch (JMSException ignore) {}
-
- // if a JMS message ID is found
- try {
- if (message.getJMSMessageID() != null) {
- map.put(JMSConstants.JMS_MESSAGE_ID, message.getJMSMessageID());
- }
- } catch (JMSException ignore) {}
-
- // priority
- try {
- map.put(JMSConstants.JMS_PRIORITY, Long.toString(message.getJMSPriority()));
- } catch (JMSException ignore) {}
-
- // redelivered
- try {
- map.put(JMSConstants.JMS_REDELIVERED, Boolean.toString(message.getJMSRedelivered()));
- } catch (JMSException ignore) {}
-
- // replyto destination name
- try {
- if (message.getJMSReplyTo() != null) {
- Destination dest = message.getJMSReplyTo();
- map.put(JMSConstants.JMS_REPLY_TO,
- dest instanceof Queue ?
- ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName());
- }
- } catch (JMSException ignore) {}
-
- // priority
- try {
- map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message.getJMSTimestamp()));
- } catch (JMSException ignore) {}
-
- // message type
- try {
- if (message.getJMSType() != null) {
- map.put(JMSConstants.JMS_TYPE, message.getJMSType());
- }
- } catch (JMSException ignore) {}
-
- // any other transport properties / headers
- Enumeration e = null;
- try {
- e = message.getPropertyNames();
- } catch (JMSException ignore) {}
-
- if (e != null) {
- while (e.hasMoreElements()) {
- String headerName = (String) e.nextElement();
- try {
- map.put(headerName, message.getStringProperty(headerName));
- continue;
- } catch (JMSException ignore) {}
- try {
- map.put(headerName, message.getBooleanProperty(headerName));
- continue;
- } catch (JMSException ignore) {}
- try {
- map.put(headerName, message.getIntProperty(headerName));
- continue;
- } catch (JMSException ignore) {}
- try {
- map.put(headerName, message.getLongProperty(headerName));
- continue;
- } catch (JMSException ignore) {}
- try {
- map.put(headerName, message.getDoubleProperty(headerName));
- continue;
- } catch (JMSException ignore) {}
- try {
- map.put(headerName, message.getFloatProperty(headerName));
- } catch (JMSException ignore) {}
- }
- }
-
- return map;
- }
-
-
- /**
- * Create a MessageConsumer for the given Destination
- * @param session JMS Session to use
- * @param dest Destination for which the Consumer is to be created
- * @param messageSelector the message selector to be used if any
- * @return a MessageConsumer for the specified Destination
- * @throws JMSException
- */
- public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector)
- throws JMSException {
-
- if (dest instanceof Queue) {
- return ((QueueSession) session).createReceiver((Queue) dest, messageSelector);
- } else {
- return ((TopicSession) session).createSubscriber((Topic) dest, messageSelector, false);
- }
- }
-
- /**
- * Create a temp queue or topic for synchronous receipt of responses, when a reply destination
- * is not specified
- * @param session the JMS Session to use
- * @return a temporary Queue or Topic, depending on the session
- * @throws JMSException
- */
- public static Destination createTemporaryDestination(Session session) throws JMSException {
-
- if (session instanceof QueueSession) {
- return session.createTemporaryQueue();
- } else {
- return session.createTemporaryTopic();
- }
- }
-
- /**
- * Return the body length in bytes for a bytes message
- * @param bMsg the JMS BytesMessage
- * @return length of body in bytes
- */
- public static long getBodyLength(BytesMessage bMsg) {
- try {
- Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS);
- if (mtd != null) {
- return (Long) mtd.invoke(bMsg, NOPARMS);
- }
- } catch (Exception e) {
- // JMS 1.0
- if (log.isDebugEnabled()) {
- log.debug("Error trying to determine JMS BytesMessage body length", e);
- }
- }
-
- // if JMS 1.0
- long length = 0;
- try {
- byte[] buffer = new byte[2048];
- bMsg.reset();
- for (int bytesRead = bMsg.readBytes(buffer); bytesRead != -1;
- bytesRead = bMsg.readBytes(buffer)) {
- length += bytesRead;
- }
- } catch (JMSException ignore) {}
- return length;
- }
-
- /**
- * Get the length of the message in bytes
- * @param message
- * @return message size (or approximation) in bytes
- * @throws JMSException
- */
- public static long getMessageSize(Message message) throws JMSException {
- if (message instanceof BytesMessage) {
- return JMSUtils.getBodyLength((BytesMessage) message);
- } else if (message instanceof TextMessage) {
- // TODO: Converting the whole message to a byte array is too much overhead just to determine the message size.
- // Anyway, the result is not accurate since we don't know what encoding the JMS provider uses.
- return ((TextMessage) message).getText().getBytes().length;
- } else {
- log.warn("Can't determine size of JMS message; unsupported message type : "
- + message.getClass().getName());
- return 0;
- }
- }
-
- public static <T> T lookup(Context context, Class<T> clazz, String name)
- throws NamingException {
-
- Object object = context.lookup(name);
- try {
- return clazz.cast(object);
- } catch (ClassCastException ex) {
- // Instead of a ClassCastException, throw an exception with some
- // more information.
- if (object instanceof Reference) {
- Reference ref = (Reference)object;
- handleException("JNDI failed to de-reference Reference with name " +
- name + "; is the factory " + ref.getFactoryClassName() +
- " in your classpath?");
- return null;
- } else {
- handleException("JNDI lookup of name " + name + " returned a " +
- object.getClass().getName() + " while a " + clazz + " was expected");
- return null;
- }
- }
- }
-
- /**
- * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory
- * @param jcf
- * @param service
- * @param workerPool
- * @return
- */
- public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf,
- AxisService service, WorkerPool workerPool) {
-
- String name = service.getName();
- Map<String, String> svc = getServiceStringParameters(service.getParameters());
- Map<String, String> cf = jcf.getParameters();
-
- ServiceTaskManager stm = new ServiceTaskManager();
-
- stm.setServiceName(name);
- stm.addJmsProperties(cf);
- stm.addJmsProperties(svc);
-
- stm.setConnFactoryJNDIName(
- getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf));
- String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf);
- if (destName == null) {
- destName = service.getName();
- }
- stm.setDestinationJNDIName(destName);
- stm.setDestinationType(getDestinationType(svc, cf));
-
- stm.setJmsSpec11(
- getJMSSpecVersion(svc, cf));
- stm.setTransactionality(
- getTransactionality(svc, cf));
- stm.setCacheUserTransaction(
- getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf));
- stm.setUserTransactionJNDIName(
- getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf));
- stm.setSessionTransacted(
- getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf));
- stm.setSessionAckMode(
- getSessionAck(svc, cf));
- stm.setMessageSelector(
- getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf));
- stm.setSubscriptionDurable(
- getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf));
- stm.setDurableSubscriberName(
- getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf));
-
- stm.setCacheLevel(
- getCacheLevel(svc, cf));
- stm.setPubSubNoLocal(
- getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf));
-
- Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf);
- if (value != null) {
- stm.setReceiveTimeout(value);
- }
- value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf);
- if (value != null) {
- stm.setConcurrentConsumers(value);
- }
- value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf);
- if (value != null) {
- stm.setMaxConcurrentConsumers(value);
- }
- value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf);
- if (value != null) {
- stm.setIdleTaskExecutionLimit(value);
- }
- value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf);
- if (value != null) {
- stm.setMaxMessagesPerTask(value);
- }
-
- value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf);
- if (value != null) {
- stm.setInitialReconnectDuration(value);
- }
- value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf);
- if (value != null) {
- stm.setMaxReconnectDuration(value);
- }
- Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf);
- if (dValue != null) {
- stm.setReconnectionProgressionFactor(dValue);
- }
-
- stm.setWorkerPool(workerPool);
-
- // remove processed properties from property bag
- stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME);
- stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION);
- stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER);
- stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY);
- stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN);
- stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME);
- stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED);
- stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR);
- stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE);
- stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME);
- stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL);
- stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL);
- stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT);
- stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS);
- stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS);
- stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT);
- stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK);
- stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION);
- stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION);
- stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR);
-
- return stm;
- }
-
- private static Map<String, String> getServiceStringParameters(List list) {
-
- Map<String, String> map = new HashMap<String, String>();
- for (Object o : list) {
- Parameter p = (Parameter) o;
- if (p.getValue() instanceof String) {
- map.put(p.getName(), (String) p.getValue());
- }
- }
- return map;
- }
-
- private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) {
- String value = (String) svcMap.get(key);
- if (value == null) {
- value = (String) cfMap.get(key);
- }
- if (value == null) {
- throw new AxisJMSException("Service/connection factory property : " + key);
- }
- return value;
- }
-
- private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) {
- String value = (String) svcMap.get(key);
- if (value == null) {
- value = (String) cfMap.get(key);
- }
- return value;
- }
-
- private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) {
- String value = (String) svcMap.get(key);
- if (value == null) {
- value = (String) cfMap.get(key);
- }
- if (value == null) {
- return null;
- } else {
- return Boolean.valueOf(value);
- }
- }
-
- private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) {
- String value = (String) svcMap.get(key);
- if (value == null) {
- value = (String) cfMap.get(key);
- }
- if (value != null) {
- try {
- return Integer.parseInt(value);
- } catch (NumberFormatException e) {
- throw new AxisJMSException("Invalid value : " + value + " for " + key);
- }
- }
- return null;
- }
-
- private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) {
- String value = (String) svcMap.get(key);
- if (value == null) {
- value = (String) cfMap.get(key);
- }
- if (value != null) {
- try {
- return Double.parseDouble(value);
- } catch (NumberFormatException e) {
- throw new AxisJMSException("Invalid value : " + value + " for " + key);
- }
- }
- return null;
- }
-
- private static int getTransactionality(Map svcMap, Map cfMap) {
-
- String key = BaseConstants.PARAM_TRANSACTIONALITY;
- String val = (String) svcMap.get(key);
- if (val == null) {
- val = (String) cfMap.get(key);
- }
-
- if (val == null) {
- return BaseConstants.TRANSACTION_NONE;
-
- } else {
- if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) {
- return BaseConstants.TRANSACTION_JTA;
- } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) {
- return BaseConstants.TRANSACTION_LOCAL;
- } else {
- throw new AxisJMSException("Invalid option : " + val + " for parameter : " +
- BaseConstants.STR_TRANSACTION_JTA);
- }
- }
- }
-
- private static int getDestinationType(Map svcMap, Map cfMap) {
-
- String key = JMSConstants.PARAM_DEST_TYPE;
- String val = (String) svcMap.get(key);
- if (val == null) {
- val = (String) cfMap.get(key);
- }
-
- if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) {
- return JMSConstants.TOPIC;
- }
- return JMSConstants.QUEUE;
- }
-
- private static int getSessionAck(Map svcMap, Map cfMap) {
-
- String key = JMSConstants.PARAM_SESSION_ACK;
- String val = (String) svcMap.get(key);
- if (val == null) {
- val = (String) cfMap.get(key);
- }
-
- if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) {
- return Session.AUTO_ACKNOWLEDGE;
- } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) {
- return Session.CLIENT_ACKNOWLEDGE;
- } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){
- return Session.DUPS_OK_ACKNOWLEDGE;
- } else if ("SESSION_TRANSACTED".equals(val)) {
- return 0; //Session.SESSION_TRANSACTED;
- } else {
- try {
- return Integer.parseInt(val);
- } catch (NumberFormatException ignore) {
- throw new AxisJMSException("Invalid session acknowledgement mode : " + val);
- }
- }
- }
-
- private static int getCacheLevel(Map svcMap, Map cfMap) {
-
- String key = JMSConstants.PARAM_CACHE_LEVEL;
- String val = (String) svcMap.get(key);
- if (val == null) {
- val = (String) cfMap.get(key);
- }
-
- if ("none".equalsIgnoreCase(val)) {
- return JMSConstants.CACHE_NONE;
- } else if ("connection".equalsIgnoreCase(val)) {
- return JMSConstants.CACHE_CONNECTION;
- } else if ("session".equals(val)){
- return JMSConstants.CACHE_SESSION;
- } else if ("consumer".equals(val)) {
- return JMSConstants.CACHE_CONSUMER;
- } else if (val != null) {
- throw new AxisJMSException("Invalid cache level : " + val);
- }
- return JMSConstants.CACHE_AUTO;
- }
-
- private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) {
-
- String key = JMSConstants.PARAM_JMS_SPEC_VER;
- String val = (String) svcMap.get(key);
- if (val == null) {
- val = (String) cfMap.get(key);
- }
-
- if (val == null || "1.1".equals(val)) {
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * This is a JMS spec independent method to create a Connection. Please be cautious when
- * making any changes
- *
- * @param conFac the ConnectionFactory to use
- * @param user optional user name
- * @param pass optional password
- * @param jmsSpec11 should we use JMS 1.1 API ?
- * @param isQueue is this to deal with a Queue?
- * @return a JMS Connection as requested
- * @throws JMSException on errors, to be handled and logged by the caller
- */
- public static Connection createConnection(ConnectionFactory conFac,
- String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException {
-
- Connection connection = null;
- if (log.isDebugEnabled()) {
- log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") +
- "Connection using credentials : (" + user + "/" + pass + ")");
- }
-
- if (jmsSpec11 || isQueue == null) {
- if (user != null && pass != null) {
- connection = conFac.createConnection(user, pass);
- } else {
- connection = conFac.createConnection();
- }
-
- } else {
- QueueConnectionFactory qConFac = null;
- TopicConnectionFactory tConFac = null;
- if (isQueue) {
- tConFac = (TopicConnectionFactory) conFac;
- } else {
- qConFac = (QueueConnectionFactory) conFac;
- }
-
- if (user != null && pass != null) {
- if (qConFac != null) {
- connection = qConFac.createQueueConnection(user, pass);
- } else if (tConFac != null) {
- connection = tConFac.createTopicConnection(user, pass);
- }
- } else {
- if (qConFac != null) {
- connection = qConFac.createQueueConnection();
- } else if (tConFac != null) {
- connection = tConFac.createTopicConnection();
- }
- }
- }
- return connection;
- }
-
- /**
- * This is a JMS spec independent method to create a Session. Please be cautious when
- * making any changes
- *
- * @param connection the JMS Connection
- * @param transacted should the session be transacted?
- * @param ackMode the ACK mode for the session
- * @param jmsSpec11 should we use the JMS 1.1 API?
- * @param isQueue is this Session to deal with a Queue?
- * @return a Session created for the given information
- * @throws JMSException on errors, to be handled and logged by the caller
- */
- public static Session createSession(Connection connection, boolean transacted, int ackMode,
- boolean jmsSpec11, Boolean isQueue) throws JMSException {
-
- if (jmsSpec11 || isQueue == null) {
- return connection.createSession(transacted, ackMode);
-
- } else {
- if (isQueue) {
- return ((QueueConnection) connection).createQueueSession(transacted, ackMode);
- } else {
- return ((TopicConnection) connection).createTopicSession(transacted, ackMode);
- }
- }
- }
-
- /**
- * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when
- * making any changes
- *
- * @param session JMS session
- * @param destination the Destination
- * @param isQueue is the Destination a queue?
- * @param subscriberName optional client name to use for a durable subscription to a topic
- * @param messageSelector optional message selector
- * @param pubSubNoLocal should we receive messages sent by us during pub-sub?
- * @param isDurable is this a durable topic subscription?
- * @param jmsSpec11 should we use JMS 1.1 API ?
- * @return a MessageConsumer to receive messages
- * @throws JMSException on errors, to be handled and logged by the caller
- */
- public static MessageConsumer createConsumer(
- Session session, Destination destination, Boolean isQueue,
- String subscriberName, String messageSelector, boolean pubSubNoLocal,
- boolean isDurable, boolean jmsSpec11) throws JMSException {
-
- if (jmsSpec11 || isQueue == null) {
- if (isDurable) {
- return session.createDurableSubscriber(
- (Topic) destination, subscriberName, messageSelector, pubSubNoLocal);
- } else {
- return session.createConsumer(destination, messageSelector, pubSubNoLocal);
- }
- } else {
- if (isQueue) {
- return ((QueueSession) session).createReceiver((Queue) destination, messageSelector);
- } else {
- if (isDurable) {
- return ((TopicSession) session).createDurableSubscriber(
- (Topic) destination, subscriberName, messageSelector, pubSubNoLocal);
- } else {
- return ((TopicSession) session).createSubscriber(
- (Topic) destination, messageSelector, pubSubNoLocal);
- }
- }
- }
- }
-
- /**
- * This is a JMS spec independent method to create a MessageProducer. Please be cautious when
- * making any changes
- *
- * @param session JMS session
- * @param destination the Destination
- * @param isQueue is the Destination a queue?
- * @param jmsSpec11 should we use JMS 1.1 API ?
- * @return a MessageProducer to send messages to the given Destination
- * @throws JMSException on errors, to be handled and logged by the caller
- */
- public static MessageProducer createProducer(
- Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException {
-
- if (jmsSpec11 || isQueue == null) {
- return session.createProducer(destination);
- } else {
- if (isQueue) {
- return ((QueueSession) session).createSender((Queue) destination);
- } else {
- return ((TopicSession) session).createPublisher((Topic) destination);
- }
- }
- }
-
- /**
- * Create a one time MessageProducer for the given JMS OutTransport information
- * For simplicity and best compatibility, this method uses only JMS 1.0.2b API.
- * Please be cautious when making any changes
- *
- * @param jmsOut the JMS OutTransport information (contains all properties)
- * @return a JMSSender based on one-time use resources
- * @throws JMSException on errors, to be handled and logged by the caller
- */
- public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut)
- throws JMSException {
-
- // digest the targetAddress and locate CF from the EPR
- jmsOut.loadConnectionFactoryFromProperies();
-
- // create a one time connection and session to be used
- Hashtable<String,String> jmsProps = jmsOut.getProperties();
- String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null;
- String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null;
-
- QueueConnectionFactory qConFac = null;
- TopicConnectionFactory tConFac = null;
-
- int destType = -1;
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) {
- destType = JMSConstants.QUEUE;
- qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory();
-
- } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) {
- destType = JMSConstants.TOPIC;
- tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory();
- }
-
- Connection connection = null;
- if (user != null && pass != null) {
- if (qConFac != null) {
- connection = qConFac.createQueueConnection(user, pass);
- } else if (tConFac != null) {
- connection = tConFac.createTopicConnection(user, pass);
- }
- } else {
- if (qConFac != null) {
- connection = qConFac.createQueueConnection();
- } else if (tConFac != null) {
- connection = tConFac.createTopicConnection();
- }
- }
-
- if (connection == null && jmsOut.getJmsConnectionFactory() != null) {
- connection = jmsOut.getJmsConnectionFactory().getConnection();
- }
-
- Session session = null;
- MessageProducer producer = null;
- Destination destination = jmsOut.getDestination();
-
- if (destType == JMSConstants.QUEUE) {
- session = ((QueueConnection) connection).
- createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = ((QueueSession) session).createSender((Queue) destination);
- } else {
- session = ((TopicConnection) connection).
- createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = ((TopicSession) session).createPublisher((Topic) destination);
- }
-
- return new JMSMessageSender(connection, session, producer,
- destination, (jmsOut.getJmsConnectionFactory() == null ?
- JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false,
- destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE);
- }
-
- /**
- * Return a String representation of the destination type
- * @param destType the destination type indicator int
- * @return a descriptive String
- */
- public static String getDestinationTypeAsString(int destType) {
- if (destType == JMSConstants.QUEUE) {
- return "Queue";
- } else if (destType == JMSConstants.TOPIC) {
- return "Topic";
- } else {
- return "Generic";
- }
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java
deleted file mode 100644
index 28c8da2a8d..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java
+++ /dev/null
@@ -1,1217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.SystemException;
-import javax.transaction.UserTransaction;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool;
-
-/**
- * Each service will have one ServiceTaskManager instance that will create, manage and also destroy
- * idle tasks created for it, for message receipt. This will also allow individual tasks to cache
- * the Connection, Session or Consumer as necessary, considering the transactionality required and
- * user preference.
- *
- * This also acts as the ExceptionListener for all JMS connections made on behalf of the service.
- * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try
- * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh
- * for the service, by discarding all connections.
- */
-public class ServiceTaskManager {
-
- /** The logger */
- private static final Log log = LogFactory.getLog(ServiceTaskManager.class);
-
- /** The Task manager is stopped or has not started */
- private static final int STATE_STOPPED = 0;
- /** The Task manager is started and active */
- private static final int STATE_STARTED = 1;
- /** The Task manager is paused temporarily */
- private static final int STATE_PAUSED = 2;
- /** The Task manager is started, but a shutdown has been requested */
- private static final int STATE_SHUTTING_DOWN = 3;
- /** The Task manager has encountered an error */
- private static final int STATE_FAILURE = 4;
-
- /** The name of the service managed by this instance */
- private String serviceName;
- /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */
- private String connFactoryJNDIName;
- /** The JNDI name of the Destination Queue or Topic */
- private String destinationJNDIName;
- /** JNDI location for the JTA UserTransaction */
- private String userTransactionJNDIName = "java:comp/UserTransaction";
- /** The type of destination - P2P or PubSub (or JMS 1.1 API generic?) */
- private int destinationType = JMSConstants.GENERIC;
- /** An optional message selector */
- private String messageSelector = null;
-
- /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */
- private int transactionality = BaseConstants.TRANSACTION_NONE;
- /** Should created Sessions be transactional ? - should be false when using JTA */
- private boolean sessionTransacted = true;
- /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */
- private int sessionAckMode = Session.AUTO_ACKNOWLEDGE;
-
- /** Is the subscription durable ? */
- private boolean subscriptionDurable = false;
- /** The name of the durable subscriber for this client */
- private String durableSubscriberName = null;
- /** In PubSub mode, should I receive messages sent by me / my connection ? */
- private boolean pubSubNoLocal = false;
- /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */
- private int concurrentConsumers = 1;
- /** Maximum number of consumers to create - see @concurrentConsumers */
- private int maxConcurrentConsumers = 1;
- /** The number of idle (i.e. message-less) attempts to be tried before suicide, to scale down */
- private int idleTaskExecutionLimit = 10;
- /** The maximum number of successful message receipts for a task - to limit thread life span */
- private int maxMessagesPerTask = -1; // default is unlimited
- /** The default receive timeout - a negative value means wait forever, zero dont wait at all */
- private int receiveTimeout = 1000;
- /** JMS Resource cache level - Connection, Session, Consumer. Auto will select safe default */
- private int cacheLevel = JMSConstants.CACHE_AUTO;
- /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */
- private boolean cacheUserTransaction = true;
- /** Shared UserTransactionHandle */
- private UserTransaction sharedUserTransaction = null;
- /** Should this service use JMS 1.1 ? (when false, defaults to 1.0.2b) */
- private boolean jmsSpec11 = true;
-
- /** Initial duration to attempt re-connection to JMS provider after failure */
- private int initialReconnectDuration = 10000;
- /** Progression factory for geometric series that calculates re-connection times */
- private double reconnectionProgressionFactor = 2.0; // default to [bounded] exponential
- /** Upper limit on reconnection attempt duration */
- private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour
-
- /** The JNDI context properties and other general properties */
- private Hashtable<String,String> jmsProperties = new Hashtable<String, String>();
- /** The JNDI Context acuired */
- private Context context = null;
- /** The ConnectionFactory to be used */
- private ConnectionFactory conFactory = null;
- /** The JMS Destination */
- private Destination destination = null;
-
- /** The list of active tasks thats managed by this instance */
- private final List<MessageListenerTask> pollingTasks =
- Collections.synchronizedList(new ArrayList<MessageListenerTask>());
- /** The per-service JMS message receiver to be invoked after receipt of messages */
- private JMSMessageReceiver jmsMessageReceiver = null;
-
- /** State of this Task Manager */
- private volatile int serviceTaskManagerState = STATE_STOPPED;
- /** Number of invoker tasks active */
- private volatile int activeTaskCount = 0;
- /** The shared thread pool from the Listener */
- private WorkerPool workerPool = null;
-
- /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */
- private Connection sharedConnection = null;
-
- /**
- * Start or re-start the Task Manager by shutting down any existing worker tasks and
- * re-creating them. However, if this is STM is PAUSED, a start request is ignored.
- * This applies for any connection failures during paused state as well, which then will
- * not try to auto recover
- */
- public synchronized void start() {
-
- if (serviceTaskManagerState == STATE_PAUSED) {
- log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead");
- return;
- }
-
- // if any tasks are running, stop whats running now
- if (!pollingTasks.isEmpty()) {
- stop();
- }
-
- if (cacheLevel == JMSConstants.CACHE_AUTO) {
- cacheLevel =
- transactionality == BaseConstants.TRANSACTION_NONE ?
- JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE;
- }
- switch (cacheLevel) {
- case JMSConstants.CACHE_NONE:
- log.debug("No JMS resources will be cached/shared between poller " +
- "worker tasks of service : " + serviceName);
- break;
- case JMSConstants.CACHE_CONNECTION:
- log.debug("Only the JMS Connection will be cached and shared between *all* " +
- "poller task invocations");
- break;
- case JMSConstants.CACHE_SESSION:
- log.debug("The JMS Connection and Session will be cached and shared between " +
- "successive poller task invocations");
- break;
- case JMSConstants.CACHE_CONSUMER:
- log.debug("The JMS Connection, Session and MessageConsumer will be cached and " +
- "shared between successive poller task invocations");
- break;
- default : {
- handleException("Invalid cache level : " + cacheLevel +
- " for service : " + serviceName);
- }
- }
-
- for (int i=0; i<concurrentConsumers; i++) {
- workerPool.execute(new MessageListenerTask());
- }
-
- serviceTaskManagerState = STATE_STARTED;
- log.info("Task manager for service : " + serviceName + " [re-]initialized");
- }
-
- /**
- * Shutdown the tasks and release any shared resources
- */
- public synchronized void stop() {
-
- if (log.isDebugEnabled()) {
- log.debug("Stopping ServiceTaskManager for service : " + serviceName);
- }
-
- if (serviceTaskManagerState != STATE_FAILURE) {
- serviceTaskManagerState = STATE_SHUTTING_DOWN;
- }
-
- synchronized(pollingTasks) {
- for (MessageListenerTask lstTask : pollingTasks) {
- lstTask.requestShutdown();
- }
- }
-
- // try to wait a bit for task shutdown
- for (int i=0; i<5; i++) {
- if (activeTaskCount == 0) {
- break;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {}
- }
-
- if (sharedConnection != null) {
- try {
- sharedConnection.stop();
- } catch (JMSException e) {
- logError("Error stopping shared Connection", e);
- } finally {
- sharedConnection = null;
- }
- }
-
- if (activeTaskCount > 0) {
- log.warn("Unable to shutdown all polling tasks of service : " + serviceName);
- }
-
- if (serviceTaskManagerState != STATE_FAILURE) {
- serviceTaskManagerState = STATE_STOPPED;
- }
- log.info("Task manager for service : " + serviceName + " shutdown");
- }
-
- /**
- * Temporarily suspend receipt and processing of messages. Accomplished by stopping the
- * connection / or connections used by the poller tasks
- */
- public synchronized void pause() {
- for (MessageListenerTask lstTask : pollingTasks) {
- lstTask.pause();
- }
- if (sharedConnection != null) {
- try {
- sharedConnection.stop();
- } catch (JMSException e) {
- logError("Error pausing shared Connection", e);
- }
- }
- }
-
- /**
- * Resume receipt and processing of messages of paused tasks
- */
- public synchronized void resume() {
- for (MessageListenerTask lstTask : pollingTasks) {
- lstTask.resume();
- }
- if (sharedConnection != null) {
- try {
- sharedConnection.start();
- } catch (JMSException e) {
- logError("Error resuming shared Connection", e);
- }
- }
- }
-
- /**
- * Start a new MessageListenerTask if we are still active, the threshold is not reached, and w
- * e do not have any idle tasks - i.e. scale up listening
- */
- private void scheduleNewTaskIfAppropriate() {
- if (serviceTaskManagerState == STATE_STARTED &&
- pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) {
- workerPool.execute(new MessageListenerTask());
- }
- }
-
- /**
- * Get the number of MessageListenerTasks that are currently idle
- * @return idle task count
- */
- private int getIdleTaskCount() {
- int count = 0;
- for (MessageListenerTask lstTask : pollingTasks) {
- if (lstTask.isTaskIdle()) {
- count++;
- }
- }
- return count;
- }
-
- /**
- * Get the number of MessageListenerTasks that are currently connected to the JMS provider
- * @return connected task count
- */
- private int getConnectedTaskCount() {
- int count = 0;
- for (MessageListenerTask lstTask : pollingTasks) {
- if (lstTask.isConnected()) {
- count++;
- }
- }
- return count;
- }
-
- /**
- * The actual threads/tasks that perform message polling
- */
- private class MessageListenerTask implements Runnable, ExceptionListener {
-
- /** The Connection used by the polling task */
- private Connection connection = null;
- /** The Sesson used by the polling task */
- private Session session = null;
- /** The MessageConsumer used by the polling task */
- private MessageConsumer consumer = null;
- /** State of the worker polling task */
- private volatile int workerState = STATE_STOPPED;
- /** The number of idle (i.e. without fetching a message) polls for this task */
- private int idleExecutionCount = 0;
- /** Is this task idle right now? */
- private volatile boolean idle = false;
- /** Is this task connected to the JMS provider successfully? */
- private boolean connected = false;
-
- /** As soon as we create a new polling task, add it to the STM for control later */
- MessageListenerTask() {
- synchronized(pollingTasks) {
- pollingTasks.add(this);
- }
- }
-
- /**
- * Pause this polling worker task
- */
- public void pause() {
- if (isActive()) {
- if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
- try {
- connection.stop();
- } catch (JMSException e) {
- log.warn("Error pausing Message Listener task for service : " + serviceName);
- }
- }
- workerState = STATE_PAUSED;
- }
- }
-
- /**
- * Resume this polling task
- */
- public void resume() {
- if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) {
- try {
- connection.start();
- } catch (JMSException e) {
- log.warn("Error resuming Message Listener task for service : " + serviceName);
- }
- }
- workerState = STATE_STARTED;
- }
-
- /**
- * Execute the polling worker task
- */
- public void run() {
- workerState = STATE_STARTED;
- activeTaskCount++;
- int messageCount = 0;
-
- if (log.isDebugEnabled()) {
- log.debug("New poll task starting : thread id = " + Thread.currentThread().getId());
- }
-
- try {
- while (isActive() &&
- (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) &&
- (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) {
-
- UserTransaction ut = null;
- try {
- if (transactionality == BaseConstants.TRANSACTION_JTA) {
- ut = getUserTransaction();
- ut.begin();
- }
- } catch (NotSupportedException e) {
- handleException("Listener Task is already associated with a transaction", e);
- } catch (SystemException e) {
- handleException("Error starting a JTA transaction", e);
- }
-
- // Get a message by polling, or receive null
- Message message = receiveMessage();
-
- if (log.isTraceEnabled()) {
- if (message != null) {
- try {
- log.trace("<<<<<<< READ message with Message ID : " +
- message.getJMSMessageID() + " from : " + destination +
- " by Thread ID : " + Thread.currentThread().getId());
- } catch (JMSException ignore) {}
- } else {
- log.trace("No message received by Thread ID : " +
- Thread.currentThread().getId() + " for destination : " + destination);
- }
- }
-
- if (message != null) {
- idle = false;
- idleExecutionCount = 0;
- messageCount++;
- // I will be busy now while processing this message, so start another if needed
- scheduleNewTaskIfAppropriate();
- handleMessage(message, ut);
-
- } else {
- idle = true;
- idleExecutionCount++;
- }
- }
-
- } finally {
- workerState = STATE_STOPPED;
- activeTaskCount--;
- synchronized(pollingTasks) {
- pollingTasks.remove(this);
- }
- }
-
- if (log.isTraceEnabled()) {
- log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() +
- " is stopping after processing : " + messageCount + " messages :: " +
- " isActive : " + isActive() + " maxMessagesPerTask : " +
- getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() +
- " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " +
- getIdleTaskExecutionLimit());
- } else if (log.isDebugEnabled()) {
- log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() +
- " is stopping after processing : " + messageCount + " messages");
- }
-
- closeConsumer(true);
- closeSession(true);
- closeConnection();
-
- // My time is up, so if I am going away, create another
- scheduleNewTaskIfAppropriate();
- }
-
- /**
- * Poll for and return a message if available
- *
- * @return a message read, or null
- */
- private Message receiveMessage() {
-
- // get a new connection, session and consumer to prevent a conflict.
- // If idle, it means we can re-use what we already have
- if (consumer == null) {
- connection = getConnection();
- session = getSession();
- consumer = getMessageConsumer();
- if (log.isDebugEnabled()) {
- log.debug("Preparing a Connection, Session and Consumer to read messages");
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Waiting for a message for service : " + serviceName + " - duration : "
- + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms")));
- }
-
- try {
- if (getReceiveTimeout() < 0) {
- return consumer.receive();
- } else {
- return consumer.receive(getReceiveTimeout());
- }
- } catch (IllegalStateException ignore) {
- // probably the consumer (shared) was closed.. which is still ok.. as we didn't read
- } catch (JMSException e) {
- logError("Error receiving message for service : " + serviceName, e);
- }
- return null;
- }
-
- /**
- * Invoke ultimate message handler/listener and ack message and/or
- * commit/rollback transactions
- * @param message the JMS message received
- * @param ut the UserTransaction used to receive this message, or null
- */
- private void handleMessage(Message message, UserTransaction ut) {
-
- String messageId = null;
- try {
- messageId = message.getJMSMessageID();
- } catch (JMSException ignore) {}
-
- boolean commitOrAck = true;
- try {
- commitOrAck = jmsMessageReceiver.onMessage(message, ut);
-
- } finally {
-
- // if client acknowledgement is selected, and processing requested ACK
- if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) {
- try {
- message.acknowledge();
- if (log.isDebugEnabled()) {
- log.debug("Message : " + messageId + " acknowledged");
- }
- } catch (JMSException e) {
- logError("Error acknowledging message : " + messageId, e);
- }
- }
-
- // close the consumer
- closeConsumer(false);
-
- // if session was transacted, commit it or rollback
- try {
- if (session.getTransacted()) {
- if (commitOrAck) {
- session.commit();
- if (log.isDebugEnabled()) {
- log.debug("Session for message : " + messageId + " committed");
- }
- } else {
- session.rollback();
- if (log.isDebugEnabled()) {
- log.debug("Session for message : " + messageId + " rolled back");
- }
- }
- }
- } catch (JMSException e) {
- logError("Error " + (commitOrAck ? "committing" : "rolling back") +
- " local session txn for message : " + messageId, e);
- }
-
- // if a JTA transaction was being used, commit it or rollback
- try {
- if (ut != null) {
- if (commitOrAck) {
- ut.commit();
- if (log.isDebugEnabled()) {
- log.debug("JTA txn for message : " + messageId + " committed");
- }
- } else {
- ut.rollback();
- if (log.isDebugEnabled()) {
- log.debug("JTA txn for message : " + messageId + " rolled back");
- }
- }
- }
- } catch (Exception e) {
- logError("Error " + (commitOrAck ? "committing" : "rolling back") +
- " JTA txn for message : " + messageId + " from the session", e);
- }
-
- closeSession(false);
- closeConnection();
- }
- }
-
- /** Handle JMS Connection exceptions by re-initializing. A single connection failure could
- * cause re-initialization of multiple MessageListenerTasks / Connections
- */
- public void onException(JMSException j) {
-
- if (!isSTMActive()) {
- requestShutdown();
- return;
- }
-
- log.warn("JMS Connection failure : " + j.getMessage());
- setConnected(false);
-
- if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
- // failed Connection was not shared, thus no need to restart the whole STM
- requestShutdown();
- return;
- }
-
- // if we failed while active, update state to show failure
- setServiceTaskManagerState(STATE_FAILURE);
- log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks");
-
- int r = 1;
- long retryDuration = initialReconnectDuration;
-
- do {
- try {
- log.info("Reconnection attempt : " + r + " for service : " + serviceName);
- start();
- } catch (Exception ignore) {}
-
- boolean connected = false;
- for (int i=0; i<5; i++) {
- if (getConnectedTaskCount() == concurrentConsumers) {
- connected = true;
- break;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {}
- }
-
- if (!connected) {
- log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName +
- " failed. Next retry in " + (retryDuration/1000) + "seconds");
- retryDuration = (long) (retryDuration * reconnectionProgressionFactor);
- if (retryDuration > maxReconnectDuration) {
- retryDuration = maxReconnectDuration;
- }
-
- try {
- Thread.sleep(retryDuration);
- } catch (InterruptedException ignore) {}
- }
-
- } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers);
- }
-
- protected void requestShutdown() {
- workerState = STATE_SHUTTING_DOWN;
- }
-
- private boolean isActive() {
- return workerState == STATE_STARTED;
- }
-
- protected boolean isTaskIdle() {
- return idle;
- }
-
- public boolean isConnected() {
- return connected;
- }
-
- public void setConnected(boolean connected) {
- this.connected = connected;
- }
-
- /**
- * Get a Connection that could/should be used by this task - depends on the cache level to reuse
- * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection
- */
- private Connection getConnection() {
- if (cacheLevel < JMSConstants.CACHE_CONNECTION) {
- // Connection is not shared
- if (connection == null) {
- connection = createConnection();
- }
- } else {
- if (sharedConnection != null) {
- connection = sharedConnection;
- } else {
- synchronized(this) {
- if (sharedConnection == null) {
- sharedConnection = createConnection();
- }
- connection = sharedConnection;
- }
- }
- }
- setConnected(true);
- return connection;
- }
-
- /**
- * Get a Session that could/should be used by this task - depends on the cache level to reuse
- * @param connection the connection (could be the shared connection) to use to create a Session
- * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session
- * created using the Connection passed, or a new/shared connection
- */
- private Session getSession() {
- if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) {
- session = createSession();
- }
- return session;
- }
-
- /**
- * Get a MessageConsumer that chould/should be used by this task - depends on the cache
- * level to reuse
- * @param connection option Connection to be used
- * @param session optional Session to be used
- * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new
- * MessageConsumer possibly using the Connection and Session passed in
- */
- private MessageConsumer getMessageConsumer() {
- if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) {
- consumer = createConsumer();
- }
- return consumer;
- }
-
- /**
- * Close the given Connection, hiding exceptions if any which are logged
- * @param connection the Connection to be closed
- */
- private void closeConnection() {
- if (connection != null &&
- cacheLevel < JMSConstants.CACHE_CONNECTION) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Closing non-shared JMS connection for service : " + serviceName);
- }
- connection.close();
- } catch (JMSException e) {
- logError("Error closing JMS connection", e);
- } finally {
- connection = null;
- }
- }
- }
-
- /**
- * Close the given Session, hiding exceptions if any which are logged
- * @param session the Session to be closed
- */
- private void closeSession(boolean forced) {
- if (session != null &&
- (cacheLevel < JMSConstants.CACHE_SESSION || forced)) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Closing non-shared JMS session for service : " + serviceName);
- }
- session.close();
- } catch (JMSException e) {
- logError("Error closing JMS session", e);
- } finally {
- session = null;
- }
- }
- }
-
- /**
- * Close the given Consumer, hiding exceptions if any which are logged
- * @param consumer the Consumer to be closed
- */
- private void closeConsumer(boolean forced) {
- if (consumer != null &&
- (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Closing non-shared JMS consumer for service : " + serviceName);
- }
- consumer.close();
- } catch (JMSException e) {
- logError("Error closing JMS consumer", e);
- } finally {
- consumer = null;
- }
- }
- }
-
- /**
- * Create a new Connection for this STM, using JNDI properties and credentials provided
- * @return a new Connection for this STM, using JNDI properties and credentials provided
- */
- private Connection createConnection() {
-
- try {
- conFactory = JMSUtils.lookup(
- getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName());
- log.debug("Connected to the JMS connection factory : " + getConnFactoryJNDIName());
- } catch (NamingException e) {
- handleException("Error looking up connection factory : " + getConnFactoryJNDIName() +
- " using JNDI properties : " + jmsProperties, e);
- }
-
- Connection connection = null;
- try {
- connection = JMSUtils.createConnection(
- conFactory,
- jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME),
- jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD),
- isJmsSpec11(), isQueue());
-
- connection.setExceptionListener(this);
- connection.start();
- log.debug("JMS Connection for service : " + serviceName + " created and started");
-
- } catch (JMSException e) {
- handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() +
- " using JNDI properties : " + jmsProperties, e);
- }
- return connection;
- }
-
- /**
- * Create a new Session for this STM
- * @param connection the Connection to be used
- * @return a new Session created using the Connection passed in
- */
- private Session createSession() {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Creating a new JMS Session for service : " + serviceName);
- }
- return JMSUtils.createSession(
- connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue());
-
- } catch (JMSException e) {
- handleException("Error creating JMS session for service : " + serviceName, e);
- }
- return null;
- }
-
- /**
- * Create a new MessageConsumer for this STM
- * @param session the Session to be used
- * @return a new MessageConsumer created using the Session passed in
- */
- private MessageConsumer createConsumer() {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Creating a new JMS MessageConsumer for service : " + serviceName);
- }
-
- return JMSUtils.createConsumer(
- session, getDestination(session), isQueue(),
- (isSubscriptionDurable() && getDurableSubscriberName() == null ?
- getDurableSubscriberName() : serviceName),
- getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11());
-
- } catch (JMSException e) {
- handleException("Error creating JMS consumer for service : " + serviceName,e);
- }
- return null;
- }
- }
-
- // -------------- mundane private methods ----------------
- /**
- * Get the InitialContext for lookup using the JNDI parameters applicable to the service
- * @return the InitialContext to be used
- * @throws NamingException
- */
- private Context getInitialContext() throws NamingException {
- if (context == null) {
- context = new InitialContext(jmsProperties);
- }
- return context;
- }
-
- /**
- * Return the JMS Destination for the JNDI name of the Destination from the InitialContext
- * @return the JMS Destination to which this STM listens for messages
- */
- private Destination getDestination(Session session) {
- if (destination == null) {
- try {
- context = getInitialContext();
- destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName());
- if (log.isDebugEnabled()) {
- log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() +
- " found for service " + serviceName);
- }
- } catch (NamingException e) {
- try {
- switch (destinationType) {
- case JMSConstants.QUEUE: {
- destination = session.createQueue(getDestinationJNDIName());
- break;
- }
- case JMSConstants.TOPIC: {
- destination = session.createTopic(getDestinationJNDIName());
- break;
- }
- default: {
- handleException("Error looking up JMS destination : " +
- getDestinationJNDIName() + " using JNDI properties : " +
- jmsProperties, e);
- }
- }
- } catch (JMSException j) {
- handleException("Error looking up and creating JMS destination : " +
- getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e);
- }
- }
- }
- return destination;
- }
-
- /**
- * The UserTransaction to be used, looked up from the JNDI
- * @return The UserTransaction to be used, looked up from the JNDI
- */
- private UserTransaction getUserTransaction() {
- if (!cacheUserTransaction) {
- if (log.isDebugEnabled()) {
- log.debug("Acquiring a new UserTransaction for service : " + serviceName);
- }
-
- try {
- context = getInitialContext();
- return
- JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName());
- } catch (NamingException e) {
- handleException("Error looking up UserTransaction : " + getDestinationJNDIName() +
- " using JNDI properties : " + jmsProperties, e);
- }
- }
-
- if (sharedUserTransaction == null) {
- try {
- context = getInitialContext();
- sharedUserTransaction =
- JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName());
- if (log.isDebugEnabled()) {
- log.debug("Acquired shared UserTransaction for service : " + serviceName);
- }
- } catch (NamingException e) {
- handleException("Error looking up UserTransaction : " + getDestinationJNDIName() +
- " using JNDI properties : " + jmsProperties, e);
- }
- }
- return sharedUserTransaction;
- }
-
- // -------------------- trivial methods ---------------------
- private boolean isSTMActive() {
- return serviceTaskManagerState == STATE_STARTED;
- }
-
- /**
- * Is this STM bound to a Queue, Topic or a JMS 1.1 Generic Destination?
- * @return TRUE for a Queue, FALSE for a Topic and NULL for a Generic Destination
- */
- private Boolean isQueue() {
- if (destinationType == JMSConstants.GENERIC) {
- return null;
- } else {
- return destinationType == JMSConstants.QUEUE;
- }
- }
-
- private void logError(String msg, Exception e) {
- log.error(msg, e);
- }
-
- private void handleException(String msg, Exception e) {
- log.error(msg, e);
- throw new AxisJMSException(msg, e);
- }
-
- private void handleException(String msg) {
- log.error(msg);
- throw new AxisJMSException(msg);
- }
-
- // -------------- getters and setters ------------------
- public String getServiceName() {
- return serviceName;
- }
-
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
- }
-
- public String getConnFactoryJNDIName() {
- return connFactoryJNDIName;
- }
-
- public void setConnFactoryJNDIName(String connFactoryJNDIName) {
- this.connFactoryJNDIName = connFactoryJNDIName;
- }
-
- public String getDestinationJNDIName() {
- return destinationJNDIName;
- }
-
- public void setDestinationJNDIName(String destinationJNDIName) {
- this.destinationJNDIName = destinationJNDIName;
- }
-
- public int getDestinationType() {
- return destinationType;
- }
-
- public void setDestinationType(int destinationType) {
- this.destinationType = destinationType;
- }
-
- public String getMessageSelector() {
- return messageSelector;
- }
-
- public void setMessageSelector(String messageSelector) {
- this.messageSelector = messageSelector;
- }
-
- public int getTransactionality() {
- return transactionality;
- }
-
- public void setTransactionality(int transactionality) {
- this.transactionality = transactionality;
- sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL);
- }
-
- public boolean isSessionTransacted() {
- return sessionTransacted;
- }
-
- public void setSessionTransacted(Boolean sessionTransacted) {
- if (sessionTransacted != null) {
- this.sessionTransacted = sessionTransacted;
- // sesstionTransacted means local transactions are used, however !sessionTransacted does
- // not mean that JTA is used
- if (sessionTransacted) {
- transactionality = BaseConstants.TRANSACTION_LOCAL;
- }
- }
- }
-
- public int getSessionAckMode() {
- return sessionAckMode;
- }
-
- public void setSessionAckMode(int sessionAckMode) {
- this.sessionAckMode = sessionAckMode;
- }
-
- public boolean isSubscriptionDurable() {
- return subscriptionDurable;
- }
-
- public void setSubscriptionDurable(Boolean subscriptionDurable) {
- if (subscriptionDurable != null) {
- this.subscriptionDurable = subscriptionDurable;
- }
- }
-
- public String getDurableSubscriberName() {
- return durableSubscriberName;
- }
-
- public void setDurableSubscriberName(String durableSubscriberName) {
- this.durableSubscriberName = durableSubscriberName;
- }
-
- public boolean isPubSubNoLocal() {
- return pubSubNoLocal;
- }
-
- public void setPubSubNoLocal(Boolean pubSubNoLocal) {
- if (pubSubNoLocal != null) {
- this.pubSubNoLocal = pubSubNoLocal;
- }
- }
-
- public int getConcurrentConsumers() {
- return concurrentConsumers;
- }
-
- public void setConcurrentConsumers(int concurrentConsumers) {
- this.concurrentConsumers = concurrentConsumers;
- }
-
- public int getMaxConcurrentConsumers() {
- return maxConcurrentConsumers;
- }
-
- public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
- this.maxConcurrentConsumers = maxConcurrentConsumers;
- }
-
- public int getIdleTaskExecutionLimit() {
- return idleTaskExecutionLimit;
- }
-
- public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
- this.idleTaskExecutionLimit = idleTaskExecutionLimit;
- }
-
- public int getReceiveTimeout() {
- return receiveTimeout;
- }
-
- public void setReceiveTimeout(int receiveTimeout) {
- this.receiveTimeout = receiveTimeout;
- }
-
- public int getCacheLevel() {
- return cacheLevel;
- }
-
- public void setCacheLevel(int cacheLevel) {
- this.cacheLevel = cacheLevel;
- }
-
- public int getInitialReconnectDuration() {
- return initialReconnectDuration;
- }
-
- public void setInitialReconnectDuration(int initialReconnectDuration) {
- this.initialReconnectDuration = initialReconnectDuration;
- }
-
- public double getReconnectionProgressionFactor() {
- return reconnectionProgressionFactor;
- }
-
- public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) {
- this.reconnectionProgressionFactor = reconnectionProgressionFactor;
- }
-
- public long getMaxReconnectDuration() {
- return maxReconnectDuration;
- }
-
- public void setMaxReconnectDuration(long maxReconnectDuration) {
- this.maxReconnectDuration = maxReconnectDuration;
- }
-
- public int getMaxMessagesPerTask() {
- return maxMessagesPerTask;
- }
-
- public void setMaxMessagesPerTask(int maxMessagesPerTask) {
- this.maxMessagesPerTask = maxMessagesPerTask;
- }
-
- public String getUserTransactionJNDIName() {
- return userTransactionJNDIName;
- }
-
- public void setUserTransactionJNDIName(String userTransactionJNDIName) {
- if (userTransactionJNDIName != null) {
- this.userTransactionJNDIName = userTransactionJNDIName;
- }
- }
-
- public boolean isCacheUserTransaction() {
- return cacheUserTransaction;
- }
-
- public void setCacheUserTransaction(Boolean cacheUserTransaction) {
- if (cacheUserTransaction != null) {
- this.cacheUserTransaction = cacheUserTransaction;
- }
- }
-
- public boolean isJmsSpec11() {
- return jmsSpec11;
- }
-
- public void setJmsSpec11(boolean jmsSpec11) {
- this.jmsSpec11 = jmsSpec11;
- }
-
- public Hashtable<String, String> getJmsProperties() {
- return jmsProperties;
- }
-
- public void addJmsProperties(Map<String, String> jmsProperties) {
- this.jmsProperties.putAll(jmsProperties);
- }
-
- public void removeJmsProperties(String key) {
- this.jmsProperties.remove(key);
- }
-
- public Context getContext() {
- return context;
- }
-
- public ConnectionFactory getConnectionFactory() {
- return conFactory;
- }
-
- public List<MessageListenerTask> getPollingTasks() {
- return pollingTasks;
- }
-
- public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) {
- this.jmsMessageReceiver = jmsMessageReceiver;
- }
-
- public void setWorkerPool(WorkerPool workerPool) {
- this.workerPool = workerPool;
- }
-
- public int getActiveTaskCount() {
- return activeTaskCount;
- }
-
- public void setServiceTaskManagerState(int serviceTaskManagerState) {
- this.serviceTaskManagerState = serviceTaskManagerState;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java
deleted file mode 100644
index 2b415df64d..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
-
-/**
- * Class encapsulating the content type information for a given message.
- */
-public class ContentTypeInfo {
- private final String propertyName;
- private final String contentType;
-
- public ContentTypeInfo(String propertyName, String contentType) {
- this.propertyName = propertyName;
- this.contentType = contentType;
- }
-
- /**
- * Get the name of the message property from which the content type
- * has been extracted.
- *
- * @return the property name or null if the content type was not determined
- * by extracting it from a message property
- */
- public String getPropertyName() {
- return propertyName;
- }
-
- /**
- * Get the content type of the message.
- *
- * @return The content type of the message. The return value is never null.
- */
- public String getContentType() {
- return contentType;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java
deleted file mode 100644
index 0dba93356f..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-/**
- * Interface implemented by content type rules.
- */
-public interface ContentTypeRule {
- /**
- * Attempt to determine the content type of the given JMS message.
- *
- * @param message the message
- * @return If the rule matches, the return value encapsulates the content type of the
- * message and the message property name from which is was extracted
- * (if applicable). If the rule doesn't match, the method returns null.
- * @throws JMSException
- */
- ContentTypeInfo getContentType(Message message) throws JMSException;
-
- /**
- * Get the name of the message property used to extract the content type from,
- * if applicable.
- *
- * @return the property name or null if not applicable
- */
- String getExpectedContentTypeProperty();
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java
deleted file mode 100644
index a9fd25ef1b..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
-
-import java.util.Iterator;
-
-import javax.jms.BytesMessage;
-import javax.jms.TextMessage;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.description.Parameter;
-
-/**
- * Utility class to create content type rules and rule sets from XML.
- */
-public class ContentTypeRuleFactory {
- private ContentTypeRuleFactory() {}
-
- public static ContentTypeRule parse(OMElement element) throws AxisFault {
- String name = element.getLocalName();
- String value = element.getText();
- if (name.equals("jmsProperty")) {
- return new PropertyRule(value);
- } else if (name.equals("textMessage")) {
- return new MessageTypeRule(TextMessage.class, value);
- } else if (name.equals("bytesMessage")) {
- return new MessageTypeRule(BytesMessage.class, value);
- } else if (name.equals("default")) {
- return new DefaultRule(value);
- } else {
- throw new AxisFault("Unknown content rule type '" + name + "'");
- }
- }
-
- public static ContentTypeRuleSet parse(Parameter param) throws AxisFault {
- ContentTypeRuleSet ruleSet = new ContentTypeRuleSet();
- Object value = param.getValue();
- if (value instanceof OMElement) {
- OMElement element = (OMElement)value;
-
- // DescriptionBuilder#processParameters actually sets the parameter element
- // itself as the value. We need to support this case.
- // TODO: seems like a bug in Axis2 and is inconsistent with Synapse's way of parsing parameter in proxy definitions
- if (element == param.getParameterElement()) {
- element = element.getFirstElement();
- }
-
- if (element.getLocalName().equals("rules")) {
- for (Iterator it = element.getChildElements(); it.hasNext(); ) {
- ruleSet.addRule(parse((OMElement)it.next()));
- }
- } else {
- throw new AxisFault("Expected <rules> element");
- }
- } else {
- ruleSet.addRule(new DefaultRule((String)value));
- }
- return ruleSet;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java
deleted file mode 100644
index 90383a42f8..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-/**
- * A set of content type rules.
- */
-public class ContentTypeRuleSet {
- private final List<ContentTypeRule> rules = new ArrayList<ContentTypeRule>();
- private String defaultContentTypeProperty;
-
- /**
- * Add a content type rule to this set.
- *
- * @param rule the rule to add
- */
- public void addRule(ContentTypeRule rule) {
- rules.add(rule);
- if (defaultContentTypeProperty == null) {
- defaultContentTypeProperty = rule.getExpectedContentTypeProperty();
- }
- }
-
- /**
- * Determine the content type of the given message.
- * This method will try the registered rules in turn until the first rule matches.
- *
- * @param message the message
- * @return the content type information for the message or null if none of the rules matches
- * @throws JMSException
- */
- public ContentTypeInfo getContentTypeInfo(Message message) throws JMSException {
- for (ContentTypeRule rule : rules) {
- ContentTypeInfo contentTypeInfo = rule.getContentType(message);
- if (contentTypeInfo != null) {
- return contentTypeInfo;
- }
- }
- return null;
- }
-
- public String getDefaultContentTypeProperty() {
- return defaultContentTypeProperty;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java
deleted file mode 100644
index a158f6ec74..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
-
-import javax.jms.Message;
-
-/**
- * Content type rule that always matches and that returns a fixed (default) content type.
- */
-public class DefaultRule implements ContentTypeRule {
- private final String contentType;
-
- public DefaultRule(String contentType) {
- this.contentType = contentType;
- }
-
- public ContentTypeInfo getContentType(Message message) {
- return new ContentTypeInfo(null, contentType);
- }
-
- public String getExpectedContentTypeProperty() {
- return null;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java
deleted file mode 100644
index cb25ab93d4..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
-
-import javax.jms.Message;
-
-/**
- * Content type rule that matches a given message type and returns a fixed content type.
- */
-public class MessageTypeRule implements ContentTypeRule {
- private final Class<? extends Message> messageType;
- private final String contentType;
-
- public MessageTypeRule(Class<? extends Message> messageType, String contentType) {
- this.messageType = messageType;
- this.contentType = contentType;
- }
-
- public ContentTypeInfo getContentType(Message message) {
- return messageType.isInstance(message) ? new ContentTypeInfo(null, contentType) : null;
- }
-
- public String getExpectedContentTypeProperty() {
- return null;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java
deleted file mode 100644
index c8d13ba462..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-/**
- * Content type rule that attempts to extract the content type from a message property.
- */
-public class PropertyRule implements ContentTypeRule {
- private final String propertyName;
-
- public PropertyRule(String propertyName) {
- this.propertyName = propertyName;
- }
-
- public ContentTypeInfo getContentType(Message message) throws JMSException {
- String value = message.getStringProperty(propertyName);
- return value == null ? null : new ContentTypeInfo(propertyName, value);
- }
-
- public String getExpectedContentTypeProperty() {
- return propertyName;
- }
-}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java
deleted file mode 100644
index 750170edd7..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-/**
- * Provides classes and interfaces to define content type rules.
- *
- * Content type rules are used to determine the content type of a
- * received message based on JMS properties, message type, etc.
- */
-package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; \ No newline at end of file
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html
deleted file mode 100644
index b95b49eb69..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html
+++ /dev/null
@@ -1,356 +0,0 @@
-<html>
-<title>JMS Transport Configuration</title>
-<body>
-
-<h2>JMS Listener Configuration (axis2.xml)</h2>
-
-e.g:
-
-<pre>
- &lt;transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"&gt;
- &lt;parameter name="myTopicConnectionFactory"&gt;
- &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
- &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
- &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;TopicConnectionFactory&lt;/parameter&gt;
- &lt;parameter name="transport.jms.ConnectionFactoryType"&gt;topic&lt;/parameter&gt;
- &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
- &lt;/parameter&gt;
-
- &lt;parameter name="myQueueConnectionFactory"&gt;
- &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
- &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
- &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;QueueConnectionFactory&lt;/parameter&gt;
- &lt;parameter name="transport.jms.ConnectionFactoryType"&gt;queue&lt;/parameter&gt;
- &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.1&lt;/parameter&gt;
- &lt;/parameter&gt;
-
- &lt;parameter name="default"&gt;
- &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
- &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
- &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;ConnectionFactory&lt;/parameter&gt;
- &lt;/parameter&gt;
- &lt;/transportReceiver&gt;
-
- &lt;transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"&gt;
- &lt;parameter name="myTopicConnectionFactory"&gt;
- &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
- &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
- &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;TopicConnectionFactory&lt;/parameter&gt;
- &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
- &lt;parameter name="transport.jms.CacheLevel"&gt;producer&lt;/parameter&gt;
- &lt;/parameter&gt;
-
- &lt;parameter name="myQueueConnectionFactory"&gt;
- &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
- &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
- &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;QueueConnectionFactory&lt;/parameter&gt;
- &lt;parameter name="transport.jms.JMSSpecVersion"&gt;1.0.2b&lt;/parameter&gt;
- &lt;parameter name="transport.jms.CacheLevel"&gt;producer&lt;/parameter&gt;
- &lt;/parameter&gt;
-
- &lt;parameter name="default"&gt;
- &lt;parameter name="java.naming.factory.initial"&gt;org.apache.activemq.jndi.ActiveMQInitialContextFactory&lt;/parameter&gt;
- &lt;parameter name="java.naming.provider.url"&gt;tcp://localhost:61616&lt;/parameter&gt;
- &lt;parameter name="transport.jms.ConnectionFactoryJNDIName"&gt;ConnectionFactory&lt;/parameter&gt;
- &lt;parameter name="transport.jms.CacheLevel"&gt;connection&lt;/parameter&gt;
- &lt;/parameter&gt;
- &lt;/transportSender&gt;
-</pre>
-
-<p>
- The Transport Listener and Sender both allows the user to configure one or more logical JMS Connection
- Factories, which are named definitions as shown above. Any remaining parameters maybe defined at the
- service level via the services.xml. The applicable set of parameters for a service would be the
- union of the properties from the services.xml and the corresponding logical connection factory.
-</p>
-
-<TABLE WIDTH="100%" BORDER=1 BORDERCOLOR="#000000" CELLPADDING=4 CELLSPACING=0>
- <COL WIDTH="10%">
- <COL WIDTH="20%">
- <COL WIDTH="60%">
- <COL WIDTH="5%">
- <COL WIDTH="5%">
- <tr>
- <td>Transport level</td>
- <td><BR></td>
- <td><BR></td>
- <td>Listening</td>
- <td>Sending</td>
- </tr>
- <tr>
- <td>JNDI</td>
- <td>java.naming.factory.initial</td>
- <td>The JNDI InitialContext factory class</td>
- <td>Required</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>java.naming.provider.url</td>
- <td>JNDI Provider URL</td>
- <td>Required</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>java.naming.security.principal</td>
- <td>Username for JNDI access</td>
- <td><BR></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>java.naming.security.credentials</td>
- <td>Password for JNDI access</td>
- <td><BR></td>
- <td><BR></td>
- </tr>
- <tr>
- <td>Transactions</td>
- <td>transport.Transactionality</td>
- <td>Desired transactionality. One of none / local / jta</td>
- <td>Defaults to <B>none</B></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.UserTxnJNDIName</td>
- <td>JNDI name to be used to obtain a UserTransaction</td>
- <td>Defaults to &quot;java:comp/UserTransaction&quot;</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.CacheUserTxn</td>
- <td>Generally its safe and more efficient to cache the
- UserTransaction reference from JNDI. One of true/ false</td>
- <td>Defaults to <B>true</B></td>
- <td><BR></td>
- </tr>
-
- <tr>
- <td><BR></td>
- <td>transport.jms.SessionTransacted</td>
- <td>Should the JMS Session be transacted. One of true/ false</td>
- <td>Defaults to <B>true</B> when local transactions are used</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.SessionAcknowledgement</td>
- <td>JMS Session acknowledgement mode to be used. One of AUTO_ACKNOWLEDGE | CLIENT_ACKNOWLEDGE | DUPS_OK_ACKNOWLEDGE | SESSION_TRANSACTED</td>
- <td>Defaults to <B>AUTO_ACKNOWLEDGE</B></td>
- <td><BR></td>
- </tr>
-
- <tr>
- <td>Connection</td>
- <td>transport.jms.ConnectionFactory</td>
- <td>Name of the logical connection factory this service will use</td>
- <td>Defaults to &quot;default&quot;</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.ConnectionFactoryJNDIName</td>
- <td>The JNDI name of the JMS ConnectionFactory</td>
- <td>Required</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.ConnectionFactoryType</td>
- <td> Type of ConnectionFactory &ndash; queue / topic</td>
- <td>Suggested to be specified</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.JMSSpecVersion</td>
- <td>JMS API Version One of &quot;1.1&quot; or &quot;1.0.2b&quot;</td>
- <td>Defaults to 1.1</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.UserName</td>
- <td>The JMS connection username</td>
- <td><BR></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.Password</td>
- <td>The JMS connection password</td>
- <td><BR></td>
- <td><BR></td>
- </tr>
- <tr>
- <td>Destinations</td>
- <td>transport.jms.Destination</td>
- <td>JNDI Name of the Destination </td>
- <td>Defaults to Service name</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.DestinationType</td>
- <td>Type of Destination &ndash; queue / topic</td>
- <td>Defaults to a queue</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.DefaultReplyDestination</td>
- <td>JNDI Name of the default reply Destination</td>
- <td><BR></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.DefaultReplyDestinationType</td>
- <td>Type of the reply Destination &ndash; queue / topic</td>
- <td>Same type as of Destination</td>
- <td><BR></td>
- </tr>
- <tr>
- <td>Advanced</td>
- <td>transport.jms.MessageSelector</td>
- <td>Optional message selector to be applied</td>
- <td><BR></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.SubscriptionDurable</td>
- <td>Is the subscription durable? (For Topics) &ndash; true / false</td>
- <td>Defaults to <B>false</B></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.DurableSubscriberName</td>
- <td>Name to be used for the durable subscription</td>
- <td>Required when subscription is durable</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.PubSubNoLocal</td>
- <td>Should messages published by the same connection (for Topics)
- be received? &ndash; true / false</td>
- <td>Defaults to <B>false</B></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.CacheLevel</td>
- <td>The JMS resource cache level. One of none / connection /
- session / consumer / producer / auto</td>
- <td>Defaults to <B>auto</B> </td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.ReceiveTimeout</td>
- <td>Time to wait for a JMS message during polling. Negative means
- wait forever, while 0 means do not wait at all. Anything else, is
- a millisecond value for the poll</td>
- <td>Defaults to 1000ms</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.ConcurrentConsumers</td>
- <td>Number of concurrent consumer tasks (~threads) to be started to
- poll for messages for this service. For Topics, this should be
- always 1, to prevent the same message being processed multiple
- times</td>
- <td>Defaults to <B>1</B></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.MaxConcurrentConsumers</td>
- <td>Will dynamically scale the number of concurrent consumer tasks
- (~threads) until this value; as the load increases. Should always
- be 1 for Topics.</td>
- <td>Defaults to <B>1</B></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.IdleTaskLimit</td>
- <td>The number of idle (i.e. poll without receipt of a message)
- runs per task, before it dies &ndash; to recycle resources, and to
- allow dynamic scale down.</td>
- <td>Defaults to 10</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.MaxMessagesPerTask</td>
- <td>The maximum number of successful message receipts to limit per
- Task lifetime. </td>
- <td>Defaults to <B>&ndash;1</B> which implies unlimited messages</td>
- <td><BR></td>
- </tr>
- <tr>
- <td>Reconnection</td>
- <td>transport.jms.InitialReconnectDuration</td>
- <td>Initial reconnection attempt duration</td>
- <td>Defaults to 10,000ms</td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.ReconnectProgressFactor</td>
- <td>Factor used to compute consecutive reconnection attempt
- durations, in a geometric series</td>
- <td>Defaults to <B>2 (i.e. exponential)</B></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.MaxReconnectDuration</td>
- <td>Maximum limit for a reconnection duration</td>
- <td>Defaults to <B>1 hour</B></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>transport.jms.PublishEPR</td>
- <td>One or more JMS URL's to be showed as the JMS EPRs on the WSDL
- for the service. Allows the specification of a custom EPR, and/or
- hiding of internal properties from a public EPR (e.g.
- credentials). Add one as LEGACY to retain auto generated EPR, when
- adding new EPRs</td>
- <td><BR></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td><BR></td>
- <td><BR></td>
- <td><BR></td>
- <td><BR></td>
- </tr>
- <tr>
- <td>Legacy Mode and Payload handling</td>
- <td>Wrapper</td>
- <td>Binary and Text payload wrapper element to be specified as &quot;{ns}name&quot; where ns refers to a namespace and name the name of the element</td>
- <td>Default binary wrapper<ul><li>{http://ws.apache.org/commons/ns/payload}binary</li></ul>
- Default text wrapper <ul><li>{http://ws.apache.org/commons/ns/payload}text</li></ul></td>
- <td><BR></td>
- </tr>
- <tr>
- <td><BR></td>
- <td>Operation</td>
- <td>operation name to be specified as &quot;{ns}name&quot; where ns refers to the namespace and name the name of the operation</td>
- <td>Defaults to urn:mediate</td>
- <td><BR></td>
- </tr>
-</TABLE>
-
-</body>
-</html> \ No newline at end of file