diff options
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms')
25 files changed, 0 insertions, 5885 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java deleted file mode 100644 index ec53a2a1ca..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -public class AxisJMSException extends RuntimeException { - - AxisJMSException() { - super(); - } - - AxisJMSException(String msg) { - super(msg); - } - - AxisJMSException(String msg, Exception e) { - super(msg, e); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java deleted file mode 100644 index 5228efa154..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java +++ /dev/null @@ -1,72 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; - -/** - * Data source implementation wrapping a JMS {@link BytesMessage}. - * <p> - * Note that two input streams created by the same instance of this - * class can not be used at the same time. - */ -public class BytesMessageDataSource implements SizeAwareDataSource { - private final BytesMessage message; - private final String contentType; - - public BytesMessageDataSource(BytesMessage message, String contentType) { - this.message = message; - this.contentType = contentType; - } - - public BytesMessageDataSource(BytesMessage message) { - this(message, "application/octet-stream"); - } - - public long getSize() { - try { - return message.getBodyLength(); - } catch (JMSException ex) { - throw new RuntimeException(ex); - } - } - - public String getContentType() { - return contentType; - } - - public InputStream getInputStream() throws IOException { - try { - message.reset(); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - return new BytesMessageInputStream(message); - } - - public String getName() { - return null; - } - - public OutputStream getOutputStream() throws IOException { - throw new UnsupportedOperationException(); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java deleted file mode 100644 index 9080641572..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java +++ /dev/null @@ -1,75 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.InputStream; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; - -/** - * Input stream that reads data from a JMS {@link BytesMessage}. - * Note that since the current position in the message is managed by - * the underlying {@link BytesMessage} object, it is not possible to - * use several instances of this class operating on a single - * {@link BytesMessage} at the same time. - */ -public class BytesMessageInputStream extends InputStream { - private final BytesMessage message; - - public BytesMessageInputStream(BytesMessage message) { - this.message = message; - } - - @Override - public int read() throws JMSExceptionWrapper { - try { - return message.readByte() & 0xFF; - } catch (MessageEOFException ex) { - return -1; - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } - - @Override - public int read(byte[] b, int off, int len) throws JMSExceptionWrapper { - if (off == 0) { - try { - return message.readBytes(b, len); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } else { - byte[] b2 = new byte[len]; - int c = read(b2); - if (c > 0) { - System.arraycopy(b2, 0, b, off, c); - } - return c; - } - } - - @Override - public int read(byte[] b) throws JMSExceptionWrapper { - try { - return message.readBytes(b); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java deleted file mode 100644 index 4508d68280..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java +++ /dev/null @@ -1,56 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.OutputStream; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; - -public class BytesMessageOutputStream extends OutputStream { - private final BytesMessage message; - - public BytesMessageOutputStream(BytesMessage message) { - this.message = message; - } - - @Override - public void write(int b) throws JMSExceptionWrapper { - try { - message.writeByte((byte)b); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } - - @Override - public void write(byte[] b, int off, int len) throws JMSExceptionWrapper { - try { - message.writeBytes(b, off, len); - } catch (JMSException ex) { - new JMSExceptionWrapper(ex); - } - } - - @Override - public void write(byte[] b) throws JMSExceptionWrapper { - try { - message.writeBytes(b); - } catch (JMSException ex) { - throw new JMSExceptionWrapper(ex); - } - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java deleted file mode 100644 index d5d164ce76..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java +++ /dev/null @@ -1,393 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.Hashtable; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.ParameterIncludeImpl; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Encapsulate a JMS Connection factory definition within an Axis2.xml - * - * JMS Connection Factory definitions, allows JNDI properties as well as other service - * level parameters to be defined, and re-used by each service that binds to it - * - * When used for sending messages out, the JMSConnectionFactory'ies are able to cache - * a Connection, Session or Producer - */ -public class JMSConnectionFactory { - - private static final Log log = LogFactory.getLog(JMSConnectionFactory.class); - - /** The name used for the connection factory definition within Axis2 */ - private String name = null; - /** The list of parameters from the axis2.xml definition */ - private Hashtable<String, String> parameters = new Hashtable<String, String>(); - - /** The cached InitialContext reference */ - private Context context = null; - /** The JMS ConnectionFactory this definition refers to */ - private ConnectionFactory conFactory = null; - /** The shared JMS Connection for this JMS connection factory */ - private Connection sharedConnection = null; - /** The shared JMS Session for this JMS connection factory */ - private Session sharedSession = null; - /** The shared JMS MessageProducer for this JMS connection factory */ - private MessageProducer sharedProducer = null; - /** The Shared Destination */ - private Destination sharedDestination = null; - /** The shared JMS connection for this JMS connection factory */ - private int cacheLevel = JMSConstants.CACHE_CONNECTION; - - /** - * Digest a JMS CF definition from an axis2.xml 'Parameter' and construct - * @param parameter the axis2.xml 'Parameter' that defined the JMS CF - */ - public JMSConnectionFactory(Parameter parameter) { - - this.name = parameter.getName(); - ParameterIncludeImpl pi = new ParameterIncludeImpl(); - - try { - pi.deserializeParameters((OMElement) parameter.getValue()); - } catch (AxisFault axisFault) { - handleException("Error reading parameters for JMS connection factory" + name, axisFault); - } - - for (Object o : pi.getParameters()) { - Parameter p = (Parameter) o; - parameters.put(p.getName(), (String) p.getValue()); - } - - digestCacheLevel(); - try { - context = new InitialContext(parameters); - conFactory = JMSUtils.lookup(context, ConnectionFactory.class, - parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)); - if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) { - sharedDestination = JMSUtils.lookup(context, Destination.class, - parameters.get(JMSConstants.PARAM_DESTINATION)); - } - log.info("JMS ConnectionFactory : " + name + " initialized"); - - } catch (NamingException e) { - throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " + - parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " + - parameters.get(JMSConstants.PARAM_DESTINATION) + - " for JMS CF : " + name + " using : " + parameters); - } - } - - /** - * Digest, the cache value iff specified - */ - private void digestCacheLevel() { - - String key = JMSConstants.PARAM_CACHE_LEVEL; - String val = parameters.get(key); - - if ("none".equalsIgnoreCase(val)) { - this.cacheLevel = JMSConstants.CACHE_NONE; - } else if ("connection".equalsIgnoreCase(val)) { - this.cacheLevel = JMSConstants.CACHE_CONNECTION; - } else if ("session".equals(val)){ - this.cacheLevel = JMSConstants.CACHE_SESSION; - } else if ("producer".equals(val)) { - this.cacheLevel = JMSConstants.CACHE_PRODUCER; - } else if (val != null) { - throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name); - } - } - - /** - * Return the name assigned to this JMS CF definition - * @return name of the JMS CF - */ - public String getName() { - return name; - } - - /** - * The list of properties (including JNDI and non-JNDI) - * @return properties defined on the JMS CF - */ - public Hashtable<String, String> getParameters() { - return parameters; - } - - /** - * Get cached InitialContext - * @return cache InitialContext - */ - public Context getContext() { - return context; - } - - /** - * Cache level applicable for this JMS CF - * @return applicable cache level - */ - public int getCacheLevel() { - return cacheLevel; - } - - /** - * Get the shared Destination - if defined - * @return - */ - public Destination getSharedDestination() { - return sharedDestination; - } - - /** - * Lookup a Destination using this JMS CF definitions and JNDI name - * @param name JNDI name of the Destionation - * @return JMS Destination for the given JNDI name or null - */ - public Destination getDestination(String name) { - try { - return JMSUtils.lookup(context, Destination.class, name); - } catch (NamingException e) { - handleException("Unknown JMS Destination : " + name + " using : " + parameters, e); - } - return null; - } - - /** - * Get the reply Destination from the PARAM_REPLY_DESTINATION parameter - * @return reply destination defined in the JMS CF - */ - public String getReplyToDestination() { - return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION); - } - - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - /** - * Should the JMS 1.1 API be used? - defaults to yes - * @return true, if JMS 1.1 api should be used - */ - public boolean isJmsSpec11() { - return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null || - "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER)); - } - - /** - * Return the type of the JMS CF Destination - * @return TRUE if a Queue, FALSE for a Topic and NULL for a JMS 1.1 Generic Destination - */ - public Boolean isQueue() { - if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null && - parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) { - return null; - } - - if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) { - if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { - return true; - } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { - return false; - } else { - throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " + - parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name); - } - } else { - if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { - return true; - } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { - return false; - } else { - throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " + - parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name); - } - } - } - - /** - * Is a session transaction requested from users of this JMS CF? - * @return session transaction required by the clients of this? - */ - private boolean isSessionTransacted() { - return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null || - Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED)); - } - - /** - * Create a new Connection - * @return a new Connection - */ - private Connection createConnection() { - - Connection connection = null; - try { - connection = JMSUtils.createConnection( - conFactory, - parameters.get(JMSConstants.PARAM_JMS_USERNAME), - parameters.get(JMSConstants.PARAM_JMS_PASSWORD), - isJmsSpec11(), isQueue()); - - if (log.isDebugEnabled()) { - log.debug("New JMS Connection from JMS CF : " + name + " created"); - } - - } catch (JMSException e) { - handleException("Error acquiring a Connection from the JMS CF : " + name + - " using properties : " + parameters, e); - } - return connection; - } - - /** - * Create a new Session - * @param connection Connection to use - * @return A new Session - */ - private Session createSession(Connection connection) { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS Session from JMS CF : " + name); - } - return JMSUtils.createSession( - connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue()); - - } catch (JMSException e) { - handleException("Error creating JMS session from JMS CF : " + name, e); - } - return null; - } - - /** - * Create a new MessageProducer - * @param session Session to be used - * @param destination Destination to be used - * @return a new MessageProducer - */ - private MessageProducer createProducer(Session session, Destination destination) { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS MessageProducer from JMS CF : " + name); - } - - return JMSUtils.createProducer( - session, destination, isQueue(), isJmsSpec11()); - - } catch (JMSException e) { - handleException("Error creating JMS producer from JMS CF : " + name,e); - } - return null; - } - - /** - * Get a new Connection or shared Connection from this JMS CF - * @return new or shared Connection from this JMS CF - */ - public Connection getConnection() { - if (cacheLevel > JMSConstants.CACHE_NONE) { - return getSharedConnection(); - } else { - return createConnection(); - } - } - - /** - * Get a new Session or shared Session from this JMS CF - * @param connection the Connection to be used - * @return new or shared Session from this JMS CF - */ - public Session getSession(Connection connection) { - if (cacheLevel > JMSConstants.CACHE_CONNECTION) { - return getSharedSession(); - } else { - return createSession((connection == null ? getConnection() : connection)); - } - } - - /** - * Get a new MessageProducer or shared MessageProducer from this JMS CF - * @param connection the Connection to be used - * @param session the Session to be used - * @param destination the Destination to bind MessageProducer to - * @return new or shared MessageProducer from this JMS CF - */ - public MessageProducer getMessageProducer( - Connection connection, Session session, Destination destination) { - if (cacheLevel > JMSConstants.CACHE_SESSION) { - return getSharedProducer(); - } else { - return createProducer((session == null ? getSession(connection) : session), destination); - } - } - - /** - * Get a new Connection or shared Connection from this JMS CF - * @return new or shared Connection from this JMS CF - */ - private Connection getSharedConnection() { - if (sharedConnection == null) { - sharedConnection = createConnection(); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS Connection for JMS CF : " + name); - } - } - return sharedConnection; - } - - /** - * Get a shared Session from this JMS CF - * @return shared Session from this JMS CF - */ - private Session getSharedSession() { - if (sharedSession == null) { - sharedSession = createSession(getSharedConnection()); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS Session for JMS CF : " + name); - } - } - return sharedSession; - } - - /** - * Get a shared MessageProducer from this JMS CF - * @return shared MessageProducer from this JMS CF - */ - private MessageProducer getSharedProducer() { - if (sharedProducer == null) { - sharedProducer = createProducer(getSharedSession(), sharedDestination); - if (log.isDebugEnabled()) { - log.debug("Created shared JMS MessageConsumer for JMS CF : " + name); - } - } - return sharedProducer; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java deleted file mode 100644 index fb16500efc..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java +++ /dev/null @@ -1,122 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.HashMap; -import java.util.Map; - -import javax.naming.Context; - -import org.apache.axis2.AxisFault; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.ParameterInclude; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Class managing a set of {@link JMSConnectionFactory} objects. - */ -public class JMSConnectionFactoryManager { - - private static final Log log = LogFactory.getLog(JMSConnectionFactoryManager.class); - - /** A Map containing the JMS connection factories managed by this, keyed by name */ - private final Map<String,JMSConnectionFactory> connectionFactories = - new HashMap<String,JMSConnectionFactory>(); - - /** - * Construct a Connection factory manager for the JMS transport sender or receiver - * @param trpInDesc - */ - public JMSConnectionFactoryManager(ParameterInclude trpInDesc) { - loadConnectionFactoryDefinitions(trpInDesc); - } - - /** - * Create JMSConnectionFactory instances for the definitions in the transport configuration, - * and add these into our collection of connectionFactories map keyed by name - * - * @param trpDesc the transport description for JMS - */ - private void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) { - - for (Object o : trpDesc.getParameters()) { - Parameter p = (Parameter)o; - try { - JMSConnectionFactory jmsConFactory = new JMSConnectionFactory(p); - connectionFactories.put(jmsConFactory.getName(), jmsConFactory); - } catch (AxisJMSException e) { - log.error("Error setting up connection factory : " + p.getName(), e); - } - } - } - - /** - * Get the JMS connection factory with the given name. - * - * @param name the name of the JMS connection factory - * @return the JMS connection factory or null if no connection factory with - * the given name exists - */ - public JMSConnectionFactory getJMSConnectionFactory(String name) { - return connectionFactories.get(name); - } - - /** - * Get the JMS connection factory that matches the given properties, i.e. referring to - * the same underlying connection factory. Used by the JMSSender to determine if already - * available resources should be used for outgoing messages - * - * @param props a Map of connection factory JNDI properties and name - * @return the JMS connection factory or null if no connection factory compatible - * with the given properties exists - */ - public JMSConnectionFactory getJMSConnectionFactory(Map<String,String> props) { - for (JMSConnectionFactory cf : connectionFactories.values()) { - Map<String,String> cfProperties = cf.getParameters(); - - if (equals(props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME), - cfProperties.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)) - && - equals(props.get(Context.INITIAL_CONTEXT_FACTORY), - cfProperties.get(Context.INITIAL_CONTEXT_FACTORY)) - && - equals(props.get(Context.PROVIDER_URL), - cfProperties.get(Context.PROVIDER_URL)) - && - equals(props.get(Context.SECURITY_PRINCIPAL), - cfProperties.get(Context.SECURITY_PRINCIPAL)) - && - equals(props.get(Context.SECURITY_CREDENTIALS), - cfProperties.get(Context.SECURITY_CREDENTIALS))) { - return cf; - } - } - return null; - } - - /** - * Compare two values preventing NPEs - */ - private static boolean equals(Object s1, Object s2) { - return s1 == s2 || s1 != null && s1.equals(s2); - } - - protected void handleException(String msg, Exception e) throws AxisFault { - log.error(msg, e); - throw new AxisFault(msg, e); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java deleted file mode 100644 index 6a11201625..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java +++ /dev/null @@ -1,273 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import org.apache.axis2.client.Options; - -public class JMSConstants { - - /** - * The prefix indicating an Axis JMS URL - */ - public static final String JMS_PREFIX = "jms:/"; - - //------------------------------------ defaults / constants ------------------------------------ - /** - * The local (Axis2) JMS connection factory name of the default connection - * factory to be used, if a service does not explicitly state the connection - * factory it should be using by a Parameter named JMSConstants.CONFAC_PARAM - */ - public static final String DEFAULT_CONFAC_NAME = "default"; - /** - * The default JMS time out waiting for a reply - also see {@link JMS_WAIT_REPLY} - */ - public static final long DEFAULT_JMS_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS; - /** - * Value indicating a Queue used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} - */ - public static final String DESTINATION_TYPE_QUEUE = "queue"; - /** - * Value indicating a Topic used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} - */ - public static final String DESTINATION_TYPE_TOPIC = "topic"; - /** - * Value indicating a JMS 1.1 Generic Destination used by {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} - */ - public static final String DESTINATION_TYPE_GENERIC = "generic"; - - /** Do not cache any JMS resources between tasks (when sending) or JMS CF's (when sending) */ - public static final int CACHE_NONE = 0; - /** Cache only the JMS connection between tasks (when receiving), or JMS CF's (when sending)*/ - public static final int CACHE_CONNECTION = 1; - /** Cache only the JMS connection and Session between tasks (receiving), or JMS CF's (sending) */ - public static final int CACHE_SESSION = 2; - /** Cache the JMS connection, Session and Consumer between tasks when receiving*/ - public static final int CACHE_CONSUMER = 3; - /** Cache the JMS connection, Session and Producer within a JMSConnectionFactory when sending */ - public static final int CACHE_PRODUCER = 4; - /** automatic choice of an appropriate caching level (depending on the transaction strategy) */ - public static final int CACHE_AUTO = 5; - - /** A JMS 1.1 Generic Destination type or ConnectionFactory */ - public static final int GENERIC = 0; - /** A Queue Destination type or ConnectionFactory */ - public static final int QUEUE = 1; - /** A Topic Destination type or ConnectionFactory */ - public static final int TOPIC = 2; - - /** - * The EPR parameter name indicating the name of the message level property that indicated the content type. - */ - public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty"; - - //---------------------------------- services.xml parameters ----------------------------------- - /** - * The Service level Parameter name indicating the JMS destination for requests of a service - */ - public static final String PARAM_DESTINATION = "transport.jms.Destination"; - /** - * The Service level Parameter name indicating the destination type for requests. - * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC} - */ - public static final String PARAM_DEST_TYPE = "transport.jms.DestinationType"; - /** - * The Service level Parameter name indicating the [default] response destination of a service - */ - public static final String PARAM_REPLY_DESTINATION = "transport.jms.ReplyDestination"; - /** - * The Service level Parameter name indicating the response destination type - * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC} - */ - public static final String PARAM_REPLY_DEST_TYPE = "transport.jms.ReplyDestinationType"; - /** - * The Parameter name of an Axis2 service, indicating the JMS connection - * factory which should be used to listen for messages for it. This is - * the local (Axis2) name of the connection factory and not the JNDI name - */ - public static final String PARAM_JMS_CONFAC = "transport.jms.ConnectionFactory"; - /** - * Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC - */ - public static final String PARAM_CONFAC_TYPE = "transport.jms.ConnectionFactoryType"; - /** - * The Parameter name indicating the JMS connection factory JNDI name - */ - public static final String PARAM_CONFAC_JNDI_NAME = "transport.jms.ConnectionFactoryJNDIName"; - /** - * The Parameter indicating the expected content type for messages received by the service. - */ - public static final String CONTENT_TYPE_PARAM = "transport.jms.ContentType"; - /** - * The Parameter indicating a final EPR as a String, to be published on the WSDL of a service - * Could occur more than once, and could provide additional connection properties or a subset - * of the properties auto computed. Also could replace IP addresses with hostnames, and expose - * public credentials clients. If a user specified this parameter, the auto generated EPR will - * not be exposed - unless an instance of this parameter is added with the string "legacy" - * This parameter could be used to expose EPR's conforming to the proposed SOAP/JMS spec - * until such time full support is implemented for it. - */ - public static final String PARAM_PUBLISH_EPR = "transport.jms.PublishEPR"; - /** The parameter indicating the JMS API specification to be used - if this is "1.1" the JMS - * 1.1 API would be used, else the JMS 1.0.2B - */ - public static final String PARAM_JMS_SPEC_VER = "transport.jms.JMSSpecVersion"; - - /** - * The Parameter indicating whether the JMS Session should be transacted for the service - * Specified as a "true" or "false" - */ - public static final String PARAM_SESSION_TRANSACTED = "transport.jms.SessionTransacted"; - /** - * The Parameter indicating the Session acknowledgement for the service. Must be one of the - * following Strings, or the appropriate Integer used by the JMS API - * "AUTO_ACKNOWLEDGE", "CLIENT_ACKNOWLEDGE", "DUPS_OK_ACKNOWLEDGE" or "SESSION_TRANSACTED" - */ - public static final String PARAM_SESSION_ACK = "transport.jms.SessionAcknowledgement"; - /** A message selector to be used when messages are sought for this service */ - public static final String PARAM_MSG_SELECTOR = "transport.jms.MessageSelector"; - /** Is the Subscription durable ? - "true" or "false" See {@link PARAM_DURABLE_SUB_NAME} */ - public static final String PARAM_SUB_DURABLE = "transport.jms.SubscriptionDurable"; - /** The name for the durable subscription See {@link PARAM_SUB_DURABLE}*/ - public static final String PARAM_DURABLE_SUB_NAME = "transport.jms.DurableSubscriberName"; - /** - * JMS Resource cachable level to be used for the service One of the following: - * {@link CACHE_NONE}, {@link CACHE_CONNECTION}, {@link CACHE_SESSION}, {@link CACHE_PRODUCER}, - * {@link CACHE_CONSUMER}, or {@link CACHE_AUTO} - to let the transport decide - */ - public static final String PARAM_CACHE_LEVEL = "transport.jms.CacheLevel"; - /** Should a pub-sub connection receive messages published by itself? */ - public static final String PARAM_PUBSUB_NO_LOCAL = "transport.jms.PubSubNoLocal"; - /** - * The number of milliseconds to wait for a message on a consumer.receive() call - * negative number - wait forever - * 0 - do not wait at all - * positive number - indicates the number of milliseconds to wait - */ - public static final String PARAM_RCV_TIMEOUT = "transport.jms.ReceiveTimeout"; - /** - *The number of concurrent consumers to be created to poll for messages for this service - * For Topics, this should be ONE, to prevent receipt of multiple copies of the same message - */ - public static final String PARAM_CONCURRENT_CONSUMERS = "transport.jms.ConcurrentConsumers"; - /** - * The maximum number of concurrent consumers for the service - See {@link PARAM_CONCURRENT_CONSUMERS} - */ - public static final String PARAM_MAX_CONSUMERS = "transport.jms.MaxConcurrentConsumers"; - /** - * The number of idle (i.e. message-less) polling attempts before a worker task commits suicide, - * to scale down resources, as load decreases - */ - public static final String PARAM_IDLE_TASK_LIMIT = "transport.jms.IdleTaskLimit"; - /** - * The maximum number of messages a polling worker task should process, before suicide - to - * prevent many longer running threads - default is unlimited (i.e. a worker task will live forever) - */ - public static final String PARAM_MAX_MSGS_PER_TASK = "transport.jms.MaxMessagesPerTask"; - /** - * Number of milliseconds before the first reconnection attempt is tried, on detection of an - * error. Subsequent retries follow a geometric series, where the - * duration = previous duration * factor - * This is further limited by the {@link PARAM_RECON_MAX_DURATION} to be meaningful - */ - public static final String PARAM_RECON_INIT_DURATION = "transport.jms.InitialReconnectDuration"; - /** @see PARAM_RECON_INIT_DURATION */ - public static final String PARAM_RECON_FACTOR = "transport.jms.ReconnectProgressFactor"; - /** @see PARAM_RECON_INIT_DURATION */ - public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration"; - - /** The username to use when obtaining a JMS Connection */ - public static final String PARAM_JMS_USERNAME = "transport.jms.UserName"; - /** The password to use when obtaining a JMS Connection */ - public static final String PARAM_JMS_PASSWORD = "transport.jms.Password"; - - //-------------- message context / transport header properties and client options -------------- - /** - * A MessageContext property or client Option indicating the JMS message type - */ - public static final String JMS_MESSAGE_TYPE = "JMS_MESSAGE_TYPE"; - /** - * The message type indicating a BytesMessage. See {@link JMS_MESSAGE_TYPE} - */ - public static final String JMS_BYTE_MESSAGE = "JMS_BYTE_MESSAGE"; - /** - * The message type indicating a TextMessage. See {@link JMS_MESSAGE_TYPE} - */ - public static final String JMS_TEXT_MESSAGE = "JMS_TEXT_MESSAGE"; - /** - * A MessageContext property or client Option indicating the time to wait for a response JMS message - */ - public static final String JMS_WAIT_REPLY = "JMS_WAIT_REPLY"; - /** - * A MessageContext property or client Option indicating the JMS correlation id - */ - public static final String JMS_COORELATION_ID = "JMS_COORELATION_ID"; - /** - * A MessageContext property or client Option indicating the JMS message id - */ - public static final String JMS_MESSAGE_ID = "JMS_MESSAGE_ID"; - /** - * A MessageContext property or client Option indicating the JMS delivery mode as an Integer or String - * Value 1 - javax.jms.DeliveryMode.NON_PERSISTENT - * Value 2 - javax.jms.DeliveryMode.PERSISTENT - */ - public static final String JMS_DELIVERY_MODE = "JMS_DELIVERY_MODE"; - /** - * A MessageContext property or client Option indicating the JMS destination to use on a Send - */ - public static final String JMS_DESTINATION = "JMS_DESTINATION"; - /** - * A MessageContext property or client Option indicating the JMS message expiration - a Long value - * specified as a String - */ - public static final String JMS_EXPIRATION = "JMS_EXPIRATION"; - /** - * A MessageContext property indicating if the message is a redelivery (Boolean as a String) - */ - public static final String JMS_REDELIVERED = "JMS_REDELIVERED"; - /** - * A MessageContext property or client Option indicating the JMS replyTo Destination - */ - public static final String JMS_REPLY_TO = "JMS_REPLY_TO"; - /** - * A MessageContext property or client Option indicating the JMS replyTo Destination type - * See {@link DESTINATION_TYPE_QUEUE} and {@link DESTINATION_TYPE_TOPIC} - */ - public static final String JMS_REPLY_TO_TYPE = "JMS_REPLY_TO_TYPE"; - /** - * A MessageContext property or client Option indicating the JMS timestamp (Long specified as String) - */ - public static final String JMS_TIMESTAMP = "JMS_TIMESTAMP"; - /** - * A MessageContext property indicating the JMS type String returned by {@link javax.jms.Message.getJMSType()} - */ - public static final String JMS_TYPE = "JMS_TYPE"; - /** - * A MessageContext property or client Option indicating the JMS priority - */ - public static final String JMS_PRIORITY = "JMS_PRIORITY"; - /** - * A MessageContext property or client Option indicating the JMS time to live for message sent - */ - public static final String JMS_TIME_TO_LIVE = "JMS_TIME_TO_LIVE"; - - /** The prefix that denotes JMSX properties */ - public static final String JMSX_PREFIX = "JMSX"; - /** The JMSXGroupID property */ - public static final String JMSX_GROUP_ID = "JMSXGroupID"; - /** The JMSXGroupSeq property */ - public static final String JMSX_GROUP_SEQ = "JMSXGroupSeq"; - -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java deleted file mode 100644 index c465b1d989..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java +++ /dev/null @@ -1,111 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet; - -/** - * Class that links an Axis2 service to a JMS destination. Additionally, it contains - * all the required information to process incoming JMS messages and to inject them - * into Axis2. - */ -public class JMSEndpoint { - private JMSConnectionFactory cf; - private AxisService service; - private String jndiDestinationName; - private int destinationType = JMSConstants.GENERIC; - private Set<EndpointReference> endpointReferences = new HashSet<EndpointReference>(); - private ContentTypeRuleSet contentTypeRuleSet; - - public AxisService getService() { - return service; - } - - public void setService(AxisService service) { - this.service = service; - } - - public String getServiceName() { - return service.getName(); - } - - public String getJndiDestinationName() { - return jndiDestinationName; - } - - public void setJndiDestinationName(String destinationJNDIName) { - this.jndiDestinationName = destinationJNDIName; - } - - public void setDestinationType(String destinationType) { - if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) { - this.destinationType = JMSConstants.TOPIC; - } else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) { - this.destinationType = JMSConstants.QUEUE; - } else { - this.destinationType = JMSConstants.GENERIC; - } - } - - public EndpointReference[] getEndpointReferences() { - return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]); - } - - public void computeEPRs() { - List<EndpointReference> eprs = new ArrayList<EndpointReference>(); - for (Object o : getService().getParameters()) { - Parameter p = (Parameter) o; - if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) { - if ("legacy".equalsIgnoreCase((String) p.getValue())) { - // if "legacy" specified, compute and replace it - endpointReferences.add( - new EndpointReference(JMSUtils.getEPR(cf, destinationType, this))); - } else { - endpointReferences.add(new EndpointReference((String) p.getValue())); - } - } - } - - if (eprs.isEmpty()) { - // if nothing specified, compute and return legacy EPR - endpointReferences.add(new EndpointReference(JMSUtils.getEPR(cf, destinationType, this))); - } - } - - public ContentTypeRuleSet getContentTypeRuleSet() { - return contentTypeRuleSet; - } - - public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) { - this.contentTypeRuleSet = contentTypeRuleSet; - } - - public JMSConnectionFactory getCf() { - return cf; - } - - public void setCf(JMSConnectionFactory cf) { - this.cf = cf; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java deleted file mode 100644 index ceeec4a6a3..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java +++ /dev/null @@ -1,28 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.IOException; - -import javax.jms.JMSException; - -public class JMSExceptionWrapper extends IOException { - private static final long serialVersionUID = 852441109009079511L; - - public JMSExceptionWrapper(JMSException ex) { - initCause(ex); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java deleted file mode 100644 index 8c9f66dfbf..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java +++ /dev/null @@ -1,294 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.BytesMessage; -import javax.jms.TextMessage; - -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.TransportInDescription; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleFactory; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.MessageTypeRule; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.PropertyRule; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportListener; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorListener; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSource; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSourceSupport; - -/** - * The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances - * for each service requesting exposure over JMS, and stops these if they are undeployed / stopped. - * <p> - * A service indicates a JMS Connection factory definition by name, which would be defined in the - * JMSListner on the axis2.xml, and this provides a way to reuse common configuration between - * services, as well as to optimize resources utilized - * <p> - * If the connection factory name was not specified, it will default to the one named "default" - * {@see JMSConstants.DEFAULT_CONFAC_NAME} - * <p> - * If a destination JNDI name is not specified, a service will expect to use a Queue with the same - * JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify - * many more detailed control options. See package documentation for more details - * <p> - * All Destinations / JMS Administered objects used MUST be pre-created or already available - */ -public class JMSListener extends AbstractTransportListener implements ManagementSupport, - TransportErrorSource { - - public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; - - /** The JMSConnectionFactoryManager which centralizes the management of defined factories */ - private JMSConnectionFactoryManager connFacManager; - /** A Map of service name to the JMS endpoints */ - private Map<String,JMSEndpoint> serviceNameToEndpointMap = new HashMap<String,JMSEndpoint>(); - /** A Map of service name to its ServiceTaskManager instances */ - private Map<String, ServiceTaskManager> serviceNameToSTMMap = - new HashMap<String, ServiceTaskManager>(); - private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this); - - /** - * TransportListener initialization - * - * @param cfgCtx the Axis configuration context - * @param trpInDesc the TransportIn description - */ - public void init(ConfigurationContext cfgCtx, - TransportInDescription trpInDesc) throws AxisFault { - - super.init(cfgCtx, trpInDesc); - connFacManager = new JMSConnectionFactoryManager(trpInDesc); - log.info("JMS Transport Receiver/Listener initialized..."); - } - - /** - * Returns EPRs for the given service over the JMS transport - * - * @param serviceName service name - * @return the JMS EPRs for the service - */ - public EndpointReference[] getEPRsForService(String serviceName) { - //Strip out the operation name - if (serviceName.indexOf('/') != -1) { - serviceName = serviceName.substring(0, serviceName.indexOf('/')); - } - // strip out the endpoint name if present - if (serviceName.indexOf('.') != -1) { - serviceName = serviceName.substring(0, serviceName.indexOf('.')); - } - JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName); - if (endpoint != null) { - return endpoint.getEndpointReferences(); - } else { - return null; - } - } - - /** - * Listen for JMS messages on behalf of the given service - * - * @param service the Axis service for which to listen for messages - */ - protected void startListeningForService(AxisService service) throws AxisFault { - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - throw new AxisFault("The service doesn't specify a JMS connection factory or refers " + - "to an invalid factory."); - } - - JMSEndpoint endpoint = new JMSEndpoint(); - endpoint.setService(service); - endpoint.setCf(cf); - - Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION); - if (destParam != null) { - endpoint.setJndiDestinationName((String)destParam.getValue()); - } else { - // Assume that the JNDI destination name is the same as the service name - endpoint.setJndiDestinationName(service.getName()); - } - - Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE); - if (destTypeParam != null) { - String paramValue = (String) destTypeParam.getValue(); - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || - JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) { - endpoint.setDestinationType(paramValue); - } else { - throw new AxisFault("Invalid destinaton type value " + paramValue); - } - } else { - log.debug("JMS destination type not given. default queue"); - endpoint.setDestinationType(JMSConstants.DESTINATION_TYPE_QUEUE); - } - - Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM); - if (contentTypeParam == null) { - ContentTypeRuleSet contentTypeRuleSet = new ContentTypeRuleSet(); - contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE)); - contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream")); - contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain")); - endpoint.setContentTypeRuleSet(contentTypeRuleSet); - } else { - endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam)); - } - - endpoint.computeEPRs(); // compute service EPR and keep for later use - serviceNameToEndpointMap.put(service.getName(), endpoint); - - ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool); - stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint)); - stm.start(); - serviceNameToSTMMap.put(service.getName(), stm); - - for (int i=0; i<3; i++) { - if (stm.getActiveTaskCount() > 0) { - log.info("Started to listen on destination : " + stm.getDestinationJNDIName() + - " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) + - " for service " + stm.getServiceName()); - return; - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) {} - } - - log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() + - " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) + - " for service " + stm.getServiceName() + " have not yet started after 3 seconds .."); - } - - /** - * Stops listening for messages for the service thats undeployed or stopped - * - * @param service the service that was undeployed or stopped - */ - protected void stopListeningForService(AxisService service) { - - ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName()); - if (stm != null) { - if (log.isDebugEnabled()) { - log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() + - " for service : " + stm.getServiceName()); - } - - stm.stop(); - - serviceNameToSTMMap.remove(service.getName()); - serviceNameToEndpointMap.remove(service.getName()); - log.info("Stopped listening for JMS messages to service : " + service.getName()); - - } else { - log.error("Unable to stop service : " + service.getName() + - " - unable to find its ServiceTaskManager"); - } - } - /** - * Return the connection factory name for this service. If this service - * refers to an invalid factory or defaults to a non-existent default - * factory, this returns null - * - * @param service the AxisService - * @return the JMSConnectionFactory to be used, or null if reference is invalid - */ - public JMSConnectionFactory getConnectionFactory(AxisService service) { - - Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC); - // validate connection factory name (specified or default) - if (conFacParam != null) { - return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue()); - } else { - return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME); - } - } - - // -- jmx/management methods-- - /** - * Pause the listener - Stop accepting/processing new messages, but continues processing existing - * messages until they complete. This helps bring an instance into a maintenence mode - * @throws AxisFault on error - */ - public void pause() throws AxisFault { - if (state != BaseConstants.STARTED) return; - try { - for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { - stm.pause(); - } - state = BaseConstants.PAUSED; - log.info("Listener paused"); - } catch (AxisJMSException e) { - log.error("At least one service could not be paused", e); - } - } - - /** - * Resume the lister - Brings the lister into active mode back from a paused state - * @throws AxisFault on error - */ - public void resume() throws AxisFault { - if (state != BaseConstants.PAUSED) return; - try { - for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { - stm.resume(); - } - state = BaseConstants.STARTED; - log.info("Listener resumed"); - } catch (AxisJMSException e) { - log.error("At least one service could not be resumed", e); - } - } - - /** - * Stop processing new messages, and wait the specified maximum time for in-flight - * requests to complete before a controlled shutdown for maintenence - * - * @param millis a number of milliseconds to wait until pending requests are allowed to complete - * @throws AxisFault on error - */ - public void maintenenceShutdown(long millis) throws AxisFault { - if (state != BaseConstants.STARTED) return; - try { - long start = System.currentTimeMillis(); - stop(); - state = BaseConstants.STOPPED; - log.info("Listener shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s"); - } catch (Exception e) { - handleException("Error shutting down the listener for maintenence", e); - } - } - - public void addErrorListener(TransportErrorListener listener) { - tess.addErrorListener(listener); - } - - public void removeErrorListener(TransportErrorListener listener) { - tess.removeErrorListener(listener); - } - - void error(AxisService service, Throwable ex) { - tess.error(service, ex); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java deleted file mode 100644 index ebd67e53e1..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java +++ /dev/null @@ -1,237 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; -import javax.transaction.UserTransaction; -import javax.xml.namespace.QName; - -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.AxisOperation; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeInfo; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector; - -/** - * This is the JMS message receiver which is invoked when a message is received. This processes - * the message through the engine - */ -public class JMSMessageReceiver { - - private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); - - /** The JMSListener */ - private JMSListener jmsListener = null; - /** A reference to the JMS Connection Factory */ - private JMSConnectionFactory jmsConnectionFactory = null; - /** The JMS metrics collector */ - private MetricsCollector metrics = null; - /** The endpoint this message receiver is bound to */ - final JMSEndpoint endpoint; - - /** - * Create a new JMSMessage receiver - * - * @param jmsListener the JMS transport Listener - * @param jmsConFac the JMS connection factory we are associated with - * @param workerPool the worker thread pool to be used - * @param cfgCtx the axis ConfigurationContext - * @param serviceName the name of the Axis service - * @param endpoint the JMSEndpoint definition to be used - */ - JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) { - this.jmsListener = jmsListener; - this.jmsConnectionFactory = jmsConFac; - this.endpoint = endpoint; - this.metrics = jmsListener.getMetricsCollector(); - } - - /** - * Process a new message received - * - * @param message the JMS message received - * @param ut UserTransaction which was used to receive the message - * @return true if caller should commit - */ - public boolean onMessage(Message message, UserTransaction ut) { - - try { - if (log.isDebugEnabled()) { - StringBuffer sb = new StringBuffer(); - sb.append("Received new JMS message for service :").append(endpoint.getServiceName()); - sb.append("\nDestination : ").append(message.getJMSDestination()); - sb.append("\nMessage ID : ").append(message.getJMSMessageID()); - sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID()); - sb.append("\nReplyTo : ").append(message.getJMSReplyTo()); - sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered()); - sb.append("\nPriority : ").append(message.getJMSPriority()); - sb.append("\nExpiration : ").append(message.getJMSExpiration()); - sb.append("\nTimestamp : ").append(message.getJMSTimestamp()); - sb.append("\nMessage Type : ").append(message.getJMSType()); - sb.append("\nPersistent ? : ").append( - DeliveryMode.PERSISTENT == message.getJMSDeliveryMode()); - - log.debug(sb.toString()); - if (log.isTraceEnabled() && message instanceof TextMessage) { - log.trace("\nMessage : " + ((TextMessage) message).getText()); - } - } - } catch (JMSException e) { - if (log.isDebugEnabled()) { - log.debug("Error reading JMS message headers for debug logging", e); - } - } - - // update transport level metrics - try { - metrics.incrementBytesReceived(JMSUtils.getMessageSize(message)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - - // has this message already expired? expiration time == 0 means never expires - try { - long expiryTime = message.getJMSExpiration(); - if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) { - if (log.isDebugEnabled()) { - log.debug("Discard expired message with ID : " + message.getJMSMessageID()); - } - return true; - } - } catch (JMSException ignore) {} - - - boolean successful = false; - try { - successful = processThoughEngine(message, ut); - - } catch (JMSException e) { - log.error("JMS Exception encountered while processing", e); - } catch (AxisFault e) { - log.error("Axis fault processing message", e); - } catch (Exception e) { - log.error("Unknown error processing message", e); - - } finally { - if (successful) { - metrics.incrementMessagesReceived(); - } else { - metrics.incrementFaultsReceiving(); - } - } - - return successful; - } - - /** - * Process the new message through Axis2 - * - * @param message the JMS message - * @param ut the UserTransaction used for receipt - * @return true if the caller should commit - * @throws JMSException, on JMS exceptions - * @throws AxisFault on Axis2 errors - */ - private boolean processThoughEngine(Message message, UserTransaction ut) - throws JMSException, AxisFault { - - MessageContext msgContext = jmsListener.createMessageContext(); - - // set the JMS Message ID as the Message ID of the MessageContext - try { - msgContext.setMessageID(message.getJMSMessageID()); - msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); - } catch (JMSException ignore) {} - - String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION); - - AxisService service = endpoint.getService(); - msgContext.setAxisService(service); - - // find the operation for the message, or default to one - Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); - QName operationQName = ( - operationParam != null ? - BaseUtils.getQNameFromString(operationParam.getValue()) : - BaseConstants.DEFAULT_OPERATION); - - AxisOperation operation = service.getOperation(operationQName); - if (operation != null) { - msgContext.setAxisOperation(operation); - msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); - } - - ContentTypeInfo contentTypeInfo = - endpoint.getContentTypeRuleSet().getContentTypeInfo(message); - if (contentTypeInfo == null) { - throw new AxisFault("Unable to determine content type for message " + - msgContext.getMessageID()); - } - - // set the message property OUT_TRANSPORT_INFO - // the reply is assumed to be over the JMSReplyTo destination, using - // the same incoming connection factory, if a JMSReplyTo is available - Destination replyTo = message.getJMSReplyTo(); - if (replyTo == null) { - // does the service specify a default reply destination ? - Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION); - if (param != null && param.getValue() != null) { - replyTo = jmsConnectionFactory.getDestination((String) param.getValue()); - } - - } - if (replyTo != null) { - msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, - new JMSOutTransportInfo(jmsConnectionFactory, replyTo, - contentTypeInfo.getPropertyName())); - } - - JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType()); - if (ut != null) { - msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut); - } - - try { - jmsListener.handleIncomingMessage( - msgContext, - JMSUtils.getTransportHeaders(message), - soapAction, - contentTypeInfo.getContentType()); - - } finally { - - Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY); - if (o != null) { - if ((o instanceof Boolean && ((Boolean) o)) || - (o instanceof String && Boolean.valueOf((String) o))) { - return false; - } - } - return true; - } - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java deleted file mode 100644 index 01fdee77dd..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.QueueSender; -import javax.jms.Session; -import javax.jms.TopicPublisher; -import javax.transaction.UserTransaction; - -import org.apache.axis2.context.MessageContext; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; - -/** - * Performs the actual sending of a JMS message, and the subsequent committing of a JTA transaction - * (if requested) or the local session transaction, if used. An instance of this class is unique - * to a single message send out operation and will not be shared. - */ -public class JMSMessageSender { - - private static final Log log = LogFactory.getLog(JMSMessageSender.class); - - /** The Connection to be used to send out */ - private Connection connection = null; - /** The Session to be used to send out */ - private Session session = null; - /** The MessageProducer used */ - private MessageProducer producer = null; - /** Target Destination */ - private Destination destination = null; - /** The level of cachability for resources */ - private int cacheLevel = JMSConstants.CACHE_CONNECTION; - /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */ - private boolean jmsSpec11 = true; - /** Are we sending to a Queue ? */ - private Boolean isQueue = null; - - /** - * This is a low-end method to support the one-time sends using JMS 1.0.2b - * @param connection the JMS Connection - * @param session JMS Session - * @param producer the MessageProducer - * @param destination the JMS Destination - * @param cacheLevel cacheLevel - None | Connection | Session | Producer - * @param jmsSpec11 true if the JMS 1.1 API should be used - * @param isQueue posting to a Queue? - */ - public JMSMessageSender(Connection connection, Session session, MessageProducer producer, - Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) { - - this.connection = connection; - this.session = session; - this.producer = producer; - this.destination = destination; - this.cacheLevel = cacheLevel; - this.jmsSpec11 = jmsSpec11; - this.isQueue = isQueue; - } - - /** - * Create a JMSSender using a JMSConnectionFactory and target EPR - * - * @param jmsConnectionFactory the JMSConnectionFactory - * @param targetAddress target EPR - */ - public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) { - - if (jmsConnectionFactory != null) { - this.cacheLevel = jmsConnectionFactory.getCacheLevel(); - this.jmsSpec11 = jmsConnectionFactory.isJmsSpec11(); - this.connection = jmsConnectionFactory.getConnection(); - this.session = jmsConnectionFactory.getSession(connection); - this.destination = - jmsConnectionFactory.getSharedDestination() == null ? - jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) : - jmsConnectionFactory.getSharedDestination(); - this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination); - - } else { - JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress); - jmsOut.loadConnectionFactoryFromProperies(); - } - } - - /** - * Perform actual send of JMS message to the Destination selected - * - * @param message the JMS message - * @param msgCtx the Axis2 MessageContext - */ - public void send(Message message, MessageContext msgCtx) { - - Boolean jtaCommit = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND); - Boolean rollbackOnly = getBooleanProperty(msgCtx, BaseConstants.SET_ROLLBACK_ONLY); - Boolean persistent = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE); - Integer priority = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY); - Integer timeToLive = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE); - - // Do not commit, if message is marked for rollback - if (rollbackOnly != null && rollbackOnly) { - jtaCommit = Boolean.FALSE; - } - - if (persistent != null) { - try { - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - } catch (JMSException e) { - handleException("Error setting JMS Producer for PERSISTENT delivery", e); - } - } - if (priority != null) { - try { - producer.setPriority(priority); - } catch (JMSException e) { - handleException("Error setting JMS Producer priority to : " + priority, e); - } - } - if (timeToLive != null) { - try { - producer.setTimeToLive(timeToLive); - } catch (JMSException e) { - handleException("Error setting JMS Producer TTL to : " + timeToLive, e); - } - } - - boolean sendingSuccessful = false; - // perform actual message sending - try { - if (jmsSpec11 || isQueue == null) { - producer.send(message); - - } else { - if (isQueue) { - ((QueueSender) producer).send(message); - - } else { - ((TopicPublisher) producer).publish(message); - } - } - - // set the actual MessageID to the message context for use by any others down the line - String msgId = null; - try { - msgId = message.getJMSMessageID(); - if (msgId != null) { - msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId); - } - } catch (JMSException ignore) {} - - sendingSuccessful = true; - - if (log.isDebugEnabled()) { - log.debug("Sent Message Context ID : " + msgCtx.getMessageID() + - " with JMS Message ID : " + msgId + - " to destination : " + producer.getDestination()); - } - - } catch (JMSException e) { - log.error("Error sending message with MessageContext ID : " + - msgCtx.getMessageID() + " to destination : " + destination, e); - - } finally { - - if (jtaCommit != null) { - - UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION); - if (ut != null) { - - try { - if (sendingSuccessful && jtaCommit) { - ut.commit(); - } else { - ut.rollback(); - } - msgCtx.removeProperty(BaseConstants.USER_TRANSACTION); - - if (log.isDebugEnabled()) { - log.debug((sendingSuccessful ? "Committed" : "Rolled back") + - " JTA Transaction"); - } - - } catch (Exception e) { - handleException("Error committing/rolling back JTA transaction after " + - "sending of message with MessageContext ID : " + msgCtx.getMessageID() + - " to destination : " + destination, e); - } - } - - } else { - try { - if (session.getTransacted()) { - if (sendingSuccessful && (rollbackOnly == null || !rollbackOnly)) { - session.commit(); - } else { - session.rollback(); - } - } - - if (log.isDebugEnabled()) { - log.debug((sendingSuccessful ? "Committed" : "Rolled back") + - " local (JMS Session) Transaction"); - } - - } catch (JMSException e) { - handleException("Error committing/rolling back local (i.e. session) " + - "transaction after sending of message with MessageContext ID : " + - msgCtx.getMessageID() + " to destination : " + destination, e); - } - } - } - } - - /** - * Close non-shared producer, session and connection if any - */ - public void close() { - if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) { - try { - producer.close(); - } catch (JMSException e) { - log.error("Error closing JMS MessageProducer after send", e); - } finally { - producer = null; - } - } - - if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) { - try { - session.close(); - } catch (JMSException e) { - log.error("Error closing JMS Session after send", e); - } finally { - session = null; - } - } - - if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - connection.close(); - } catch (JMSException e) { - log.error("Error closing JMS Connection after send", e); - } finally { - connection = null; - } - } - } - - private void handleException(String message, Exception e) { - log.error(message, e); - throw new AxisJMSException(message, e); - } - - private Boolean getBooleanProperty(MessageContext msgCtx, String name) { - Object o = msgCtx.getProperty(name); - if (o != null) { - if (o instanceof Boolean) { - return (Boolean) o; - } else if (o instanceof String) { - return Boolean.valueOf((String) o); - } - } - return null; - } - - private Integer getIntegerProperty(MessageContext msgCtx, String name) { - Object o = msgCtx.getProperty(name); - if (o != null) { - if (o instanceof Integer) { - return (Integer) o; - } else if (o instanceof String) { - return Integer.parseInt((String) o); - } - } - return null; - } - - public void setConnection(Connection connection) { - this.connection = connection; - } - - public void setSession(Session session) { - this.session = session; - } - - public void setProducer(MessageProducer producer) { - this.producer = producer; - } - - public void setCacheLevel(int cacheLevel) { - this.cacheLevel = cacheLevel; - } - - public int getCacheLevel() { - return cacheLevel; - } - - public Connection getConnection() { - return connection; - } - - public MessageProducer getProducer() { - return producer; - } - - public Session getSession() { - return session; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java deleted file mode 100644 index 9e029b33e1..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java +++ /dev/null @@ -1,306 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.Hashtable; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; - -import org.apache.axis2.transport.OutTransportInfo; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; - -/** - * The JMS OutTransportInfo is a holder of information to send an outgoing message - * (e.g. a Response) to a JMS destination. Thus at a minimum a reference to a - * ConnectionFactory and a Destination are held - */ -public class JMSOutTransportInfo implements OutTransportInfo { - - private static final Log log = LogFactory.getLog(JMSOutTransportInfo.class); - - /** The naming context */ - private Context context; - /** - * this is a reference to the underlying JMS ConnectionFactory when sending messages - * through connection factories not defined at the TransportSender level - */ - private ConnectionFactory connectionFactory = null; - /** - * this is a reference to a JMS Connection Factory instance, which has a reference - * to the underlying actual connection factory, an open connection to the JMS provider - * and optionally a session already available for use - */ - private JMSConnectionFactory jmsConnectionFactory = null; - /** the Destination queue or topic for the outgoing message */ - private Destination destination = null; - /** the Destination queue or topic for the outgoing message - * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC - */ - private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC; - /** the Reply Destination queue or topic for the outgoing message */ - private Destination replyDestination = null; - /** the Reply Destination name */ - private String replyDestinationName = null; - /** the Reply Destination queue or topic for the outgoing message - * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC - */ - private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC; - /** the EPR properties when the out-transport info is generated from a target EPR */ - private Hashtable<String,String> properties = null; - /** the target EPR string where applicable */ - private String targetEPR = null; - /** the message property name that stores the content type of the outgoing message */ - private String contentTypeProperty; - - /** - * Creates an instance using the given JMS connection factory and destination - * - * @param jmsConnectionFactory the JMS connection factory - * @param dest the destination - * @param contentTypeProperty - */ - JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest, - String contentTypeProperty) { - this.jmsConnectionFactory = jmsConnectionFactory; - this.destination = dest; - destinationType = dest instanceof Topic ? JMSConstants.DESTINATION_TYPE_TOPIC - : JMSConstants.DESTINATION_TYPE_QUEUE; - this.contentTypeProperty = contentTypeProperty; - } - - /** - * Creates and instance using the given URL - * - * @param targetEPR the target EPR - */ - JMSOutTransportInfo(String targetEPR) { - - this.targetEPR = targetEPR; - if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) { - handleException("Invalid prefix for a JMS EPR : " + targetEPR); - - } else { - properties = BaseUtils.getEPRProperties(targetEPR); - String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE); - if (destinationType != null) { - setDestinationType(destinationType); - } - - String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE); - if (replyDestinationType != null) { - setReplyDestinationType(replyDestinationType); - } - - replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); - contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - try { - context = new InitialContext(properties); - } catch (NamingException e) { - handleException("Could not get an initial context using " + properties, e); - } - - destination = getDestination(context, targetEPR); - replyDestination = getReplyDestination(context, targetEPR); - } - } - - /** - * Provides a lazy load when created with a target EPR. This method performs actual - * lookup for the connection factory and destination - */ - public void loadConnectionFactoryFromProperies() { - if (properties != null) { - connectionFactory = getConnectionFactory(context, properties); - } - } - - /** - * Get the referenced ConnectionFactory using the properties from the context - * - * @param context the context to use for lookup - * @param props the properties which contains the JNDI name of the factory - * @return the connection factory - */ - private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) { - try { - - String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME); - if (conFacJndiName != null) { - return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName); - } else { - handleException("Connection Factory JNDI name cannot be determined"); - } - } catch (NamingException e) { - handleException("Failed to look up connection factory from JNDI", e); - } - return null; - } - - /** - * Get the JMS destination specified by the given URL from the context - * - * @param context the Context to lookup - * @param url URL - * @return the JMS destination, or null if it does not exist - */ - private Destination getDestination(Context context, String url) { - String destinationName = JMSUtils.getDestination(url); - try { - return JMSUtils.lookup(context, Destination.class, destinationName); - } catch (NameNotFoundException e) { - try { - return JMSUtils.lookup(context, Destination.class, - (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ? - "dynamicTopics/" : "dynamicQueues/") + destinationName); - } catch (NamingException x) { - handleException("Cannot locate destination : " + destinationName + " using " + url); - } - } catch (NamingException e) { - handleException("Cannot locate destination : " + destinationName + " using " + url, e); - } - return null; - } - - /** - * Get the JMS reply destination specified by the given URL from the context - * - * @param context the Context to lookup - * @param url URL - * @return the JMS destination, or null if it does not exist - */ - private Destination getReplyDestination(Context context, String url) { - String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); - if(replyDestinationName == null) { - return null; - } - - try { - return JMSUtils.lookup(context, Destination.class, replyDestinationName); - } catch (NameNotFoundException e) { - if (log.isDebugEnabled()) { - log.debug("Cannot locate destination : " + replyDestinationName + " using " + url); - } - } catch (NamingException e) { - handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e); - } - - return null; - } - - /** - * Look up for the given destination - * @param replyDest the JNDI name to lookup Destination required - * @return Destination for the JNDI name passed - */ - public Destination getReplyDestination(String replyDest) { - try { - return JMSUtils.lookup(jmsConnectionFactory.getContext(), Destination.class, - replyDest); - } catch (NameNotFoundException e) { - if (log.isDebugEnabled()) { - log.debug("Cannot locate reply destination : " + replyDest, e); - } - } catch (NamingException e) { - handleException("Cannot locate reply destination : " + replyDest, e); - } - return null; - } - - - private void handleException(String s) { - log.error(s); - throw new AxisJMSException(s); - } - - private void handleException(String s, Exception e) { - log.error(s, e); - throw new AxisJMSException(s, e); - } - - public Destination getDestination() { - return destination; - } - - public ConnectionFactory getConnectionFactory() { - return connectionFactory; - } - - public JMSConnectionFactory getJmsConnectionFactory() { - return jmsConnectionFactory; - } - - public void setContentType(String contentType) { - // this is a useless Axis2 method imposed by the OutTransportInfo interface :( - } - - public Hashtable<String,String> getProperties() { - return properties; - } - - public String getTargetEPR() { - return targetEPR; - } - - public String getDestinationType() { - return destinationType; - } - - public void setDestinationType(String destinationType) { - if (destinationType != null) { - this.destinationType = destinationType; - } - } - - public Destination getReplyDestination() { - return replyDestination; - } - - public void setReplyDestination(Destination replyDestination) { - this.replyDestination = replyDestination; - } - - public String getReplyDestinationType() { - return replyDestinationType; - } - - public void setReplyDestinationType(String replyDestinationType) { - this.replyDestinationType = replyDestinationType; - } - - public String getReplyDestinationName() { - return replyDestinationName; - } - - public void setReplyDestinationName(String replyDestinationName) { - this.replyDestinationName = replyDestinationName; - } - - public String getContentTypeProperty() { - return contentTypeProperty; - } - - public void setContentTypeProperty(String contentTypeProperty) { - this.contentTypeProperty = contentTypeProperty; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java deleted file mode 100644 index a5f77dc4c9..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java +++ /dev/null @@ -1,499 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.StringWriter; -import java.nio.charset.UnsupportedCharsetException; -import java.util.Map; - -import javax.activation.DataHandler; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMNode; -import org.apache.axiom.om.OMOutputFormat; -import org.apache.axiom.om.OMText; -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.TransportOutDescription; -import org.apache.axis2.transport.MessageFormatter; -import org.apache.axis2.transport.OutTransportInfo; -import org.apache.axis2.transport.TransportUtils; -import org.apache.axis2.transport.http.HTTPConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportSender; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams.WriterOutputStream; - -/** - * The TransportSender for JMS - */ -public class JMSSender extends AbstractTransportSender implements ManagementSupport { - - public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; - - /** The JMS connection factory manager to be used when sending messages out */ - private JMSConnectionFactoryManager connFacManager; - - /** - * Initialize the transport sender by reading pre-defined connection factories for - * outgoing messages. - * - * @param cfgCtx the configuration context - * @param transportOut the transport sender definition from axis2.xml - * @throws AxisFault on error - */ - public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { - super.init(cfgCtx, transportOut); - connFacManager = new JMSConnectionFactoryManager(transportOut); - log.info("JMS Transport Sender initialized..."); - } - - /** - * Get corresponding JMS connection factory defined within the transport sender for the - * transport-out information - usually constructed from a targetEPR - * - * @param trpInfo the transport-out information - * @return the corresponding JMS connection factory, if any - */ - private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { - Map<String,String> props = trpInfo.getProperties(); - if (trpInfo.getProperties() != null) { - String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); - if (jmsConnectionFactoryName != null) { - return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); - } else { - return connFacManager.getJMSConnectionFactory(props); - } - } else { - return null; - } - } - - /** - * Performs the actual sending of the JMS message - */ - public void sendMessage(MessageContext msgCtx, String targetAddress, - OutTransportInfo outTransportInfo) throws AxisFault { - - JMSConnectionFactory jmsConnectionFactory = null; - JMSOutTransportInfo jmsOut = null; - JMSMessageSender messageSender = null; - - if (targetAddress != null) { - - jmsOut = new JMSOutTransportInfo(targetAddress); - // do we have a definition for a connection factory to use for this address? - jmsConnectionFactory = getJMSConnectionFactory(jmsOut); - - if (jmsConnectionFactory != null) { - messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress); - - } else { - try { - messageSender = JMSUtils.createJMSSender(jmsOut); - } catch (JMSException e) { - handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); - } - } - - } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { - - jmsOut = (JMSOutTransportInfo) outTransportInfo; - try { - messageSender = JMSUtils.createJMSSender(jmsOut); - } catch (JMSException e) { - handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); - } - } - - // The message property to be used to send the content type is determined by - // the out transport info, i.e. either from the EPR if we are sending a request, - // or, if we are sending a response, from the configuration of the service that - // received the request). The property name can be overridden by a message - // context property. - String contentTypeProperty = - (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - if (contentTypeProperty == null) { - contentTypeProperty = jmsOut.getContentTypeProperty(); - } - - // need to synchronize as Sessions are not thread safe - synchronized (messageSender.getSession()) { - try { - sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); - } finally { - messageSender.close(); - } - } - } - - /** - * Perform actual sending of the JMS message - */ - private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender, - String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory, - JMSOutTransportInfo jmsOut) throws AxisFault { - - // convert the axis message context into a JMS Message that we can send over JMS - Message message = null; - String correlationId = null; - try { - message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty); - } catch (JMSException e) { - handleException("Error creating a JMS message from the message context", e); - } - - // should we wait for a synchronous response on this same thread? - boolean waitForResponse = waitForSynchronousResponse(msgCtx); - Destination replyDestination = jmsOut.getReplyDestination(); - - // if this is a synchronous out-in, prepare to listen on the response destination - if (waitForResponse) { - - String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); - if (replyDestName == null && jmsConnectionFactory != null) { - replyDestName = jmsConnectionFactory.getReplyToDestination(); - } - - if (replyDestName != null) { - if (jmsConnectionFactory != null) { - replyDestination = jmsConnectionFactory.getDestination(replyDestName); - } else { - replyDestination = jmsOut.getReplyDestination(replyDestName); - } - } - replyDestination = JMSUtils.setReplyDestination( - replyDestination, messageSender.getSession(), message); - } - - try { - messageSender.send(message, msgCtx); - metrics.incrementMessagesSent(msgCtx); - - } catch (AxisJMSException e) { - metrics.incrementFaultsSending(); - handleException("Error sending JMS message", e); - } - - try { - metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - - // if we are expecting a synchronous response back for the message sent out - if (waitForResponse) { - // TODO ******************************************************************************** - // TODO **** replace with asynchronous polling via a poller task to process this ******* - // information would be given. Then it should poll (until timeout) the - // requested destination for the response message and inject it from a - // asynchronous worker thread - try { - messageSender.getConnection().start(); // multiple calls are safely ignored - } catch (JMSException ignore) {} - - try { - correlationId = message.getJMSMessageID(); - } catch(JMSException ignore) {} - - // We assume here that the response uses the same message property to - // specify the content type of the message. - waitForResponseAndProcess(messageSender.getSession(), replyDestination, - msgCtx, correlationId, contentTypeProperty); - // TODO ******************************************************************************** - } - } - - /** - * Create a Consumer for the reply destination and wait for the response JMS message - * synchronously. If a message arrives within the specified time interval, process it - * through Axis2 - * @param session the session to use to listen for the response - * @param replyDestination the JMS reply Destination - * @param msgCtx the outgoing message for which we are expecting the response - * @param contentTypeProperty the message property used to determine the content type - * of the response message - * @throws AxisFault on error - */ - private void waitForResponseAndProcess(Session session, Destination replyDestination, - MessageContext msgCtx, String correlationId, - String contentTypeProperty) throws AxisFault { - - try { - MessageConsumer consumer; - consumer = JMSUtils.createConsumer(session, replyDestination, - "JMSCorrelationID = '" + correlationId + "'"); - - // how long are we willing to wait for the sync response - long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; - String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY); - if (waitReply != null) { - timeout = Long.valueOf(waitReply).longValue(); - } - - if (log.isDebugEnabled()) { - log.debug("Waiting for a maximum of " + timeout + - "ms for a response message to destination : " + replyDestination + - " with JMS correlation ID : " + correlationId); - } - - Message reply = consumer.receive(timeout); - - if (reply != null) { - - // update transport level metrics - metrics.incrementMessagesReceived(); - try { - metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - - try { - processSyncResponse(msgCtx, reply, contentTypeProperty); - metrics.incrementMessagesReceived(); - } catch (AxisFault e) { - metrics.incrementFaultsReceiving(); - throw e; - } - - } else { - log.warn("Did not receive a JMS response within " + - timeout + " ms to destination : " + replyDestination + - " with JMS correlation ID : " + correlationId); - metrics.incrementTimeoutsReceiving(); - } - - } catch (JMSException e) { - metrics.incrementFaultsReceiving(); - handleException("Error creating a consumer, or receiving a synchronous reply " + - "for outgoing MessageContext ID : " + msgCtx.getMessageID() + - " and reply Destination : " + replyDestination, e); - } - } - - /** - * Create a JMS Message from the given MessageContext and using the given - * session - * - * @param msgContext the MessageContext - * @param session the JMS session - * @param contentTypeProperty the message property to be used to store the - * content type - * @return a JMS message from the context and session - * @throws JMSException on exception - * @throws AxisFault on exception - */ - private Message createJMSMessage(MessageContext msgContext, Session session, - String contentTypeProperty) throws JMSException, AxisFault { - - Message message = null; - String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); - - // check the first element of the SOAP body, do we have content wrapped using the - // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or - // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages - // for JMS but just get the payload in its native format - String jmsPayloadType = guessMessageType(msgContext); - - if (jmsPayloadType == null) { - - OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); - MessageFormatter messageFormatter = null; - try { - messageFormatter = TransportUtils.getMessageFormatter(msgContext); - } catch (AxisFault axisFault) { - throw new JMSException("Unable to get the message formatter to use"); - } - - String contentType = messageFormatter.getContentType( - msgContext, format, msgContext.getSoapAction()); - - boolean useBytesMessage = - msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) || - contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1; - - OutputStream out; - StringWriter sw; - if (useBytesMessage) { - BytesMessage bytesMsg = session.createBytesMessage(); - sw = null; - out = new BytesMessageOutputStream(bytesMsg); - message = bytesMsg; - } else { - sw = new StringWriter(); - try { - out = new WriterOutputStream(sw, format.getCharSetEncoding()); - } catch (UnsupportedCharsetException ex) { - handleException("Unsupported encoding " + format.getCharSetEncoding(), ex); - return null; - } - } - - try { - messageFormatter.writeTo(msgContext, format, out, true); - out.close(); - } catch (IOException e) { - handleException("IO Error while creating BytesMessage", e); - } - - if (!useBytesMessage) { - TextMessage txtMsg = session.createTextMessage(); - txtMsg.setText(sw.toString()); - message = txtMsg; - } - - if (contentTypeProperty != null) { - message.setStringProperty(contentTypeProperty, contentType); - } - - } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) { - message = session.createBytesMessage(); - BytesMessage bytesMsg = (BytesMessage) message; - OMElement wrapper = msgContext.getEnvelope().getBody(). - getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER); - OMNode omNode = wrapper.getFirstOMChild(); - if (omNode != null && omNode instanceof OMText) { - Object dh = ((OMText) omNode).getDataHandler(); - if (dh != null && dh instanceof DataHandler) { - try { - ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg)); - } catch (IOException e) { - handleException("Error serializing binary content of element : " + - BaseConstants.DEFAULT_BINARY_WRAPPER, e); - } - } - } - - } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) { - message = session.createTextMessage(); - TextMessage txtMsg = (TextMessage) message; - txtMsg.setText(msgContext.getEnvelope().getBody(). - getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText()); - } - - // set the JMS correlation ID if specified - String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID); - if (correlationId == null && msgContext.getRelatesTo() != null) { - correlationId = msgContext.getRelatesTo().getValue(); - } - - if (correlationId != null) { - message.setJMSCorrelationID(correlationId); - } - - if (msgContext.isServerSide()) { - // set SOAP Action as a property on the JMS message - setProperty(message, msgContext, BaseConstants.SOAPACTION); - } else { - String action = msgContext.getOptions().getAction(); - if (action != null) { - message.setStringProperty(BaseConstants.SOAPACTION, action); - } - } - - JMSUtils.setTransportHeaders(msgContext, message); - return message; - } - - /** - * Guess the message type to use for JMS looking at the message contexts' envelope - * @param msgContext the message context - * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null - */ - private String guessMessageType(MessageContext msgContext) { - OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement(); - if (firstChild != null) { - if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) { - return JMSConstants.JMS_BYTE_MESSAGE; - } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) { - return JMSConstants.JMS_TEXT_MESSAGE; - } - } - return null; - } - - /** - * Creates an Axis MessageContext for the received JMS message and - * sets up the transports and various properties - * - * @param outMsgCtx the outgoing message for which we are expecting the response - * @param message the JMS response message received - * @param contentTypeProperty the message property used to determine the content type - * of the response message - * @throws AxisFault on error - */ - private void processSyncResponse(MessageContext outMsgCtx, Message message, - String contentTypeProperty) throws AxisFault { - - MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx); - - // load any transport headers from received message - JMSUtils.loadTransportHeaders(message, responseMsgCtx); - - // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response - // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This - // question is still under debate and due to the timelines, I am commiting this - // workaround as Axis2 1.2 is about to be released and Synapse 1.0 - responseMsgCtx.setServerSide(false); - - String contentType = - contentTypeProperty == null ? null - : JMSUtils.getProperty(message, contentTypeProperty); - - try { - JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType); - } catch (JMSException ex) { - throw AxisFault.makeFault(ex); - } -// responseMsgCtx.setServerSide(true); - - handleIncomingMessage( - responseMsgCtx, - JMSUtils.getTransportHeaders(message), - JMSUtils.getProperty(message, BaseConstants.SOAPACTION), - contentType - ); - } - - private void setProperty(Message message, MessageContext msgCtx, String key) { - - String value = getProperty(msgCtx, key); - if (value != null) { - try { - message.setStringProperty(key, value); - } catch (JMSException e) { - log.warn("Couldn't set message property : " + key + " = " + value, e); - } - } - } - - private String getProperty(MessageContext mc, String key) { - return (String) mc.getProperty(key); - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java deleted file mode 100644 index 63faa0b852..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java +++ /dev/null @@ -1,1115 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.lang.reflect.Method; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.jms.TopicSession; -import javax.mail.internet.ContentType; -import javax.mail.internet.ParseException; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.Reference; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.builder.Builder; -import org.apache.axis2.builder.BuilderUtil; -import org.apache.axis2.builder.SOAPBuilder; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.transport.TransportUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.format.DataSourceMessageBuilder; -import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilder; -import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilderAdapter; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; - -/** - * Miscallaneous methods used for the JMS transport - */ -public class JMSUtils extends BaseUtils { - - private static final Log log = LogFactory.getLog(JMSUtils.class); - private static final Class[] NOARGS = new Class[] {}; - private static final Object[] NOPARMS = new Object[] {}; - - /** - * Should this service be enabled over the JMS transport? - * - * @param service the Axis service - * @return true if JMS should be enabled - */ - public static boolean isJMSService(AxisService service) { - if (service.isEnableAllTransports()) { - return true; - - } else { - List transports = service.getExposedTransports(); - for (Object transport : transports) { - if (JMSListener.TRANSPORT_NAME.equals(transport)) { - return true; - } - } - } - return false; - } - - /** - * Get the EPR for the given JMS connection factory and destination - * the form of the URL is - * jms:/<destination>?[<key>=<value>&]* - * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS - * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered - * - * @param cf the Axis2 JMS connection factory - * @param destinationType the type of destination - * @param endpoint JMSEndpoint - * @return the EPR as a String - */ - static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) { - StringBuffer sb = new StringBuffer(); - - sb.append( - JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName()); - sb.append("?"). - append(JMSConstants.PARAM_DEST_TYPE).append("=").append( - destinationType == JMSConstants.TOPIC ? - JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE); - - if (endpoint.getContentTypeRuleSet() != null) { - String contentTypeProperty = - endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty(); - if (contentTypeProperty != null) { - sb.append("&"); - sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - sb.append("="); - sb.append(contentTypeProperty); - } - } - - for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) { - if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) && - !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) && - !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) && - !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) { - sb.append("&").append( - entry.getKey()).append("=").append(entry.getValue()); - } - } - return sb.toString(); - } - - /** - * Get a String property from the JMS message - * - * @param message JMS message - * @param property property name - * @return property value - */ - public static String getProperty(Message message, String property) { - try { - return message.getStringProperty(property); - } catch (JMSException e) { - return null; - } - } - - /** - * Return the destination name from the given URL - * - * @param url the URL - * @return the destination name - */ - public static String getDestination(String url) { - String tempUrl = url.substring(JMSConstants.JMS_PREFIX.length()); - int propPos = tempUrl.indexOf("?"); - - if (propPos == -1) { - return tempUrl; - } else { - return tempUrl.substring(0, propPos); - } - } - - /** - * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in - * @param message the JMS message read - * @param msgContext the Axis2 MessageContext to be populated - * @param contentType content type for the message - * @throws AxisFault - * @throws JMSException - */ - public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) - throws AxisFault, JMSException { - - if (contentType == null) { - if (message instanceof TextMessage) { - contentType = "text/plain"; - } else { - contentType = "application/octet-stream"; - } - if (log.isDebugEnabled()) { - log.debug("No content type specified; assuming " + contentType); - } - } - - int index = contentType.indexOf(';'); - String type = index > 0 ? contentType.substring(0, index) : contentType; - Builder builder = BuilderUtil.getBuilderFromSelector(type, msgContext); - if (builder == null) { - if (log.isDebugEnabled()) { - log.debug("No message builder found for type '" + type + "'. Falling back to SOAP."); - } - builder = new SOAPBuilder(); - } - - OMElement documentElement; - if (message instanceof BytesMessage) { - // Extract the charset encoding from the content type and - // set the CHARACTER_SET_ENCODING property as e.g. SOAPBuilder relies on this. - String charSetEnc = null; - try { - if (contentType != null) { - charSetEnc = new ContentType(contentType).getParameter("charset"); - } - } catch (ParseException ex) { - // ignore - } - msgContext.setProperty(Constants.Configuration.CHARACTER_SET_ENCODING, charSetEnc); - - if (builder instanceof DataSourceMessageBuilder) { - documentElement = ((DataSourceMessageBuilder)builder).processDocument( - new BytesMessageDataSource((BytesMessage)message), contentType, - msgContext); - } else { - documentElement = builder.processDocument( - new BytesMessageInputStream((BytesMessage)message), contentType, - msgContext); - } - } else if (message instanceof TextMessage) { - TextMessageBuilder textMessageBuilder; - if (builder instanceof TextMessageBuilder) { - textMessageBuilder = (TextMessageBuilder)builder; - } else { - textMessageBuilder = new TextMessageBuilderAdapter(builder); - } - String content = ((TextMessage)message).getText(); - documentElement = textMessageBuilder.processDocument(content, contentType, msgContext); - } else { - handleException("Unsupported JMS message type " + message.getClass().getName()); - return; // Make compiler happy - } - msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement)); - } - - /** - * Set the JMS ReplyTo for the message - * - * @param replyDestination the JMS Destination where the reply is expected - * @param session the session to use to create a temp Queue if a response is expected - * but a Destination has not been specified - * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo - * @return the JMS ReplyTo Destination for the message - */ - public static Destination setReplyDestination(Destination replyDestination, Session session, - Message message) { - - if (replyDestination == null) { - try { - // create temporary queue to receive the reply - replyDestination = createTemporaryDestination(session); - } catch (JMSException e) { - handleException("Error creating temporary queue for response"); - } - } - - try { - message.setJMSReplyTo(replyDestination); - } catch (JMSException e) { - log.warn("Error setting JMS ReplyTo destination to : " + replyDestination, e); - } - - if (log.isDebugEnabled()) { - try { - assert replyDestination != null; - log.debug("Expecting a response to JMS Destination : " + - (replyDestination instanceof Queue ? - ((Queue) replyDestination).getQueueName() : - ((Topic) replyDestination).getTopicName())); - } catch (JMSException ignore) {} - } - return replyDestination; - } - - /** - * Set transport headers from the axis message context, into the JMS message - * - * @param msgContext the axis message context - * @param message the JMS Message - * @throws JMSException on exception - */ - public static void setTransportHeaders(MessageContext msgContext, Message message) - throws JMSException { - - Map headerMap = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS); - - if (headerMap == null) { - return; - } - - for (Object headerName : headerMap.keySet()) { - - String name = (String) headerName; - - if (name.startsWith(JMSConstants.JMSX_PREFIX) && - !(name.equals(JMSConstants.JMSX_GROUP_ID) || name.equals(JMSConstants.JMSX_GROUP_SEQ))) { - continue; - } - - if (JMSConstants.JMS_COORELATION_ID.equals(name)) { - message.setJMSCorrelationID( - (String) headerMap.get(JMSConstants.JMS_COORELATION_ID)); - } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) { - Object o = headerMap.get(JMSConstants.JMS_DELIVERY_MODE); - if (o instanceof Integer) { - message.setJMSDeliveryMode((Integer) o); - } else if (o instanceof String) { - try { - message.setJMSDeliveryMode(Integer.parseInt((String) o)); - } catch (NumberFormatException nfe) { - log.warn("Invalid delivery mode ignored : " + o, nfe); - } - } else { - log.warn("Invalid delivery mode ignored : " + o); - } - - } else if (JMSConstants.JMS_EXPIRATION.equals(name)) { - message.setJMSExpiration( - Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION))); - } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) { - message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID)); - } else if (JMSConstants.JMS_PRIORITY.equals(name)) { - message.setJMSPriority( - Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY))); - } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) { - message.setJMSTimestamp( - Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP))); - } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) { - message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE)); - - } else { - Object value = headerMap.get(name); - if (value instanceof String) { - message.setStringProperty(name, (String) value); - } else if (value instanceof Boolean) { - message.setBooleanProperty(name, (Boolean) value); - } else if (value instanceof Integer) { - message.setIntProperty(name, (Integer) value); - } else if (value instanceof Long) { - message.setLongProperty(name, (Long) value); - } else if (value instanceof Double) { - message.setDoubleProperty(name, (Double) value); - } else if (value instanceof Float) { - message.setFloatProperty(name, (Float) value); - } - } - } - } - - /** - * Read the transport headers from the JMS Message and set them to the axis2 message context - * - * @param message the JMS Message received - * @param responseMsgCtx the axis message context - * @throws AxisFault on error - */ - public static void loadTransportHeaders(Message message, MessageContext responseMsgCtx) - throws AxisFault { - responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, getTransportHeaders(message)); - } - - /** - * Extract transport level headers for JMS from the given message into a Map - * - * @param message the JMS message - * @return a Map of the transport headers - */ - public static Map<String, Object> getTransportHeaders(Message message) { - // create a Map to hold transport headers - Map<String, Object> map = new HashMap<String, Object>(); - - // correlation ID - try { - if (message.getJMSCorrelationID() != null) { - map.put(JMSConstants.JMS_COORELATION_ID, message.getJMSCorrelationID()); - } - } catch (JMSException ignore) {} - - // set the delivery mode as persistent or not - try { - map.put(JMSConstants.JMS_DELIVERY_MODE, Integer.toString(message.getJMSDeliveryMode())); - } catch (JMSException ignore) {} - - // destination name - try { - if (message.getJMSDestination() != null) { - Destination dest = message.getJMSDestination(); - map.put(JMSConstants.JMS_DESTINATION, - dest instanceof Queue ? - ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); - } - } catch (JMSException ignore) {} - - // expiration - try { - map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message.getJMSExpiration())); - } catch (JMSException ignore) {} - - // if a JMS message ID is found - try { - if (message.getJMSMessageID() != null) { - map.put(JMSConstants.JMS_MESSAGE_ID, message.getJMSMessageID()); - } - } catch (JMSException ignore) {} - - // priority - try { - map.put(JMSConstants.JMS_PRIORITY, Long.toString(message.getJMSPriority())); - } catch (JMSException ignore) {} - - // redelivered - try { - map.put(JMSConstants.JMS_REDELIVERED, Boolean.toString(message.getJMSRedelivered())); - } catch (JMSException ignore) {} - - // replyto destination name - try { - if (message.getJMSReplyTo() != null) { - Destination dest = message.getJMSReplyTo(); - map.put(JMSConstants.JMS_REPLY_TO, - dest instanceof Queue ? - ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); - } - } catch (JMSException ignore) {} - - // priority - try { - map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message.getJMSTimestamp())); - } catch (JMSException ignore) {} - - // message type - try { - if (message.getJMSType() != null) { - map.put(JMSConstants.JMS_TYPE, message.getJMSType()); - } - } catch (JMSException ignore) {} - - // any other transport properties / headers - Enumeration e = null; - try { - e = message.getPropertyNames(); - } catch (JMSException ignore) {} - - if (e != null) { - while (e.hasMoreElements()) { - String headerName = (String) e.nextElement(); - try { - map.put(headerName, message.getStringProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getBooleanProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getIntProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getLongProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getDoubleProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getFloatProperty(headerName)); - } catch (JMSException ignore) {} - } - } - - return map; - } - - - /** - * Create a MessageConsumer for the given Destination - * @param session JMS Session to use - * @param dest Destination for which the Consumer is to be created - * @param messageSelector the message selector to be used if any - * @return a MessageConsumer for the specified Destination - * @throws JMSException - */ - public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector) - throws JMSException { - - if (dest instanceof Queue) { - return ((QueueSession) session).createReceiver((Queue) dest, messageSelector); - } else { - return ((TopicSession) session).createSubscriber((Topic) dest, messageSelector, false); - } - } - - /** - * Create a temp queue or topic for synchronous receipt of responses, when a reply destination - * is not specified - * @param session the JMS Session to use - * @return a temporary Queue or Topic, depending on the session - * @throws JMSException - */ - public static Destination createTemporaryDestination(Session session) throws JMSException { - - if (session instanceof QueueSession) { - return session.createTemporaryQueue(); - } else { - return session.createTemporaryTopic(); - } - } - - /** - * Return the body length in bytes for a bytes message - * @param bMsg the JMS BytesMessage - * @return length of body in bytes - */ - public static long getBodyLength(BytesMessage bMsg) { - try { - Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS); - if (mtd != null) { - return (Long) mtd.invoke(bMsg, NOPARMS); - } - } catch (Exception e) { - // JMS 1.0 - if (log.isDebugEnabled()) { - log.debug("Error trying to determine JMS BytesMessage body length", e); - } - } - - // if JMS 1.0 - long length = 0; - try { - byte[] buffer = new byte[2048]; - bMsg.reset(); - for (int bytesRead = bMsg.readBytes(buffer); bytesRead != -1; - bytesRead = bMsg.readBytes(buffer)) { - length += bytesRead; - } - } catch (JMSException ignore) {} - return length; - } - - /** - * Get the length of the message in bytes - * @param message - * @return message size (or approximation) in bytes - * @throws JMSException - */ - public static long getMessageSize(Message message) throws JMSException { - if (message instanceof BytesMessage) { - return JMSUtils.getBodyLength((BytesMessage) message); - } else if (message instanceof TextMessage) { - // TODO: Converting the whole message to a byte array is too much overhead just to determine the message size. - // Anyway, the result is not accurate since we don't know what encoding the JMS provider uses. - return ((TextMessage) message).getText().getBytes().length; - } else { - log.warn("Can't determine size of JMS message; unsupported message type : " - + message.getClass().getName()); - return 0; - } - } - - public static <T> T lookup(Context context, Class<T> clazz, String name) - throws NamingException { - - Object object = context.lookup(name); - try { - return clazz.cast(object); - } catch (ClassCastException ex) { - // Instead of a ClassCastException, throw an exception with some - // more information. - if (object instanceof Reference) { - Reference ref = (Reference)object; - handleException("JNDI failed to de-reference Reference with name " + - name + "; is the factory " + ref.getFactoryClassName() + - " in your classpath?"); - return null; - } else { - handleException("JNDI lookup of name " + name + " returned a " + - object.getClass().getName() + " while a " + clazz + " was expected"); - return null; - } - } - } - - /** - * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory - * @param jcf - * @param service - * @param workerPool - * @return - */ - public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf, - AxisService service, WorkerPool workerPool) { - - String name = service.getName(); - Map<String, String> svc = getServiceStringParameters(service.getParameters()); - Map<String, String> cf = jcf.getParameters(); - - ServiceTaskManager stm = new ServiceTaskManager(); - - stm.setServiceName(name); - stm.addJmsProperties(cf); - stm.addJmsProperties(svc); - - stm.setConnFactoryJNDIName( - getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf)); - String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf); - if (destName == null) { - destName = service.getName(); - } - stm.setDestinationJNDIName(destName); - stm.setDestinationType(getDestinationType(svc, cf)); - - stm.setJmsSpec11( - getJMSSpecVersion(svc, cf)); - stm.setTransactionality( - getTransactionality(svc, cf)); - stm.setCacheUserTransaction( - getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf)); - stm.setUserTransactionJNDIName( - getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf)); - stm.setSessionTransacted( - getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf)); - stm.setSessionAckMode( - getSessionAck(svc, cf)); - stm.setMessageSelector( - getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf)); - stm.setSubscriptionDurable( - getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf)); - stm.setDurableSubscriberName( - getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf)); - - stm.setCacheLevel( - getCacheLevel(svc, cf)); - stm.setPubSubNoLocal( - getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf)); - - Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf); - if (value != null) { - stm.setReceiveTimeout(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf); - if (value != null) { - stm.setConcurrentConsumers(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf); - if (value != null) { - stm.setMaxConcurrentConsumers(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf); - if (value != null) { - stm.setIdleTaskExecutionLimit(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf); - if (value != null) { - stm.setMaxMessagesPerTask(value); - } - - value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf); - if (value != null) { - stm.setInitialReconnectDuration(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf); - if (value != null) { - stm.setMaxReconnectDuration(value); - } - Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf); - if (dValue != null) { - stm.setReconnectionProgressionFactor(dValue); - } - - stm.setWorkerPool(workerPool); - - // remove processed properties from property bag - stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION); - stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER); - stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY); - stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN); - stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED); - stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR); - stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE); - stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL); - stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL); - stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT); - stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS); - stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS); - stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT); - stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR); - - return stm; - } - - private static Map<String, String> getServiceStringParameters(List list) { - - Map<String, String> map = new HashMap<String, String>(); - for (Object o : list) { - Parameter p = (Parameter) o; - if (p.getValue() instanceof String) { - map.put(p.getName(), (String) p.getValue()); - } - } - return map; - } - - private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value == null) { - throw new AxisJMSException("Service/connection factory property : " + key); - } - return value; - } - - private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - return value; - } - - private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value == null) { - return null; - } else { - return Boolean.valueOf(value); - } - } - - private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value != null) { - try { - return Integer.parseInt(value); - } catch (NumberFormatException e) { - throw new AxisJMSException("Invalid value : " + value + " for " + key); - } - } - return null; - } - - private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value != null) { - try { - return Double.parseDouble(value); - } catch (NumberFormatException e) { - throw new AxisJMSException("Invalid value : " + value + " for " + key); - } - } - return null; - } - - private static int getTransactionality(Map svcMap, Map cfMap) { - - String key = BaseConstants.PARAM_TRANSACTIONALITY; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null) { - return BaseConstants.TRANSACTION_NONE; - - } else { - if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) { - return BaseConstants.TRANSACTION_JTA; - } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) { - return BaseConstants.TRANSACTION_LOCAL; - } else { - throw new AxisJMSException("Invalid option : " + val + " for parameter : " + - BaseConstants.STR_TRANSACTION_JTA); - } - } - } - - private static int getDestinationType(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_DEST_TYPE; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) { - return JMSConstants.TOPIC; - } - return JMSConstants.QUEUE; - } - - private static int getSessionAck(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_SESSION_ACK; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) { - return Session.AUTO_ACKNOWLEDGE; - } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) { - return Session.CLIENT_ACKNOWLEDGE; - } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){ - return Session.DUPS_OK_ACKNOWLEDGE; - } else if ("SESSION_TRANSACTED".equals(val)) { - return 0; //Session.SESSION_TRANSACTED; - } else { - try { - return Integer.parseInt(val); - } catch (NumberFormatException ignore) { - throw new AxisJMSException("Invalid session acknowledgement mode : " + val); - } - } - } - - private static int getCacheLevel(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_CACHE_LEVEL; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if ("none".equalsIgnoreCase(val)) { - return JMSConstants.CACHE_NONE; - } else if ("connection".equalsIgnoreCase(val)) { - return JMSConstants.CACHE_CONNECTION; - } else if ("session".equals(val)){ - return JMSConstants.CACHE_SESSION; - } else if ("consumer".equals(val)) { - return JMSConstants.CACHE_CONSUMER; - } else if (val != null) { - throw new AxisJMSException("Invalid cache level : " + val); - } - return JMSConstants.CACHE_AUTO; - } - - private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_JMS_SPEC_VER; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null || "1.1".equals(val)) { - return true; - } else { - return false; - } - } - - /** - * This is a JMS spec independent method to create a Connection. Please be cautious when - * making any changes - * - * @param conFac the ConnectionFactory to use - * @param user optional user name - * @param pass optional password - * @param jmsSpec11 should we use JMS 1.1 API ? - * @param isQueue is this to deal with a Queue? - * @return a JMS Connection as requested - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static Connection createConnection(ConnectionFactory conFac, - String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException { - - Connection connection = null; - if (log.isDebugEnabled()) { - log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") + - "Connection using credentials : (" + user + "/" + pass + ")"); - } - - if (jmsSpec11 || isQueue == null) { - if (user != null && pass != null) { - connection = conFac.createConnection(user, pass); - } else { - connection = conFac.createConnection(); - } - - } else { - QueueConnectionFactory qConFac = null; - TopicConnectionFactory tConFac = null; - if (isQueue) { - tConFac = (TopicConnectionFactory) conFac; - } else { - qConFac = (QueueConnectionFactory) conFac; - } - - if (user != null && pass != null) { - if (qConFac != null) { - connection = qConFac.createQueueConnection(user, pass); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(user, pass); - } - } else { - if (qConFac != null) { - connection = qConFac.createQueueConnection(); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(); - } - } - } - return connection; - } - - /** - * This is a JMS spec independent method to create a Session. Please be cautious when - * making any changes - * - * @param connection the JMS Connection - * @param transacted should the session be transacted? - * @param ackMode the ACK mode for the session - * @param jmsSpec11 should we use the JMS 1.1 API? - * @param isQueue is this Session to deal with a Queue? - * @return a Session created for the given information - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static Session createSession(Connection connection, boolean transacted, int ackMode, - boolean jmsSpec11, Boolean isQueue) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - return connection.createSession(transacted, ackMode); - - } else { - if (isQueue) { - return ((QueueConnection) connection).createQueueSession(transacted, ackMode); - } else { - return ((TopicConnection) connection).createTopicSession(transacted, ackMode); - } - } - } - - /** - * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when - * making any changes - * - * @param session JMS session - * @param destination the Destination - * @param isQueue is the Destination a queue? - * @param subscriberName optional client name to use for a durable subscription to a topic - * @param messageSelector optional message selector - * @param pubSubNoLocal should we receive messages sent by us during pub-sub? - * @param isDurable is this a durable topic subscription? - * @param jmsSpec11 should we use JMS 1.1 API ? - * @return a MessageConsumer to receive messages - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static MessageConsumer createConsumer( - Session session, Destination destination, Boolean isQueue, - String subscriberName, String messageSelector, boolean pubSubNoLocal, - boolean isDurable, boolean jmsSpec11) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - if (isDurable) { - return session.createDurableSubscriber( - (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); - } else { - return session.createConsumer(destination, messageSelector, pubSubNoLocal); - } - } else { - if (isQueue) { - return ((QueueSession) session).createReceiver((Queue) destination, messageSelector); - } else { - if (isDurable) { - return ((TopicSession) session).createDurableSubscriber( - (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); - } else { - return ((TopicSession) session).createSubscriber( - (Topic) destination, messageSelector, pubSubNoLocal); - } - } - } - } - - /** - * This is a JMS spec independent method to create a MessageProducer. Please be cautious when - * making any changes - * - * @param session JMS session - * @param destination the Destination - * @param isQueue is the Destination a queue? - * @param jmsSpec11 should we use JMS 1.1 API ? - * @return a MessageProducer to send messages to the given Destination - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static MessageProducer createProducer( - Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - return session.createProducer(destination); - } else { - if (isQueue) { - return ((QueueSession) session).createSender((Queue) destination); - } else { - return ((TopicSession) session).createPublisher((Topic) destination); - } - } - } - - /** - * Create a one time MessageProducer for the given JMS OutTransport information - * For simplicity and best compatibility, this method uses only JMS 1.0.2b API. - * Please be cautious when making any changes - * - * @param jmsOut the JMS OutTransport information (contains all properties) - * @return a JMSSender based on one-time use resources - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut) - throws JMSException { - - // digest the targetAddress and locate CF from the EPR - jmsOut.loadConnectionFactoryFromProperies(); - - // create a one time connection and session to be used - Hashtable<String,String> jmsProps = jmsOut.getProperties(); - String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null; - String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null; - - QueueConnectionFactory qConFac = null; - TopicConnectionFactory tConFac = null; - - int destType = -1; - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) { - destType = JMSConstants.QUEUE; - qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory(); - - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) { - destType = JMSConstants.TOPIC; - tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory(); - } - - Connection connection = null; - if (user != null && pass != null) { - if (qConFac != null) { - connection = qConFac.createQueueConnection(user, pass); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(user, pass); - } - } else { - if (qConFac != null) { - connection = qConFac.createQueueConnection(); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(); - } - } - - if (connection == null && jmsOut.getJmsConnectionFactory() != null) { - connection = jmsOut.getJmsConnectionFactory().getConnection(); - } - - Session session = null; - MessageProducer producer = null; - Destination destination = jmsOut.getDestination(); - - if (destType == JMSConstants.QUEUE) { - session = ((QueueConnection) connection). - createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - producer = ((QueueSession) session).createSender((Queue) destination); - } else { - session = ((TopicConnection) connection). - createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - producer = ((TopicSession) session).createPublisher((Topic) destination); - } - - return new JMSMessageSender(connection, session, producer, - destination, (jmsOut.getJmsConnectionFactory() == null ? - JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false, - destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE); - } - - /** - * Return a String representation of the destination type - * @param destType the destination type indicator int - * @return a descriptive String - */ - public static String getDestinationTypeAsString(int destType) { - if (destType == JMSConstants.QUEUE) { - return "Queue"; - } else if (destType == JMSConstants.TOPIC) { - return "Topic"; - } else { - return "Generic"; - } - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java deleted file mode 100644 index 28c8da2a8d..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java +++ /dev/null @@ -1,1217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.transaction.NotSupportedException; -import javax.transaction.SystemException; -import javax.transaction.UserTransaction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; - -/** - * Each service will have one ServiceTaskManager instance that will create, manage and also destroy - * idle tasks created for it, for message receipt. This will also allow individual tasks to cache - * the Connection, Session or Consumer as necessary, considering the transactionality required and - * user preference. - * - * This also acts as the ExceptionListener for all JMS connections made on behalf of the service. - * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try - * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh - * for the service, by discarding all connections. - */ -public class ServiceTaskManager { - - /** The logger */ - private static final Log log = LogFactory.getLog(ServiceTaskManager.class); - - /** The Task manager is stopped or has not started */ - private static final int STATE_STOPPED = 0; - /** The Task manager is started and active */ - private static final int STATE_STARTED = 1; - /** The Task manager is paused temporarily */ - private static final int STATE_PAUSED = 2; - /** The Task manager is started, but a shutdown has been requested */ - private static final int STATE_SHUTTING_DOWN = 3; - /** The Task manager has encountered an error */ - private static final int STATE_FAILURE = 4; - - /** The name of the service managed by this instance */ - private String serviceName; - /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */ - private String connFactoryJNDIName; - /** The JNDI name of the Destination Queue or Topic */ - private String destinationJNDIName; - /** JNDI location for the JTA UserTransaction */ - private String userTransactionJNDIName = "java:comp/UserTransaction"; - /** The type of destination - P2P or PubSub (or JMS 1.1 API generic?) */ - private int destinationType = JMSConstants.GENERIC; - /** An optional message selector */ - private String messageSelector = null; - - /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */ - private int transactionality = BaseConstants.TRANSACTION_NONE; - /** Should created Sessions be transactional ? - should be false when using JTA */ - private boolean sessionTransacted = true; - /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */ - private int sessionAckMode = Session.AUTO_ACKNOWLEDGE; - - /** Is the subscription durable ? */ - private boolean subscriptionDurable = false; - /** The name of the durable subscriber for this client */ - private String durableSubscriberName = null; - /** In PubSub mode, should I receive messages sent by me / my connection ? */ - private boolean pubSubNoLocal = false; - /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */ - private int concurrentConsumers = 1; - /** Maximum number of consumers to create - see @concurrentConsumers */ - private int maxConcurrentConsumers = 1; - /** The number of idle (i.e. message-less) attempts to be tried before suicide, to scale down */ - private int idleTaskExecutionLimit = 10; - /** The maximum number of successful message receipts for a task - to limit thread life span */ - private int maxMessagesPerTask = -1; // default is unlimited - /** The default receive timeout - a negative value means wait forever, zero dont wait at all */ - private int receiveTimeout = 1000; - /** JMS Resource cache level - Connection, Session, Consumer. Auto will select safe default */ - private int cacheLevel = JMSConstants.CACHE_AUTO; - /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */ - private boolean cacheUserTransaction = true; - /** Shared UserTransactionHandle */ - private UserTransaction sharedUserTransaction = null; - /** Should this service use JMS 1.1 ? (when false, defaults to 1.0.2b) */ - private boolean jmsSpec11 = true; - - /** Initial duration to attempt re-connection to JMS provider after failure */ - private int initialReconnectDuration = 10000; - /** Progression factory for geometric series that calculates re-connection times */ - private double reconnectionProgressionFactor = 2.0; // default to [bounded] exponential - /** Upper limit on reconnection attempt duration */ - private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour - - /** The JNDI context properties and other general properties */ - private Hashtable<String,String> jmsProperties = new Hashtable<String, String>(); - /** The JNDI Context acuired */ - private Context context = null; - /** The ConnectionFactory to be used */ - private ConnectionFactory conFactory = null; - /** The JMS Destination */ - private Destination destination = null; - - /** The list of active tasks thats managed by this instance */ - private final List<MessageListenerTask> pollingTasks = - Collections.synchronizedList(new ArrayList<MessageListenerTask>()); - /** The per-service JMS message receiver to be invoked after receipt of messages */ - private JMSMessageReceiver jmsMessageReceiver = null; - - /** State of this Task Manager */ - private volatile int serviceTaskManagerState = STATE_STOPPED; - /** Number of invoker tasks active */ - private volatile int activeTaskCount = 0; - /** The shared thread pool from the Listener */ - private WorkerPool workerPool = null; - - /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */ - private Connection sharedConnection = null; - - /** - * Start or re-start the Task Manager by shutting down any existing worker tasks and - * re-creating them. However, if this is STM is PAUSED, a start request is ignored. - * This applies for any connection failures during paused state as well, which then will - * not try to auto recover - */ - public synchronized void start() { - - if (serviceTaskManagerState == STATE_PAUSED) { - log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead"); - return; - } - - // if any tasks are running, stop whats running now - if (!pollingTasks.isEmpty()) { - stop(); - } - - if (cacheLevel == JMSConstants.CACHE_AUTO) { - cacheLevel = - transactionality == BaseConstants.TRANSACTION_NONE ? - JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE; - } - switch (cacheLevel) { - case JMSConstants.CACHE_NONE: - log.debug("No JMS resources will be cached/shared between poller " + - "worker tasks of service : " + serviceName); - break; - case JMSConstants.CACHE_CONNECTION: - log.debug("Only the JMS Connection will be cached and shared between *all* " + - "poller task invocations"); - break; - case JMSConstants.CACHE_SESSION: - log.debug("The JMS Connection and Session will be cached and shared between " + - "successive poller task invocations"); - break; - case JMSConstants.CACHE_CONSUMER: - log.debug("The JMS Connection, Session and MessageConsumer will be cached and " + - "shared between successive poller task invocations"); - break; - default : { - handleException("Invalid cache level : " + cacheLevel + - " for service : " + serviceName); - } - } - - for (int i=0; i<concurrentConsumers; i++) { - workerPool.execute(new MessageListenerTask()); - } - - serviceTaskManagerState = STATE_STARTED; - log.info("Task manager for service : " + serviceName + " [re-]initialized"); - } - - /** - * Shutdown the tasks and release any shared resources - */ - public synchronized void stop() { - - if (log.isDebugEnabled()) { - log.debug("Stopping ServiceTaskManager for service : " + serviceName); - } - - if (serviceTaskManagerState != STATE_FAILURE) { - serviceTaskManagerState = STATE_SHUTTING_DOWN; - } - - synchronized(pollingTasks) { - for (MessageListenerTask lstTask : pollingTasks) { - lstTask.requestShutdown(); - } - } - - // try to wait a bit for task shutdown - for (int i=0; i<5; i++) { - if (activeTaskCount == 0) { - break; - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) {} - } - - if (sharedConnection != null) { - try { - sharedConnection.stop(); - } catch (JMSException e) { - logError("Error stopping shared Connection", e); - } finally { - sharedConnection = null; - } - } - - if (activeTaskCount > 0) { - log.warn("Unable to shutdown all polling tasks of service : " + serviceName); - } - - if (serviceTaskManagerState != STATE_FAILURE) { - serviceTaskManagerState = STATE_STOPPED; - } - log.info("Task manager for service : " + serviceName + " shutdown"); - } - - /** - * Temporarily suspend receipt and processing of messages. Accomplished by stopping the - * connection / or connections used by the poller tasks - */ - public synchronized void pause() { - for (MessageListenerTask lstTask : pollingTasks) { - lstTask.pause(); - } - if (sharedConnection != null) { - try { - sharedConnection.stop(); - } catch (JMSException e) { - logError("Error pausing shared Connection", e); - } - } - } - - /** - * Resume receipt and processing of messages of paused tasks - */ - public synchronized void resume() { - for (MessageListenerTask lstTask : pollingTasks) { - lstTask.resume(); - } - if (sharedConnection != null) { - try { - sharedConnection.start(); - } catch (JMSException e) { - logError("Error resuming shared Connection", e); - } - } - } - - /** - * Start a new MessageListenerTask if we are still active, the threshold is not reached, and w - * e do not have any idle tasks - i.e. scale up listening - */ - private void scheduleNewTaskIfAppropriate() { - if (serviceTaskManagerState == STATE_STARTED && - pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) { - workerPool.execute(new MessageListenerTask()); - } - } - - /** - * Get the number of MessageListenerTasks that are currently idle - * @return idle task count - */ - private int getIdleTaskCount() { - int count = 0; - for (MessageListenerTask lstTask : pollingTasks) { - if (lstTask.isTaskIdle()) { - count++; - } - } - return count; - } - - /** - * Get the number of MessageListenerTasks that are currently connected to the JMS provider - * @return connected task count - */ - private int getConnectedTaskCount() { - int count = 0; - for (MessageListenerTask lstTask : pollingTasks) { - if (lstTask.isConnected()) { - count++; - } - } - return count; - } - - /** - * The actual threads/tasks that perform message polling - */ - private class MessageListenerTask implements Runnable, ExceptionListener { - - /** The Connection used by the polling task */ - private Connection connection = null; - /** The Sesson used by the polling task */ - private Session session = null; - /** The MessageConsumer used by the polling task */ - private MessageConsumer consumer = null; - /** State of the worker polling task */ - private volatile int workerState = STATE_STOPPED; - /** The number of idle (i.e. without fetching a message) polls for this task */ - private int idleExecutionCount = 0; - /** Is this task idle right now? */ - private volatile boolean idle = false; - /** Is this task connected to the JMS provider successfully? */ - private boolean connected = false; - - /** As soon as we create a new polling task, add it to the STM for control later */ - MessageListenerTask() { - synchronized(pollingTasks) { - pollingTasks.add(this); - } - } - - /** - * Pause this polling worker task - */ - public void pause() { - if (isActive()) { - if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - connection.stop(); - } catch (JMSException e) { - log.warn("Error pausing Message Listener task for service : " + serviceName); - } - } - workerState = STATE_PAUSED; - } - } - - /** - * Resume this polling task - */ - public void resume() { - if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - connection.start(); - } catch (JMSException e) { - log.warn("Error resuming Message Listener task for service : " + serviceName); - } - } - workerState = STATE_STARTED; - } - - /** - * Execute the polling worker task - */ - public void run() { - workerState = STATE_STARTED; - activeTaskCount++; - int messageCount = 0; - - if (log.isDebugEnabled()) { - log.debug("New poll task starting : thread id = " + Thread.currentThread().getId()); - } - - try { - while (isActive() && - (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) && - (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) { - - UserTransaction ut = null; - try { - if (transactionality == BaseConstants.TRANSACTION_JTA) { - ut = getUserTransaction(); - ut.begin(); - } - } catch (NotSupportedException e) { - handleException("Listener Task is already associated with a transaction", e); - } catch (SystemException e) { - handleException("Error starting a JTA transaction", e); - } - - // Get a message by polling, or receive null - Message message = receiveMessage(); - - if (log.isTraceEnabled()) { - if (message != null) { - try { - log.trace("<<<<<<< READ message with Message ID : " + - message.getJMSMessageID() + " from : " + destination + - " by Thread ID : " + Thread.currentThread().getId()); - } catch (JMSException ignore) {} - } else { - log.trace("No message received by Thread ID : " + - Thread.currentThread().getId() + " for destination : " + destination); - } - } - - if (message != null) { - idle = false; - idleExecutionCount = 0; - messageCount++; - // I will be busy now while processing this message, so start another if needed - scheduleNewTaskIfAppropriate(); - handleMessage(message, ut); - - } else { - idle = true; - idleExecutionCount++; - } - } - - } finally { - workerState = STATE_STOPPED; - activeTaskCount--; - synchronized(pollingTasks) { - pollingTasks.remove(this); - } - } - - if (log.isTraceEnabled()) { - log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() + - " is stopping after processing : " + messageCount + " messages :: " + - " isActive : " + isActive() + " maxMessagesPerTask : " + - getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() + - " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " + - getIdleTaskExecutionLimit()); - } else if (log.isDebugEnabled()) { - log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() + - " is stopping after processing : " + messageCount + " messages"); - } - - closeConsumer(true); - closeSession(true); - closeConnection(); - - // My time is up, so if I am going away, create another - scheduleNewTaskIfAppropriate(); - } - - /** - * Poll for and return a message if available - * - * @return a message read, or null - */ - private Message receiveMessage() { - - // get a new connection, session and consumer to prevent a conflict. - // If idle, it means we can re-use what we already have - if (consumer == null) { - connection = getConnection(); - session = getSession(); - consumer = getMessageConsumer(); - if (log.isDebugEnabled()) { - log.debug("Preparing a Connection, Session and Consumer to read messages"); - } - } - - if (log.isDebugEnabled()) { - log.debug("Waiting for a message for service : " + serviceName + " - duration : " - + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms"))); - } - - try { - if (getReceiveTimeout() < 0) { - return consumer.receive(); - } else { - return consumer.receive(getReceiveTimeout()); - } - } catch (IllegalStateException ignore) { - // probably the consumer (shared) was closed.. which is still ok.. as we didn't read - } catch (JMSException e) { - logError("Error receiving message for service : " + serviceName, e); - } - return null; - } - - /** - * Invoke ultimate message handler/listener and ack message and/or - * commit/rollback transactions - * @param message the JMS message received - * @param ut the UserTransaction used to receive this message, or null - */ - private void handleMessage(Message message, UserTransaction ut) { - - String messageId = null; - try { - messageId = message.getJMSMessageID(); - } catch (JMSException ignore) {} - - boolean commitOrAck = true; - try { - commitOrAck = jmsMessageReceiver.onMessage(message, ut); - - } finally { - - // if client acknowledgement is selected, and processing requested ACK - if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) { - try { - message.acknowledge(); - if (log.isDebugEnabled()) { - log.debug("Message : " + messageId + " acknowledged"); - } - } catch (JMSException e) { - logError("Error acknowledging message : " + messageId, e); - } - } - - // close the consumer - closeConsumer(false); - - // if session was transacted, commit it or rollback - try { - if (session.getTransacted()) { - if (commitOrAck) { - session.commit(); - if (log.isDebugEnabled()) { - log.debug("Session for message : " + messageId + " committed"); - } - } else { - session.rollback(); - if (log.isDebugEnabled()) { - log.debug("Session for message : " + messageId + " rolled back"); - } - } - } - } catch (JMSException e) { - logError("Error " + (commitOrAck ? "committing" : "rolling back") + - " local session txn for message : " + messageId, e); - } - - // if a JTA transaction was being used, commit it or rollback - try { - if (ut != null) { - if (commitOrAck) { - ut.commit(); - if (log.isDebugEnabled()) { - log.debug("JTA txn for message : " + messageId + " committed"); - } - } else { - ut.rollback(); - if (log.isDebugEnabled()) { - log.debug("JTA txn for message : " + messageId + " rolled back"); - } - } - } - } catch (Exception e) { - logError("Error " + (commitOrAck ? "committing" : "rolling back") + - " JTA txn for message : " + messageId + " from the session", e); - } - - closeSession(false); - closeConnection(); - } - } - - /** Handle JMS Connection exceptions by re-initializing. A single connection failure could - * cause re-initialization of multiple MessageListenerTasks / Connections - */ - public void onException(JMSException j) { - - if (!isSTMActive()) { - requestShutdown(); - return; - } - - log.warn("JMS Connection failure : " + j.getMessage()); - setConnected(false); - - if (cacheLevel < JMSConstants.CACHE_CONNECTION) { - // failed Connection was not shared, thus no need to restart the whole STM - requestShutdown(); - return; - } - - // if we failed while active, update state to show failure - setServiceTaskManagerState(STATE_FAILURE); - log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks"); - - int r = 1; - long retryDuration = initialReconnectDuration; - - do { - try { - log.info("Reconnection attempt : " + r + " for service : " + serviceName); - start(); - } catch (Exception ignore) {} - - boolean connected = false; - for (int i=0; i<5; i++) { - if (getConnectedTaskCount() == concurrentConsumers) { - connected = true; - break; - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) {} - } - - if (!connected) { - log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName + - " failed. Next retry in " + (retryDuration/1000) + "seconds"); - retryDuration = (long) (retryDuration * reconnectionProgressionFactor); - if (retryDuration > maxReconnectDuration) { - retryDuration = maxReconnectDuration; - } - - try { - Thread.sleep(retryDuration); - } catch (InterruptedException ignore) {} - } - - } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers); - } - - protected void requestShutdown() { - workerState = STATE_SHUTTING_DOWN; - } - - private boolean isActive() { - return workerState == STATE_STARTED; - } - - protected boolean isTaskIdle() { - return idle; - } - - public boolean isConnected() { - return connected; - } - - public void setConnected(boolean connected) { - this.connected = connected; - } - - /** - * Get a Connection that could/should be used by this task - depends on the cache level to reuse - * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection - */ - private Connection getConnection() { - if (cacheLevel < JMSConstants.CACHE_CONNECTION) { - // Connection is not shared - if (connection == null) { - connection = createConnection(); - } - } else { - if (sharedConnection != null) { - connection = sharedConnection; - } else { - synchronized(this) { - if (sharedConnection == null) { - sharedConnection = createConnection(); - } - connection = sharedConnection; - } - } - } - setConnected(true); - return connection; - } - - /** - * Get a Session that could/should be used by this task - depends on the cache level to reuse - * @param connection the connection (could be the shared connection) to use to create a Session - * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session - * created using the Connection passed, or a new/shared connection - */ - private Session getSession() { - if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) { - session = createSession(); - } - return session; - } - - /** - * Get a MessageConsumer that chould/should be used by this task - depends on the cache - * level to reuse - * @param connection option Connection to be used - * @param session optional Session to be used - * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new - * MessageConsumer possibly using the Connection and Session passed in - */ - private MessageConsumer getMessageConsumer() { - if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) { - consumer = createConsumer(); - } - return consumer; - } - - /** - * Close the given Connection, hiding exceptions if any which are logged - * @param connection the Connection to be closed - */ - private void closeConnection() { - if (connection != null && - cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS connection for service : " + serviceName); - } - connection.close(); - } catch (JMSException e) { - logError("Error closing JMS connection", e); - } finally { - connection = null; - } - } - } - - /** - * Close the given Session, hiding exceptions if any which are logged - * @param session the Session to be closed - */ - private void closeSession(boolean forced) { - if (session != null && - (cacheLevel < JMSConstants.CACHE_SESSION || forced)) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS session for service : " + serviceName); - } - session.close(); - } catch (JMSException e) { - logError("Error closing JMS session", e); - } finally { - session = null; - } - } - } - - /** - * Close the given Consumer, hiding exceptions if any which are logged - * @param consumer the Consumer to be closed - */ - private void closeConsumer(boolean forced) { - if (consumer != null && - (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS consumer for service : " + serviceName); - } - consumer.close(); - } catch (JMSException e) { - logError("Error closing JMS consumer", e); - } finally { - consumer = null; - } - } - } - - /** - * Create a new Connection for this STM, using JNDI properties and credentials provided - * @return a new Connection for this STM, using JNDI properties and credentials provided - */ - private Connection createConnection() { - - try { - conFactory = JMSUtils.lookup( - getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName()); - log.debug("Connected to the JMS connection factory : " + getConnFactoryJNDIName()); - } catch (NamingException e) { - handleException("Error looking up connection factory : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - - Connection connection = null; - try { - connection = JMSUtils.createConnection( - conFactory, - jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME), - jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD), - isJmsSpec11(), isQueue()); - - connection.setExceptionListener(this); - connection.start(); - log.debug("JMS Connection for service : " + serviceName + " created and started"); - - } catch (JMSException e) { - handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - return connection; - } - - /** - * Create a new Session for this STM - * @param connection the Connection to be used - * @return a new Session created using the Connection passed in - */ - private Session createSession() { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS Session for service : " + serviceName); - } - return JMSUtils.createSession( - connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue()); - - } catch (JMSException e) { - handleException("Error creating JMS session for service : " + serviceName, e); - } - return null; - } - - /** - * Create a new MessageConsumer for this STM - * @param session the Session to be used - * @return a new MessageConsumer created using the Session passed in - */ - private MessageConsumer createConsumer() { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS MessageConsumer for service : " + serviceName); - } - - return JMSUtils.createConsumer( - session, getDestination(session), isQueue(), - (isSubscriptionDurable() && getDurableSubscriberName() == null ? - getDurableSubscriberName() : serviceName), - getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11()); - - } catch (JMSException e) { - handleException("Error creating JMS consumer for service : " + serviceName,e); - } - return null; - } - } - - // -------------- mundane private methods ---------------- - /** - * Get the InitialContext for lookup using the JNDI parameters applicable to the service - * @return the InitialContext to be used - * @throws NamingException - */ - private Context getInitialContext() throws NamingException { - if (context == null) { - context = new InitialContext(jmsProperties); - } - return context; - } - - /** - * Return the JMS Destination for the JNDI name of the Destination from the InitialContext - * @return the JMS Destination to which this STM listens for messages - */ - private Destination getDestination(Session session) { - if (destination == null) { - try { - context = getInitialContext(); - destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName()); - if (log.isDebugEnabled()) { - log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() + - " found for service " + serviceName); - } - } catch (NamingException e) { - try { - switch (destinationType) { - case JMSConstants.QUEUE: { - destination = session.createQueue(getDestinationJNDIName()); - break; - } - case JMSConstants.TOPIC: { - destination = session.createTopic(getDestinationJNDIName()); - break; - } - default: { - handleException("Error looking up JMS destination : " + - getDestinationJNDIName() + " using JNDI properties : " + - jmsProperties, e); - } - } - } catch (JMSException j) { - handleException("Error looking up and creating JMS destination : " + - getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e); - } - } - } - return destination; - } - - /** - * The UserTransaction to be used, looked up from the JNDI - * @return The UserTransaction to be used, looked up from the JNDI - */ - private UserTransaction getUserTransaction() { - if (!cacheUserTransaction) { - if (log.isDebugEnabled()) { - log.debug("Acquiring a new UserTransaction for service : " + serviceName); - } - - try { - context = getInitialContext(); - return - JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); - } catch (NamingException e) { - handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - } - - if (sharedUserTransaction == null) { - try { - context = getInitialContext(); - sharedUserTransaction = - JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); - if (log.isDebugEnabled()) { - log.debug("Acquired shared UserTransaction for service : " + serviceName); - } - } catch (NamingException e) { - handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - } - return sharedUserTransaction; - } - - // -------------------- trivial methods --------------------- - private boolean isSTMActive() { - return serviceTaskManagerState == STATE_STARTED; - } - - /** - * Is this STM bound to a Queue, Topic or a JMS 1.1 Generic Destination? - * @return TRUE for a Queue, FALSE for a Topic and NULL for a Generic Destination - */ - private Boolean isQueue() { - if (destinationType == JMSConstants.GENERIC) { - return null; - } else { - return destinationType == JMSConstants.QUEUE; - } - } - - private void logError(String msg, Exception e) { - log.error(msg, e); - } - - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - private void handleException(String msg) { - log.error(msg); - throw new AxisJMSException(msg); - } - - // -------------- getters and setters ------------------ - public String getServiceName() { - return serviceName; - } - - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - public String getConnFactoryJNDIName() { - return connFactoryJNDIName; - } - - public void setConnFactoryJNDIName(String connFactoryJNDIName) { - this.connFactoryJNDIName = connFactoryJNDIName; - } - - public String getDestinationJNDIName() { - return destinationJNDIName; - } - - public void setDestinationJNDIName(String destinationJNDIName) { - this.destinationJNDIName = destinationJNDIName; - } - - public int getDestinationType() { - return destinationType; - } - - public void setDestinationType(int destinationType) { - this.destinationType = destinationType; - } - - public String getMessageSelector() { - return messageSelector; - } - - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - public int getTransactionality() { - return transactionality; - } - - public void setTransactionality(int transactionality) { - this.transactionality = transactionality; - sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL); - } - - public boolean isSessionTransacted() { - return sessionTransacted; - } - - public void setSessionTransacted(Boolean sessionTransacted) { - if (sessionTransacted != null) { - this.sessionTransacted = sessionTransacted; - // sesstionTransacted means local transactions are used, however !sessionTransacted does - // not mean that JTA is used - if (sessionTransacted) { - transactionality = BaseConstants.TRANSACTION_LOCAL; - } - } - } - - public int getSessionAckMode() { - return sessionAckMode; - } - - public void setSessionAckMode(int sessionAckMode) { - this.sessionAckMode = sessionAckMode; - } - - public boolean isSubscriptionDurable() { - return subscriptionDurable; - } - - public void setSubscriptionDurable(Boolean subscriptionDurable) { - if (subscriptionDurable != null) { - this.subscriptionDurable = subscriptionDurable; - } - } - - public String getDurableSubscriberName() { - return durableSubscriberName; - } - - public void setDurableSubscriberName(String durableSubscriberName) { - this.durableSubscriberName = durableSubscriberName; - } - - public boolean isPubSubNoLocal() { - return pubSubNoLocal; - } - - public void setPubSubNoLocal(Boolean pubSubNoLocal) { - if (pubSubNoLocal != null) { - this.pubSubNoLocal = pubSubNoLocal; - } - } - - public int getConcurrentConsumers() { - return concurrentConsumers; - } - - public void setConcurrentConsumers(int concurrentConsumers) { - this.concurrentConsumers = concurrentConsumers; - } - - public int getMaxConcurrentConsumers() { - return maxConcurrentConsumers; - } - - public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { - this.maxConcurrentConsumers = maxConcurrentConsumers; - } - - public int getIdleTaskExecutionLimit() { - return idleTaskExecutionLimit; - } - - public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { - this.idleTaskExecutionLimit = idleTaskExecutionLimit; - } - - public int getReceiveTimeout() { - return receiveTimeout; - } - - public void setReceiveTimeout(int receiveTimeout) { - this.receiveTimeout = receiveTimeout; - } - - public int getCacheLevel() { - return cacheLevel; - } - - public void setCacheLevel(int cacheLevel) { - this.cacheLevel = cacheLevel; - } - - public int getInitialReconnectDuration() { - return initialReconnectDuration; - } - - public void setInitialReconnectDuration(int initialReconnectDuration) { - this.initialReconnectDuration = initialReconnectDuration; - } - - public double getReconnectionProgressionFactor() { - return reconnectionProgressionFactor; - } - - public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) { - this.reconnectionProgressionFactor = reconnectionProgressionFactor; - } - - public long getMaxReconnectDuration() { - return maxReconnectDuration; - } - - public void setMaxReconnectDuration(long maxReconnectDuration) { - this.maxReconnectDuration = maxReconnectDuration; - } - - public int getMaxMessagesPerTask() { - return maxMessagesPerTask; - } - - public void setMaxMessagesPerTask(int maxMessagesPerTask) { - this.maxMessagesPerTask = maxMessagesPerTask; - } - - public String getUserTransactionJNDIName() { - return userTransactionJNDIName; - } - - public void setUserTransactionJNDIName(String userTransactionJNDIName) { - if (userTransactionJNDIName != null) { - this.userTransactionJNDIName = userTransactionJNDIName; - } - } - - public boolean isCacheUserTransaction() { - return cacheUserTransaction; - } - - public void setCacheUserTransaction(Boolean cacheUserTransaction) { - if (cacheUserTransaction != null) { - this.cacheUserTransaction = cacheUserTransaction; - } - } - - public boolean isJmsSpec11() { - return jmsSpec11; - } - - public void setJmsSpec11(boolean jmsSpec11) { - this.jmsSpec11 = jmsSpec11; - } - - public Hashtable<String, String> getJmsProperties() { - return jmsProperties; - } - - public void addJmsProperties(Map<String, String> jmsProperties) { - this.jmsProperties.putAll(jmsProperties); - } - - public void removeJmsProperties(String key) { - this.jmsProperties.remove(key); - } - - public Context getContext() { - return context; - } - - public ConnectionFactory getConnectionFactory() { - return conFactory; - } - - public List<MessageListenerTask> getPollingTasks() { - return pollingTasks; - } - - public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) { - this.jmsMessageReceiver = jmsMessageReceiver; - } - - public void setWorkerPool(WorkerPool workerPool) { - this.workerPool = workerPool; - } - - public int getActiveTaskCount() { - return activeTaskCount; - } - - public void setServiceTaskManagerState(int serviceTaskManagerState) { - this.serviceTaskManagerState = serviceTaskManagerState; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java deleted file mode 100644 index 2b415df64d..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java +++ /dev/null @@ -1,49 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -/** - * Class encapsulating the content type information for a given message. - */ -public class ContentTypeInfo { - private final String propertyName; - private final String contentType; - - public ContentTypeInfo(String propertyName, String contentType) { - this.propertyName = propertyName; - this.contentType = contentType; - } - - /** - * Get the name of the message property from which the content type - * has been extracted. - * - * @return the property name or null if the content type was not determined - * by extracting it from a message property - */ - public String getPropertyName() { - return propertyName; - } - - /** - * Get the content type of the message. - * - * @return The content type of the message. The return value is never null. - */ - public String getContentType() { - return contentType; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java deleted file mode 100644 index 0dba93356f..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java +++ /dev/null @@ -1,43 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import javax.jms.JMSException; -import javax.jms.Message; - -/** - * Interface implemented by content type rules. - */ -public interface ContentTypeRule { - /** - * Attempt to determine the content type of the given JMS message. - * - * @param message the message - * @return If the rule matches, the return value encapsulates the content type of the - * message and the message property name from which is was extracted - * (if applicable). If the rule doesn't match, the method returns null. - * @throws JMSException - */ - ContentTypeInfo getContentType(Message message) throws JMSException; - - /** - * Get the name of the message property used to extract the content type from, - * if applicable. - * - * @return the property name or null if not applicable - */ - String getExpectedContentTypeProperty(); -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java deleted file mode 100644 index a9fd25ef1b..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java +++ /dev/null @@ -1,74 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import java.util.Iterator; - -import javax.jms.BytesMessage; -import javax.jms.TextMessage; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.description.Parameter; - -/** - * Utility class to create content type rules and rule sets from XML. - */ -public class ContentTypeRuleFactory { - private ContentTypeRuleFactory() {} - - public static ContentTypeRule parse(OMElement element) throws AxisFault { - String name = element.getLocalName(); - String value = element.getText(); - if (name.equals("jmsProperty")) { - return new PropertyRule(value); - } else if (name.equals("textMessage")) { - return new MessageTypeRule(TextMessage.class, value); - } else if (name.equals("bytesMessage")) { - return new MessageTypeRule(BytesMessage.class, value); - } else if (name.equals("default")) { - return new DefaultRule(value); - } else { - throw new AxisFault("Unknown content rule type '" + name + "'"); - } - } - - public static ContentTypeRuleSet parse(Parameter param) throws AxisFault { - ContentTypeRuleSet ruleSet = new ContentTypeRuleSet(); - Object value = param.getValue(); - if (value instanceof OMElement) { - OMElement element = (OMElement)value; - - // DescriptionBuilder#processParameters actually sets the parameter element - // itself as the value. We need to support this case. - // TODO: seems like a bug in Axis2 and is inconsistent with Synapse's way of parsing parameter in proxy definitions - if (element == param.getParameterElement()) { - element = element.getFirstElement(); - } - - if (element.getLocalName().equals("rules")) { - for (Iterator it = element.getChildElements(); it.hasNext(); ) { - ruleSet.addRule(parse((OMElement)it.next())); - } - } else { - throw new AxisFault("Expected <rules> element"); - } - } else { - ruleSet.addRule(new DefaultRule((String)value)); - } - return ruleSet; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java deleted file mode 100644 index 90383a42f8..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java +++ /dev/null @@ -1,64 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.JMSException; -import javax.jms.Message; - -/** - * A set of content type rules. - */ -public class ContentTypeRuleSet { - private final List<ContentTypeRule> rules = new ArrayList<ContentTypeRule>(); - private String defaultContentTypeProperty; - - /** - * Add a content type rule to this set. - * - * @param rule the rule to add - */ - public void addRule(ContentTypeRule rule) { - rules.add(rule); - if (defaultContentTypeProperty == null) { - defaultContentTypeProperty = rule.getExpectedContentTypeProperty(); - } - } - - /** - * Determine the content type of the given message. - * This method will try the registered rules in turn until the first rule matches. - * - * @param message the message - * @return the content type information for the message or null if none of the rules matches - * @throws JMSException - */ - public ContentTypeInfo getContentTypeInfo(Message message) throws JMSException { - for (ContentTypeRule rule : rules) { - ContentTypeInfo contentTypeInfo = rule.getContentType(message); - if (contentTypeInfo != null) { - return contentTypeInfo; - } - } - return null; - } - - public String getDefaultContentTypeProperty() { - return defaultContentTypeProperty; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java deleted file mode 100644 index a158f6ec74..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java +++ /dev/null @@ -1,37 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import javax.jms.Message; - -/** - * Content type rule that always matches and that returns a fixed (default) content type. - */ -public class DefaultRule implements ContentTypeRule { - private final String contentType; - - public DefaultRule(String contentType) { - this.contentType = contentType; - } - - public ContentTypeInfo getContentType(Message message) { - return new ContentTypeInfo(null, contentType); - } - - public String getExpectedContentTypeProperty() { - return null; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java deleted file mode 100644 index cb25ab93d4..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import javax.jms.Message; - -/** - * Content type rule that matches a given message type and returns a fixed content type. - */ -public class MessageTypeRule implements ContentTypeRule { - private final Class<? extends Message> messageType; - private final String contentType; - - public MessageTypeRule(Class<? extends Message> messageType, String contentType) { - this.messageType = messageType; - this.contentType = contentType; - } - - public ContentTypeInfo getContentType(Message message) { - return messageType.isInstance(message) ? new ContentTypeInfo(null, contentType) : null; - } - - public String getExpectedContentTypeProperty() { - return null; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java deleted file mode 100644 index c8d13ba462..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; - -import javax.jms.JMSException; -import javax.jms.Message; - -/** - * Content type rule that attempts to extract the content type from a message property. - */ -public class PropertyRule implements ContentTypeRule { - private final String propertyName; - - public PropertyRule(String propertyName) { - this.propertyName = propertyName; - } - - public ContentTypeInfo getContentType(Message message) throws JMSException { - String value = message.getStringProperty(propertyName); - return value == null ? null : new ContentTypeInfo(propertyName, value); - } - - public String getExpectedContentTypeProperty() { - return propertyName; - } -} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java deleted file mode 100644 index 750170edd7..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -/** - * Provides classes and interfaces to define content type rules. - * - * Content type rules are used to determine the content type of a - * received message based on JMS properties, message type, etc. - */ -package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
\ No newline at end of file diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html deleted file mode 100644 index b95b49eb69..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html +++ /dev/null @@ -1,356 +0,0 @@ -<html> -<title>JMS Transport Configuration</title> -<body> - -<h2>JMS Listener Configuration (axis2.xml)</h2> - -e.g: - -<pre> - <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> - <parameter name="myTopicConnectionFactory"> - <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> - <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> - <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter> - <parameter name="transport.jms.ConnectionFactoryType">topic</parameter> - <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter> - </parameter> - - <parameter name="myQueueConnectionFactory"> - <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> - <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> - <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter> - <parameter name="transport.jms.ConnectionFactoryType">queue</parameter> - <parameter name="transport.jms.JMSSpecVersion">1.1</parameter> - </parameter> - - <parameter name="default"> - <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> - <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> - <parameter name="transport.jms.ConnectionFactoryJNDIName">ConnectionFactory</parameter> - </parameter> - </transportReceiver> - - <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"> - <parameter name="myTopicConnectionFactory"> - <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> - <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> - <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter> - <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter> - <parameter name="transport.jms.CacheLevel">producer</parameter> - </parameter> - - <parameter name="myQueueConnectionFactory"> - <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> - <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> - <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter> - <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter> - <parameter name="transport.jms.CacheLevel">producer</parameter> - </parameter> - - <parameter name="default"> - <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> - <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> - <parameter name="transport.jms.ConnectionFactoryJNDIName">ConnectionFactory</parameter> - <parameter name="transport.jms.CacheLevel">connection</parameter> - </parameter> - </transportSender> -</pre> - -<p> - The Transport Listener and Sender both allows the user to configure one or more logical JMS Connection - Factories, which are named definitions as shown above. Any remaining parameters maybe defined at the - service level via the services.xml. The applicable set of parameters for a service would be the - union of the properties from the services.xml and the corresponding logical connection factory. -</p> - -<TABLE WIDTH="100%" BORDER=1 BORDERCOLOR="#000000" CELLPADDING=4 CELLSPACING=0> - <COL WIDTH="10%"> - <COL WIDTH="20%"> - <COL WIDTH="60%"> - <COL WIDTH="5%"> - <COL WIDTH="5%"> - <tr> - <td>Transport level</td> - <td><BR></td> - <td><BR></td> - <td>Listening</td> - <td>Sending</td> - </tr> - <tr> - <td>JNDI</td> - <td>java.naming.factory.initial</td> - <td>The JNDI InitialContext factory class</td> - <td>Required</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>java.naming.provider.url</td> - <td>JNDI Provider URL</td> - <td>Required</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>java.naming.security.principal</td> - <td>Username for JNDI access</td> - <td><BR></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>java.naming.security.credentials</td> - <td>Password for JNDI access</td> - <td><BR></td> - <td><BR></td> - </tr> - <tr> - <td>Transactions</td> - <td>transport.Transactionality</td> - <td>Desired transactionality. One of none / local / jta</td> - <td>Defaults to <B>none</B></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.UserTxnJNDIName</td> - <td>JNDI name to be used to obtain a UserTransaction</td> - <td>Defaults to "java:comp/UserTransaction"</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.CacheUserTxn</td> - <td>Generally its safe and more efficient to cache the - UserTransaction reference from JNDI. One of true/ false</td> - <td>Defaults to <B>true</B></td> - <td><BR></td> - </tr> - - <tr> - <td><BR></td> - <td>transport.jms.SessionTransacted</td> - <td>Should the JMS Session be transacted. One of true/ false</td> - <td>Defaults to <B>true</B> when local transactions are used</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.SessionAcknowledgement</td> - <td>JMS Session acknowledgement mode to be used. One of AUTO_ACKNOWLEDGE | CLIENT_ACKNOWLEDGE | DUPS_OK_ACKNOWLEDGE | SESSION_TRANSACTED</td> - <td>Defaults to <B>AUTO_ACKNOWLEDGE</B></td> - <td><BR></td> - </tr> - - <tr> - <td>Connection</td> - <td>transport.jms.ConnectionFactory</td> - <td>Name of the logical connection factory this service will use</td> - <td>Defaults to "default"</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.ConnectionFactoryJNDIName</td> - <td>The JNDI name of the JMS ConnectionFactory</td> - <td>Required</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.ConnectionFactoryType</td> - <td> Type of ConnectionFactory – queue / topic</td> - <td>Suggested to be specified</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.JMSSpecVersion</td> - <td>JMS API Version One of "1.1" or "1.0.2b"</td> - <td>Defaults to 1.1</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.UserName</td> - <td>The JMS connection username</td> - <td><BR></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.Password</td> - <td>The JMS connection password</td> - <td><BR></td> - <td><BR></td> - </tr> - <tr> - <td>Destinations</td> - <td>transport.jms.Destination</td> - <td>JNDI Name of the Destination </td> - <td>Defaults to Service name</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.DestinationType</td> - <td>Type of Destination – queue / topic</td> - <td>Defaults to a queue</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.DefaultReplyDestination</td> - <td>JNDI Name of the default reply Destination</td> - <td><BR></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.DefaultReplyDestinationType</td> - <td>Type of the reply Destination – queue / topic</td> - <td>Same type as of Destination</td> - <td><BR></td> - </tr> - <tr> - <td>Advanced</td> - <td>transport.jms.MessageSelector</td> - <td>Optional message selector to be applied</td> - <td><BR></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.SubscriptionDurable</td> - <td>Is the subscription durable? (For Topics) – true / false</td> - <td>Defaults to <B>false</B></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.DurableSubscriberName</td> - <td>Name to be used for the durable subscription</td> - <td>Required when subscription is durable</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.PubSubNoLocal</td> - <td>Should messages published by the same connection (for Topics) - be received? – true / false</td> - <td>Defaults to <B>false</B></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.CacheLevel</td> - <td>The JMS resource cache level. One of none / connection / - session / consumer / producer / auto</td> - <td>Defaults to <B>auto</B> </td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.ReceiveTimeout</td> - <td>Time to wait for a JMS message during polling. Negative means - wait forever, while 0 means do not wait at all. Anything else, is - a millisecond value for the poll</td> - <td>Defaults to 1000ms</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.ConcurrentConsumers</td> - <td>Number of concurrent consumer tasks (~threads) to be started to - poll for messages for this service. For Topics, this should be - always 1, to prevent the same message being processed multiple - times</td> - <td>Defaults to <B>1</B></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.MaxConcurrentConsumers</td> - <td>Will dynamically scale the number of concurrent consumer tasks - (~threads) until this value; as the load increases. Should always - be 1 for Topics.</td> - <td>Defaults to <B>1</B></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.IdleTaskLimit</td> - <td>The number of idle (i.e. poll without receipt of a message) - runs per task, before it dies – to recycle resources, and to - allow dynamic scale down.</td> - <td>Defaults to 10</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.MaxMessagesPerTask</td> - <td>The maximum number of successful message receipts to limit per - Task lifetime. </td> - <td>Defaults to <B>–1</B> which implies unlimited messages</td> - <td><BR></td> - </tr> - <tr> - <td>Reconnection</td> - <td>transport.jms.InitialReconnectDuration</td> - <td>Initial reconnection attempt duration</td> - <td>Defaults to 10,000ms</td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.ReconnectProgressFactor</td> - <td>Factor used to compute consecutive reconnection attempt - durations, in a geometric series</td> - <td>Defaults to <B>2 (i.e. exponential)</B></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.MaxReconnectDuration</td> - <td>Maximum limit for a reconnection duration</td> - <td>Defaults to <B>1 hour</B></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>transport.jms.PublishEPR</td> - <td>One or more JMS URL's to be showed as the JMS EPRs on the WSDL - for the service. Allows the specification of a custom EPR, and/or - hiding of internal properties from a public EPR (e.g. - credentials). Add one as LEGACY to retain auto generated EPR, when - adding new EPRs</td> - <td><BR></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td><BR></td> - <td><BR></td> - <td><BR></td> - <td><BR></td> - </tr> - <tr> - <td>Legacy Mode and Payload handling</td> - <td>Wrapper</td> - <td>Binary and Text payload wrapper element to be specified as "{ns}name" where ns refers to a namespace and name the name of the element</td> - <td>Default binary wrapper<ul><li>{http://ws.apache.org/commons/ns/payload}binary</li></ul> - Default text wrapper <ul><li>{http://ws.apache.org/commons/ns/payload}text</li></ul></td> - <td><BR></td> - </tr> - <tr> - <td><BR></td> - <td>Operation</td> - <td>operation name to be specified as "{ns}name" where ns refers to the namespace and name the name of the operation</td> - <td>Defaults to urn:mediate</td> - <td><BR></td> - </tr> -</TABLE> - -</body> -</html>
\ No newline at end of file |