From 9145d1479e838918317bc9d4c5e25fe537e5f6de Mon Sep 17 00:00:00 2001 From: antelder Date: Wed, 13 May 2009 12:39:53 +0000 Subject: Abandon trying to use the new Axis2 JMS transport for now as its proving too messy tryingto backport it to the 1.4.1 release. Now trying a new approach which modifies the JMS transport from Axis2 1.4.1 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@774293 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/ws/axis2/jms/AxisJMSException.java | 31 - .../ws/axis2/jms/BytesMessageDataSource.java | 72 -- .../ws/axis2/jms/BytesMessageInputStream.java | 75 -- .../ws/axis2/jms/BytesMessageOutputStream.java | 56 - .../binding/ws/axis2/jms/JMSConnectionFactory.java | 393 ------- .../ws/axis2/jms/JMSConnectionFactoryManager.java | 122 -- .../sca/binding/ws/axis2/jms/JMSConstants.java | 273 ----- .../sca/binding/ws/axis2/jms/JMSEndpoint.java | 111 -- .../binding/ws/axis2/jms/JMSExceptionWrapper.java | 28 - .../sca/binding/ws/axis2/jms/JMSListener.java | 294 ----- .../binding/ws/axis2/jms/JMSMessageReceiver.java | 237 ---- .../sca/binding/ws/axis2/jms/JMSMessageSender.java | 332 ------ .../binding/ws/axis2/jms/JMSOutTransportInfo.java | 306 ----- .../sca/binding/ws/axis2/jms/JMSSender.java | 499 -------- .../tuscany/sca/binding/ws/axis2/jms/JMSUtils.java | 1115 ------------------ .../binding/ws/axis2/jms/ServiceTaskManager.java | 1217 -------------------- .../ws/axis2/jms/ctype/ContentTypeInfo.java | 49 - .../ws/axis2/jms/ctype/ContentTypeRule.java | 43 - .../ws/axis2/jms/ctype/ContentTypeRuleFactory.java | 74 -- .../ws/axis2/jms/ctype/ContentTypeRuleSet.java | 64 - .../binding/ws/axis2/jms/ctype/DefaultRule.java | 37 - .../ws/axis2/jms/ctype/MessageTypeRule.java | 39 - .../binding/ws/axis2/jms/ctype/PropertyRule.java | 39 - .../binding/ws/axis2/jms/ctype/package-info.java | 23 - .../tuscany/sca/binding/ws/axis2/jms/package.html | 356 ------ 25 files changed, 5885 deletions(-) delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms') diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java deleted file mode 100644 index ec53a2a1ca..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -public class AxisJMSException extends RuntimeException { - - AxisJMSException() { - super(); - } - - AxisJMSException(String msg) { - super(msg); - } - - AxisJMSException(String msg, Exception e) { - super(msg, e); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java deleted file mode 100644 index 5228efa154..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java +++ /dev/null @@ -1,72 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; - -/** - * Data source implementation wrapping a JMS {@link BytesMessage}. - *

- * Note that two input streams created by the same instance of this - * class can not be used at the same time. - */ -public class BytesMessageDataSource implements SizeAwareDataSource { - private final BytesMessage message; - private final String contentType; - - public BytesMessageDataSource(BytesMessage message, String contentType) { - this.message = message; - this.contentType = contentType; - } - - public BytesMessageDataSource(BytesMessage message) { - this(message, "application/octet-stream"); - } - - public long getSize() { - try { - return message.getBodyLength(); - } catch (JMSException ex) { - throw new RuntimeException(ex); - } - } - - public String getContentType() { - return contentType; - } - - public InputStream getInputStream() throws IOException { - try { - message.reset(); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - return new BytesMessageInputStream(message); - } - - public String getName() { - return null; - } - - public OutputStream getOutputStream() throws IOException { - throw new UnsupportedOperationException(); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java deleted file mode 100644 index 9080641572..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java +++ /dev/null @@ -1,75 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.InputStream; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; - -/** - * Input stream that reads data from a JMS {@link BytesMessage}. - * Note that since the current position in the message is managed by - * the underlying {@link BytesMessage} object, it is not possible to - * use several instances of this class operating on a single - * {@link BytesMessage} at the same time. - */ -public class BytesMessageInputStream extends InputStream { - private final BytesMessage message; - - public BytesMessageInputStream(BytesMessage message) { - this.message = message; - } - - @Override - public int read() throws JMSExceptionWrapper { - try { - return message.readByte() & 0xFF; - } catch (MessageEOFException ex) { - return -1; - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } - - @Override - public int read(byte[] b, int off, int len) throws JMSExceptionWrapper { - if (off == 0) { - try { - return message.readBytes(b, len); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } else { - byte[] b2 = new byte[len]; - int c = read(b2); - if (c > 0) { - System.arraycopy(b2, 0, b, off, c); - } - return c; - } - } - - @Override - public int read(byte[] b) throws JMSExceptionWrapper { - try { - return message.readBytes(b); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java deleted file mode 100644 index 4508d68280..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java +++ /dev/null @@ -1,56 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.OutputStream; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; - -public class BytesMessageOutputStream extends OutputStream { - private final BytesMessage message; - - public BytesMessageOutputStream(BytesMessage message) { - this.message = message; - } - - @Override - public void write(int b) throws JMSExceptionWrapper { - try { - message.writeByte((byte)b); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } - - @Override - public void write(byte[] b, int off, int len) throws JMSExceptionWrapper { - try { - message.writeBytes(b, off, len); - } catch (JMSException ex) { - new JMSExceptionWrapper(ex); - } - } - - @Override - public void write(byte[] b) throws JMSExceptionWrapper { - try { - message.writeBytes(b); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java deleted file mode 100644 index d5d164ce76..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java +++ /dev/null @@ -1,393 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.Hashtable; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.ParameterIncludeImpl; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Encapsulate a JMS Connection factory definition within an Axis2.xml - * - * JMS Connection Factory definitions, allows JNDI properties as well as other service - * level parameters to be defined, and re-used by each service that binds to it - * - * When used for sending messages out, the JMSConnectionFactory'ies are able to cache - * a Connection, Session or Producer - */ -public class JMSConnectionFactory { - - private static final Log log = LogFactory.getLog(JMSConnectionFactory.class); - - /** The name used for the connection factory definition within Axis2 */ - private String name = null; - /** The list of parameters from the axis2.xml definition */ - private Hashtable parameters = new Hashtable(); - - /** The cached InitialContext reference */ - private Context context = null; - /** The JMS ConnectionFactory this definition refers to */ - private ConnectionFactory conFactory = null; - /** The shared JMS Connection for this JMS connection factory */ - private Connection sharedConnection = null; - /** The shared JMS Session for this JMS connection factory */ - private Session sharedSession = null; - /** The shared JMS MessageProducer for this JMS connection factory */ - private MessageProducer sharedProducer = null; - /** The Shared Destination */ - private Destination sharedDestination = null; - /** The shared JMS connection for this JMS connection factory */ - private int cacheLevel = JMSConstants.CACHE_CONNECTION; - - /** - * Digest a JMS CF definition from an axis2.xml 'Parameter' and construct - * @param parameter the axis2.xml 'Parameter' that defined the JMS CF - */ - public JMSConnectionFactory(Parameter parameter) { - - this.name = parameter.getName(); - ParameterIncludeImpl pi = new ParameterIncludeImpl(); - - try { - pi.deserializeParameters((OMElement) parameter.getValue()); - } catch (AxisFault axisFault) { - handleException("Error reading parameters for JMS connection factory" + name, axisFault); - } - - for (Object o : pi.getParameters()) { - Parameter p = (Parameter) o; - parameters.put(p.getName(), (String) p.getValue()); - } - - digestCacheLevel(); - try { - context = new InitialContext(parameters); - conFactory = JMSUtils.lookup(context, ConnectionFactory.class, - parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)); - if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) { - sharedDestination = JMSUtils.lookup(context, Destination.class, - parameters.get(JMSConstants.PARAM_DESTINATION)); - } - log.info("JMS ConnectionFactory : " + name + " initialized"); - - } catch (NamingException e) { - throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " + - parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " + - parameters.get(JMSConstants.PARAM_DESTINATION) + - " for JMS CF : " + name + " using : " + parameters); - } - } - - /** - * Digest, the cache value iff specified - */ - private void digestCacheLevel() { - - String key = JMSConstants.PARAM_CACHE_LEVEL; - String val = parameters.get(key); - - if ("none".equalsIgnoreCase(val)) { - this.cacheLevel = JMSConstants.CACHE_NONE; - } else if ("connection".equalsIgnoreCase(val)) { - this.cacheLevel = JMSConstants.CACHE_CONNECTION; - } else if ("session".equals(val)){ - this.cacheLevel = JMSConstants.CACHE_SESSION; - } else if ("producer".equals(val)) { - this.cacheLevel = JMSConstants.CACHE_PRODUCER; - } else if (val != null) { - throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name); - } - } - - /** - * Return the name assigned to this JMS CF definition - * @return name of the JMS CF - */ - public String getName() { - return name; - } - - /** - * The list of properties (including JNDI and non-JNDI) - * @return properties defined on the JMS CF - */ - public Hashtable getParameters() { - return parameters; - } - - /** - * Get cached InitialContext - * @return cache InitialContext - */ - public Context getContext() { - return context; - } - - /** - * Cache level applicable for this JMS CF - * @return applicable cache level - */ - public int getCacheLevel() { - return cacheLevel; - } - - /** - * Get the shared Destination - if defined - * @return - */ - public Destination getSharedDestination() { - return sharedDestination; - } - - /** - * Lookup a Destination using this JMS CF definitions and JNDI name - * @param name JNDI name of the Destionation - * @return JMS Destination for the given JNDI name or null - */ - public Destination getDestination(String name) { - try { - return JMSUtils.lookup(context, Destination.class, name); - } catch (NamingException e) { - handleException("Unknown JMS Destination : " + name + " using : " + parameters, e); - } - return null; - } - - /** - * Get the reply Destination from the PARAM_REPLY_DESTINATION parameter - * @return reply destination defined in the JMS CF - */ - public String getReplyToDestination() { - return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION); - } - - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - /** - * Should the JMS 1.1 API be used? - defaults to yes - * @return true, if JMS 1.1 api should be used - */ - public boolean isJmsSpec11() { - return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null || - "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER)); - } - - /** - * Return the type of the JMS CF Destination - * @return TRUE if a Queue, FALSE for a Topic and NULL for a JMS 1.1 Generic Destination - */ - public Boolean isQueue() { - if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null && - parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) { - return null; - } - - if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) { - if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { - return true; - } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { - return false; - } else { - throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " + - parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name); - } - } else { - if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { - return true; - } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { - return false; - } else { - throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " + - parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name); - } - } - } - - /** - * Is a session transaction requested from users of this JMS CF? - * @return session transaction required by the clients of this? - */ - private boolean isSessionTransacted() { - return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null || - Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED)); - } - - /** - * Create a new Connection - * @return a new Connection - */ - private Connection createConnection() { - - Connection connection = null; - try { - connection = JMSUtils.createConnection( - conFactory, - parameters.get(JMSConstants.PARAM_JMS_USERNAME), - parameters.get(JMSConstants.PARAM_JMS_PASSWORD), - isJmsSpec11(), isQueue()); - - if (log.isDebugEnabled()) { - log.debug("New JMS Connection from JMS CF : " + name + " created"); - } - - } catch (JMSException e) { - handleException("Error acquiring a Connection from the JMS CF : " + name + - " using properties : " + parameters, e); - } - return connection; - } - - /** - * Create a new Session - * @param connection Connection to use - * @return A new Session - */ - private Session createSession(Connection connection) { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS Session from JMS CF : " + name); - } - return JMSUtils.createSession( - connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue()); - - } catch (JMSException e) { - handleException("Error creating JMS session from JMS CF : " + name, e); - } - return null; - } - - /** - * Create a new MessageProducer - * @param session Session to be used - * @param destination Destination to be used - * @return a new MessageProducer - */ - private MessageProducer createProducer(Session session, Destination destination) { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS MessageProducer from JMS CF : " + name); - } - - return JMSUtils.createProducer( - session, destination, isQueue(), isJmsSpec11()); - - } catch (JMSException e) { - handleException("Error creating JMS producer from JMS CF : " + name,e); - } - return null; - } - - /** - * Get a new Connection or shared Connection from this JMS CF - * @return new or shared Connection from this JMS CF - */ - public Connection getConnection() { - if (cacheLevel > JMSConstants.CACHE_NONE) { - return getSharedConnection(); - } else { - return createConnection(); - } - } - - /** - * Get a new Session or shared Session from this JMS CF - * @param connection the Connection to be used - * @return new or shared Session from this JMS CF - */ - public Session getSession(Connection connection) { - if (cacheLevel > JMSConstants.CACHE_CONNECTION) { - return getSharedSession(); - } else { - return createSession((connection == null ? getConnection() : connection)); - } - } - - /** - * Get a new MessageProducer or shared MessageProducer from this JMS CF - * @param connection the Connection to be used - * @param session the Session to be used - * @param destination the Destination to bind MessageProducer to - * @return new or shared MessageProducer from this JMS CF - */ - public MessageProducer getMessageProducer( - Connection connection, Session session, Destination destination) { - if (cacheLevel > JMSConstants.CACHE_SESSION) { - return getSharedProducer(); - } else { - return createProducer((session == null ? getSession(connection) : session), destination); - } - } - - /** - * Get a new Connection or shared Connection from this JMS CF - * @return new or shared Connection from this JMS CF - */ - private Connection getSharedConnection() { - if (sharedConnection == null) { - sharedConnection = createConnection(); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS Connection for JMS CF : " + name); - } - } - return sharedConnection; - } - - /** - * Get a shared Session from this JMS CF - * @return shared Session from this JMS CF - */ - private Session getSharedSession() { - if (sharedSession == null) { - sharedSession = createSession(getSharedConnection()); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS Session for JMS CF : " + name); - } - } - return sharedSession; - } - - /** - * Get a shared MessageProducer from this JMS CF - * @return shared MessageProducer from this JMS CF - */ - private MessageProducer getSharedProducer() { - if (sharedProducer == null) { - sharedProducer = createProducer(getSharedSession(), sharedDestination); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS MessageConsumer for JMS CF : " + name); - } - } - return sharedProducer; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java deleted file mode 100644 index fb16500efc..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java +++ /dev/null @@ -1,122 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.HashMap; -import java.util.Map; - -import javax.naming.Context; - -import org.apache.axis2.AxisFault; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.ParameterInclude; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Class managing a set of {@link JMSConnectionFactory} objects. - */ -public class JMSConnectionFactoryManager { - - private static final Log log = LogFactory.getLog(JMSConnectionFactoryManager.class); - - /** A Map containing the JMS connection factories managed by this, keyed by name */ - private final Map connectionFactories = - new HashMap(); - - /** - * Construct a Connection factory manager for the JMS transport sender or receiver - * @param trpInDesc - */ - public JMSConnectionFactoryManager(ParameterInclude trpInDesc) { - loadConnectionFactoryDefinitions(trpInDesc); - } - - /** - * Create JMSConnectionFactory instances for the definitions in the transport configuration, - * and add these into our collection of connectionFactories map keyed by name - * - * @param trpDesc the transport description for JMS - */ - private void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) { - - for (Object o : trpDesc.getParameters()) { - Parameter p = (Parameter)o; - try { - JMSConnectionFactory jmsConFactory = new JMSConnectionFactory(p); - connectionFactories.put(jmsConFactory.getName(), jmsConFactory); - } catch (AxisJMSException e) { - log.error("Error setting up connection factory : " + p.getName(), e); - } - } - } - - /** - * Get the JMS connection factory with the given name. - * - * @param name the name of the JMS connection factory - * @return the JMS connection factory or null if no connection factory with - * the given name exists - */ - public JMSConnectionFactory getJMSConnectionFactory(String name) { - return connectionFactories.get(name); - } - - /** - * Get the JMS connection factory that matches the given properties, i.e. referring to - * the same underlying connection factory. Used by the JMSSender to determine if already - * available resources should be used for outgoing messages - * - * @param props a Map of connection factory JNDI properties and name - * @return the JMS connection factory or null if no connection factory compatible - * with the given properties exists - */ - public JMSConnectionFactory getJMSConnectionFactory(Map props) { - for (JMSConnectionFactory cf : connectionFactories.values()) { - Map cfProperties = cf.getParameters(); - - if (equals(props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME), - cfProperties.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)) - && - equals(props.get(Context.INITIAL_CONTEXT_FACTORY), - cfProperties.get(Context.INITIAL_CONTEXT_FACTORY)) - && - equals(props.get(Context.PROVIDER_URL), - cfProperties.get(Context.PROVIDER_URL)) - && - equals(props.get(Context.SECURITY_PRINCIPAL), - cfProperties.get(Context.SECURITY_PRINCIPAL)) - && - equals(props.get(Context.SECURITY_CREDENTIALS), - cfProperties.get(Context.SECURITY_CREDENTIALS))) { - return cf; - } - } - return null; - } - - /** - * Compare two values preventing NPEs - */ - private static boolean equals(Object s1, Object s2) { - return s1 == s2 || s1 != null && s1.equals(s2); - } - - protected void handleException(String msg, Exception e) throws AxisFault { - log.error(msg, e); - throw new AxisFault(msg, e); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java deleted file mode 100644 index 6a11201625..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java +++ /dev/null @@ -1,273 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import org.apache.axis2.client.Options; - -public class JMSConstants { - - /** - * The prefix indicating an Axis JMS URL - */ - public static final String JMS_PREFIX = "jms:/"; - - //------------------------------------ defaults / constants ------------------------------------ - /** - * The local (Axis2) JMS connection factory name of the default connection - * factory to be used, if a service does not explicitly state the connection - * factory it should be using by a Parameter named JMSConstants.CONFAC_PARAM - */ - public static final String DEFAULT_CONFAC_NAME = "default"; - /** - * The default JMS time out waiting for a reply - also see {@link JMS_WAIT_REPLY} - */ - public static final long DEFAULT_JMS_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS; - /** - * Value indicating a Queue used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} - */ - public static final String DESTINATION_TYPE_QUEUE = "queue"; - /** - * Value indicating a Topic used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} - */ - public static final String DESTINATION_TYPE_TOPIC = "topic"; - /** - * Value indicating a JMS 1.1 Generic Destination used by {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} - */ - public static final String DESTINATION_TYPE_GENERIC = "generic"; - - /** Do not cache any JMS resources between tasks (when sending) or JMS CF's (when sending) */ - public static final int CACHE_NONE = 0; - /** Cache only the JMS connection between tasks (when receiving), or JMS CF's (when sending)*/ - public static final int CACHE_CONNECTION = 1; - /** Cache only the JMS connection and Session between tasks (receiving), or JMS CF's (sending) */ - public static final int CACHE_SESSION = 2; - /** Cache the JMS connection, Session and Consumer between tasks when receiving*/ - public static final int CACHE_CONSUMER = 3; - /** Cache the JMS connection, Session and Producer within a JMSConnectionFactory when sending */ - public static final int CACHE_PRODUCER = 4; - /** automatic choice of an appropriate caching level (depending on the transaction strategy) */ - public static final int CACHE_AUTO = 5; - - /** A JMS 1.1 Generic Destination type or ConnectionFactory */ - public static final int GENERIC = 0; - /** A Queue Destination type or ConnectionFactory */ - public static final int QUEUE = 1; - /** A Topic Destination type or ConnectionFactory */ - public static final int TOPIC = 2; - - /** - * The EPR parameter name indicating the name of the message level property that indicated the content type. - */ - public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty"; - - //---------------------------------- services.xml parameters ----------------------------------- - /** - * The Service level Parameter name indicating the JMS destination for requests of a service - */ - public static final String PARAM_DESTINATION = "transport.jms.Destination"; - /** - * The Service level Parameter name indicating the destination type for requests. - * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC} - */ - public static final String PARAM_DEST_TYPE = "transport.jms.DestinationType"; - /** - * The Service level Parameter name indicating the [default] response destination of a service - */ - public static final String PARAM_REPLY_DESTINATION = "transport.jms.ReplyDestination"; - /** - * The Service level Parameter name indicating the response destination type - * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC} - */ - public static final String PARAM_REPLY_DEST_TYPE = "transport.jms.ReplyDestinationType"; - /** - * The Parameter name of an Axis2 service, indicating the JMS connection - * factory which should be used to listen for messages for it. This is - * the local (Axis2) name of the connection factory and not the JNDI name - */ - public static final String PARAM_JMS_CONFAC = "transport.jms.ConnectionFactory"; - /** - * Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC - */ - public static final String PARAM_CONFAC_TYPE = "transport.jms.ConnectionFactoryType"; - /** - * The Parameter name indicating the JMS connection factory JNDI name - */ - public static final String PARAM_CONFAC_JNDI_NAME = "transport.jms.ConnectionFactoryJNDIName"; - /** - * The Parameter indicating the expected content type for messages received by the service. - */ - public static final String CONTENT_TYPE_PARAM = "transport.jms.ContentType"; - /** - * The Parameter indicating a final EPR as a String, to be published on the WSDL of a service - * Could occur more than once, and could provide additional connection properties or a subset - * of the properties auto computed. Also could replace IP addresses with hostnames, and expose - * public credentials clients. If a user specified this parameter, the auto generated EPR will - * not be exposed - unless an instance of this parameter is added with the string "legacy" - * This parameter could be used to expose EPR's conforming to the proposed SOAP/JMS spec - * until such time full support is implemented for it. - */ - public static final String PARAM_PUBLISH_EPR = "transport.jms.PublishEPR"; - /** The parameter indicating the JMS API specification to be used - if this is "1.1" the JMS - * 1.1 API would be used, else the JMS 1.0.2B - */ - public static final String PARAM_JMS_SPEC_VER = "transport.jms.JMSSpecVersion"; - - /** - * The Parameter indicating whether the JMS Session should be transacted for the service - * Specified as a "true" or "false" - */ - public static final String PARAM_SESSION_TRANSACTED = "transport.jms.SessionTransacted"; - /** - * The Parameter indicating the Session acknowledgement for the service. Must be one of the - * following Strings, or the appropriate Integer used by the JMS API - * "AUTO_ACKNOWLEDGE", "CLIENT_ACKNOWLEDGE", "DUPS_OK_ACKNOWLEDGE" or "SESSION_TRANSACTED" - */ - public static final String PARAM_SESSION_ACK = "transport.jms.SessionAcknowledgement"; - /** A message selector to be used when messages are sought for this service */ - public static final String PARAM_MSG_SELECTOR = "transport.jms.MessageSelector"; - /** Is the Subscription durable ? - "true" or "false" See {@link PARAM_DURABLE_SUB_NAME} */ - public static final String PARAM_SUB_DURABLE = "transport.jms.SubscriptionDurable"; - /** The name for the durable subscription See {@link PARAM_SUB_DURABLE}*/ - public static final String PARAM_DURABLE_SUB_NAME = "transport.jms.DurableSubscriberName"; - /** - * JMS Resource cachable level to be used for the service One of the following: - * {@link CACHE_NONE}, {@link CACHE_CONNECTION}, {@link CACHE_SESSION}, {@link CACHE_PRODUCER}, - * {@link CACHE_CONSUMER}, or {@link CACHE_AUTO} - to let the transport decide - */ - public static final String PARAM_CACHE_LEVEL = "transport.jms.CacheLevel"; - /** Should a pub-sub connection receive messages published by itself? */ - public static final String PARAM_PUBSUB_NO_LOCAL = "transport.jms.PubSubNoLocal"; - /** - * The number of milliseconds to wait for a message on a consumer.receive() call - * negative number - wait forever - * 0 - do not wait at all - * positive number - indicates the number of milliseconds to wait - */ - public static final String PARAM_RCV_TIMEOUT = "transport.jms.ReceiveTimeout"; - /** - *The number of concurrent consumers to be created to poll for messages for this service - * For Topics, this should be ONE, to prevent receipt of multiple copies of the same message - */ - public static final String PARAM_CONCURRENT_CONSUMERS = "transport.jms.ConcurrentConsumers"; - /** - * The maximum number of concurrent consumers for the service - See {@link PARAM_CONCURRENT_CONSUMERS} - */ - public static final String PARAM_MAX_CONSUMERS = "transport.jms.MaxConcurrentConsumers"; - /** - * The number of idle (i.e. message-less) polling attempts before a worker task commits suicide, - * to scale down resources, as load decreases - */ - public static final String PARAM_IDLE_TASK_LIMIT = "transport.jms.IdleTaskLimit"; - /** - * The maximum number of messages a polling worker task should process, before suicide - to - * prevent many longer running threads - default is unlimited (i.e. a worker task will live forever) - */ - public static final String PARAM_MAX_MSGS_PER_TASK = "transport.jms.MaxMessagesPerTask"; - /** - * Number of milliseconds before the first reconnection attempt is tried, on detection of an - * error. Subsequent retries follow a geometric series, where the - * duration = previous duration * factor - * This is further limited by the {@link PARAM_RECON_MAX_DURATION} to be meaningful - */ - public static final String PARAM_RECON_INIT_DURATION = "transport.jms.InitialReconnectDuration"; - /** @see PARAM_RECON_INIT_DURATION */ - public static final String PARAM_RECON_FACTOR = "transport.jms.ReconnectProgressFactor"; - /** @see PARAM_RECON_INIT_DURATION */ - public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration"; - - /** The username to use when obtaining a JMS Connection */ - public static final String PARAM_JMS_USERNAME = "transport.jms.UserName"; - /** The password to use when obtaining a JMS Connection */ - public static final String PARAM_JMS_PASSWORD = "transport.jms.Password"; - - //-------------- message context / transport header properties and client options -------------- - /** - * A MessageContext property or client Option indicating the JMS message type - */ - public static final String JMS_MESSAGE_TYPE = "JMS_MESSAGE_TYPE"; - /** - * The message type indicating a BytesMessage. See {@link JMS_MESSAGE_TYPE} - */ - public static final String JMS_BYTE_MESSAGE = "JMS_BYTE_MESSAGE"; - /** - * The message type indicating a TextMessage. See {@link JMS_MESSAGE_TYPE} - */ - public static final String JMS_TEXT_MESSAGE = "JMS_TEXT_MESSAGE"; - /** - * A MessageContext property or client Option indicating the time to wait for a response JMS message - */ - public static final String JMS_WAIT_REPLY = "JMS_WAIT_REPLY"; - /** - * A MessageContext property or client Option indicating the JMS correlation id - */ - public static final String JMS_COORELATION_ID = "JMS_COORELATION_ID"; - /** - * A MessageContext property or client Option indicating the JMS message id - */ - public static final String JMS_MESSAGE_ID = "JMS_MESSAGE_ID"; - /** - * A MessageContext property or client Option indicating the JMS delivery mode as an Integer or String - * Value 1 - javax.jms.DeliveryMode.NON_PERSISTENT - * Value 2 - javax.jms.DeliveryMode.PERSISTENT - */ - public static final String JMS_DELIVERY_MODE = "JMS_DELIVERY_MODE"; - /** - * A MessageContext property or client Option indicating the JMS destination to use on a Send - */ - public static final String JMS_DESTINATION = "JMS_DESTINATION"; - /** - * A MessageContext property or client Option indicating the JMS message expiration - a Long value - * specified as a String - */ - public static final String JMS_EXPIRATION = "JMS_EXPIRATION"; - /** - * A MessageContext property indicating if the message is a redelivery (Boolean as a String) - */ - public static final String JMS_REDELIVERED = "JMS_REDELIVERED"; - /** - * A MessageContext property or client Option indicating the JMS replyTo Destination - */ - public static final String JMS_REPLY_TO = "JMS_REPLY_TO"; - /** - * A MessageContext property or client Option indicating the JMS replyTo Destination type - * See {@link DESTINATION_TYPE_QUEUE} and {@link DESTINATION_TYPE_TOPIC} - */ - public static final String JMS_REPLY_TO_TYPE = "JMS_REPLY_TO_TYPE"; - /** - * A MessageContext property or client Option indicating the JMS timestamp (Long specified as String) - */ - public static final String JMS_TIMESTAMP = "JMS_TIMESTAMP"; - /** - * A MessageContext property indicating the JMS type String returned by {@link javax.jms.Message.getJMSType()} - */ - public static final String JMS_TYPE = "JMS_TYPE"; - /** - * A MessageContext property or client Option indicating the JMS priority - */ - public static final String JMS_PRIORITY = "JMS_PRIORITY"; - /** - * A MessageContext property or client Option indicating the JMS time to live for message sent - */ - public static final String JMS_TIME_TO_LIVE = "JMS_TIME_TO_LIVE"; - - /** The prefix that denotes JMSX properties */ - public static final String JMSX_PREFIX = "JMSX"; - /** The JMSXGroupID property */ - public static final String JMSX_GROUP_ID = "JMSXGroupID"; - /** The JMSXGroupSeq property */ - public static final String JMSX_GROUP_SEQ = "JMSXGroupSeq"; - -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java deleted file mode 100644 index c465b1d989..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java +++ /dev/null @@ -1,111 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet; - -/** - * Class that links an Axis2 service to a JMS destination. Additionally, it contains - * all the required information to process incoming JMS messages and to inject them - * into Axis2. - */ -public class JMSEndpoint { - private JMSConnectionFactory cf; - private AxisService service; - private String jndiDestinationName; - private int destinationType = JMSConstants.GENERIC; - private Set endpointReferences = new HashSet(); - private ContentTypeRuleSet contentTypeRuleSet; - - public AxisService getService() { - return service; - } - - public void setService(AxisService service) { - this.service = service; - } - - public String getServiceName() { - return service.getName(); - } - - public String getJndiDestinationName() { - return jndiDestinationName; - } - - public void setJndiDestinationName(String destinationJNDIName) { - this.jndiDestinationName = destinationJNDIName; - } - - public void setDestinationType(String destinationType) { - if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) { - this.destinationType = JMSConstants.TOPIC; - } else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) { - this.destinationType = JMSConstants.QUEUE; - } else { - this.destinationType = JMSConstants.GENERIC; - } - } - - public EndpointReference[] getEndpointReferences() { - return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]); - } - - public void computeEPRs() { - List eprs = new ArrayList(); - for (Object o : getService().getParameters()) { - Parameter p = (Parameter) o; - if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) { - if ("legacy".equalsIgnoreCase((String) p.getValue())) { - // if "legacy" specified, compute and replace it - endpointReferences.add( - new EndpointReference(JMSUtils.getEPR(cf, destinationType, this))); - } else { - endpointReferences.add(new EndpointReference((String) p.getValue())); - } - } - } - - if (eprs.isEmpty()) { - // if nothing specified, compute and return legacy EPR - endpointReferences.add(new EndpointReference(JMSUtils.getEPR(cf, destinationType, this))); - } - } - - public ContentTypeRuleSet getContentTypeRuleSet() { - return contentTypeRuleSet; - } - - public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) { - this.contentTypeRuleSet = contentTypeRuleSet; - } - - public JMSConnectionFactory getCf() { - return cf; - } - - public void setCf(JMSConnectionFactory cf) { - this.cf = cf; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java deleted file mode 100644 index ceeec4a6a3..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java +++ /dev/null @@ -1,28 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.IOException; - -import javax.jms.JMSException; - -public class JMSExceptionWrapper extends IOException { - private static final long serialVersionUID = 852441109009079511L; - - public JMSExceptionWrapper(JMSException ex) { - initCause(ex); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java deleted file mode 100644 index 8c9f66dfbf..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java +++ /dev/null @@ -1,294 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.BytesMessage; -import javax.jms.TextMessage; - -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.TransportInDescription; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleFactory; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.MessageTypeRule; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.PropertyRule; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportListener; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorListener; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSource; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSourceSupport; - -/** - * The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances - * for each service requesting exposure over JMS, and stops these if they are undeployed / stopped. - *

- * A service indicates a JMS Connection factory definition by name, which would be defined in the - * JMSListner on the axis2.xml, and this provides a way to reuse common configuration between - * services, as well as to optimize resources utilized - *

- * If the connection factory name was not specified, it will default to the one named "default" - * {@see JMSConstants.DEFAULT_CONFAC_NAME} - *

- * If a destination JNDI name is not specified, a service will expect to use a Queue with the same - * JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify - * many more detailed control options. See package documentation for more details - *

- * All Destinations / JMS Administered objects used MUST be pre-created or already available - */ -public class JMSListener extends AbstractTransportListener implements ManagementSupport, - TransportErrorSource { - - public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; - - /** The JMSConnectionFactoryManager which centralizes the management of defined factories */ - private JMSConnectionFactoryManager connFacManager; - /** A Map of service name to the JMS endpoints */ - private Map serviceNameToEndpointMap = new HashMap(); - /** A Map of service name to its ServiceTaskManager instances */ - private Map serviceNameToSTMMap = - new HashMap(); - private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this); - - /** - * TransportListener initialization - * - * @param cfgCtx the Axis configuration context - * @param trpInDesc the TransportIn description - */ - public void init(ConfigurationContext cfgCtx, - TransportInDescription trpInDesc) throws AxisFault { - - super.init(cfgCtx, trpInDesc); - connFacManager = new JMSConnectionFactoryManager(trpInDesc); - log.info("JMS Transport Receiver/Listener initialized..."); - } - - /** - * Returns EPRs for the given service over the JMS transport - * - * @param serviceName service name - * @return the JMS EPRs for the service - */ - public EndpointReference[] getEPRsForService(String serviceName) { - //Strip out the operation name - if (serviceName.indexOf('/') != -1) { - serviceName = serviceName.substring(0, serviceName.indexOf('/')); - } - // strip out the endpoint name if present - if (serviceName.indexOf('.') != -1) { - serviceName = serviceName.substring(0, serviceName.indexOf('.')); - } - JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName); - if (endpoint != null) { - return endpoint.getEndpointReferences(); - } else { - return null; - } - } - - /** - * Listen for JMS messages on behalf of the given service - * - * @param service the Axis service for which to listen for messages - */ - protected void startListeningForService(AxisService service) throws AxisFault { - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - throw new AxisFault("The service doesn't specify a JMS connection factory or refers " + - "to an invalid factory."); - } - - JMSEndpoint endpoint = new JMSEndpoint(); - endpoint.setService(service); - endpoint.setCf(cf); - - Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION); - if (destParam != null) { - endpoint.setJndiDestinationName((String)destParam.getValue()); - } else { - // Assume that the JNDI destination name is the same as the service name - endpoint.setJndiDestinationName(service.getName()); - } - - Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE); - if (destTypeParam != null) { - String paramValue = (String) destTypeParam.getValue(); - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || - JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) { - endpoint.setDestinationType(paramValue); - } else { - throw new AxisFault("Invalid destinaton type value " + paramValue); - } - } else { - log.debug("JMS destination type not given. default queue"); - endpoint.setDestinationType(JMSConstants.DESTINATION_TYPE_QUEUE); - } - - Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM); - if (contentTypeParam == null) { - ContentTypeRuleSet contentTypeRuleSet = new ContentTypeRuleSet(); - contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE)); - contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream")); - contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain")); - endpoint.setContentTypeRuleSet(contentTypeRuleSet); - } else { - endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam)); - } - - endpoint.computeEPRs(); // compute service EPR and keep for later use - serviceNameToEndpointMap.put(service.getName(), endpoint); - - ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool); - stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint)); - stm.start(); - serviceNameToSTMMap.put(service.getName(), stm); - - for (int i=0; i<3; i++) { - if (stm.getActiveTaskCount() > 0) { - log.info("Started to listen on destination : " + stm.getDestinationJNDIName() + - " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) + - " for service " + stm.getServiceName()); - return; - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) {} - } - - log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() + - " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) + - " for service " + stm.getServiceName() + " have not yet started after 3 seconds .."); - } - - /** - * Stops listening for messages for the service thats undeployed or stopped - * - * @param service the service that was undeployed or stopped - */ - protected void stopListeningForService(AxisService service) { - - ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName()); - if (stm != null) { - if (log.isDebugEnabled()) { - log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() + - " for service : " + stm.getServiceName()); - } - - stm.stop(); - - serviceNameToSTMMap.remove(service.getName()); - serviceNameToEndpointMap.remove(service.getName()); - log.info("Stopped listening for JMS messages to service : " + service.getName()); - - } else { - log.error("Unable to stop service : " + service.getName() + - " - unable to find its ServiceTaskManager"); - } - } - /** - * Return the connection factory name for this service. If this service - * refers to an invalid factory or defaults to a non-existent default - * factory, this returns null - * - * @param service the AxisService - * @return the JMSConnectionFactory to be used, or null if reference is invalid - */ - public JMSConnectionFactory getConnectionFactory(AxisService service) { - - Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC); - // validate connection factory name (specified or default) - if (conFacParam != null) { - return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue()); - } else { - return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME); - } - } - - // -- jmx/management methods-- - /** - * Pause the listener - Stop accepting/processing new messages, but continues processing existing - * messages until they complete. This helps bring an instance into a maintenence mode - * @throws AxisFault on error - */ - public void pause() throws AxisFault { - if (state != BaseConstants.STARTED) return; - try { - for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { - stm.pause(); - } - state = BaseConstants.PAUSED; - log.info("Listener paused"); - } catch (AxisJMSException e) { - log.error("At least one service could not be paused", e); - } - } - - /** - * Resume the lister - Brings the lister into active mode back from a paused state - * @throws AxisFault on error - */ - public void resume() throws AxisFault { - if (state != BaseConstants.PAUSED) return; - try { - for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { - stm.resume(); - } - state = BaseConstants.STARTED; - log.info("Listener resumed"); - } catch (AxisJMSException e) { - log.error("At least one service could not be resumed", e); - } - } - - /** - * Stop processing new messages, and wait the specified maximum time for in-flight - * requests to complete before a controlled shutdown for maintenence - * - * @param millis a number of milliseconds to wait until pending requests are allowed to complete - * @throws AxisFault on error - */ - public void maintenenceShutdown(long millis) throws AxisFault { - if (state != BaseConstants.STARTED) return; - try { - long start = System.currentTimeMillis(); - stop(); - state = BaseConstants.STOPPED; - log.info("Listener shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s"); - } catch (Exception e) { - handleException("Error shutting down the listener for maintenence", e); - } - } - - public void addErrorListener(TransportErrorListener listener) { - tess.addErrorListener(listener); - } - - public void removeErrorListener(TransportErrorListener listener) { - tess.removeErrorListener(listener); - } - - void error(AxisService service, Throwable ex) { - tess.error(service, ex); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java deleted file mode 100644 index ebd67e53e1..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java +++ /dev/null @@ -1,237 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; -import javax.transaction.UserTransaction; -import javax.xml.namespace.QName; - -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.AxisOperation; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeInfo; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector; - -/** - * This is the JMS message receiver which is invoked when a message is received. This processes - * the message through the engine - */ -public class JMSMessageReceiver { - - private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); - - /** The JMSListener */ - private JMSListener jmsListener = null; - /** A reference to the JMS Connection Factory */ - private JMSConnectionFactory jmsConnectionFactory = null; - /** The JMS metrics collector */ - private MetricsCollector metrics = null; - /** The endpoint this message receiver is bound to */ - final JMSEndpoint endpoint; - - /** - * Create a new JMSMessage receiver - * - * @param jmsListener the JMS transport Listener - * @param jmsConFac the JMS connection factory we are associated with - * @param workerPool the worker thread pool to be used - * @param cfgCtx the axis ConfigurationContext - * @param serviceName the name of the Axis service - * @param endpoint the JMSEndpoint definition to be used - */ - JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) { - this.jmsListener = jmsListener; - this.jmsConnectionFactory = jmsConFac; - this.endpoint = endpoint; - this.metrics = jmsListener.getMetricsCollector(); - } - - /** - * Process a new message received - * - * @param message the JMS message received - * @param ut UserTransaction which was used to receive the message - * @return true if caller should commit - */ - public boolean onMessage(Message message, UserTransaction ut) { - - try { - if (log.isDebugEnabled()) { - StringBuffer sb = new StringBuffer(); - sb.append("Received new JMS message for service :").append(endpoint.getServiceName()); - sb.append("\nDestination : ").append(message.getJMSDestination()); - sb.append("\nMessage ID : ").append(message.getJMSMessageID()); - sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID()); - sb.append("\nReplyTo : ").append(message.getJMSReplyTo()); - sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered()); - sb.append("\nPriority : ").append(message.getJMSPriority()); - sb.append("\nExpiration : ").append(message.getJMSExpiration()); - sb.append("\nTimestamp : ").append(message.getJMSTimestamp()); - sb.append("\nMessage Type : ").append(message.getJMSType()); - sb.append("\nPersistent ? : ").append( - DeliveryMode.PERSISTENT == message.getJMSDeliveryMode()); - - log.debug(sb.toString()); - if (log.isTraceEnabled() && message instanceof TextMessage) { - log.trace("\nMessage : " + ((TextMessage) message).getText()); - } - } - } catch (JMSException e) { - if (log.isDebugEnabled()) { - log.debug("Error reading JMS message headers for debug logging", e); - } - } - - // update transport level metrics - try { - metrics.incrementBytesReceived(JMSUtils.getMessageSize(message)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - - // has this message already expired? expiration time == 0 means never expires - try { - long expiryTime = message.getJMSExpiration(); - if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) { - if (log.isDebugEnabled()) { - log.debug("Discard expired message with ID : " + message.getJMSMessageID()); - } - return true; - } - } catch (JMSException ignore) {} - - - boolean successful = false; - try { - successful = processThoughEngine(message, ut); - - } catch (JMSException e) { - log.error("JMS Exception encountered while processing", e); - } catch (AxisFault e) { - log.error("Axis fault processing message", e); - } catch (Exception e) { - log.error("Unknown error processing message", e); - - } finally { - if (successful) { - metrics.incrementMessagesReceived(); - } else { - metrics.incrementFaultsReceiving(); - } - } - - return successful; - } - - /** - * Process the new message through Axis2 - * - * @param message the JMS message - * @param ut the UserTransaction used for receipt - * @return true if the caller should commit - * @throws JMSException, on JMS exceptions - * @throws AxisFault on Axis2 errors - */ - private boolean processThoughEngine(Message message, UserTransaction ut) - throws JMSException, AxisFault { - - MessageContext msgContext = jmsListener.createMessageContext(); - - // set the JMS Message ID as the Message ID of the MessageContext - try { - msgContext.setMessageID(message.getJMSMessageID()); - msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); - } catch (JMSException ignore) {} - - String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION); - - AxisService service = endpoint.getService(); - msgContext.setAxisService(service); - - // find the operation for the message, or default to one - Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); - QName operationQName = ( - operationParam != null ? - BaseUtils.getQNameFromString(operationParam.getValue()) : - BaseConstants.DEFAULT_OPERATION); - - AxisOperation operation = service.getOperation(operationQName); - if (operation != null) { - msgContext.setAxisOperation(operation); - msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); - } - - ContentTypeInfo contentTypeInfo = - endpoint.getContentTypeRuleSet().getContentTypeInfo(message); - if (contentTypeInfo == null) { - throw new AxisFault("Unable to determine content type for message " + - msgContext.getMessageID()); - } - - // set the message property OUT_TRANSPORT_INFO - // the reply is assumed to be over the JMSReplyTo destination, using - // the same incoming connection factory, if a JMSReplyTo is available - Destination replyTo = message.getJMSReplyTo(); - if (replyTo == null) { - // does the service specify a default reply destination ? - Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION); - if (param != null && param.getValue() != null) { - replyTo = jmsConnectionFactory.getDestination((String) param.getValue()); - } - - } - if (replyTo != null) { - msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, - new JMSOutTransportInfo(jmsConnectionFactory, replyTo, - contentTypeInfo.getPropertyName())); - } - - JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType()); - if (ut != null) { - msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut); - } - - try { - jmsListener.handleIncomingMessage( - msgContext, - JMSUtils.getTransportHeaders(message), - soapAction, - contentTypeInfo.getContentType()); - - } finally { - - Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY); - if (o != null) { - if ((o instanceof Boolean && ((Boolean) o)) || - (o instanceof String && Boolean.valueOf((String) o))) { - return false; - } - } - return true; - } - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java deleted file mode 100644 index 01fdee77dd..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.QueueSender; -import javax.jms.Session; -import javax.jms.TopicPublisher; -import javax.transaction.UserTransaction; - -import org.apache.axis2.context.MessageContext; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; - -/** - * Performs the actual sending of a JMS message, and the subsequent committing of a JTA transaction - * (if requested) or the local session transaction, if used. An instance of this class is unique - * to a single message send out operation and will not be shared. - */ -public class JMSMessageSender { - - private static final Log log = LogFactory.getLog(JMSMessageSender.class); - - /** The Connection to be used to send out */ - private Connection connection = null; - /** The Session to be used to send out */ - private Session session = null; - /** The MessageProducer used */ - private MessageProducer producer = null; - /** Target Destination */ - private Destination destination = null; - /** The level of cachability for resources */ - private int cacheLevel = JMSConstants.CACHE_CONNECTION; - /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */ - private boolean jmsSpec11 = true; - /** Are we sending to a Queue ? */ - private Boolean isQueue = null; - - /** - * This is a low-end method to support the one-time sends using JMS 1.0.2b - * @param connection the JMS Connection - * @param session JMS Session - * @param producer the MessageProducer - * @param destination the JMS Destination - * @param cacheLevel cacheLevel - None | Connection | Session | Producer - * @param jmsSpec11 true if the JMS 1.1 API should be used - * @param isQueue posting to a Queue? - */ - public JMSMessageSender(Connection connection, Session session, MessageProducer producer, - Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) { - - this.connection = connection; - this.session = session; - this.producer = producer; - this.destination = destination; - this.cacheLevel = cacheLevel; - this.jmsSpec11 = jmsSpec11; - this.isQueue = isQueue; - } - - /** - * Create a JMSSender using a JMSConnectionFactory and target EPR - * - * @param jmsConnectionFactory the JMSConnectionFactory - * @param targetAddress target EPR - */ - public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) { - - if (jmsConnectionFactory != null) { - this.cacheLevel = jmsConnectionFactory.getCacheLevel(); - this.jmsSpec11 = jmsConnectionFactory.isJmsSpec11(); - this.connection = jmsConnectionFactory.getConnection(); - this.session = jmsConnectionFactory.getSession(connection); - this.destination = - jmsConnectionFactory.getSharedDestination() == null ? - jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) : - jmsConnectionFactory.getSharedDestination(); - this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination); - - } else { - JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress); - jmsOut.loadConnectionFactoryFromProperies(); - } - } - - /** - * Perform actual send of JMS message to the Destination selected - * - * @param message the JMS message - * @param msgCtx the Axis2 MessageContext - */ - public void send(Message message, MessageContext msgCtx) { - - Boolean jtaCommit = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND); - Boolean rollbackOnly = getBooleanProperty(msgCtx, BaseConstants.SET_ROLLBACK_ONLY); - Boolean persistent = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE); - Integer priority = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY); - Integer timeToLive = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE); - - // Do not commit, if message is marked for rollback - if (rollbackOnly != null && rollbackOnly) { - jtaCommit = Boolean.FALSE; - } - - if (persistent != null) { - try { - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - } catch (JMSException e) { - handleException("Error setting JMS Producer for PERSISTENT delivery", e); - } - } - if (priority != null) { - try { - producer.setPriority(priority); - } catch (JMSException e) { - handleException("Error setting JMS Producer priority to : " + priority, e); - } - } - if (timeToLive != null) { - try { - producer.setTimeToLive(timeToLive); - } catch (JMSException e) { - handleException("Error setting JMS Producer TTL to : " + timeToLive, e); - } - } - - boolean sendingSuccessful = false; - // perform actual message sending - try { - if (jmsSpec11 || isQueue == null) { - producer.send(message); - - } else { - if (isQueue) { - ((QueueSender) producer).send(message); - - } else { - ((TopicPublisher) producer).publish(message); - } - } - - // set the actual MessageID to the message context for use by any others down the line - String msgId = null; - try { - msgId = message.getJMSMessageID(); - if (msgId != null) { - msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId); - } - } catch (JMSException ignore) {} - - sendingSuccessful = true; - - if (log.isDebugEnabled()) { - log.debug("Sent Message Context ID : " + msgCtx.getMessageID() + - " with JMS Message ID : " + msgId + - " to destination : " + producer.getDestination()); - } - - } catch (JMSException e) { - log.error("Error sending message with MessageContext ID : " + - msgCtx.getMessageID() + " to destination : " + destination, e); - - } finally { - - if (jtaCommit != null) { - - UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION); - if (ut != null) { - - try { - if (sendingSuccessful && jtaCommit) { - ut.commit(); - } else { - ut.rollback(); - } - msgCtx.removeProperty(BaseConstants.USER_TRANSACTION); - - if (log.isDebugEnabled()) { - log.debug((sendingSuccessful ? "Committed" : "Rolled back") + - " JTA Transaction"); - } - - } catch (Exception e) { - handleException("Error committing/rolling back JTA transaction after " + - "sending of message with MessageContext ID : " + msgCtx.getMessageID() + - " to destination : " + destination, e); - } - } - - } else { - try { - if (session.getTransacted()) { - if (sendingSuccessful && (rollbackOnly == null || !rollbackOnly)) { - session.commit(); - } else { - session.rollback(); - } - } - - if (log.isDebugEnabled()) { - log.debug((sendingSuccessful ? "Committed" : "Rolled back") + - " local (JMS Session) Transaction"); - } - - } catch (JMSException e) { - handleException("Error committing/rolling back local (i.e. session) " + - "transaction after sending of message with MessageContext ID : " + - msgCtx.getMessageID() + " to destination : " + destination, e); - } - } - } - } - - /** - * Close non-shared producer, session and connection if any - */ - public void close() { - if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) { - try { - producer.close(); - } catch (JMSException e) { - log.error("Error closing JMS MessageProducer after send", e); - } finally { - producer = null; - } - } - - if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) { - try { - session.close(); - } catch (JMSException e) { - log.error("Error closing JMS Session after send", e); - } finally { - session = null; - } - } - - if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - connection.close(); - } catch (JMSException e) { - log.error("Error closing JMS Connection after send", e); - } finally { - connection = null; - } - } - } - - private void handleException(String message, Exception e) { - log.error(message, e); - throw new AxisJMSException(message, e); - } - - private Boolean getBooleanProperty(MessageContext msgCtx, String name) { - Object o = msgCtx.getProperty(name); - if (o != null) { - if (o instanceof Boolean) { - return (Boolean) o; - } else if (o instanceof String) { - return Boolean.valueOf((String) o); - } - } - return null; - } - - private Integer getIntegerProperty(MessageContext msgCtx, String name) { - Object o = msgCtx.getProperty(name); - if (o != null) { - if (o instanceof Integer) { - return (Integer) o; - } else if (o instanceof String) { - return Integer.parseInt((String) o); - } - } - return null; - } - - public void setConnection(Connection connection) { - this.connection = connection; - } - - public void setSession(Session session) { - this.session = session; - } - - public void setProducer(MessageProducer producer) { - this.producer = producer; - } - - public void setCacheLevel(int cacheLevel) { - this.cacheLevel = cacheLevel; - } - - public int getCacheLevel() { - return cacheLevel; - } - - public Connection getConnection() { - return connection; - } - - public MessageProducer getProducer() { - return producer; - } - - public Session getSession() { - return session; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java deleted file mode 100644 index 9e029b33e1..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java +++ /dev/null @@ -1,306 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.Hashtable; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; - -import org.apache.axis2.transport.OutTransportInfo; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; - -/** - * The JMS OutTransportInfo is a holder of information to send an outgoing message - * (e.g. a Response) to a JMS destination. Thus at a minimum a reference to a - * ConnectionFactory and a Destination are held - */ -public class JMSOutTransportInfo implements OutTransportInfo { - - private static final Log log = LogFactory.getLog(JMSOutTransportInfo.class); - - /** The naming context */ - private Context context; - /** - * this is a reference to the underlying JMS ConnectionFactory when sending messages - * through connection factories not defined at the TransportSender level - */ - private ConnectionFactory connectionFactory = null; - /** - * this is a reference to a JMS Connection Factory instance, which has a reference - * to the underlying actual connection factory, an open connection to the JMS provider - * and optionally a session already available for use - */ - private JMSConnectionFactory jmsConnectionFactory = null; - /** the Destination queue or topic for the outgoing message */ - private Destination destination = null; - /** the Destination queue or topic for the outgoing message - * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC - */ - private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC; - /** the Reply Destination queue or topic for the outgoing message */ - private Destination replyDestination = null; - /** the Reply Destination name */ - private String replyDestinationName = null; - /** the Reply Destination queue or topic for the outgoing message - * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC - */ - private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC; - /** the EPR properties when the out-transport info is generated from a target EPR */ - private Hashtable properties = null; - /** the target EPR string where applicable */ - private String targetEPR = null; - /** the message property name that stores the content type of the outgoing message */ - private String contentTypeProperty; - - /** - * Creates an instance using the given JMS connection factory and destination - * - * @param jmsConnectionFactory the JMS connection factory - * @param dest the destination - * @param contentTypeProperty - */ - JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest, - String contentTypeProperty) { - this.jmsConnectionFactory = jmsConnectionFactory; - this.destination = dest; - destinationType = dest instanceof Topic ? JMSConstants.DESTINATION_TYPE_TOPIC - : JMSConstants.DESTINATION_TYPE_QUEUE; - this.contentTypeProperty = contentTypeProperty; - } - - /** - * Creates and instance using the given URL - * - * @param targetEPR the target EPR - */ - JMSOutTransportInfo(String targetEPR) { - - this.targetEPR = targetEPR; - if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) { - handleException("Invalid prefix for a JMS EPR : " + targetEPR); - - } else { - properties = BaseUtils.getEPRProperties(targetEPR); - String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE); - if (destinationType != null) { - setDestinationType(destinationType); - } - - String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE); - if (replyDestinationType != null) { - setReplyDestinationType(replyDestinationType); - } - - replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); - contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - try { - context = new InitialContext(properties); - } catch (NamingException e) { - handleException("Could not get an initial context using " + properties, e); - } - - destination = getDestination(context, targetEPR); - replyDestination = getReplyDestination(context, targetEPR); - } - } - - /** - * Provides a lazy load when created with a target EPR. This method performs actual - * lookup for the connection factory and destination - */ - public void loadConnectionFactoryFromProperies() { - if (properties != null) { - connectionFactory = getConnectionFactory(context, properties); - } - } - - /** - * Get the referenced ConnectionFactory using the properties from the context - * - * @param context the context to use for lookup - * @param props the properties which contains the JNDI name of the factory - * @return the connection factory - */ - private ConnectionFactory getConnectionFactory(Context context, Hashtable props) { - try { - - String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME); - if (conFacJndiName != null) { - return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName); - } else { - handleException("Connection Factory JNDI name cannot be determined"); - } - } catch (NamingException e) { - handleException("Failed to look up connection factory from JNDI", e); - } - return null; - } - - /** - * Get the JMS destination specified by the given URL from the context - * - * @param context the Context to lookup - * @param url URL - * @return the JMS destination, or null if it does not exist - */ - private Destination getDestination(Context context, String url) { - String destinationName = JMSUtils.getDestination(url); - try { - return JMSUtils.lookup(context, Destination.class, destinationName); - } catch (NameNotFoundException e) { - try { - return JMSUtils.lookup(context, Destination.class, - (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ? - "dynamicTopics/" : "dynamicQueues/") + destinationName); - } catch (NamingException x) { - handleException("Cannot locate destination : " + destinationName + " using " + url); - } - } catch (NamingException e) { - handleException("Cannot locate destination : " + destinationName + " using " + url, e); - } - return null; - } - - /** - * Get the JMS reply destination specified by the given URL from the context - * - * @param context the Context to lookup - * @param url URL - * @return the JMS destination, or null if it does not exist - */ - private Destination getReplyDestination(Context context, String url) { - String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); - if(replyDestinationName == null) { - return null; - } - - try { - return JMSUtils.lookup(context, Destination.class, replyDestinationName); - } catch (NameNotFoundException e) { - if (log.isDebugEnabled()) { - log.debug("Cannot locate destination : " + replyDestinationName + " using " + url); - } - } catch (NamingException e) { - handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e); - } - - return null; - } - - /** - * Look up for the given destination - * @param replyDest the JNDI name to lookup Destination required - * @return Destination for the JNDI name passed - */ - public Destination getReplyDestination(String replyDest) { - try { - return JMSUtils.lookup(jmsConnectionFactory.getContext(), Destination.class, - replyDest); - } catch (NameNotFoundException e) { - if (log.isDebugEnabled()) { - log.debug("Cannot locate reply destination : " + replyDest, e); - } - } catch (NamingException e) { - handleException("Cannot locate reply destination : " + replyDest, e); - } - return null; - } - - - private void handleException(String s) { - log.error(s); - throw new AxisJMSException(s); - } - - private void handleException(String s, Exception e) { - log.error(s, e); - throw new AxisJMSException(s, e); - } - - public Destination getDestination() { - return destination; - } - - public ConnectionFactory getConnectionFactory() { - return connectionFactory; - } - - public JMSConnectionFactory getJmsConnectionFactory() { - return jmsConnectionFactory; - } - - public void setContentType(String contentType) { - // this is a useless Axis2 method imposed by the OutTransportInfo interface :( - } - - public Hashtable getProperties() { - return properties; - } - - public String getTargetEPR() { - return targetEPR; - } - - public String getDestinationType() { - return destinationType; - } - - public void setDestinationType(String destinationType) { - if (destinationType != null) { - this.destinationType = destinationType; - } - } - - public Destination getReplyDestination() { - return replyDestination; - } - - public void setReplyDestination(Destination replyDestination) { - this.replyDestination = replyDestination; - } - - public String getReplyDestinationType() { - return replyDestinationType; - } - - public void setReplyDestinationType(String replyDestinationType) { - this.replyDestinationType = replyDestinationType; - } - - public String getReplyDestinationName() { - return replyDestinationName; - } - - public void setReplyDestinationName(String replyDestinationName) { - this.replyDestinationName = replyDestinationName; - } - - public String getContentTypeProperty() { - return contentTypeProperty; - } - - public void setContentTypeProperty(String contentTypeProperty) { - this.contentTypeProperty = contentTypeProperty; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java deleted file mode 100644 index a5f77dc4c9..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java +++ /dev/null @@ -1,499 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.StringWriter; -import java.nio.charset.UnsupportedCharsetException; -import java.util.Map; - -import javax.activation.DataHandler; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMNode; -import org.apache.axiom.om.OMOutputFormat; -import org.apache.axiom.om.OMText; -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.TransportOutDescription; -import org.apache.axis2.transport.MessageFormatter; -import org.apache.axis2.transport.OutTransportInfo; -import org.apache.axis2.transport.TransportUtils; -import org.apache.axis2.transport.http.HTTPConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportSender; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams.WriterOutputStream; - -/** - * The TransportSender for JMS - */ -public class JMSSender extends AbstractTransportSender implements ManagementSupport { - - public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; - - /** The JMS connection factory manager to be used when sending messages out */ - private JMSConnectionFactoryManager connFacManager; - - /** - * Initialize the transport sender by reading pre-defined connection factories for - * outgoing messages. - * - * @param cfgCtx the configuration context - * @param transportOut the transport sender definition from axis2.xml - * @throws AxisFault on error - */ - public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { - super.init(cfgCtx, transportOut); - connFacManager = new JMSConnectionFactoryManager(transportOut); - log.info("JMS Transport Sender initialized..."); - } - - /** - * Get corresponding JMS connection factory defined within the transport sender for the - * transport-out information - usually constructed from a targetEPR - * - * @param trpInfo the transport-out information - * @return the corresponding JMS connection factory, if any - */ - private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { - Map props = trpInfo.getProperties(); - if (trpInfo.getProperties() != null) { - String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); - if (jmsConnectionFactoryName != null) { - return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); - } else { - return connFacManager.getJMSConnectionFactory(props); - } - } else { - return null; - } - } - - /** - * Performs the actual sending of the JMS message - */ - public void sendMessage(MessageContext msgCtx, String targetAddress, - OutTransportInfo outTransportInfo) throws AxisFault { - - JMSConnectionFactory jmsConnectionFactory = null; - JMSOutTransportInfo jmsOut = null; - JMSMessageSender messageSender = null; - - if (targetAddress != null) { - - jmsOut = new JMSOutTransportInfo(targetAddress); - // do we have a definition for a connection factory to use for this address? - jmsConnectionFactory = getJMSConnectionFactory(jmsOut); - - if (jmsConnectionFactory != null) { - messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress); - - } else { - try { - messageSender = JMSUtils.createJMSSender(jmsOut); - } catch (JMSException e) { - handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); - } - } - - } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { - - jmsOut = (JMSOutTransportInfo) outTransportInfo; - try { - messageSender = JMSUtils.createJMSSender(jmsOut); - } catch (JMSException e) { - handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); - } - } - - // The message property to be used to send the content type is determined by - // the out transport info, i.e. either from the EPR if we are sending a request, - // or, if we are sending a response, from the configuration of the service that - // received the request). The property name can be overridden by a message - // context property. - String contentTypeProperty = - (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - if (contentTypeProperty == null) { - contentTypeProperty = jmsOut.getContentTypeProperty(); - } - - // need to synchronize as Sessions are not thread safe - synchronized (messageSender.getSession()) { - try { - sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); - } finally { - messageSender.close(); - } - } - } - - /** - * Perform actual sending of the JMS message - */ - private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender, - String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory, - JMSOutTransportInfo jmsOut) throws AxisFault { - - // convert the axis message context into a JMS Message that we can send over JMS - Message message = null; - String correlationId = null; - try { - message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty); - } catch (JMSException e) { - handleException("Error creating a JMS message from the message context", e); - } - - // should we wait for a synchronous response on this same thread? - boolean waitForResponse = waitForSynchronousResponse(msgCtx); - Destination replyDestination = jmsOut.getReplyDestination(); - - // if this is a synchronous out-in, prepare to listen on the response destination - if (waitForResponse) { - - String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); - if (replyDestName == null && jmsConnectionFactory != null) { - replyDestName = jmsConnectionFactory.getReplyToDestination(); - } - - if (replyDestName != null) { - if (jmsConnectionFactory != null) { - replyDestination = jmsConnectionFactory.getDestination(replyDestName); - } else { - replyDestination = jmsOut.getReplyDestination(replyDestName); - } - } - replyDestination = JMSUtils.setReplyDestination( - replyDestination, messageSender.getSession(), message); - } - - try { - messageSender.send(message, msgCtx); - metrics.incrementMessagesSent(msgCtx); - - } catch (AxisJMSException e) { - metrics.incrementFaultsSending(); - handleException("Error sending JMS message", e); - } - - try { - metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - - // if we are expecting a synchronous response back for the message sent out - if (waitForResponse) { - // TODO ******************************************************************************** - // TODO **** replace with asynchronous polling via a poller task to process this ******* - // information would be given. Then it should poll (until timeout) the - // requested destination for the response message and inject it from a - // asynchronous worker thread - try { - messageSender.getConnection().start(); // multiple calls are safely ignored - } catch (JMSException ignore) {} - - try { - correlationId = message.getJMSMessageID(); - } catch(JMSException ignore) {} - - // We assume here that the response uses the same message property to - // specify the content type of the message. - waitForResponseAndProcess(messageSender.getSession(), replyDestination, - msgCtx, correlationId, contentTypeProperty); - // TODO ******************************************************************************** - } - } - - /** - * Create a Consumer for the reply destination and wait for the response JMS message - * synchronously. If a message arrives within the specified time interval, process it - * through Axis2 - * @param session the session to use to listen for the response - * @param replyDestination the JMS reply Destination - * @param msgCtx the outgoing message for which we are expecting the response - * @param contentTypeProperty the message property used to determine the content type - * of the response message - * @throws AxisFault on error - */ - private void waitForResponseAndProcess(Session session, Destination replyDestination, - MessageContext msgCtx, String correlationId, - String contentTypeProperty) throws AxisFault { - - try { - MessageConsumer consumer; - consumer = JMSUtils.createConsumer(session, replyDestination, - "JMSCorrelationID = '" + correlationId + "'"); - - // how long are we willing to wait for the sync response - long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; - String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY); - if (waitReply != null) { - timeout = Long.valueOf(waitReply).longValue(); - } - - if (log.isDebugEnabled()) { - log.debug("Waiting for a maximum of " + timeout + - "ms for a response message to destination : " + replyDestination + - " with JMS correlation ID : " + correlationId); - } - - Message reply = consumer.receive(timeout); - - if (reply != null) { - - // update transport level metrics - metrics.incrementMessagesReceived(); - try { - metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - - try { - processSyncResponse(msgCtx, reply, contentTypeProperty); - metrics.incrementMessagesReceived(); - } catch (AxisFault e) { - metrics.incrementFaultsReceiving(); - throw e; - } - - } else { - log.warn("Did not receive a JMS response within " + - timeout + " ms to destination : " + replyDestination + - " with JMS correlation ID : " + correlationId); - metrics.incrementTimeoutsReceiving(); - } - - } catch (JMSException e) { - metrics.incrementFaultsReceiving(); - handleException("Error creating a consumer, or receiving a synchronous reply " + - "for outgoing MessageContext ID : " + msgCtx.getMessageID() + - " and reply Destination : " + replyDestination, e); - } - } - - /** - * Create a JMS Message from the given MessageContext and using the given - * session - * - * @param msgContext the MessageContext - * @param session the JMS session - * @param contentTypeProperty the message property to be used to store the - * content type - * @return a JMS message from the context and session - * @throws JMSException on exception - * @throws AxisFault on exception - */ - private Message createJMSMessage(MessageContext msgContext, Session session, - String contentTypeProperty) throws JMSException, AxisFault { - - Message message = null; - String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); - - // check the first element of the SOAP body, do we have content wrapped using the - // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or - // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages - // for JMS but just get the payload in its native format - String jmsPayloadType = guessMessageType(msgContext); - - if (jmsPayloadType == null) { - - OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); - MessageFormatter messageFormatter = null; - try { - messageFormatter = TransportUtils.getMessageFormatter(msgContext); - } catch (AxisFault axisFault) { - throw new JMSException("Unable to get the message formatter to use"); - } - - String contentType = messageFormatter.getContentType( - msgContext, format, msgContext.getSoapAction()); - - boolean useBytesMessage = - msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) || - contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1; - - OutputStream out; - StringWriter sw; - if (useBytesMessage) { - BytesMessage bytesMsg = session.createBytesMessage(); - sw = null; - out = new BytesMessageOutputStream(bytesMsg); - message = bytesMsg; - } else { - sw = new StringWriter(); - try { - out = new WriterOutputStream(sw, format.getCharSetEncoding()); - } catch (UnsupportedCharsetException ex) { - handleException("Unsupported encoding " + format.getCharSetEncoding(), ex); - return null; - } - } - - try { - messageFormatter.writeTo(msgContext, format, out, true); - out.close(); - } catch (IOException e) { - handleException("IO Error while creating BytesMessage", e); - } - - if (!useBytesMessage) { - TextMessage txtMsg = session.createTextMessage(); - txtMsg.setText(sw.toString()); - message = txtMsg; - } - - if (contentTypeProperty != null) { - message.setStringProperty(contentTypeProperty, contentType); - } - - } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) { - message = session.createBytesMessage(); - BytesMessage bytesMsg = (BytesMessage) message; - OMElement wrapper = msgContext.getEnvelope().getBody(). - getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER); - OMNode omNode = wrapper.getFirstOMChild(); - if (omNode != null && omNode instanceof OMText) { - Object dh = ((OMText) omNode).getDataHandler(); - if (dh != null && dh instanceof DataHandler) { - try { - ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg)); - } catch (IOException e) { - handleException("Error serializing binary content of element : " + - BaseConstants.DEFAULT_BINARY_WRAPPER, e); - } - } - } - - } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) { - message = session.createTextMessage(); - TextMessage txtMsg = (TextMessage) message; - txtMsg.setText(msgContext.getEnvelope().getBody(). - getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText()); - } - - // set the JMS correlation ID if specified - String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID); - if (correlationId == null && msgContext.getRelatesTo() != null) { - correlationId = msgContext.getRelatesTo().getValue(); - } - - if (correlationId != null) { - message.setJMSCorrelationID(correlationId); - } - - if (msgContext.isServerSide()) { - // set SOAP Action as a property on the JMS message - setProperty(message, msgContext, BaseConstants.SOAPACTION); - } else { - String action = msgContext.getOptions().getAction(); - if (action != null) { - message.setStringProperty(BaseConstants.SOAPACTION, action); - } - } - - JMSUtils.setTransportHeaders(msgContext, message); - return message; - } - - /** - * Guess the message type to use for JMS looking at the message contexts' envelope - * @param msgContext the message context - * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null - */ - private String guessMessageType(MessageContext msgContext) { - OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement(); - if (firstChild != null) { - if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) { - return JMSConstants.JMS_BYTE_MESSAGE; - } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) { - return JMSConstants.JMS_TEXT_MESSAGE; - } - } - return null; - } - - /** - * Creates an Axis MessageContext for the received JMS message and - * sets up the transports and various properties - * - * @param outMsgCtx the outgoing message for which we are expecting the response - * @param message the JMS response message received - * @param contentTypeProperty the message property used to determine the content type - * of the response message - * @throws AxisFault on error - */ - private void processSyncResponse(MessageContext outMsgCtx, Message message, - String contentTypeProperty) throws AxisFault { - - MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx); - - // load any transport headers from received message - JMSUtils.loadTransportHeaders(message, responseMsgCtx); - - // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response - // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This - // question is still under debate and due to the timelines, I am commiting this - // workaround as Axis2 1.2 is about to be released and Synapse 1.0 - responseMsgCtx.setServerSide(false); - - String contentType = - contentTypeProperty == null ? null - : JMSUtils.getProperty(message, contentTypeProperty); - - try { - JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType); - } catch (JMSException ex) { - throw AxisFault.makeFault(ex); - } -// responseMsgCtx.setServerSide(true); - - handleIncomingMessage( - responseMsgCtx, - JMSUtils.getTransportHeaders(message), - JMSUtils.getProperty(message, BaseConstants.SOAPACTION), - contentType - ); - } - - private void setProperty(Message message, MessageContext msgCtx, String key) { - - String value = getProperty(msgCtx, key); - if (value != null) { - try { - message.setStringProperty(key, value); - } catch (JMSException e) { - log.warn("Couldn't set message property : " + key + " = " + value, e); - } - } - } - - private String getProperty(MessageContext mc, String key) { - return (String) mc.getProperty(key); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java deleted file mode 100644 index 63faa0b852..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java +++ /dev/null @@ -1,1115 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.lang.reflect.Method; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.jms.TopicSession; -import javax.mail.internet.ContentType; -import javax.mail.internet.ParseException; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.Reference; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.builder.Builder; -import org.apache.axis2.builder.BuilderUtil; -import org.apache.axis2.builder.SOAPBuilder; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.transport.TransportUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.format.DataSourceMessageBuilder; -import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilder; -import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilderAdapter; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; - -/** - * Miscallaneous methods used for the JMS transport - */ -public class JMSUtils extends BaseUtils { - - private static final Log log = LogFactory.getLog(JMSUtils.class); - private static final Class[] NOARGS = new Class[] {}; - private static final Object[] NOPARMS = new Object[] {}; - - /** - * Should this service be enabled over the JMS transport? - * - * @param service the Axis service - * @return true if JMS should be enabled - */ - public static boolean isJMSService(AxisService service) { - if (service.isEnableAllTransports()) { - return true; - - } else { - List transports = service.getExposedTransports(); - for (Object transport : transports) { - if (JMSListener.TRANSPORT_NAME.equals(transport)) { - return true; - } - } - } - return false; - } - - /** - * Get the EPR for the given JMS connection factory and destination - * the form of the URL is - * jms:/?[=&]* - * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS - * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered - * - * @param cf the Axis2 JMS connection factory - * @param destinationType the type of destination - * @param endpoint JMSEndpoint - * @return the EPR as a String - */ - static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) { - StringBuffer sb = new StringBuffer(); - - sb.append( - JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName()); - sb.append("?"). - append(JMSConstants.PARAM_DEST_TYPE).append("=").append( - destinationType == JMSConstants.TOPIC ? - JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE); - - if (endpoint.getContentTypeRuleSet() != null) { - String contentTypeProperty = - endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty(); - if (contentTypeProperty != null) { - sb.append("&"); - sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - sb.append("="); - sb.append(contentTypeProperty); - } - } - - for (Map.Entry entry : cf.getParameters().entrySet()) { - if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) && - !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) && - !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) && - !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) { - sb.append("&").append( - entry.getKey()).append("=").append(entry.getValue()); - } - } - return sb.toString(); - } - - /** - * Get a String property from the JMS message - * - * @param message JMS message - * @param property property name - * @return property value - */ - public static String getProperty(Message message, String property) { - try { - return message.getStringProperty(property); - } catch (JMSException e) { - return null; - } - } - - /** - * Return the destination name from the given URL - * - * @param url the URL - * @return the destination name - */ - public static String getDestination(String url) { - String tempUrl = url.substring(JMSConstants.JMS_PREFIX.length()); - int propPos = tempUrl.indexOf("?"); - - if (propPos == -1) { - return tempUrl; - } else { - return tempUrl.substring(0, propPos); - } - } - - /** - * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in - * @param message the JMS message read - * @param msgContext the Axis2 MessageContext to be populated - * @param contentType content type for the message - * @throws AxisFault - * @throws JMSException - */ - public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) - throws AxisFault, JMSException { - - if (contentType == null) { - if (message instanceof TextMessage) { - contentType = "text/plain"; - } else { - contentType = "application/octet-stream"; - } - if (log.isDebugEnabled()) { - log.debug("No content type specified; assuming " + contentType); - } - } - - int index = contentType.indexOf(';'); - String type = index > 0 ? contentType.substring(0, index) : contentType; - Builder builder = BuilderUtil.getBuilderFromSelector(type, msgContext); - if (builder == null) { - if (log.isDebugEnabled()) { - log.debug("No message builder found for type '" + type + "'. Falling back to SOAP."); - } - builder = new SOAPBuilder(); - } - - OMElement documentElement; - if (message instanceof BytesMessage) { - // Extract the charset encoding from the content type and - // set the CHARACTER_SET_ENCODING property as e.g. SOAPBuilder relies on this. - String charSetEnc = null; - try { - if (contentType != null) { - charSetEnc = new ContentType(contentType).getParameter("charset"); - } - } catch (ParseException ex) { - // ignore - } - msgContext.setProperty(Constants.Configuration.CHARACTER_SET_ENCODING, charSetEnc); - - if (builder instanceof DataSourceMessageBuilder) { - documentElement = ((DataSourceMessageBuilder)builder).processDocument( - new BytesMessageDataSource((BytesMessage)message), contentType, - msgContext); - } else { - documentElement = builder.processDocument( - new BytesMessageInputStream((BytesMessage)message), contentType, - msgContext); - } - } else if (message instanceof TextMessage) { - TextMessageBuilder textMessageBuilder; - if (builder instanceof TextMessageBuilder) { - textMessageBuilder = (TextMessageBuilder)builder; - } else { - textMessageBuilder = new TextMessageBuilderAdapter(builder); - } - String content = ((TextMessage)message).getText(); - documentElement = textMessageBuilder.processDocument(content, contentType, msgContext); - } else { - handleException("Unsupported JMS message type " + message.getClass().getName()); - return; // Make compiler happy - } - msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement)); - } - - /** - * Set the JMS ReplyTo for the message - * - * @param replyDestination the JMS Destination where the reply is expected - * @param session the session to use to create a temp Queue if a response is expected - * but a Destination has not been specified - * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo - * @return the JMS ReplyTo Destination for the message - */ - public static Destination setReplyDestination(Destination replyDestination, Session session, - Message message) { - - if (replyDestination == null) { - try { - // create temporary queue to receive the reply - replyDestination = createTemporaryDestination(session); - } catch (JMSException e) { - handleException("Error creating temporary queue for response"); - } - } - - try { - message.setJMSReplyTo(replyDestination); - } catch (JMSException e) { - log.warn("Error setting JMS ReplyTo destination to : " + replyDestination, e); - } - - if (log.isDebugEnabled()) { - try { - assert replyDestination != null; - log.debug("Expecting a response to JMS Destination : " + - (replyDestination instanceof Queue ? - ((Queue) replyDestination).getQueueName() : - ((Topic) replyDestination).getTopicName())); - } catch (JMSException ignore) {} - } - return replyDestination; - } - - /** - * Set transport headers from the axis message context, into the JMS message - * - * @param msgContext the axis message context - * @param message the JMS Message - * @throws JMSException on exception - */ - public static void setTransportHeaders(MessageContext msgContext, Message message) - throws JMSException { - - Map headerMap = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS); - - if (headerMap == null) { - return; - } - - for (Object headerName : headerMap.keySet()) { - - String name = (String) headerName; - - if (name.startsWith(JMSConstants.JMSX_PREFIX) && - !(name.equals(JMSConstants.JMSX_GROUP_ID) || name.equals(JMSConstants.JMSX_GROUP_SEQ))) { - continue; - } - - if (JMSConstants.JMS_COORELATION_ID.equals(name)) { - message.setJMSCorrelationID( - (String) headerMap.get(JMSConstants.JMS_COORELATION_ID)); - } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) { - Object o = headerMap.get(JMSConstants.JMS_DELIVERY_MODE); - if (o instanceof Integer) { - message.setJMSDeliveryMode((Integer) o); - } else if (o instanceof String) { - try { - message.setJMSDeliveryMode(Integer.parseInt((String) o)); - } catch (NumberFormatException nfe) { - log.warn("Invalid delivery mode ignored : " + o, nfe); - } - } else { - log.warn("Invalid delivery mode ignored : " + o); - } - - } else if (JMSConstants.JMS_EXPIRATION.equals(name)) { - message.setJMSExpiration( - Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION))); - } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) { - message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID)); - } else if (JMSConstants.JMS_PRIORITY.equals(name)) { - message.setJMSPriority( - Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY))); - } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) { - message.setJMSTimestamp( - Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP))); - } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) { - message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE)); - - } else { - Object value = headerMap.get(name); - if (value instanceof String) { - message.setStringProperty(name, (String) value); - } else if (value instanceof Boolean) { - message.setBooleanProperty(name, (Boolean) value); - } else if (value instanceof Integer) { - message.setIntProperty(name, (Integer) value); - } else if (value instanceof Long) { - message.setLongProperty(name, (Long) value); - } else if (value instanceof Double) { - message.setDoubleProperty(name, (Double) value); - } else if (value instanceof Float) { - message.setFloatProperty(name, (Float) value); - } - } - } - } - - /** - * Read the transport headers from the JMS Message and set them to the axis2 message context - * - * @param message the JMS Message received - * @param responseMsgCtx the axis message context - * @throws AxisFault on error - */ - public static void loadTransportHeaders(Message message, MessageContext responseMsgCtx) - throws AxisFault { - responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, getTransportHeaders(message)); - } - - /** - * Extract transport level headers for JMS from the given message into a Map - * - * @param message the JMS message - * @return a Map of the transport headers - */ - public static Map getTransportHeaders(Message message) { - // create a Map to hold transport headers - Map map = new HashMap(); - - // correlation ID - try { - if (message.getJMSCorrelationID() != null) { - map.put(JMSConstants.JMS_COORELATION_ID, message.getJMSCorrelationID()); - } - } catch (JMSException ignore) {} - - // set the delivery mode as persistent or not - try { - map.put(JMSConstants.JMS_DELIVERY_MODE, Integer.toString(message.getJMSDeliveryMode())); - } catch (JMSException ignore) {} - - // destination name - try { - if (message.getJMSDestination() != null) { - Destination dest = message.getJMSDestination(); - map.put(JMSConstants.JMS_DESTINATION, - dest instanceof Queue ? - ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); - } - } catch (JMSException ignore) {} - - // expiration - try { - map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message.getJMSExpiration())); - } catch (JMSException ignore) {} - - // if a JMS message ID is found - try { - if (message.getJMSMessageID() != null) { - map.put(JMSConstants.JMS_MESSAGE_ID, message.getJMSMessageID()); - } - } catch (JMSException ignore) {} - - // priority - try { - map.put(JMSConstants.JMS_PRIORITY, Long.toString(message.getJMSPriority())); - } catch (JMSException ignore) {} - - // redelivered - try { - map.put(JMSConstants.JMS_REDELIVERED, Boolean.toString(message.getJMSRedelivered())); - } catch (JMSException ignore) {} - - // replyto destination name - try { - if (message.getJMSReplyTo() != null) { - Destination dest = message.getJMSReplyTo(); - map.put(JMSConstants.JMS_REPLY_TO, - dest instanceof Queue ? - ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); - } - } catch (JMSException ignore) {} - - // priority - try { - map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message.getJMSTimestamp())); - } catch (JMSException ignore) {} - - // message type - try { - if (message.getJMSType() != null) { - map.put(JMSConstants.JMS_TYPE, message.getJMSType()); - } - } catch (JMSException ignore) {} - - // any other transport properties / headers - Enumeration e = null; - try { - e = message.getPropertyNames(); - } catch (JMSException ignore) {} - - if (e != null) { - while (e.hasMoreElements()) { - String headerName = (String) e.nextElement(); - try { - map.put(headerName, message.getStringProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getBooleanProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getIntProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getLongProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getDoubleProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getFloatProperty(headerName)); - } catch (JMSException ignore) {} - } - } - - return map; - } - - - /** - * Create a MessageConsumer for the given Destination - * @param session JMS Session to use - * @param dest Destination for which the Consumer is to be created - * @param messageSelector the message selector to be used if any - * @return a MessageConsumer for the specified Destination - * @throws JMSException - */ - public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector) - throws JMSException { - - if (dest instanceof Queue) { - return ((QueueSession) session).createReceiver((Queue) dest, messageSelector); - } else { - return ((TopicSession) session).createSubscriber((Topic) dest, messageSelector, false); - } - } - - /** - * Create a temp queue or topic for synchronous receipt of responses, when a reply destination - * is not specified - * @param session the JMS Session to use - * @return a temporary Queue or Topic, depending on the session - * @throws JMSException - */ - public static Destination createTemporaryDestination(Session session) throws JMSException { - - if (session instanceof QueueSession) { - return session.createTemporaryQueue(); - } else { - return session.createTemporaryTopic(); - } - } - - /** - * Return the body length in bytes for a bytes message - * @param bMsg the JMS BytesMessage - * @return length of body in bytes - */ - public static long getBodyLength(BytesMessage bMsg) { - try { - Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS); - if (mtd != null) { - return (Long) mtd.invoke(bMsg, NOPARMS); - } - } catch (Exception e) { - // JMS 1.0 - if (log.isDebugEnabled()) { - log.debug("Error trying to determine JMS BytesMessage body length", e); - } - } - - // if JMS 1.0 - long length = 0; - try { - byte[] buffer = new byte[2048]; - bMsg.reset(); - for (int bytesRead = bMsg.readBytes(buffer); bytesRead != -1; - bytesRead = bMsg.readBytes(buffer)) { - length += bytesRead; - } - } catch (JMSException ignore) {} - return length; - } - - /** - * Get the length of the message in bytes - * @param message - * @return message size (or approximation) in bytes - * @throws JMSException - */ - public static long getMessageSize(Message message) throws JMSException { - if (message instanceof BytesMessage) { - return JMSUtils.getBodyLength((BytesMessage) message); - } else if (message instanceof TextMessage) { - // TODO: Converting the whole message to a byte array is too much overhead just to determine the message size. - // Anyway, the result is not accurate since we don't know what encoding the JMS provider uses. - return ((TextMessage) message).getText().getBytes().length; - } else { - log.warn("Can't determine size of JMS message; unsupported message type : " - + message.getClass().getName()); - return 0; - } - } - - public static T lookup(Context context, Class clazz, String name) - throws NamingException { - - Object object = context.lookup(name); - try { - return clazz.cast(object); - } catch (ClassCastException ex) { - // Instead of a ClassCastException, throw an exception with some - // more information. - if (object instanceof Reference) { - Reference ref = (Reference)object; - handleException("JNDI failed to de-reference Reference with name " + - name + "; is the factory " + ref.getFactoryClassName() + - " in your classpath?"); - return null; - } else { - handleException("JNDI lookup of name " + name + " returned a " + - object.getClass().getName() + " while a " + clazz + " was expected"); - return null; - } - } - } - - /** - * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory - * @param jcf - * @param service - * @param workerPool - * @return - */ - public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf, - AxisService service, WorkerPool workerPool) { - - String name = service.getName(); - Map svc = getServiceStringParameters(service.getParameters()); - Map cf = jcf.getParameters(); - - ServiceTaskManager stm = new ServiceTaskManager(); - - stm.setServiceName(name); - stm.addJmsProperties(cf); - stm.addJmsProperties(svc); - - stm.setConnFactoryJNDIName( - getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf)); - String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf); - if (destName == null) { - destName = service.getName(); - } - stm.setDestinationJNDIName(destName); - stm.setDestinationType(getDestinationType(svc, cf)); - - stm.setJmsSpec11( - getJMSSpecVersion(svc, cf)); - stm.setTransactionality( - getTransactionality(svc, cf)); - stm.setCacheUserTransaction( - getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf)); - stm.setUserTransactionJNDIName( - getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf)); - stm.setSessionTransacted( - getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf)); - stm.setSessionAckMode( - getSessionAck(svc, cf)); - stm.setMessageSelector( - getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf)); - stm.setSubscriptionDurable( - getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf)); - stm.setDurableSubscriberName( - getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf)); - - stm.setCacheLevel( - getCacheLevel(svc, cf)); - stm.setPubSubNoLocal( - getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf)); - - Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf); - if (value != null) { - stm.setReceiveTimeout(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf); - if (value != null) { - stm.setConcurrentConsumers(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf); - if (value != null) { - stm.setMaxConcurrentConsumers(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf); - if (value != null) { - stm.setIdleTaskExecutionLimit(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf); - if (value != null) { - stm.setMaxMessagesPerTask(value); - } - - value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf); - if (value != null) { - stm.setInitialReconnectDuration(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf); - if (value != null) { - stm.setMaxReconnectDuration(value); - } - Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf); - if (dValue != null) { - stm.setReconnectionProgressionFactor(dValue); - } - - stm.setWorkerPool(workerPool); - - // remove processed properties from property bag - stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION); - stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER); - stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY); - stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN); - stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED); - stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR); - stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE); - stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL); - stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL); - stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT); - stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS); - stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS); - stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT); - stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR); - - return stm; - } - - private static Map getServiceStringParameters(List list) { - - Map map = new HashMap(); - for (Object o : list) { - Parameter p = (Parameter) o; - if (p.getValue() instanceof String) { - map.put(p.getName(), (String) p.getValue()); - } - } - return map; - } - - private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value == null) { - throw new AxisJMSException("Service/connection factory property : " + key); - } - return value; - } - - private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - return value; - } - - private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value == null) { - return null; - } else { - return Boolean.valueOf(value); - } - } - - private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value != null) { - try { - return Integer.parseInt(value); - } catch (NumberFormatException e) { - throw new AxisJMSException("Invalid value : " + value + " for " + key); - } - } - return null; - } - - private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value != null) { - try { - return Double.parseDouble(value); - } catch (NumberFormatException e) { - throw new AxisJMSException("Invalid value : " + value + " for " + key); - } - } - return null; - } - - private static int getTransactionality(Map svcMap, Map cfMap) { - - String key = BaseConstants.PARAM_TRANSACTIONALITY; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null) { - return BaseConstants.TRANSACTION_NONE; - - } else { - if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) { - return BaseConstants.TRANSACTION_JTA; - } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) { - return BaseConstants.TRANSACTION_LOCAL; - } else { - throw new AxisJMSException("Invalid option : " + val + " for parameter : " + - BaseConstants.STR_TRANSACTION_JTA); - } - } - } - - private static int getDestinationType(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_DEST_TYPE; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) { - return JMSConstants.TOPIC; - } - return JMSConstants.QUEUE; - } - - private static int getSessionAck(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_SESSION_ACK; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) { - return Session.AUTO_ACKNOWLEDGE; - } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) { - return Session.CLIENT_ACKNOWLEDGE; - } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){ - return Session.DUPS_OK_ACKNOWLEDGE; - } else if ("SESSION_TRANSACTED".equals(val)) { - return 0; //Session.SESSION_TRANSACTED; - } else { - try { - return Integer.parseInt(val); - } catch (NumberFormatException ignore) { - throw new AxisJMSException("Invalid session acknowledgement mode : " + val); - } - } - } - - private static int getCacheLevel(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_CACHE_LEVEL; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if ("none".equalsIgnoreCase(val)) { - return JMSConstants.CACHE_NONE; - } else if ("connection".equalsIgnoreCase(val)) { - return JMSConstants.CACHE_CONNECTION; - } else if ("session".equals(val)){ - return JMSConstants.CACHE_SESSION; - } else if ("consumer".equals(val)) { - return JMSConstants.CACHE_CONSUMER; - } else if (val != null) { - throw new AxisJMSException("Invalid cache level : " + val); - } - return JMSConstants.CACHE_AUTO; - } - - private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_JMS_SPEC_VER; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null || "1.1".equals(val)) { - return true; - } else { - return false; - } - } - - /** - * This is a JMS spec independent method to create a Connection. Please be cautious when - * making any changes - * - * @param conFac the ConnectionFactory to use - * @param user optional user name - * @param pass optional password - * @param jmsSpec11 should we use JMS 1.1 API ? - * @param isQueue is this to deal with a Queue? - * @return a JMS Connection as requested - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static Connection createConnection(ConnectionFactory conFac, - String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException { - - Connection connection = null; - if (log.isDebugEnabled()) { - log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") + - "Connection using credentials : (" + user + "/" + pass + ")"); - } - - if (jmsSpec11 || isQueue == null) { - if (user != null && pass != null) { - connection = conFac.createConnection(user, pass); - } else { - connection = conFac.createConnection(); - } - - } else { - QueueConnectionFactory qConFac = null; - TopicConnectionFactory tConFac = null; - if (isQueue) { - tConFac = (TopicConnectionFactory) conFac; - } else { - qConFac = (QueueConnectionFactory) conFac; - } - - if (user != null && pass != null) { - if (qConFac != null) { - connection = qConFac.createQueueConnection(user, pass); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(user, pass); - } - } else { - if (qConFac != null) { - connection = qConFac.createQueueConnection(); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(); - } - } - } - return connection; - } - - /** - * This is a JMS spec independent method to create a Session. Please be cautious when - * making any changes - * - * @param connection the JMS Connection - * @param transacted should the session be transacted? - * @param ackMode the ACK mode for the session - * @param jmsSpec11 should we use the JMS 1.1 API? - * @param isQueue is this Session to deal with a Queue? - * @return a Session created for the given information - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static Session createSession(Connection connection, boolean transacted, int ackMode, - boolean jmsSpec11, Boolean isQueue) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - return connection.createSession(transacted, ackMode); - - } else { - if (isQueue) { - return ((QueueConnection) connection).createQueueSession(transacted, ackMode); - } else { - return ((TopicConnection) connection).createTopicSession(transacted, ackMode); - } - } - } - - /** - * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when - * making any changes - * - * @param session JMS session - * @param destination the Destination - * @param isQueue is the Destination a queue? - * @param subscriberName optional client name to use for a durable subscription to a topic - * @param messageSelector optional message selector - * @param pubSubNoLocal should we receive messages sent by us during pub-sub? - * @param isDurable is this a durable topic subscription? - * @param jmsSpec11 should we use JMS 1.1 API ? - * @return a MessageConsumer to receive messages - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static MessageConsumer createConsumer( - Session session, Destination destination, Boolean isQueue, - String subscriberName, String messageSelector, boolean pubSubNoLocal, - boolean isDurable, boolean jmsSpec11) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - if (isDurable) { - return session.createDurableSubscriber( - (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); - } else { - return session.createConsumer(destination, messageSelector, pubSubNoLocal); - } - } else { - if (isQueue) { - return ((QueueSession) session).createReceiver((Queue) destination, messageSelector); - } else { - if (isDurable) { - return ((TopicSession) session).createDurableSubscriber( - (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); - } else { - return ((TopicSession) session).createSubscriber( - (Topic) destination, messageSelector, pubSubNoLocal); - } - } - } - } - - /** - * This is a JMS spec independent method to create a MessageProducer. Please be cautious when - * making any changes - * - * @param session JMS session - * @param destination the Destination - * @param isQueue is the Destination a queue? - * @param jmsSpec11 should we use JMS 1.1 API ? - * @return a MessageProducer to send messages to the given Destination - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static MessageProducer createProducer( - Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - return session.createProducer(destination); - } else { - if (isQueue) { - return ((QueueSession) session).createSender((Queue) destination); - } else { - return ((TopicSession) session).createPublisher((Topic) destination); - } - } - } - - /** - * Create a one time MessageProducer for the given JMS OutTransport information - * For simplicity and best compatibility, this method uses only JMS 1.0.2b API. - * Please be cautious when making any changes - * - * @param jmsOut the JMS OutTransport information (contains all properties) - * @return a JMSSender based on one-time use resources - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut) - throws JMSException { - - // digest the targetAddress and locate CF from the EPR - jmsOut.loadConnectionFactoryFromProperies(); - - // create a one time connection and session to be used - Hashtable jmsProps = jmsOut.getProperties(); - String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null; - String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null; - - QueueConnectionFactory qConFac = null; - TopicConnectionFactory tConFac = null; - - int destType = -1; - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) { - destType = JMSConstants.QUEUE; - qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory(); - - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) { - destType = JMSConstants.TOPIC; - tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory(); - } - - Connection connection = null; - if (user != null && pass != null) { - if (qConFac != null) { - connection = qConFac.createQueueConnection(user, pass); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(user, pass); - } - } else { - if (qConFac != null) { - connection = qConFac.createQueueConnection(); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(); - } - } - - if (connection == null && jmsOut.getJmsConnectionFactory() != null) { - connection = jmsOut.getJmsConnectionFactory().getConnection(); - } - - Session session = null; - MessageProducer producer = null; - Destination destination = jmsOut.getDestination(); - - if (destType == JMSConstants.QUEUE) { - session = ((QueueConnection) connection). - createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - producer = ((QueueSession) session).createSender((Queue) destination); - } else { - session = ((TopicConnection) connection). - createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - producer = ((TopicSession) session).createPublisher((Topic) destination); - } - - return new JMSMessageSender(connection, session, producer, - destination, (jmsOut.getJmsConnectionFactory() == null ? - JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false, - destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE); - } - - /** - * Return a String representation of the destination type - * @param destType the destination type indicator int - * @return a descriptive String - */ - public static String getDestinationTypeAsString(int destType) { - if (destType == JMSConstants.QUEUE) { - return "Queue"; - } else if (destType == JMSConstants.TOPIC) { - return "Topic"; - } else { - return "Generic"; - } - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java deleted file mode 100644 index 28c8da2a8d..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java +++ /dev/null @@ -1,1217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.transaction.NotSupportedException; -import javax.transaction.SystemException; -import javax.transaction.UserTransaction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; - -/** - * Each service will have one ServiceTaskManager instance that will create, manage and also destroy - * idle tasks created for it, for message receipt. This will also allow individual tasks to cache - * the Connection, Session or Consumer as necessary, considering the transactionality required and - * user preference. - * - * This also acts as the ExceptionListener for all JMS connections made on behalf of the service. - * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try - * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh - * for the service, by discarding all connections. - */ -public class ServiceTaskManager { - - /** The logger */ - private static final Log log = LogFactory.getLog(ServiceTaskManager.class); - - /** The Task manager is stopped or has not started */ - private static final int STATE_STOPPED = 0; - /** The Task manager is started and active */ - private static final int STATE_STARTED = 1; - /** The Task manager is paused temporarily */ - private static final int STATE_PAUSED = 2; - /** The Task manager is started, but a shutdown has been requested */ - private static final int STATE_SHUTTING_DOWN = 3; - /** The Task manager has encountered an error */ - private static final int STATE_FAILURE = 4; - - /** The name of the service managed by this instance */ - private String serviceName; - /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */ - private String connFactoryJNDIName; - /** The JNDI name of the Destination Queue or Topic */ - private String destinationJNDIName; - /** JNDI location for the JTA UserTransaction */ - private String userTransactionJNDIName = "java:comp/UserTransaction"; - /** The type of destination - P2P or PubSub (or JMS 1.1 API generic?) */ - private int destinationType = JMSConstants.GENERIC; - /** An optional message selector */ - private String messageSelector = null; - - /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */ - private int transactionality = BaseConstants.TRANSACTION_NONE; - /** Should created Sessions be transactional ? - should be false when using JTA */ - private boolean sessionTransacted = true; - /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */ - private int sessionAckMode = Session.AUTO_ACKNOWLEDGE; - - /** Is the subscription durable ? */ - private boolean subscriptionDurable = false; - /** The name of the durable subscriber for this client */ - private String durableSubscriberName = null; - /** In PubSub mode, should I receive messages sent by me / my connection ? */ - private boolean pubSubNoLocal = false; - /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */ - private int concurrentConsumers = 1; - /** Maximum number of consumers to create - see @concurrentConsumers */ - private int maxConcurrentConsumers = 1; - /** The number of idle (i.e. message-less) attempts to be tried before suicide, to scale down */ - private int idleTaskExecutionLimit = 10; - /** The maximum number of successful message receipts for a task - to limit thread life span */ - private int maxMessagesPerTask = -1; // default is unlimited - /** The default receive timeout - a negative value means wait forever, zero dont wait at all */ - private int receiveTimeout = 1000; - /** JMS Resource cache level - Connection, Session, Consumer. Auto will select safe default */ - private int cacheLevel = JMSConstants.CACHE_AUTO; - /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */ - private boolean cacheUserTransaction = true; - /** Shared UserTransactionHandle */ - private UserTransaction sharedUserTransaction = null; - /** Should this service use JMS 1.1 ? (when false, defaults to 1.0.2b) */ - private boolean jmsSpec11 = true; - - /** Initial duration to attempt re-connection to JMS provider after failure */ - private int initialReconnectDuration = 10000; - /** Progression factory for geometric series that calculates re-connection times */ - private double reconnectionProgressionFactor = 2.0; // default to [bounded] exponential - /** Upper limit on reconnection attempt duration */ - private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour - - /** The JNDI context properties and other general properties */ - private Hashtable jmsProperties = new Hashtable(); - /** The JNDI Context acuired */ - private Context context = null; - /** The ConnectionFactory to be used */ - private ConnectionFactory conFactory = null; - /** The JMS Destination */ - private Destination destination = null; - - /** The list of active tasks thats managed by this instance */ - private final List pollingTasks = - Collections.synchronizedList(new ArrayList()); - /** The per-service JMS message receiver to be invoked after receipt of messages */ - private JMSMessageReceiver jmsMessageReceiver = null; - - /** State of this Task Manager */ - private volatile int serviceTaskManagerState = STATE_STOPPED; - /** Number of invoker tasks active */ - private volatile int activeTaskCount = 0; - /** The shared thread pool from the Listener */ - private WorkerPool workerPool = null; - - /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */ - private Connection sharedConnection = null; - - /** - * Start or re-start the Task Manager by shutting down any existing worker tasks and - * re-creating them. However, if this is STM is PAUSED, a start request is ignored. - * This applies for any connection failures during paused state as well, which then will - * not try to auto recover - */ - public synchronized void start() { - - if (serviceTaskManagerState == STATE_PAUSED) { - log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead"); - return; - } - - // if any tasks are running, stop whats running now - if (!pollingTasks.isEmpty()) { - stop(); - } - - if (cacheLevel == JMSConstants.CACHE_AUTO) { - cacheLevel = - transactionality == BaseConstants.TRANSACTION_NONE ? - JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE; - } - switch (cacheLevel) { - case JMSConstants.CACHE_NONE: - log.debug("No JMS resources will be cached/shared between poller " + - "worker tasks of service : " + serviceName); - break; - case JMSConstants.CACHE_CONNECTION: - log.debug("Only the JMS Connection will be cached and shared between *all* " + - "poller task invocations"); - break; - case JMSConstants.CACHE_SESSION: - log.debug("The JMS Connection and Session will be cached and shared between " + - "successive poller task invocations"); - break; - case JMSConstants.CACHE_CONSUMER: - log.debug("The JMS Connection, Session and MessageConsumer will be cached and " + - "shared between successive poller task invocations"); - break; - default : { - handleException("Invalid cache level : " + cacheLevel + - " for service : " + serviceName); - } - } - - for (int i=0; i 0) { - log.warn("Unable to shutdown all polling tasks of service : " + serviceName); - } - - if (serviceTaskManagerState != STATE_FAILURE) { - serviceTaskManagerState = STATE_STOPPED; - } - log.info("Task manager for service : " + serviceName + " shutdown"); - } - - /** - * Temporarily suspend receipt and processing of messages. Accomplished by stopping the - * connection / or connections used by the poller tasks - */ - public synchronized void pause() { - for (MessageListenerTask lstTask : pollingTasks) { - lstTask.pause(); - } - if (sharedConnection != null) { - try { - sharedConnection.stop(); - } catch (JMSException e) { - logError("Error pausing shared Connection", e); - } - } - } - - /** - * Resume receipt and processing of messages of paused tasks - */ - public synchronized void resume() { - for (MessageListenerTask lstTask : pollingTasks) { - lstTask.resume(); - } - if (sharedConnection != null) { - try { - sharedConnection.start(); - } catch (JMSException e) { - logError("Error resuming shared Connection", e); - } - } - } - - /** - * Start a new MessageListenerTask if we are still active, the threshold is not reached, and w - * e do not have any idle tasks - i.e. scale up listening - */ - private void scheduleNewTaskIfAppropriate() { - if (serviceTaskManagerState == STATE_STARTED && - pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) { - workerPool.execute(new MessageListenerTask()); - } - } - - /** - * Get the number of MessageListenerTasks that are currently idle - * @return idle task count - */ - private int getIdleTaskCount() { - int count = 0; - for (MessageListenerTask lstTask : pollingTasks) { - if (lstTask.isTaskIdle()) { - count++; - } - } - return count; - } - - /** - * Get the number of MessageListenerTasks that are currently connected to the JMS provider - * @return connected task count - */ - private int getConnectedTaskCount() { - int count = 0; - for (MessageListenerTask lstTask : pollingTasks) { - if (lstTask.isConnected()) { - count++; - } - } - return count; - } - - /** - * The actual threads/tasks that perform message polling - */ - private class MessageListenerTask implements Runnable, ExceptionListener { - - /** The Connection used by the polling task */ - private Connection connection = null; - /** The Sesson used by the polling task */ - private Session session = null; - /** The MessageConsumer used by the polling task */ - private MessageConsumer consumer = null; - /** State of the worker polling task */ - private volatile int workerState = STATE_STOPPED; - /** The number of idle (i.e. without fetching a message) polls for this task */ - private int idleExecutionCount = 0; - /** Is this task idle right now? */ - private volatile boolean idle = false; - /** Is this task connected to the JMS provider successfully? */ - private boolean connected = false; - - /** As soon as we create a new polling task, add it to the STM for control later */ - MessageListenerTask() { - synchronized(pollingTasks) { - pollingTasks.add(this); - } - } - - /** - * Pause this polling worker task - */ - public void pause() { - if (isActive()) { - if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - connection.stop(); - } catch (JMSException e) { - log.warn("Error pausing Message Listener task for service : " + serviceName); - } - } - workerState = STATE_PAUSED; - } - } - - /** - * Resume this polling task - */ - public void resume() { - if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - connection.start(); - } catch (JMSException e) { - log.warn("Error resuming Message Listener task for service : " + serviceName); - } - } - workerState = STATE_STARTED; - } - - /** - * Execute the polling worker task - */ - public void run() { - workerState = STATE_STARTED; - activeTaskCount++; - int messageCount = 0; - - if (log.isDebugEnabled()) { - log.debug("New poll task starting : thread id = " + Thread.currentThread().getId()); - } - - try { - while (isActive() && - (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) && - (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) { - - UserTransaction ut = null; - try { - if (transactionality == BaseConstants.TRANSACTION_JTA) { - ut = getUserTransaction(); - ut.begin(); - } - } catch (NotSupportedException e) { - handleException("Listener Task is already associated with a transaction", e); - } catch (SystemException e) { - handleException("Error starting a JTA transaction", e); - } - - // Get a message by polling, or receive null - Message message = receiveMessage(); - - if (log.isTraceEnabled()) { - if (message != null) { - try { - log.trace("<<<<<<< READ message with Message ID : " + - message.getJMSMessageID() + " from : " + destination + - " by Thread ID : " + Thread.currentThread().getId()); - } catch (JMSException ignore) {} - } else { - log.trace("No message received by Thread ID : " + - Thread.currentThread().getId() + " for destination : " + destination); - } - } - - if (message != null) { - idle = false; - idleExecutionCount = 0; - messageCount++; - // I will be busy now while processing this message, so start another if needed - scheduleNewTaskIfAppropriate(); - handleMessage(message, ut); - - } else { - idle = true; - idleExecutionCount++; - } - } - - } finally { - workerState = STATE_STOPPED; - activeTaskCount--; - synchronized(pollingTasks) { - pollingTasks.remove(this); - } - } - - if (log.isTraceEnabled()) { - log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() + - " is stopping after processing : " + messageCount + " messages :: " + - " isActive : " + isActive() + " maxMessagesPerTask : " + - getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() + - " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " + - getIdleTaskExecutionLimit()); - } else if (log.isDebugEnabled()) { - log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() + - " is stopping after processing : " + messageCount + " messages"); - } - - closeConsumer(true); - closeSession(true); - closeConnection(); - - // My time is up, so if I am going away, create another - scheduleNewTaskIfAppropriate(); - } - - /** - * Poll for and return a message if available - * - * @return a message read, or null - */ - private Message receiveMessage() { - - // get a new connection, session and consumer to prevent a conflict. - // If idle, it means we can re-use what we already have - if (consumer == null) { - connection = getConnection(); - session = getSession(); - consumer = getMessageConsumer(); - if (log.isDebugEnabled()) { - log.debug("Preparing a Connection, Session and Consumer to read messages"); - } - } - - if (log.isDebugEnabled()) { - log.debug("Waiting for a message for service : " + serviceName + " - duration : " - + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms"))); - } - - try { - if (getReceiveTimeout() < 0) { - return consumer.receive(); - } else { - return consumer.receive(getReceiveTimeout()); - } - } catch (IllegalStateException ignore) { - // probably the consumer (shared) was closed.. which is still ok.. as we didn't read - } catch (JMSException e) { - logError("Error receiving message for service : " + serviceName, e); - } - return null; - } - - /** - * Invoke ultimate message handler/listener and ack message and/or - * commit/rollback transactions - * @param message the JMS message received - * @param ut the UserTransaction used to receive this message, or null - */ - private void handleMessage(Message message, UserTransaction ut) { - - String messageId = null; - try { - messageId = message.getJMSMessageID(); - } catch (JMSException ignore) {} - - boolean commitOrAck = true; - try { - commitOrAck = jmsMessageReceiver.onMessage(message, ut); - - } finally { - - // if client acknowledgement is selected, and processing requested ACK - if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) { - try { - message.acknowledge(); - if (log.isDebugEnabled()) { - log.debug("Message : " + messageId + " acknowledged"); - } - } catch (JMSException e) { - logError("Error acknowledging message : " + messageId, e); - } - } - - // close the consumer - closeConsumer(false); - - // if session was transacted, commit it or rollback - try { - if (session.getTransacted()) { - if (commitOrAck) { - session.commit(); - if (log.isDebugEnabled()) { - log.debug("Session for message : " + messageId + " committed"); - } - } else { - session.rollback(); - if (log.isDebugEnabled()) { - log.debug("Session for message : " + messageId + " rolled back"); - } - } - } - } catch (JMSException e) { - logError("Error " + (commitOrAck ? "committing" : "rolling back") + - " local session txn for message : " + messageId, e); - } - - // if a JTA transaction was being used, commit it or rollback - try { - if (ut != null) { - if (commitOrAck) { - ut.commit(); - if (log.isDebugEnabled()) { - log.debug("JTA txn for message : " + messageId + " committed"); - } - } else { - ut.rollback(); - if (log.isDebugEnabled()) { - log.debug("JTA txn for message : " + messageId + " rolled back"); - } - } - } - } catch (Exception e) { - logError("Error " + (commitOrAck ? "committing" : "rolling back") + - " JTA txn for message : " + messageId + " from the session", e); - } - - closeSession(false); - closeConnection(); - } - } - - /** Handle JMS Connection exceptions by re-initializing. A single connection failure could - * cause re-initialization of multiple MessageListenerTasks / Connections - */ - public void onException(JMSException j) { - - if (!isSTMActive()) { - requestShutdown(); - return; - } - - log.warn("JMS Connection failure : " + j.getMessage()); - setConnected(false); - - if (cacheLevel < JMSConstants.CACHE_CONNECTION) { - // failed Connection was not shared, thus no need to restart the whole STM - requestShutdown(); - return; - } - - // if we failed while active, update state to show failure - setServiceTaskManagerState(STATE_FAILURE); - log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks"); - - int r = 1; - long retryDuration = initialReconnectDuration; - - do { - try { - log.info("Reconnection attempt : " + r + " for service : " + serviceName); - start(); - } catch (Exception ignore) {} - - boolean connected = false; - for (int i=0; i<5; i++) { - if (getConnectedTaskCount() == concurrentConsumers) { - connected = true; - break; - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) {} - } - - if (!connected) { - log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName + - " failed. Next retry in " + (retryDuration/1000) + "seconds"); - retryDuration = (long) (retryDuration * reconnectionProgressionFactor); - if (retryDuration > maxReconnectDuration) { - retryDuration = maxReconnectDuration; - } - - try { - Thread.sleep(retryDuration); - } catch (InterruptedException ignore) {} - } - - } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers); - } - - protected void requestShutdown() { - workerState = STATE_SHUTTING_DOWN; - } - - private boolean isActive() { - return workerState == STATE_STARTED; - } - - protected boolean isTaskIdle() { - return idle; - } - - public boolean isConnected() { - return connected; - } - - public void setConnected(boolean connected) { - this.connected = connected; - } - - /** - * Get a Connection that could/should be used by this task - depends on the cache level to reuse - * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection - */ - private Connection getConnection() { - if (cacheLevel < JMSConstants.CACHE_CONNECTION) { - // Connection is not shared - if (connection == null) { - connection = createConnection(); - } - } else { - if (sharedConnection != null) { - connection = sharedConnection; - } else { - synchronized(this) { - if (sharedConnection == null) { - sharedConnection = createConnection(); - } - connection = sharedConnection; - } - } - } - setConnected(true); - return connection; - } - - /** - * Get a Session that could/should be used by this task - depends on the cache level to reuse - * @param connection the connection (could be the shared connection) to use to create a Session - * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session - * created using the Connection passed, or a new/shared connection - */ - private Session getSession() { - if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) { - session = createSession(); - } - return session; - } - - /** - * Get a MessageConsumer that chould/should be used by this task - depends on the cache - * level to reuse - * @param connection option Connection to be used - * @param session optional Session to be used - * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new - * MessageConsumer possibly using the Connection and Session passed in - */ - private MessageConsumer getMessageConsumer() { - if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) { - consumer = createConsumer(); - } - return consumer; - } - - /** - * Close the given Connection, hiding exceptions if any which are logged - * @param connection the Connection to be closed - */ - private void closeConnection() { - if (connection != null && - cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS connection for service : " + serviceName); - } - connection.close(); - } catch (JMSException e) { - logError("Error closing JMS connection", e); - } finally { - connection = null; - } - } - } - - /** - * Close the given Session, hiding exceptions if any which are logged - * @param session the Session to be closed - */ - private void closeSession(boolean forced) { - if (session != null && - (cacheLevel < JMSConstants.CACHE_SESSION || forced)) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS session for service : " + serviceName); - } - session.close(); - } catch (JMSException e) { - logError("Error closing JMS session", e); - } finally { - session = null; - } - } - } - - /** - * Close the given Consumer, hiding exceptions if any which are logged - * @param consumer the Consumer to be closed - */ - private void closeConsumer(boolean forced) { - if (consumer != null && - (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS consumer for service : " + serviceName); - } - consumer.close(); - } catch (JMSException e) { - logError("Error closing JMS consumer", e); - } finally { - consumer = null; - } - } - } - - /** - * Create a new Connection for this STM, using JNDI properties and credentials provided - * @return a new Connection for this STM, using JNDI properties and credentials provided - */ - private Connection createConnection() { - - try { - conFactory = JMSUtils.lookup( - getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName()); - log.debug("Connected to the JMS connection factory : " + getConnFactoryJNDIName()); - } catch (NamingException e) { - handleException("Error looking up connection factory : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - - Connection connection = null; - try { - connection = JMSUtils.createConnection( - conFactory, - jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME), - jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD), - isJmsSpec11(), isQueue()); - - connection.setExceptionListener(this); - connection.start(); - log.debug("JMS Connection for service : " + serviceName + " created and started"); - - } catch (JMSException e) { - handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - return connection; - } - - /** - * Create a new Session for this STM - * @param connection the Connection to be used - * @return a new Session created using the Connection passed in - */ - private Session createSession() { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS Session for service : " + serviceName); - } - return JMSUtils.createSession( - connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue()); - - } catch (JMSException e) { - handleException("Error creating JMS session for service : " + serviceName, e); - } - return null; - } - - /** - * Create a new MessageConsumer for this STM - * @param session the Session to be used - * @return a new MessageConsumer created using the Session passed in - */ - private MessageConsumer createConsumer() { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS MessageConsumer for service : " + serviceName); - } - - return JMSUtils.createConsumer( - session, getDestination(session), isQueue(), - (isSubscriptionDurable() && getDurableSubscriberName() == null ? - getDurableSubscriberName() : serviceName), - getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11()); - - } catch (JMSException e) { - handleException("Error creating JMS consumer for service : " + serviceName,e); - } - return null; - } - } - - // -------------- mundane private methods ---------------- - /** - * Get the InitialContext for lookup using the JNDI parameters applicable to the service - * @return the InitialContext to be used - * @throws NamingException - */ - private Context getInitialContext() throws NamingException { - if (context == null) { - context = new InitialContext(jmsProperties); - } - return context; - } - - /** - * Return the JMS Destination for the JNDI name of the Destination from the InitialContext - * @return the JMS Destination to which this STM listens for messages - */ - private Destination getDestination(Session session) { - if (destination == null) { - try { - context = getInitialContext(); - destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName()); - if (log.isDebugEnabled()) { - log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() + - " found for service " + serviceName); - } - } catch (NamingException e) { - try { - switch (destinationType) { - case JMSConstants.QUEUE: { - destination = session.createQueue(getDestinationJNDIName()); - break; - } - case JMSConstants.TOPIC: { - destination = session.createTopic(getDestinationJNDIName()); - break; - } - default: { - handleException("Error looking up JMS destination : " + - getDestinationJNDIName() + " using JNDI properties : " + - jmsProperties, e); - } - } - } catch (JMSException j) { - handleException("Error looking up and creating JMS destination : " + - getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e); - } - } - } - return destination; - } - - /** - * The UserTransaction to be used, looked up from the JNDI - * @return The UserTransaction to be used, looked up from the JNDI - */ - private UserTransaction getUserTransaction() { - if (!cacheUserTransaction) { - if (log.isDebugEnabled()) { - log.debug("Acquiring a new UserTransaction for service : " + serviceName); - } - - try { - context = getInitialContext(); - return - JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); - } catch (NamingException e) { - handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - } - - if (sharedUserTransaction == null) { - try { - context = getInitialContext(); - sharedUserTransaction = - JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); - if (log.isDebugEnabled()) { - log.debug("Acquired shared UserTransaction for service : " + serviceName); - } - } catch (NamingException e) { - handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - } - return sharedUserTransaction; - } - - // -------------------- trivial methods --------------------- - private boolean isSTMActive() { - return serviceTaskManagerState == STATE_STARTED; - } - - /** - * Is this STM bound to a Queue, Topic or a JMS 1.1 Generic Destination? - * @return TRUE for a Queue, FALSE for a Topic and NULL for a Generic Destination - */ - private Boolean isQueue() { - if (destinationType == JMSConstants.GENERIC) { - return null; - } else { - return destinationType == JMSConstants.QUEUE; - } - } - - private void logError(String msg, Exception e) { - log.error(msg, e); - } - - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - private void handleException(String msg) { - log.error(msg); - throw new AxisJMSException(msg); - } - - // -------------- getters and setters ------------------ - public String getServiceName() { - return serviceName; - } - - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - public String getConnFactoryJNDIName() { - return connFactoryJNDIName; - } - - public void setConnFactoryJNDIName(String connFactoryJNDIName) { - this.connFactoryJNDIName = connFactoryJNDIName; - } - - public String getDestinationJNDIName() { - return destinationJNDIName; - } - - public void setDestinationJNDIName(String destinationJNDIName) { - this.destinationJNDIName = destinationJNDIName; - } - - public int getDestinationType() { - return destinationType; - } - - public void setDestinationType(int destinationType) { - this.destinationType = destinationType; - } - - public String getMessageSelector() { - return messageSelector; - } - - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - public int getTransactionality() { - return transactionality; - } - - public void setTransactionality(int transactionality) { - this.transactionality = transactionality; - sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL); - } - - public boolean isSessionTransacted() { - return sessionTransacted; - } - - public void setSessionTransacted(Boolean sessionTransacted) { - if (sessionTransacted != null) { - this.sessionTransacted = sessionTransacted; - // sesstionTransacted means local transactions are used, however !sessionTransacted does - // not mean that JTA is used - if (sessionTransacted) { - transactionality = BaseConstants.TRANSACTION_LOCAL; - } - } - } - - public int getSessionAckMode() { - return sessionAckMode; - } - - public void setSessionAckMode(int sessionAckMode) { - this.sessionAckMode = sessionAckMode; - } - - public boolean isSubscriptionDurable() { - return subscriptionDurable; - } - - public void setSubscriptionDurable(Boolean subscriptionDurable) { - if (subscriptionDurable != null) { - this.subscriptionDurable = subscriptionDurable; - } - } - - public String getDurableSubscriberName() { - return durableSubscriberName; - } - - public void setDurableSubscriberName(String durableSubscriberName) { - this.durableSubscriberName = durableSubscriberName; - } - - public boolean isPubSubNoLocal() { - return pubSubNoLocal; - } - - public void setPubSubNoLocal(Boolean pubSubNoLocal) { - if (pubSubNoLocal != null) { - this.pubSubNoLocal = pubSubNoLocal; - } - } - - public int getConcurrentConsumers() { - return concurrentConsumers; - } - - public void setConcurrentConsumers(int concurrentConsumers) { - this.concurrentConsumers = concurrentConsumers; - } - - public int getMaxConcurrentConsumers() { - return maxConcurrentConsumers; - } - - public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { - this.maxConcurrentConsumers = maxConcurrentConsumers; - } - - public int getIdleTaskExecutionLimit() { - return idleTaskExecutionLimit; - } - - public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { - this.idleTaskExecutionLimit = idleTaskExecutionLimit; - } - - public int getReceiveTimeout() { - return receiveTimeout; - } - - public void setReceiveTimeout(int receiveTimeout) { - this.receiveTimeout = receiveTimeout; - } - - public int getCacheLevel() { - return cacheLevel; - } - - public void setCacheLevel(int cacheLevel) { - this.cacheLevel = cacheLevel; - } - - public int getInitialReconnectDuration() { - return initialReconnectDuration; - } - - public void setInitialReconnectDuration(int initialReconnectDuration) { - this.initialReconnectDuration = initialReconnectDuration; - } - - public double getReconnectionProgressionFactor() { - return reconnectionProgressionFactor; - } - - public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) { - this.reconnectionProgressionFactor = reconnectionProgressionFactor; - } - - public long getMaxReconnectDuration() { - return maxReconnectDuration; - } - - public void setMaxReconnectDuration(long maxReconnectDuration) { - this.maxReconnectDuration = maxReconnectDuration; - } - - public int getMaxMessagesPerTask() { - return maxMessagesPerTask; - } - - public void setMaxMessagesPerTask(int maxMessagesPerTask) { - this.maxMessagesPerTask = maxMessagesPerTask; - } - - public String getUserTransactionJNDIName() { - return userTransactionJNDIName; - } - - public void setUserTransactionJNDIName(String userTransactionJNDIName) { - if (userTransactionJNDIName != null) { - this.userTransactionJNDIName = userTransactionJNDIName; - } - } - - public boolean isCacheUserTransaction() { - return cacheUserTransaction; - } - - public void setCacheUserTransaction(Boolean cacheUserTransaction) { - if (cacheUserTransaction != null) { - this.cacheUserTransaction = cacheUserTransaction; - } - } - - public boolean isJmsSpec11() { - return jmsSpec11; - } - - public void setJmsSpec11(boolean jmsSpec11) { - this.jmsSpec11 = jmsSpec11; - } - - public Hashtable getJmsProperties() { - return jmsProperties; - } - - public void addJmsProperties(Map jmsProperties) { - this.jmsProperties.putAll(jmsProperties); - } - - public void removeJmsProperties(String key) { - this.jmsProperties.remove(key); - } - - public Context getContext() { - return context; - } - - public ConnectionFactory getConnectionFactory() { - return conFactory; - } - - public List getPollingTasks() { - return pollingTasks; - } - - public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) { - this.jmsMessageReceiver = jmsMessageReceiver; - } - - public void setWorkerPool(WorkerPool workerPool) { - this.workerPool = workerPool; - } - - public int getActiveTaskCount() { - return activeTaskCount; - } - - public void setServiceTaskManagerState(int serviceTaskManagerState) { - this.serviceTaskManagerState = serviceTaskManagerState; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java deleted file mode 100644 index 2b415df64d..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java +++ /dev/null @@ -1,49 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -/** - * Class encapsulating the content type information for a given message. - */ -public class ContentTypeInfo { - private final String propertyName; - private final String contentType; - - public ContentTypeInfo(String propertyName, String contentType) { - this.propertyName = propertyName; - this.contentType = contentType; - } - - /** - * Get the name of the message property from which the content type - * has been extracted. - * - * @return the property name or null if the content type was not determined - * by extracting it from a message property - */ - public String getPropertyName() { - return propertyName; - } - - /** - * Get the content type of the message. - * - * @return The content type of the message. The return value is never null. - */ - public String getContentType() { - return contentType; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java deleted file mode 100644 index 0dba93356f..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java +++ /dev/null @@ -1,43 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import javax.jms.JMSException; -import javax.jms.Message; - -/** - * Interface implemented by content type rules. - */ -public interface ContentTypeRule { - /** - * Attempt to determine the content type of the given JMS message. - * - * @param message the message - * @return If the rule matches, the return value encapsulates the content type of the - * message and the message property name from which is was extracted - * (if applicable). If the rule doesn't match, the method returns null. - * @throws JMSException - */ - ContentTypeInfo getContentType(Message message) throws JMSException; - - /** - * Get the name of the message property used to extract the content type from, - * if applicable. - * - * @return the property name or null if not applicable - */ - String getExpectedContentTypeProperty(); -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java deleted file mode 100644 index a9fd25ef1b..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java +++ /dev/null @@ -1,74 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import java.util.Iterator; - -import javax.jms.BytesMessage; -import javax.jms.TextMessage; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.description.Parameter; - -/** - * Utility class to create content type rules and rule sets from XML. - */ -public class ContentTypeRuleFactory { - private ContentTypeRuleFactory() {} - - public static ContentTypeRule parse(OMElement element) throws AxisFault { - String name = element.getLocalName(); - String value = element.getText(); - if (name.equals("jmsProperty")) { - return new PropertyRule(value); - } else if (name.equals("textMessage")) { - return new MessageTypeRule(TextMessage.class, value); - } else if (name.equals("bytesMessage")) { - return new MessageTypeRule(BytesMessage.class, value); - } else if (name.equals("default")) { - return new DefaultRule(value); - } else { - throw new AxisFault("Unknown content rule type '" + name + "'"); - } - } - - public static ContentTypeRuleSet parse(Parameter param) throws AxisFault { - ContentTypeRuleSet ruleSet = new ContentTypeRuleSet(); - Object value = param.getValue(); - if (value instanceof OMElement) { - OMElement element = (OMElement)value; - - // DescriptionBuilder#processParameters actually sets the parameter element - // itself as the value. We need to support this case. - // TODO: seems like a bug in Axis2 and is inconsistent with Synapse's way of parsing parameter in proxy definitions - if (element == param.getParameterElement()) { - element = element.getFirstElement(); - } - - if (element.getLocalName().equals("rules")) { - for (Iterator it = element.getChildElements(); it.hasNext(); ) { - ruleSet.addRule(parse((OMElement)it.next())); - } - } else { - throw new AxisFault("Expected element"); - } - } else { - ruleSet.addRule(new DefaultRule((String)value)); - } - return ruleSet; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java deleted file mode 100644 index 90383a42f8..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java +++ /dev/null @@ -1,64 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.JMSException; -import javax.jms.Message; - -/** - * A set of content type rules. - */ -public class ContentTypeRuleSet { - private final List rules = new ArrayList(); - private String defaultContentTypeProperty; - - /** - * Add a content type rule to this set. - * - * @param rule the rule to add - */ - public void addRule(ContentTypeRule rule) { - rules.add(rule); - if (defaultContentTypeProperty == null) { - defaultContentTypeProperty = rule.getExpectedContentTypeProperty(); - } - } - - /** - * Determine the content type of the given message. - * This method will try the registered rules in turn until the first rule matches. - * - * @param message the message - * @return the content type information for the message or null if none of the rules matches - * @throws JMSException - */ - public ContentTypeInfo getContentTypeInfo(Message message) throws JMSException { - for (ContentTypeRule rule : rules) { - ContentTypeInfo contentTypeInfo = rule.getContentType(message); - if (contentTypeInfo != null) { - return contentTypeInfo; - } - } - return null; - } - - public String getDefaultContentTypeProperty() { - return defaultContentTypeProperty; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java deleted file mode 100644 index a158f6ec74..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java +++ /dev/null @@ -1,37 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import javax.jms.Message; - -/** - * Content type rule that always matches and that returns a fixed (default) content type. - */ -public class DefaultRule implements ContentTypeRule { - private final String contentType; - - public DefaultRule(String contentType) { - this.contentType = contentType; - } - - public ContentTypeInfo getContentType(Message message) { - return new ContentTypeInfo(null, contentType); - } - - public String getExpectedContentTypeProperty() { - return null; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java deleted file mode 100644 index cb25ab93d4..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import javax.jms.Message; - -/** - * Content type rule that matches a given message type and returns a fixed content type. - */ -public class MessageTypeRule implements ContentTypeRule { - private final Class messageType; - private final String contentType; - - public MessageTypeRule(Class messageType, String contentType) { - this.messageType = messageType; - this.contentType = contentType; - } - - public ContentTypeInfo getContentType(Message message) { - return messageType.isInstance(message) ? new ContentTypeInfo(null, contentType) : null; - } - - public String getExpectedContentTypeProperty() { - return null; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java deleted file mode 100644 index c8d13ba462..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import javax.jms.JMSException; -import javax.jms.Message; - -/** - * Content type rule that attempts to extract the content type from a message property. - */ -public class PropertyRule implements ContentTypeRule { - private final String propertyName; - - public PropertyRule(String propertyName) { - this.propertyName = propertyName; - } - - public ContentTypeInfo getContentType(Message message) throws JMSException { - String value = message.getStringProperty(propertyName); - return value == null ? null : new ContentTypeInfo(propertyName, value); - } - - public String getExpectedContentTypeProperty() { - return propertyName; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java deleted file mode 100644 index 750170edd7..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -/** - * Provides classes and interfaces to define content type rules. - * - * Content type rules are used to determine the content type of a - * received message based on JMS properties, message type, etc. - */ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; \ No newline at end of file diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html deleted file mode 100644 index b95b49eb69..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html +++ /dev/null @@ -1,356 +0,0 @@ - -JMS Transport Configuration - - -

JMS Listener Configuration (axis2.xml)

- -e.g: - -
-    <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
-        <parameter name="myTopicConnectionFactory">
-            <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
-            <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
-            <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
-            <parameter name="transport.jms.ConnectionFactoryType">topic</parameter>
-            <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter>
-        </parameter>
-
-        <parameter name="myQueueConnectionFactory">
-            <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
-            <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
-            <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
-            <parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
-            <parameter name="transport.jms.JMSSpecVersion">1.1</parameter>
-        </parameter>
-
-        <parameter name="default">
-            <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
-            <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
-            <parameter name="transport.jms.ConnectionFactoryJNDIName">ConnectionFactory</parameter>
-        </parameter>
-    </transportReceiver>
-
-    <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender">
-        <parameter name="myTopicConnectionFactory">
-            <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
-            <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
-            <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
-            <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter>
-            <parameter name="transport.jms.CacheLevel">producer</parameter>
-        </parameter>
-
-        <parameter name="myQueueConnectionFactory">
-            <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
-            <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
-            <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
-            <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter>
-            <parameter name="transport.jms.CacheLevel">producer</parameter>
-        </parameter>
-
-        <parameter name="default">
-            <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
-            <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
-            <parameter name="transport.jms.ConnectionFactoryJNDIName">ConnectionFactory</parameter>
-            <parameter name="transport.jms.CacheLevel">connection</parameter>
-        </parameter>
-    </transportSender>
-
- -

- The Transport Listener and Sender both allows the user to configure one or more logical JMS Connection - Factories, which are named definitions as shown above. Any remaining parameters maybe defined at the - service level via the services.xml. The applicable set of parameters for a service would be the - union of the properties from the services.xml and the corresponding logical connection factory. -

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Transport level

ListeningSending
JNDIjava.naming.factory.initialThe JNDI InitialContext factory classRequired

java.naming.provider.urlJNDI Provider URLRequired

java.naming.security.principalUsername for JNDI access


java.naming.security.credentialsPassword for JNDI access

Transactionstransport.TransactionalityDesired transactionality. One of none / local / jtaDefaults to none

transport.UserTxnJNDINameJNDI name to be used to obtain a UserTransactionDefaults to "java:comp/UserTransaction"

transport.CacheUserTxnGenerally its safe and more efficient to cache the - UserTransaction reference from JNDI. One of true/ falseDefaults to true

transport.jms.SessionTransactedShould the JMS Session be transacted. One of true/ falseDefaults to true when local transactions are used

transport.jms.SessionAcknowledgementJMS Session acknowledgement mode to be used. One of AUTO_ACKNOWLEDGE | CLIENT_ACKNOWLEDGE | DUPS_OK_ACKNOWLEDGE | SESSION_TRANSACTEDDefaults to AUTO_ACKNOWLEDGE
Connectiontransport.jms.ConnectionFactoryName of the logical connection factory this service will useDefaults to "default"

transport.jms.ConnectionFactoryJNDINameThe JNDI name of the JMS ConnectionFactoryRequired

transport.jms.ConnectionFactoryType Type of ConnectionFactory – queue / topicSuggested to be specified

transport.jms.JMSSpecVersionJMS API Version One of "1.1" or "1.0.2b"Defaults to 1.1

transport.jms.UserNameThe JMS connection username


transport.jms.PasswordThe JMS connection password

Destinationstransport.jms.DestinationJNDI Name of the Destination Defaults to Service name

transport.jms.DestinationTypeType of Destination – queue / topicDefaults to a queue

transport.jms.DefaultReplyDestinationJNDI Name of the default reply Destination


transport.jms.DefaultReplyDestinationTypeType of the reply Destination – queue / topicSame type as of Destination
Advancedtransport.jms.MessageSelectorOptional message selector to be applied


transport.jms.SubscriptionDurableIs the subscription durable? (For Topics) – true / falseDefaults to false

transport.jms.DurableSubscriberNameName to be used for the durable subscriptionRequired when subscription is durable

transport.jms.PubSubNoLocalShould messages published by the same connection (for Topics) - be received? – true / falseDefaults to false

transport.jms.CacheLevelThe JMS resource cache level. One of none / connection / - session / consumer / producer / autoDefaults to auto

transport.jms.ReceiveTimeoutTime to wait for a JMS message during polling. Negative means - wait forever, while 0 means do not wait at all. Anything else, is - a millisecond value for the pollDefaults to 1000ms

transport.jms.ConcurrentConsumersNumber of concurrent consumer tasks (~threads) to be started to - poll for messages for this service. For Topics, this should be - always 1, to prevent the same message being processed multiple - timesDefaults to 1

transport.jms.MaxConcurrentConsumersWill dynamically scale the number of concurrent consumer tasks - (~threads) until this value; as the load increases. Should always - be 1 for Topics.Defaults to 1

transport.jms.IdleTaskLimitThe number of idle (i.e. poll without receipt of a message) - runs per task, before it dies – to recycle resources, and to - allow dynamic scale down.Defaults to 10

transport.jms.MaxMessagesPerTaskThe maximum number of successful message receipts to limit per - Task lifetime. Defaults to –1 which implies unlimited messages
Reconnectiontransport.jms.InitialReconnectDurationInitial reconnection attempt durationDefaults to 10,000ms

transport.jms.ReconnectProgressFactorFactor used to compute consecutive reconnection attempt - durations, in a geometric seriesDefaults to 2 (i.e. exponential)

transport.jms.MaxReconnectDurationMaximum limit for a reconnection durationDefaults to 1 hour

transport.jms.PublishEPROne or more JMS URL's to be showed as the JMS EPRs on the WSDL - for the service. Allows the specification of a custom EPR, and/or - hiding of internal properties from a public EPR (e.g. - credentials). Add one as LEGACY to retain auto generated EPR, when - adding new EPRs






Legacy Mode and Payload handlingWrapperBinary and Text payload wrapper element to be specified as "{ns}name" where ns refers to a namespace and name the name of the elementDefault binary wrapper
  • {http://ws.apache.org/commons/ns/payload}binary
- Default text wrapper
  • {http://ws.apache.org/commons/ns/payload}text


Operationoperation name to be specified as "{ns}name" where ns refers to the namespace and name the name of the operationDefaults to urn:mediate
- - - \ No newline at end of file -- cgit v1.2.3