diff options
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms')
25 files changed, 5885 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java new file mode 100644 index 0000000000..ec53a2a1ca --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java @@ -0,0 +1,31 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +public class AxisJMSException extends RuntimeException { + + AxisJMSException() { + super(); + } + + AxisJMSException(String msg) { + super(msg); + } + + AxisJMSException(String msg, Exception e) { + super(msg, e); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java new file mode 100644 index 0000000000..5228efa154 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageDataSource.java @@ -0,0 +1,72 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; + +/** + * Data source implementation wrapping a JMS {@link BytesMessage}. + * <p> + * Note that two input streams created by the same instance of this + * class can not be used at the same time. + */ +public class BytesMessageDataSource implements SizeAwareDataSource { + private final BytesMessage message; + private final String contentType; + + public BytesMessageDataSource(BytesMessage message, String contentType) { + this.message = message; + this.contentType = contentType; + } + + public BytesMessageDataSource(BytesMessage message) { + this(message, "application/octet-stream"); + } + + public long getSize() { + try { + return message.getBodyLength(); + } catch (JMSException ex) { + throw new RuntimeException(ex); + } + } + + public String getContentType() { + return contentType; + } + + public InputStream getInputStream() throws IOException { + try { + message.reset(); + } catch (JMSException ex) { + throw new JMSExceptionWrapper(ex); + } + return new BytesMessageInputStream(message); + } + + public String getName() { + return null; + } + + public OutputStream getOutputStream() throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java new file mode 100644 index 0000000000..9080641572 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageInputStream.java @@ -0,0 +1,75 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.io.InputStream; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; + +/** + * Input stream that reads data from a JMS {@link BytesMessage}. + * Note that since the current position in the message is managed by + * the underlying {@link BytesMessage} object, it is not possible to + * use several instances of this class operating on a single + * {@link BytesMessage} at the same time. + */ +public class BytesMessageInputStream extends InputStream { + private final BytesMessage message; + + public BytesMessageInputStream(BytesMessage message) { + this.message = message; + } + + @Override + public int read() throws JMSExceptionWrapper { + try { + return message.readByte() & 0xFF; + } catch (MessageEOFException ex) { + return -1; + } catch (JMSException ex) { + throw new JMSExceptionWrapper(ex); + } + } + + @Override + public int read(byte[] b, int off, int len) throws JMSExceptionWrapper { + if (off == 0) { + try { + return message.readBytes(b, len); + } catch (JMSException ex) { + throw new JMSExceptionWrapper(ex); + } + } else { + byte[] b2 = new byte[len]; + int c = read(b2); + if (c > 0) { + System.arraycopy(b2, 0, b, off, c); + } + return c; + } + } + + @Override + public int read(byte[] b) throws JMSExceptionWrapper { + try { + return message.readBytes(b); + } catch (JMSException ex) { + throw new JMSExceptionWrapper(ex); + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java new file mode 100644 index 0000000000..4508d68280 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/BytesMessageOutputStream.java @@ -0,0 +1,56 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.io.OutputStream; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; + +public class BytesMessageOutputStream extends OutputStream { + private final BytesMessage message; + + public BytesMessageOutputStream(BytesMessage message) { + this.message = message; + } + + @Override + public void write(int b) throws JMSExceptionWrapper { + try { + message.writeByte((byte)b); + } catch (JMSException ex) { + throw new JMSExceptionWrapper(ex); + } + } + + @Override + public void write(byte[] b, int off, int len) throws JMSExceptionWrapper { + try { + message.writeBytes(b, off, len); + } catch (JMSException ex) { + new JMSExceptionWrapper(ex); + } + } + + @Override + public void write(byte[] b) throws JMSExceptionWrapper { + try { + message.writeBytes(b); + } catch (JMSException ex) { + throw new JMSExceptionWrapper(ex); + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java new file mode 100644 index 0000000000..d5d164ce76 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java @@ -0,0 +1,393 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.Hashtable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterIncludeImpl; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Encapsulate a JMS Connection factory definition within an Axis2.xml + * + * JMS Connection Factory definitions, allows JNDI properties as well as other service + * level parameters to be defined, and re-used by each service that binds to it + * + * When used for sending messages out, the JMSConnectionFactory'ies are able to cache + * a Connection, Session or Producer + */ +public class JMSConnectionFactory { + + private static final Log log = LogFactory.getLog(JMSConnectionFactory.class); + + /** The name used for the connection factory definition within Axis2 */ + private String name = null; + /** The list of parameters from the axis2.xml definition */ + private Hashtable<String, String> parameters = new Hashtable<String, String>(); + + /** The cached InitialContext reference */ + private Context context = null; + /** The JMS ConnectionFactory this definition refers to */ + private ConnectionFactory conFactory = null; + /** The shared JMS Connection for this JMS connection factory */ + private Connection sharedConnection = null; + /** The shared JMS Session for this JMS connection factory */ + private Session sharedSession = null; + /** The shared JMS MessageProducer for this JMS connection factory */ + private MessageProducer sharedProducer = null; + /** The Shared Destination */ + private Destination sharedDestination = null; + /** The shared JMS connection for this JMS connection factory */ + private int cacheLevel = JMSConstants.CACHE_CONNECTION; + + /** + * Digest a JMS CF definition from an axis2.xml 'Parameter' and construct + * @param parameter the axis2.xml 'Parameter' that defined the JMS CF + */ + public JMSConnectionFactory(Parameter parameter) { + + this.name = parameter.getName(); + ParameterIncludeImpl pi = new ParameterIncludeImpl(); + + try { + pi.deserializeParameters((OMElement) parameter.getValue()); + } catch (AxisFault axisFault) { + handleException("Error reading parameters for JMS connection factory" + name, axisFault); + } + + for (Object o : pi.getParameters()) { + Parameter p = (Parameter) o; + parameters.put(p.getName(), (String) p.getValue()); + } + + digestCacheLevel(); + try { + context = new InitialContext(parameters); + conFactory = JMSUtils.lookup(context, ConnectionFactory.class, + parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)); + if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) { + sharedDestination = JMSUtils.lookup(context, Destination.class, + parameters.get(JMSConstants.PARAM_DESTINATION)); + } + log.info("JMS ConnectionFactory : " + name + " initialized"); + + } catch (NamingException e) { + throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " + + parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " + + parameters.get(JMSConstants.PARAM_DESTINATION) + + " for JMS CF : " + name + " using : " + parameters); + } + } + + /** + * Digest, the cache value iff specified + */ + private void digestCacheLevel() { + + String key = JMSConstants.PARAM_CACHE_LEVEL; + String val = parameters.get(key); + + if ("none".equalsIgnoreCase(val)) { + this.cacheLevel = JMSConstants.CACHE_NONE; + } else if ("connection".equalsIgnoreCase(val)) { + this.cacheLevel = JMSConstants.CACHE_CONNECTION; + } else if ("session".equals(val)){ + this.cacheLevel = JMSConstants.CACHE_SESSION; + } else if ("producer".equals(val)) { + this.cacheLevel = JMSConstants.CACHE_PRODUCER; + } else if (val != null) { + throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name); + } + } + + /** + * Return the name assigned to this JMS CF definition + * @return name of the JMS CF + */ + public String getName() { + return name; + } + + /** + * The list of properties (including JNDI and non-JNDI) + * @return properties defined on the JMS CF + */ + public Hashtable<String, String> getParameters() { + return parameters; + } + + /** + * Get cached InitialContext + * @return cache InitialContext + */ + public Context getContext() { + return context; + } + + /** + * Cache level applicable for this JMS CF + * @return applicable cache level + */ + public int getCacheLevel() { + return cacheLevel; + } + + /** + * Get the shared Destination - if defined + * @return + */ + public Destination getSharedDestination() { + return sharedDestination; + } + + /** + * Lookup a Destination using this JMS CF definitions and JNDI name + * @param name JNDI name of the Destionation + * @return JMS Destination for the given JNDI name or null + */ + public Destination getDestination(String name) { + try { + return JMSUtils.lookup(context, Destination.class, name); + } catch (NamingException e) { + handleException("Unknown JMS Destination : " + name + " using : " + parameters, e); + } + return null; + } + + /** + * Get the reply Destination from the PARAM_REPLY_DESTINATION parameter + * @return reply destination defined in the JMS CF + */ + public String getReplyToDestination() { + return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION); + } + + private void handleException(String msg, Exception e) { + log.error(msg, e); + throw new AxisJMSException(msg, e); + } + + /** + * Should the JMS 1.1 API be used? - defaults to yes + * @return true, if JMS 1.1 api should be used + */ + public boolean isJmsSpec11() { + return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null || + "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER)); + } + + /** + * Return the type of the JMS CF Destination + * @return TRUE if a Queue, FALSE for a Topic and NULL for a JMS 1.1 Generic Destination + */ + public Boolean isQueue() { + if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null && + parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) { + return null; + } + + if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) { + if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { + return true; + } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { + return false; + } else { + throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " + + parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name); + } + } else { + if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { + return true; + } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { + return false; + } else { + throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " + + parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name); + } + } + } + + /** + * Is a session transaction requested from users of this JMS CF? + * @return session transaction required by the clients of this? + */ + private boolean isSessionTransacted() { + return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null || + Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED)); + } + + /** + * Create a new Connection + * @return a new Connection + */ + private Connection createConnection() { + + Connection connection = null; + try { + connection = JMSUtils.createConnection( + conFactory, + parameters.get(JMSConstants.PARAM_JMS_USERNAME), + parameters.get(JMSConstants.PARAM_JMS_PASSWORD), + isJmsSpec11(), isQueue()); + + if (log.isDebugEnabled()) { + log.debug("New JMS Connection from JMS CF : " + name + " created"); + } + + } catch (JMSException e) { + handleException("Error acquiring a Connection from the JMS CF : " + name + + " using properties : " + parameters, e); + } + return connection; + } + + /** + * Create a new Session + * @param connection Connection to use + * @return A new Session + */ + private Session createSession(Connection connection) { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS Session from JMS CF : " + name); + } + return JMSUtils.createSession( + connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue()); + + } catch (JMSException e) { + handleException("Error creating JMS session from JMS CF : " + name, e); + } + return null; + } + + /** + * Create a new MessageProducer + * @param session Session to be used + * @param destination Destination to be used + * @return a new MessageProducer + */ + private MessageProducer createProducer(Session session, Destination destination) { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS MessageProducer from JMS CF : " + name); + } + + return JMSUtils.createProducer( + session, destination, isQueue(), isJmsSpec11()); + + } catch (JMSException e) { + handleException("Error creating JMS producer from JMS CF : " + name,e); + } + return null; + } + + /** + * Get a new Connection or shared Connection from this JMS CF + * @return new or shared Connection from this JMS CF + */ + public Connection getConnection() { + if (cacheLevel > JMSConstants.CACHE_NONE) { + return getSharedConnection(); + } else { + return createConnection(); + } + } + + /** + * Get a new Session or shared Session from this JMS CF + * @param connection the Connection to be used + * @return new or shared Session from this JMS CF + */ + public Session getSession(Connection connection) { + if (cacheLevel > JMSConstants.CACHE_CONNECTION) { + return getSharedSession(); + } else { + return createSession((connection == null ? getConnection() : connection)); + } + } + + /** + * Get a new MessageProducer or shared MessageProducer from this JMS CF + * @param connection the Connection to be used + * @param session the Session to be used + * @param destination the Destination to bind MessageProducer to + * @return new or shared MessageProducer from this JMS CF + */ + public MessageProducer getMessageProducer( + Connection connection, Session session, Destination destination) { + if (cacheLevel > JMSConstants.CACHE_SESSION) { + return getSharedProducer(); + } else { + return createProducer((session == null ? getSession(connection) : session), destination); + } + } + + /** + * Get a new Connection or shared Connection from this JMS CF + * @return new or shared Connection from this JMS CF + */ + private Connection getSharedConnection() { + if (sharedConnection == null) { + sharedConnection = createConnection(); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS Connection for JMS CF : " + name); + } + } + return sharedConnection; + } + + /** + * Get a shared Session from this JMS CF + * @return shared Session from this JMS CF + */ + private Session getSharedSession() { + if (sharedSession == null) { + sharedSession = createSession(getSharedConnection()); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS Session for JMS CF : " + name); + } + } + return sharedSession; + } + + /** + * Get a shared MessageProducer from this JMS CF + * @return shared MessageProducer from this JMS CF + */ + private MessageProducer getSharedProducer() { + if (sharedProducer == null) { + sharedProducer = createProducer(getSharedSession(), sharedDestination); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS MessageConsumer for JMS CF : " + name); + } + } + return sharedProducer; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java new file mode 100644 index 0000000000..fb16500efc --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactoryManager.java @@ -0,0 +1,122 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.HashMap; +import java.util.Map; + +import javax.naming.Context; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterInclude; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Class managing a set of {@link JMSConnectionFactory} objects. + */ +public class JMSConnectionFactoryManager { + + private static final Log log = LogFactory.getLog(JMSConnectionFactoryManager.class); + + /** A Map containing the JMS connection factories managed by this, keyed by name */ + private final Map<String,JMSConnectionFactory> connectionFactories = + new HashMap<String,JMSConnectionFactory>(); + + /** + * Construct a Connection factory manager for the JMS transport sender or receiver + * @param trpInDesc + */ + public JMSConnectionFactoryManager(ParameterInclude trpInDesc) { + loadConnectionFactoryDefinitions(trpInDesc); + } + + /** + * Create JMSConnectionFactory instances for the definitions in the transport configuration, + * and add these into our collection of connectionFactories map keyed by name + * + * @param trpDesc the transport description for JMS + */ + private void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) { + + for (Object o : trpDesc.getParameters()) { + Parameter p = (Parameter)o; + try { + JMSConnectionFactory jmsConFactory = new JMSConnectionFactory(p); + connectionFactories.put(jmsConFactory.getName(), jmsConFactory); + } catch (AxisJMSException e) { + log.error("Error setting up connection factory : " + p.getName(), e); + } + } + } + + /** + * Get the JMS connection factory with the given name. + * + * @param name the name of the JMS connection factory + * @return the JMS connection factory or null if no connection factory with + * the given name exists + */ + public JMSConnectionFactory getJMSConnectionFactory(String name) { + return connectionFactories.get(name); + } + + /** + * Get the JMS connection factory that matches the given properties, i.e. referring to + * the same underlying connection factory. Used by the JMSSender to determine if already + * available resources should be used for outgoing messages + * + * @param props a Map of connection factory JNDI properties and name + * @return the JMS connection factory or null if no connection factory compatible + * with the given properties exists + */ + public JMSConnectionFactory getJMSConnectionFactory(Map<String,String> props) { + for (JMSConnectionFactory cf : connectionFactories.values()) { + Map<String,String> cfProperties = cf.getParameters(); + + if (equals(props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME), + cfProperties.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)) + && + equals(props.get(Context.INITIAL_CONTEXT_FACTORY), + cfProperties.get(Context.INITIAL_CONTEXT_FACTORY)) + && + equals(props.get(Context.PROVIDER_URL), + cfProperties.get(Context.PROVIDER_URL)) + && + equals(props.get(Context.SECURITY_PRINCIPAL), + cfProperties.get(Context.SECURITY_PRINCIPAL)) + && + equals(props.get(Context.SECURITY_CREDENTIALS), + cfProperties.get(Context.SECURITY_CREDENTIALS))) { + return cf; + } + } + return null; + } + + /** + * Compare two values preventing NPEs + */ + private static boolean equals(Object s1, Object s2) { + return s1 == s2 || s1 != null && s1.equals(s2); + } + + protected void handleException(String msg, Exception e) throws AxisFault { + log.error(msg, e); + throw new AxisFault(msg, e); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java new file mode 100644 index 0000000000..6a11201625 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConstants.java @@ -0,0 +1,273 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import org.apache.axis2.client.Options; + +public class JMSConstants { + + /** + * The prefix indicating an Axis JMS URL + */ + public static final String JMS_PREFIX = "jms:/"; + + //------------------------------------ defaults / constants ------------------------------------ + /** + * The local (Axis2) JMS connection factory name of the default connection + * factory to be used, if a service does not explicitly state the connection + * factory it should be using by a Parameter named JMSConstants.CONFAC_PARAM + */ + public static final String DEFAULT_CONFAC_NAME = "default"; + /** + * The default JMS time out waiting for a reply - also see {@link JMS_WAIT_REPLY} + */ + public static final long DEFAULT_JMS_TIMEOUT = Options.DEFAULT_TIMEOUT_MILLISECONDS; + /** + * Value indicating a Queue used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} + */ + public static final String DESTINATION_TYPE_QUEUE = "queue"; + /** + * Value indicating a Topic used for {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} + */ + public static final String DESTINATION_TYPE_TOPIC = "topic"; + /** + * Value indicating a JMS 1.1 Generic Destination used by {@link DEST_PARAM_TYPE}, {@link REPLY_PARAM_TYPE} + */ + public static final String DESTINATION_TYPE_GENERIC = "generic"; + + /** Do not cache any JMS resources between tasks (when sending) or JMS CF's (when sending) */ + public static final int CACHE_NONE = 0; + /** Cache only the JMS connection between tasks (when receiving), or JMS CF's (when sending)*/ + public static final int CACHE_CONNECTION = 1; + /** Cache only the JMS connection and Session between tasks (receiving), or JMS CF's (sending) */ + public static final int CACHE_SESSION = 2; + /** Cache the JMS connection, Session and Consumer between tasks when receiving*/ + public static final int CACHE_CONSUMER = 3; + /** Cache the JMS connection, Session and Producer within a JMSConnectionFactory when sending */ + public static final int CACHE_PRODUCER = 4; + /** automatic choice of an appropriate caching level (depending on the transaction strategy) */ + public static final int CACHE_AUTO = 5; + + /** A JMS 1.1 Generic Destination type or ConnectionFactory */ + public static final int GENERIC = 0; + /** A Queue Destination type or ConnectionFactory */ + public static final int QUEUE = 1; + /** A Topic Destination type or ConnectionFactory */ + public static final int TOPIC = 2; + + /** + * The EPR parameter name indicating the name of the message level property that indicated the content type. + */ + public static final String CONTENT_TYPE_PROPERTY_PARAM = "transport.jms.ContentTypeProperty"; + + //---------------------------------- services.xml parameters ----------------------------------- + /** + * The Service level Parameter name indicating the JMS destination for requests of a service + */ + public static final String PARAM_DESTINATION = "transport.jms.Destination"; + /** + * The Service level Parameter name indicating the destination type for requests. + * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC} + */ + public static final String PARAM_DEST_TYPE = "transport.jms.DestinationType"; + /** + * The Service level Parameter name indicating the [default] response destination of a service + */ + public static final String PARAM_REPLY_DESTINATION = "transport.jms.ReplyDestination"; + /** + * The Service level Parameter name indicating the response destination type + * also see {@link DESTINATION_TYPE_QUEUE}, {@link DESTINATION_TYPE_TOPIC} + */ + public static final String PARAM_REPLY_DEST_TYPE = "transport.jms.ReplyDestinationType"; + /** + * The Parameter name of an Axis2 service, indicating the JMS connection + * factory which should be used to listen for messages for it. This is + * the local (Axis2) name of the connection factory and not the JNDI name + */ + public static final String PARAM_JMS_CONFAC = "transport.jms.ConnectionFactory"; + /** + * Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC + */ + public static final String PARAM_CONFAC_TYPE = "transport.jms.ConnectionFactoryType"; + /** + * The Parameter name indicating the JMS connection factory JNDI name + */ + public static final String PARAM_CONFAC_JNDI_NAME = "transport.jms.ConnectionFactoryJNDIName"; + /** + * The Parameter indicating the expected content type for messages received by the service. + */ + public static final String CONTENT_TYPE_PARAM = "transport.jms.ContentType"; + /** + * The Parameter indicating a final EPR as a String, to be published on the WSDL of a service + * Could occur more than once, and could provide additional connection properties or a subset + * of the properties auto computed. Also could replace IP addresses with hostnames, and expose + * public credentials clients. If a user specified this parameter, the auto generated EPR will + * not be exposed - unless an instance of this parameter is added with the string "legacy" + * This parameter could be used to expose EPR's conforming to the proposed SOAP/JMS spec + * until such time full support is implemented for it. + */ + public static final String PARAM_PUBLISH_EPR = "transport.jms.PublishEPR"; + /** The parameter indicating the JMS API specification to be used - if this is "1.1" the JMS + * 1.1 API would be used, else the JMS 1.0.2B + */ + public static final String PARAM_JMS_SPEC_VER = "transport.jms.JMSSpecVersion"; + + /** + * The Parameter indicating whether the JMS Session should be transacted for the service + * Specified as a "true" or "false" + */ + public static final String PARAM_SESSION_TRANSACTED = "transport.jms.SessionTransacted"; + /** + * The Parameter indicating the Session acknowledgement for the service. Must be one of the + * following Strings, or the appropriate Integer used by the JMS API + * "AUTO_ACKNOWLEDGE", "CLIENT_ACKNOWLEDGE", "DUPS_OK_ACKNOWLEDGE" or "SESSION_TRANSACTED" + */ + public static final String PARAM_SESSION_ACK = "transport.jms.SessionAcknowledgement"; + /** A message selector to be used when messages are sought for this service */ + public static final String PARAM_MSG_SELECTOR = "transport.jms.MessageSelector"; + /** Is the Subscription durable ? - "true" or "false" See {@link PARAM_DURABLE_SUB_NAME} */ + public static final String PARAM_SUB_DURABLE = "transport.jms.SubscriptionDurable"; + /** The name for the durable subscription See {@link PARAM_SUB_DURABLE}*/ + public static final String PARAM_DURABLE_SUB_NAME = "transport.jms.DurableSubscriberName"; + /** + * JMS Resource cachable level to be used for the service One of the following: + * {@link CACHE_NONE}, {@link CACHE_CONNECTION}, {@link CACHE_SESSION}, {@link CACHE_PRODUCER}, + * {@link CACHE_CONSUMER}, or {@link CACHE_AUTO} - to let the transport decide + */ + public static final String PARAM_CACHE_LEVEL = "transport.jms.CacheLevel"; + /** Should a pub-sub connection receive messages published by itself? */ + public static final String PARAM_PUBSUB_NO_LOCAL = "transport.jms.PubSubNoLocal"; + /** + * The number of milliseconds to wait for a message on a consumer.receive() call + * negative number - wait forever + * 0 - do not wait at all + * positive number - indicates the number of milliseconds to wait + */ + public static final String PARAM_RCV_TIMEOUT = "transport.jms.ReceiveTimeout"; + /** + *The number of concurrent consumers to be created to poll for messages for this service + * For Topics, this should be ONE, to prevent receipt of multiple copies of the same message + */ + public static final String PARAM_CONCURRENT_CONSUMERS = "transport.jms.ConcurrentConsumers"; + /** + * The maximum number of concurrent consumers for the service - See {@link PARAM_CONCURRENT_CONSUMERS} + */ + public static final String PARAM_MAX_CONSUMERS = "transport.jms.MaxConcurrentConsumers"; + /** + * The number of idle (i.e. message-less) polling attempts before a worker task commits suicide, + * to scale down resources, as load decreases + */ + public static final String PARAM_IDLE_TASK_LIMIT = "transport.jms.IdleTaskLimit"; + /** + * The maximum number of messages a polling worker task should process, before suicide - to + * prevent many longer running threads - default is unlimited (i.e. a worker task will live forever) + */ + public static final String PARAM_MAX_MSGS_PER_TASK = "transport.jms.MaxMessagesPerTask"; + /** + * Number of milliseconds before the first reconnection attempt is tried, on detection of an + * error. Subsequent retries follow a geometric series, where the + * duration = previous duration * factor + * This is further limited by the {@link PARAM_RECON_MAX_DURATION} to be meaningful + */ + public static final String PARAM_RECON_INIT_DURATION = "transport.jms.InitialReconnectDuration"; + /** @see PARAM_RECON_INIT_DURATION */ + public static final String PARAM_RECON_FACTOR = "transport.jms.ReconnectProgressFactor"; + /** @see PARAM_RECON_INIT_DURATION */ + public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration"; + + /** The username to use when obtaining a JMS Connection */ + public static final String PARAM_JMS_USERNAME = "transport.jms.UserName"; + /** The password to use when obtaining a JMS Connection */ + public static final String PARAM_JMS_PASSWORD = "transport.jms.Password"; + + //-------------- message context / transport header properties and client options -------------- + /** + * A MessageContext property or client Option indicating the JMS message type + */ + public static final String JMS_MESSAGE_TYPE = "JMS_MESSAGE_TYPE"; + /** + * The message type indicating a BytesMessage. See {@link JMS_MESSAGE_TYPE} + */ + public static final String JMS_BYTE_MESSAGE = "JMS_BYTE_MESSAGE"; + /** + * The message type indicating a TextMessage. See {@link JMS_MESSAGE_TYPE} + */ + public static final String JMS_TEXT_MESSAGE = "JMS_TEXT_MESSAGE"; + /** + * A MessageContext property or client Option indicating the time to wait for a response JMS message + */ + public static final String JMS_WAIT_REPLY = "JMS_WAIT_REPLY"; + /** + * A MessageContext property or client Option indicating the JMS correlation id + */ + public static final String JMS_COORELATION_ID = "JMS_COORELATION_ID"; + /** + * A MessageContext property or client Option indicating the JMS message id + */ + public static final String JMS_MESSAGE_ID = "JMS_MESSAGE_ID"; + /** + * A MessageContext property or client Option indicating the JMS delivery mode as an Integer or String + * Value 1 - javax.jms.DeliveryMode.NON_PERSISTENT + * Value 2 - javax.jms.DeliveryMode.PERSISTENT + */ + public static final String JMS_DELIVERY_MODE = "JMS_DELIVERY_MODE"; + /** + * A MessageContext property or client Option indicating the JMS destination to use on a Send + */ + public static final String JMS_DESTINATION = "JMS_DESTINATION"; + /** + * A MessageContext property or client Option indicating the JMS message expiration - a Long value + * specified as a String + */ + public static final String JMS_EXPIRATION = "JMS_EXPIRATION"; + /** + * A MessageContext property indicating if the message is a redelivery (Boolean as a String) + */ + public static final String JMS_REDELIVERED = "JMS_REDELIVERED"; + /** + * A MessageContext property or client Option indicating the JMS replyTo Destination + */ + public static final String JMS_REPLY_TO = "JMS_REPLY_TO"; + /** + * A MessageContext property or client Option indicating the JMS replyTo Destination type + * See {@link DESTINATION_TYPE_QUEUE} and {@link DESTINATION_TYPE_TOPIC} + */ + public static final String JMS_REPLY_TO_TYPE = "JMS_REPLY_TO_TYPE"; + /** + * A MessageContext property or client Option indicating the JMS timestamp (Long specified as String) + */ + public static final String JMS_TIMESTAMP = "JMS_TIMESTAMP"; + /** + * A MessageContext property indicating the JMS type String returned by {@link javax.jms.Message.getJMSType()} + */ + public static final String JMS_TYPE = "JMS_TYPE"; + /** + * A MessageContext property or client Option indicating the JMS priority + */ + public static final String JMS_PRIORITY = "JMS_PRIORITY"; + /** + * A MessageContext property or client Option indicating the JMS time to live for message sent + */ + public static final String JMS_TIME_TO_LIVE = "JMS_TIME_TO_LIVE"; + + /** The prefix that denotes JMSX properties */ + public static final String JMSX_PREFIX = "JMSX"; + /** The JMSXGroupID property */ + public static final String JMSX_GROUP_ID = "JMSXGroupID"; + /** The JMSXGroupSeq property */ + public static final String JMSX_GROUP_SEQ = "JMSXGroupSeq"; + +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java new file mode 100644 index 0000000000..c465b1d989 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSEndpoint.java @@ -0,0 +1,111 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet; + +/** + * Class that links an Axis2 service to a JMS destination. Additionally, it contains + * all the required information to process incoming JMS messages and to inject them + * into Axis2. + */ +public class JMSEndpoint { + private JMSConnectionFactory cf; + private AxisService service; + private String jndiDestinationName; + private int destinationType = JMSConstants.GENERIC; + private Set<EndpointReference> endpointReferences = new HashSet<EndpointReference>(); + private ContentTypeRuleSet contentTypeRuleSet; + + public AxisService getService() { + return service; + } + + public void setService(AxisService service) { + this.service = service; + } + + public String getServiceName() { + return service.getName(); + } + + public String getJndiDestinationName() { + return jndiDestinationName; + } + + public void setJndiDestinationName(String destinationJNDIName) { + this.jndiDestinationName = destinationJNDIName; + } + + public void setDestinationType(String destinationType) { + if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) { + this.destinationType = JMSConstants.TOPIC; + } else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) { + this.destinationType = JMSConstants.QUEUE; + } else { + this.destinationType = JMSConstants.GENERIC; + } + } + + public EndpointReference[] getEndpointReferences() { + return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]); + } + + public void computeEPRs() { + List<EndpointReference> eprs = new ArrayList<EndpointReference>(); + for (Object o : getService().getParameters()) { + Parameter p = (Parameter) o; + if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) { + if ("legacy".equalsIgnoreCase((String) p.getValue())) { + // if "legacy" specified, compute and replace it + endpointReferences.add( + new EndpointReference(JMSUtils.getEPR(cf, destinationType, this))); + } else { + endpointReferences.add(new EndpointReference((String) p.getValue())); + } + } + } + + if (eprs.isEmpty()) { + // if nothing specified, compute and return legacy EPR + endpointReferences.add(new EndpointReference(JMSUtils.getEPR(cf, destinationType, this))); + } + } + + public ContentTypeRuleSet getContentTypeRuleSet() { + return contentTypeRuleSet; + } + + public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) { + this.contentTypeRuleSet = contentTypeRuleSet; + } + + public JMSConnectionFactory getCf() { + return cf; + } + + public void setCf(JMSConnectionFactory cf) { + this.cf = cf; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java new file mode 100644 index 0000000000..ceeec4a6a3 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSExceptionWrapper.java @@ -0,0 +1,28 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.io.IOException; + +import javax.jms.JMSException; + +public class JMSExceptionWrapper extends IOException { + private static final long serialVersionUID = 852441109009079511L; + + public JMSExceptionWrapper(JMSException ex) { + initCause(ex); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java new file mode 100644 index 0000000000..8c9f66dfbf --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java @@ -0,0 +1,294 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.TextMessage; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.TransportInDescription; +import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleFactory; +import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet; +import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.MessageTypeRule; +import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.PropertyRule; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportListener; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorListener; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSource; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSourceSupport; + +/** + * The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances + * for each service requesting exposure over JMS, and stops these if they are undeployed / stopped. + * <p> + * A service indicates a JMS Connection factory definition by name, which would be defined in the + * JMSListner on the axis2.xml, and this provides a way to reuse common configuration between + * services, as well as to optimize resources utilized + * <p> + * If the connection factory name was not specified, it will default to the one named "default" + * {@see JMSConstants.DEFAULT_CONFAC_NAME} + * <p> + * If a destination JNDI name is not specified, a service will expect to use a Queue with the same + * JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify + * many more detailed control options. See package documentation for more details + * <p> + * All Destinations / JMS Administered objects used MUST be pre-created or already available + */ +public class JMSListener extends AbstractTransportListener implements ManagementSupport, + TransportErrorSource { + + public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; + + /** The JMSConnectionFactoryManager which centralizes the management of defined factories */ + private JMSConnectionFactoryManager connFacManager; + /** A Map of service name to the JMS endpoints */ + private Map<String,JMSEndpoint> serviceNameToEndpointMap = new HashMap<String,JMSEndpoint>(); + /** A Map of service name to its ServiceTaskManager instances */ + private Map<String, ServiceTaskManager> serviceNameToSTMMap = + new HashMap<String, ServiceTaskManager>(); + private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this); + + /** + * TransportListener initialization + * + * @param cfgCtx the Axis configuration context + * @param trpInDesc the TransportIn description + */ + public void init(ConfigurationContext cfgCtx, + TransportInDescription trpInDesc) throws AxisFault { + + super.init(cfgCtx, trpInDesc); + connFacManager = new JMSConnectionFactoryManager(trpInDesc); + log.info("JMS Transport Receiver/Listener initialized..."); + } + + /** + * Returns EPRs for the given service over the JMS transport + * + * @param serviceName service name + * @return the JMS EPRs for the service + */ + public EndpointReference[] getEPRsForService(String serviceName) { + //Strip out the operation name + if (serviceName.indexOf('/') != -1) { + serviceName = serviceName.substring(0, serviceName.indexOf('/')); + } + // strip out the endpoint name if present + if (serviceName.indexOf('.') != -1) { + serviceName = serviceName.substring(0, serviceName.indexOf('.')); + } + JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName); + if (endpoint != null) { + return endpoint.getEndpointReferences(); + } else { + return null; + } + } + + /** + * Listen for JMS messages on behalf of the given service + * + * @param service the Axis service for which to listen for messages + */ + protected void startListeningForService(AxisService service) throws AxisFault { + JMSConnectionFactory cf = getConnectionFactory(service); + if (cf == null) { + throw new AxisFault("The service doesn't specify a JMS connection factory or refers " + + "to an invalid factory."); + } + + JMSEndpoint endpoint = new JMSEndpoint(); + endpoint.setService(service); + endpoint.setCf(cf); + + Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION); + if (destParam != null) { + endpoint.setJndiDestinationName((String)destParam.getValue()); + } else { + // Assume that the JNDI destination name is the same as the service name + endpoint.setJndiDestinationName(service.getName()); + } + + Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE); + if (destTypeParam != null) { + String paramValue = (String) destTypeParam.getValue(); + if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || + JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) { + endpoint.setDestinationType(paramValue); + } else { + throw new AxisFault("Invalid destinaton type value " + paramValue); + } + } else { + log.debug("JMS destination type not given. default queue"); + endpoint.setDestinationType(JMSConstants.DESTINATION_TYPE_QUEUE); + } + + Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM); + if (contentTypeParam == null) { + ContentTypeRuleSet contentTypeRuleSet = new ContentTypeRuleSet(); + contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE)); + contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream")); + contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain")); + endpoint.setContentTypeRuleSet(contentTypeRuleSet); + } else { + endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam)); + } + + endpoint.computeEPRs(); // compute service EPR and keep for later use + serviceNameToEndpointMap.put(service.getName(), endpoint); + + ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool); + stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint)); + stm.start(); + serviceNameToSTMMap.put(service.getName(), stm); + + for (int i=0; i<3; i++) { + if (stm.getActiveTaskCount() > 0) { + log.info("Started to listen on destination : " + stm.getDestinationJNDIName() + + " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) + + " for service " + stm.getServiceName()); + return; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) {} + } + + log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() + + " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) + + " for service " + stm.getServiceName() + " have not yet started after 3 seconds .."); + } + + /** + * Stops listening for messages for the service thats undeployed or stopped + * + * @param service the service that was undeployed or stopped + */ + protected void stopListeningForService(AxisService service) { + + ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName()); + if (stm != null) { + if (log.isDebugEnabled()) { + log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() + + " for service : " + stm.getServiceName()); + } + + stm.stop(); + + serviceNameToSTMMap.remove(service.getName()); + serviceNameToEndpointMap.remove(service.getName()); + log.info("Stopped listening for JMS messages to service : " + service.getName()); + + } else { + log.error("Unable to stop service : " + service.getName() + + " - unable to find its ServiceTaskManager"); + } + } + /** + * Return the connection factory name for this service. If this service + * refers to an invalid factory or defaults to a non-existent default + * factory, this returns null + * + * @param service the AxisService + * @return the JMSConnectionFactory to be used, or null if reference is invalid + */ + public JMSConnectionFactory getConnectionFactory(AxisService service) { + + Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC); + // validate connection factory name (specified or default) + if (conFacParam != null) { + return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue()); + } else { + return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME); + } + } + + // -- jmx/management methods-- + /** + * Pause the listener - Stop accepting/processing new messages, but continues processing existing + * messages until they complete. This helps bring an instance into a maintenence mode + * @throws AxisFault on error + */ + public void pause() throws AxisFault { + if (state != BaseConstants.STARTED) return; + try { + for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { + stm.pause(); + } + state = BaseConstants.PAUSED; + log.info("Listener paused"); + } catch (AxisJMSException e) { + log.error("At least one service could not be paused", e); + } + } + + /** + * Resume the lister - Brings the lister into active mode back from a paused state + * @throws AxisFault on error + */ + public void resume() throws AxisFault { + if (state != BaseConstants.PAUSED) return; + try { + for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { + stm.resume(); + } + state = BaseConstants.STARTED; + log.info("Listener resumed"); + } catch (AxisJMSException e) { + log.error("At least one service could not be resumed", e); + } + } + + /** + * Stop processing new messages, and wait the specified maximum time for in-flight + * requests to complete before a controlled shutdown for maintenence + * + * @param millis a number of milliseconds to wait until pending requests are allowed to complete + * @throws AxisFault on error + */ + public void maintenenceShutdown(long millis) throws AxisFault { + if (state != BaseConstants.STARTED) return; + try { + long start = System.currentTimeMillis(); + stop(); + state = BaseConstants.STOPPED; + log.info("Listener shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s"); + } catch (Exception e) { + handleException("Error shutting down the listener for maintenence", e); + } + } + + public void addErrorListener(TransportErrorListener listener) { + tess.addErrorListener(listener); + } + + public void removeErrorListener(TransportErrorListener listener) { + tess.removeErrorListener(listener); + } + + void error(AxisService service, Throwable ex) { + tess.error(service, ex); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java new file mode 100644 index 0000000000..ebd67e53e1 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java @@ -0,0 +1,237 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; +import javax.transaction.UserTransaction; +import javax.xml.namespace.QName; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeInfo; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector; + +/** + * This is the JMS message receiver which is invoked when a message is received. This processes + * the message through the engine + */ +public class JMSMessageReceiver { + + private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); + + /** The JMSListener */ + private JMSListener jmsListener = null; + /** A reference to the JMS Connection Factory */ + private JMSConnectionFactory jmsConnectionFactory = null; + /** The JMS metrics collector */ + private MetricsCollector metrics = null; + /** The endpoint this message receiver is bound to */ + final JMSEndpoint endpoint; + + /** + * Create a new JMSMessage receiver + * + * @param jmsListener the JMS transport Listener + * @param jmsConFac the JMS connection factory we are associated with + * @param workerPool the worker thread pool to be used + * @param cfgCtx the axis ConfigurationContext + * @param serviceName the name of the Axis service + * @param endpoint the JMSEndpoint definition to be used + */ + JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) { + this.jmsListener = jmsListener; + this.jmsConnectionFactory = jmsConFac; + this.endpoint = endpoint; + this.metrics = jmsListener.getMetricsCollector(); + } + + /** + * Process a new message received + * + * @param message the JMS message received + * @param ut UserTransaction which was used to receive the message + * @return true if caller should commit + */ + public boolean onMessage(Message message, UserTransaction ut) { + + try { + if (log.isDebugEnabled()) { + StringBuffer sb = new StringBuffer(); + sb.append("Received new JMS message for service :").append(endpoint.getServiceName()); + sb.append("\nDestination : ").append(message.getJMSDestination()); + sb.append("\nMessage ID : ").append(message.getJMSMessageID()); + sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID()); + sb.append("\nReplyTo : ").append(message.getJMSReplyTo()); + sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered()); + sb.append("\nPriority : ").append(message.getJMSPriority()); + sb.append("\nExpiration : ").append(message.getJMSExpiration()); + sb.append("\nTimestamp : ").append(message.getJMSTimestamp()); + sb.append("\nMessage Type : ").append(message.getJMSType()); + sb.append("\nPersistent ? : ").append( + DeliveryMode.PERSISTENT == message.getJMSDeliveryMode()); + + log.debug(sb.toString()); + if (log.isTraceEnabled() && message instanceof TextMessage) { + log.trace("\nMessage : " + ((TextMessage) message).getText()); + } + } + } catch (JMSException e) { + if (log.isDebugEnabled()) { + log.debug("Error reading JMS message headers for debug logging", e); + } + } + + // update transport level metrics + try { + metrics.incrementBytesReceived(JMSUtils.getMessageSize(message)); + } catch (JMSException e) { + log.warn("Error reading JMS message size to update transport metrics", e); + } + + // has this message already expired? expiration time == 0 means never expires + try { + long expiryTime = message.getJMSExpiration(); + if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) { + if (log.isDebugEnabled()) { + log.debug("Discard expired message with ID : " + message.getJMSMessageID()); + } + return true; + } + } catch (JMSException ignore) {} + + + boolean successful = false; + try { + successful = processThoughEngine(message, ut); + + } catch (JMSException e) { + log.error("JMS Exception encountered while processing", e); + } catch (AxisFault e) { + log.error("Axis fault processing message", e); + } catch (Exception e) { + log.error("Unknown error processing message", e); + + } finally { + if (successful) { + metrics.incrementMessagesReceived(); + } else { + metrics.incrementFaultsReceiving(); + } + } + + return successful; + } + + /** + * Process the new message through Axis2 + * + * @param message the JMS message + * @param ut the UserTransaction used for receipt + * @return true if the caller should commit + * @throws JMSException, on JMS exceptions + * @throws AxisFault on Axis2 errors + */ + private boolean processThoughEngine(Message message, UserTransaction ut) + throws JMSException, AxisFault { + + MessageContext msgContext = jmsListener.createMessageContext(); + + // set the JMS Message ID as the Message ID of the MessageContext + try { + msgContext.setMessageID(message.getJMSMessageID()); + msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); + } catch (JMSException ignore) {} + + String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION); + + AxisService service = endpoint.getService(); + msgContext.setAxisService(service); + + // find the operation for the message, or default to one + Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); + QName operationQName = ( + operationParam != null ? + BaseUtils.getQNameFromString(operationParam.getValue()) : + BaseConstants.DEFAULT_OPERATION); + + AxisOperation operation = service.getOperation(operationQName); + if (operation != null) { + msgContext.setAxisOperation(operation); + msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); + } + + ContentTypeInfo contentTypeInfo = + endpoint.getContentTypeRuleSet().getContentTypeInfo(message); + if (contentTypeInfo == null) { + throw new AxisFault("Unable to determine content type for message " + + msgContext.getMessageID()); + } + + // set the message property OUT_TRANSPORT_INFO + // the reply is assumed to be over the JMSReplyTo destination, using + // the same incoming connection factory, if a JMSReplyTo is available + Destination replyTo = message.getJMSReplyTo(); + if (replyTo == null) { + // does the service specify a default reply destination ? + Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION); + if (param != null && param.getValue() != null) { + replyTo = jmsConnectionFactory.getDestination((String) param.getValue()); + } + + } + if (replyTo != null) { + msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, + new JMSOutTransportInfo(jmsConnectionFactory, replyTo, + contentTypeInfo.getPropertyName())); + } + + JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType()); + if (ut != null) { + msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut); + } + + try { + jmsListener.handleIncomingMessage( + msgContext, + JMSUtils.getTransportHeaders(message), + soapAction, + contentTypeInfo.getContentType()); + + } finally { + + Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY); + if (o != null) { + if ((o instanceof Boolean && ((Boolean) o)) || + (o instanceof String && Boolean.valueOf((String) o))) { + return false; + } + } + return true; + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java new file mode 100644 index 0000000000..01fdee77dd --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageSender.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.QueueSender; +import javax.jms.Session; +import javax.jms.TopicPublisher; +import javax.transaction.UserTransaction; + +import org.apache.axis2.context.MessageContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; + +/** + * Performs the actual sending of a JMS message, and the subsequent committing of a JTA transaction + * (if requested) or the local session transaction, if used. An instance of this class is unique + * to a single message send out operation and will not be shared. + */ +public class JMSMessageSender { + + private static final Log log = LogFactory.getLog(JMSMessageSender.class); + + /** The Connection to be used to send out */ + private Connection connection = null; + /** The Session to be used to send out */ + private Session session = null; + /** The MessageProducer used */ + private MessageProducer producer = null; + /** Target Destination */ + private Destination destination = null; + /** The level of cachability for resources */ + private int cacheLevel = JMSConstants.CACHE_CONNECTION; + /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */ + private boolean jmsSpec11 = true; + /** Are we sending to a Queue ? */ + private Boolean isQueue = null; + + /** + * This is a low-end method to support the one-time sends using JMS 1.0.2b + * @param connection the JMS Connection + * @param session JMS Session + * @param producer the MessageProducer + * @param destination the JMS Destination + * @param cacheLevel cacheLevel - None | Connection | Session | Producer + * @param jmsSpec11 true if the JMS 1.1 API should be used + * @param isQueue posting to a Queue? + */ + public JMSMessageSender(Connection connection, Session session, MessageProducer producer, + Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) { + + this.connection = connection; + this.session = session; + this.producer = producer; + this.destination = destination; + this.cacheLevel = cacheLevel; + this.jmsSpec11 = jmsSpec11; + this.isQueue = isQueue; + } + + /** + * Create a JMSSender using a JMSConnectionFactory and target EPR + * + * @param jmsConnectionFactory the JMSConnectionFactory + * @param targetAddress target EPR + */ + public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) { + + if (jmsConnectionFactory != null) { + this.cacheLevel = jmsConnectionFactory.getCacheLevel(); + this.jmsSpec11 = jmsConnectionFactory.isJmsSpec11(); + this.connection = jmsConnectionFactory.getConnection(); + this.session = jmsConnectionFactory.getSession(connection); + this.destination = + jmsConnectionFactory.getSharedDestination() == null ? + jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) : + jmsConnectionFactory.getSharedDestination(); + this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination); + + } else { + JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress); + jmsOut.loadConnectionFactoryFromProperies(); + } + } + + /** + * Perform actual send of JMS message to the Destination selected + * + * @param message the JMS message + * @param msgCtx the Axis2 MessageContext + */ + public void send(Message message, MessageContext msgCtx) { + + Boolean jtaCommit = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND); + Boolean rollbackOnly = getBooleanProperty(msgCtx, BaseConstants.SET_ROLLBACK_ONLY); + Boolean persistent = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE); + Integer priority = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY); + Integer timeToLive = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE); + + // Do not commit, if message is marked for rollback + if (rollbackOnly != null && rollbackOnly) { + jtaCommit = Boolean.FALSE; + } + + if (persistent != null) { + try { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } catch (JMSException e) { + handleException("Error setting JMS Producer for PERSISTENT delivery", e); + } + } + if (priority != null) { + try { + producer.setPriority(priority); + } catch (JMSException e) { + handleException("Error setting JMS Producer priority to : " + priority, e); + } + } + if (timeToLive != null) { + try { + producer.setTimeToLive(timeToLive); + } catch (JMSException e) { + handleException("Error setting JMS Producer TTL to : " + timeToLive, e); + } + } + + boolean sendingSuccessful = false; + // perform actual message sending + try { + if (jmsSpec11 || isQueue == null) { + producer.send(message); + + } else { + if (isQueue) { + ((QueueSender) producer).send(message); + + } else { + ((TopicPublisher) producer).publish(message); + } + } + + // set the actual MessageID to the message context for use by any others down the line + String msgId = null; + try { + msgId = message.getJMSMessageID(); + if (msgId != null) { + msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId); + } + } catch (JMSException ignore) {} + + sendingSuccessful = true; + + if (log.isDebugEnabled()) { + log.debug("Sent Message Context ID : " + msgCtx.getMessageID() + + " with JMS Message ID : " + msgId + + " to destination : " + producer.getDestination()); + } + + } catch (JMSException e) { + log.error("Error sending message with MessageContext ID : " + + msgCtx.getMessageID() + " to destination : " + destination, e); + + } finally { + + if (jtaCommit != null) { + + UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION); + if (ut != null) { + + try { + if (sendingSuccessful && jtaCommit) { + ut.commit(); + } else { + ut.rollback(); + } + msgCtx.removeProperty(BaseConstants.USER_TRANSACTION); + + if (log.isDebugEnabled()) { + log.debug((sendingSuccessful ? "Committed" : "Rolled back") + + " JTA Transaction"); + } + + } catch (Exception e) { + handleException("Error committing/rolling back JTA transaction after " + + "sending of message with MessageContext ID : " + msgCtx.getMessageID() + + " to destination : " + destination, e); + } + } + + } else { + try { + if (session.getTransacted()) { + if (sendingSuccessful && (rollbackOnly == null || !rollbackOnly)) { + session.commit(); + } else { + session.rollback(); + } + } + + if (log.isDebugEnabled()) { + log.debug((sendingSuccessful ? "Committed" : "Rolled back") + + " local (JMS Session) Transaction"); + } + + } catch (JMSException e) { + handleException("Error committing/rolling back local (i.e. session) " + + "transaction after sending of message with MessageContext ID : " + + msgCtx.getMessageID() + " to destination : " + destination, e); + } + } + } + } + + /** + * Close non-shared producer, session and connection if any + */ + public void close() { + if (producer != null && cacheLevel < JMSConstants.CACHE_PRODUCER) { + try { + producer.close(); + } catch (JMSException e) { + log.error("Error closing JMS MessageProducer after send", e); + } finally { + producer = null; + } + } + + if (session != null && cacheLevel < JMSConstants.CACHE_SESSION) { + try { + session.close(); + } catch (JMSException e) { + log.error("Error closing JMS Session after send", e); + } finally { + session = null; + } + } + + if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { + try { + connection.close(); + } catch (JMSException e) { + log.error("Error closing JMS Connection after send", e); + } finally { + connection = null; + } + } + } + + private void handleException(String message, Exception e) { + log.error(message, e); + throw new AxisJMSException(message, e); + } + + private Boolean getBooleanProperty(MessageContext msgCtx, String name) { + Object o = msgCtx.getProperty(name); + if (o != null) { + if (o instanceof Boolean) { + return (Boolean) o; + } else if (o instanceof String) { + return Boolean.valueOf((String) o); + } + } + return null; + } + + private Integer getIntegerProperty(MessageContext msgCtx, String name) { + Object o = msgCtx.getProperty(name); + if (o != null) { + if (o instanceof Integer) { + return (Integer) o; + } else if (o instanceof String) { + return Integer.parseInt((String) o); + } + } + return null; + } + + public void setConnection(Connection connection) { + this.connection = connection; + } + + public void setSession(Session session) { + this.session = session; + } + + public void setProducer(MessageProducer producer) { + this.producer = producer; + } + + public void setCacheLevel(int cacheLevel) { + this.cacheLevel = cacheLevel; + } + + public int getCacheLevel() { + return cacheLevel; + } + + public Connection getConnection() { + return connection; + } + + public MessageProducer getProducer() { + return producer; + } + + public Session getSession() { + return session; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java new file mode 100644 index 0000000000..9e029b33e1 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java @@ -0,0 +1,306 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.Hashtable; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NameNotFoundException; +import javax.naming.NamingException; + +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; + +/** + * The JMS OutTransportInfo is a holder of information to send an outgoing message + * (e.g. a Response) to a JMS destination. Thus at a minimum a reference to a + * ConnectionFactory and a Destination are held + */ +public class JMSOutTransportInfo implements OutTransportInfo { + + private static final Log log = LogFactory.getLog(JMSOutTransportInfo.class); + + /** The naming context */ + private Context context; + /** + * this is a reference to the underlying JMS ConnectionFactory when sending messages + * through connection factories not defined at the TransportSender level + */ + private ConnectionFactory connectionFactory = null; + /** + * this is a reference to a JMS Connection Factory instance, which has a reference + * to the underlying actual connection factory, an open connection to the JMS provider + * and optionally a session already available for use + */ + private JMSConnectionFactory jmsConnectionFactory = null; + /** the Destination queue or topic for the outgoing message */ + private Destination destination = null; + /** the Destination queue or topic for the outgoing message + * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC + */ + private String destinationType = JMSConstants.DESTINATION_TYPE_GENERIC; + /** the Reply Destination queue or topic for the outgoing message */ + private Destination replyDestination = null; + /** the Reply Destination name */ + private String replyDestinationName = null; + /** the Reply Destination queue or topic for the outgoing message + * i.e. JMSConstants.DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC or DESTINATION_TYPE_GENERIC + */ + private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC; + /** the EPR properties when the out-transport info is generated from a target EPR */ + private Hashtable<String,String> properties = null; + /** the target EPR string where applicable */ + private String targetEPR = null; + /** the message property name that stores the content type of the outgoing message */ + private String contentTypeProperty; + + /** + * Creates an instance using the given JMS connection factory and destination + * + * @param jmsConnectionFactory the JMS connection factory + * @param dest the destination + * @param contentTypeProperty + */ + JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest, + String contentTypeProperty) { + this.jmsConnectionFactory = jmsConnectionFactory; + this.destination = dest; + destinationType = dest instanceof Topic ? JMSConstants.DESTINATION_TYPE_TOPIC + : JMSConstants.DESTINATION_TYPE_QUEUE; + this.contentTypeProperty = contentTypeProperty; + } + + /** + * Creates and instance using the given URL + * + * @param targetEPR the target EPR + */ + JMSOutTransportInfo(String targetEPR) { + + this.targetEPR = targetEPR; + if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) { + handleException("Invalid prefix for a JMS EPR : " + targetEPR); + + } else { + properties = BaseUtils.getEPRProperties(targetEPR); + String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE); + if (destinationType != null) { + setDestinationType(destinationType); + } + + String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE); + if (replyDestinationType != null) { + setReplyDestinationType(replyDestinationType); + } + + replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); + contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + try { + context = new InitialContext(properties); + } catch (NamingException e) { + handleException("Could not get an initial context using " + properties, e); + } + + destination = getDestination(context, targetEPR); + replyDestination = getReplyDestination(context, targetEPR); + } + } + + /** + * Provides a lazy load when created with a target EPR. This method performs actual + * lookup for the connection factory and destination + */ + public void loadConnectionFactoryFromProperies() { + if (properties != null) { + connectionFactory = getConnectionFactory(context, properties); + } + } + + /** + * Get the referenced ConnectionFactory using the properties from the context + * + * @param context the context to use for lookup + * @param props the properties which contains the JNDI name of the factory + * @return the connection factory + */ + private ConnectionFactory getConnectionFactory(Context context, Hashtable<String,String> props) { + try { + + String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME); + if (conFacJndiName != null) { + return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName); + } else { + handleException("Connection Factory JNDI name cannot be determined"); + } + } catch (NamingException e) { + handleException("Failed to look up connection factory from JNDI", e); + } + return null; + } + + /** + * Get the JMS destination specified by the given URL from the context + * + * @param context the Context to lookup + * @param url URL + * @return the JMS destination, or null if it does not exist + */ + private Destination getDestination(Context context, String url) { + String destinationName = JMSUtils.getDestination(url); + try { + return JMSUtils.lookup(context, Destination.class, destinationName); + } catch (NameNotFoundException e) { + try { + return JMSUtils.lookup(context, Destination.class, + (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType) ? + "dynamicTopics/" : "dynamicQueues/") + destinationName); + } catch (NamingException x) { + handleException("Cannot locate destination : " + destinationName + " using " + url); + } + } catch (NamingException e) { + handleException("Cannot locate destination : " + destinationName + " using " + url, e); + } + return null; + } + + /** + * Get the JMS reply destination specified by the given URL from the context + * + * @param context the Context to lookup + * @param url URL + * @return the JMS destination, or null if it does not exist + */ + private Destination getReplyDestination(Context context, String url) { + String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); + if(replyDestinationName == null) { + return null; + } + + try { + return JMSUtils.lookup(context, Destination.class, replyDestinationName); + } catch (NameNotFoundException e) { + if (log.isDebugEnabled()) { + log.debug("Cannot locate destination : " + replyDestinationName + " using " + url); + } + } catch (NamingException e) { + handleException("Cannot locate destination : " + replyDestinationName + " using " + url, e); + } + + return null; + } + + /** + * Look up for the given destination + * @param replyDest the JNDI name to lookup Destination required + * @return Destination for the JNDI name passed + */ + public Destination getReplyDestination(String replyDest) { + try { + return JMSUtils.lookup(jmsConnectionFactory.getContext(), Destination.class, + replyDest); + } catch (NameNotFoundException e) { + if (log.isDebugEnabled()) { + log.debug("Cannot locate reply destination : " + replyDest, e); + } + } catch (NamingException e) { + handleException("Cannot locate reply destination : " + replyDest, e); + } + return null; + } + + + private void handleException(String s) { + log.error(s); + throw new AxisJMSException(s); + } + + private void handleException(String s, Exception e) { + log.error(s, e); + throw new AxisJMSException(s, e); + } + + public Destination getDestination() { + return destination; + } + + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public JMSConnectionFactory getJmsConnectionFactory() { + return jmsConnectionFactory; + } + + public void setContentType(String contentType) { + // this is a useless Axis2 method imposed by the OutTransportInfo interface :( + } + + public Hashtable<String,String> getProperties() { + return properties; + } + + public String getTargetEPR() { + return targetEPR; + } + + public String getDestinationType() { + return destinationType; + } + + public void setDestinationType(String destinationType) { + if (destinationType != null) { + this.destinationType = destinationType; + } + } + + public Destination getReplyDestination() { + return replyDestination; + } + + public void setReplyDestination(Destination replyDestination) { + this.replyDestination = replyDestination; + } + + public String getReplyDestinationType() { + return replyDestinationType; + } + + public void setReplyDestinationType(String replyDestinationType) { + this.replyDestinationType = replyDestinationType; + } + + public String getReplyDestinationName() { + return replyDestinationName; + } + + public void setReplyDestinationName(String replyDestinationName) { + this.replyDestinationName = replyDestinationName; + } + + public String getContentTypeProperty() { + return contentTypeProperty; + } + + public void setContentTypeProperty(String contentTypeProperty) { + this.contentTypeProperty = contentTypeProperty; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java new file mode 100644 index 0000000000..a5f77dc4c9 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java @@ -0,0 +1,499 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.StringWriter; +import java.nio.charset.UnsupportedCharsetException; +import java.util.Map; + +import javax.activation.DataHandler; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMNode; +import org.apache.axiom.om.OMOutputFormat; +import org.apache.axiom.om.OMText; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.transport.MessageFormatter; +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.axis2.transport.TransportUtils; +import org.apache.axis2.transport.http.HTTPConstants; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportSender; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams.WriterOutputStream; + +/** + * The TransportSender for JMS + */ +public class JMSSender extends AbstractTransportSender implements ManagementSupport { + + public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; + + /** The JMS connection factory manager to be used when sending messages out */ + private JMSConnectionFactoryManager connFacManager; + + /** + * Initialize the transport sender by reading pre-defined connection factories for + * outgoing messages. + * + * @param cfgCtx the configuration context + * @param transportOut the transport sender definition from axis2.xml + * @throws AxisFault on error + */ + public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { + super.init(cfgCtx, transportOut); + connFacManager = new JMSConnectionFactoryManager(transportOut); + log.info("JMS Transport Sender initialized..."); + } + + /** + * Get corresponding JMS connection factory defined within the transport sender for the + * transport-out information - usually constructed from a targetEPR + * + * @param trpInfo the transport-out information + * @return the corresponding JMS connection factory, if any + */ + private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { + Map<String,String> props = trpInfo.getProperties(); + if (trpInfo.getProperties() != null) { + String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); + if (jmsConnectionFactoryName != null) { + return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); + } else { + return connFacManager.getJMSConnectionFactory(props); + } + } else { + return null; + } + } + + /** + * Performs the actual sending of the JMS message + */ + public void sendMessage(MessageContext msgCtx, String targetAddress, + OutTransportInfo outTransportInfo) throws AxisFault { + + JMSConnectionFactory jmsConnectionFactory = null; + JMSOutTransportInfo jmsOut = null; + JMSMessageSender messageSender = null; + + if (targetAddress != null) { + + jmsOut = new JMSOutTransportInfo(targetAddress); + // do we have a definition for a connection factory to use for this address? + jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + + if (jmsConnectionFactory != null) { + messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress); + + } else { + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } + + } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { + + jmsOut = (JMSOutTransportInfo) outTransportInfo; + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } + + // The message property to be used to send the content type is determined by + // the out transport info, i.e. either from the EPR if we are sending a request, + // or, if we are sending a response, from the configuration of the service that + // received the request). The property name can be overridden by a message + // context property. + String contentTypeProperty = + (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + if (contentTypeProperty == null) { + contentTypeProperty = jmsOut.getContentTypeProperty(); + } + + // need to synchronize as Sessions are not thread safe + synchronized (messageSender.getSession()) { + try { + sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); + } finally { + messageSender.close(); + } + } + } + + /** + * Perform actual sending of the JMS message + */ + private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender, + String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory, + JMSOutTransportInfo jmsOut) throws AxisFault { + + // convert the axis message context into a JMS Message that we can send over JMS + Message message = null; + String correlationId = null; + try { + message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty); + } catch (JMSException e) { + handleException("Error creating a JMS message from the message context", e); + } + + // should we wait for a synchronous response on this same thread? + boolean waitForResponse = waitForSynchronousResponse(msgCtx); + Destination replyDestination = jmsOut.getReplyDestination(); + + // if this is a synchronous out-in, prepare to listen on the response destination + if (waitForResponse) { + + String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); + if (replyDestName == null && jmsConnectionFactory != null) { + replyDestName = jmsConnectionFactory.getReplyToDestination(); + } + + if (replyDestName != null) { + if (jmsConnectionFactory != null) { + replyDestination = jmsConnectionFactory.getDestination(replyDestName); + } else { + replyDestination = jmsOut.getReplyDestination(replyDestName); + } + } + replyDestination = JMSUtils.setReplyDestination( + replyDestination, messageSender.getSession(), message); + } + + try { + messageSender.send(message, msgCtx); + metrics.incrementMessagesSent(msgCtx); + + } catch (AxisJMSException e) { + metrics.incrementFaultsSending(); + handleException("Error sending JMS message", e); + } + + try { + metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message)); + } catch (JMSException e) { + log.warn("Error reading JMS message size to update transport metrics", e); + } + + // if we are expecting a synchronous response back for the message sent out + if (waitForResponse) { + // TODO ******************************************************************************** + // TODO **** replace with asynchronous polling via a poller task to process this ******* + // information would be given. Then it should poll (until timeout) the + // requested destination for the response message and inject it from a + // asynchronous worker thread + try { + messageSender.getConnection().start(); // multiple calls are safely ignored + } catch (JMSException ignore) {} + + try { + correlationId = message.getJMSMessageID(); + } catch(JMSException ignore) {} + + // We assume here that the response uses the same message property to + // specify the content type of the message. + waitForResponseAndProcess(messageSender.getSession(), replyDestination, + msgCtx, correlationId, contentTypeProperty); + // TODO ******************************************************************************** + } + } + + /** + * Create a Consumer for the reply destination and wait for the response JMS message + * synchronously. If a message arrives within the specified time interval, process it + * through Axis2 + * @param session the session to use to listen for the response + * @param replyDestination the JMS reply Destination + * @param msgCtx the outgoing message for which we are expecting the response + * @param contentTypeProperty the message property used to determine the content type + * of the response message + * @throws AxisFault on error + */ + private void waitForResponseAndProcess(Session session, Destination replyDestination, + MessageContext msgCtx, String correlationId, + String contentTypeProperty) throws AxisFault { + + try { + MessageConsumer consumer; + consumer = JMSUtils.createConsumer(session, replyDestination, + "JMSCorrelationID = '" + correlationId + "'"); + + // how long are we willing to wait for the sync response + long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; + String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY); + if (waitReply != null) { + timeout = Long.valueOf(waitReply).longValue(); + } + + if (log.isDebugEnabled()) { + log.debug("Waiting for a maximum of " + timeout + + "ms for a response message to destination : " + replyDestination + + " with JMS correlation ID : " + correlationId); + } + + Message reply = consumer.receive(timeout); + + if (reply != null) { + + // update transport level metrics + metrics.incrementMessagesReceived(); + try { + metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply)); + } catch (JMSException e) { + log.warn("Error reading JMS message size to update transport metrics", e); + } + + try { + processSyncResponse(msgCtx, reply, contentTypeProperty); + metrics.incrementMessagesReceived(); + } catch (AxisFault e) { + metrics.incrementFaultsReceiving(); + throw e; + } + + } else { + log.warn("Did not receive a JMS response within " + + timeout + " ms to destination : " + replyDestination + + " with JMS correlation ID : " + correlationId); + metrics.incrementTimeoutsReceiving(); + } + + } catch (JMSException e) { + metrics.incrementFaultsReceiving(); + handleException("Error creating a consumer, or receiving a synchronous reply " + + "for outgoing MessageContext ID : " + msgCtx.getMessageID() + + " and reply Destination : " + replyDestination, e); + } + } + + /** + * Create a JMS Message from the given MessageContext and using the given + * session + * + * @param msgContext the MessageContext + * @param session the JMS session + * @param contentTypeProperty the message property to be used to store the + * content type + * @return a JMS message from the context and session + * @throws JMSException on exception + * @throws AxisFault on exception + */ + private Message createJMSMessage(MessageContext msgContext, Session session, + String contentTypeProperty) throws JMSException, AxisFault { + + Message message = null; + String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); + + // check the first element of the SOAP body, do we have content wrapped using the + // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or + // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages + // for JMS but just get the payload in its native format + String jmsPayloadType = guessMessageType(msgContext); + + if (jmsPayloadType == null) { + + OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); + MessageFormatter messageFormatter = null; + try { + messageFormatter = TransportUtils.getMessageFormatter(msgContext); + } catch (AxisFault axisFault) { + throw new JMSException("Unable to get the message formatter to use"); + } + + String contentType = messageFormatter.getContentType( + msgContext, format, msgContext.getSoapAction()); + + boolean useBytesMessage = + msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) || + contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1; + + OutputStream out; + StringWriter sw; + if (useBytesMessage) { + BytesMessage bytesMsg = session.createBytesMessage(); + sw = null; + out = new BytesMessageOutputStream(bytesMsg); + message = bytesMsg; + } else { + sw = new StringWriter(); + try { + out = new WriterOutputStream(sw, format.getCharSetEncoding()); + } catch (UnsupportedCharsetException ex) { + handleException("Unsupported encoding " + format.getCharSetEncoding(), ex); + return null; + } + } + + try { + messageFormatter.writeTo(msgContext, format, out, true); + out.close(); + } catch (IOException e) { + handleException("IO Error while creating BytesMessage", e); + } + + if (!useBytesMessage) { + TextMessage txtMsg = session.createTextMessage(); + txtMsg.setText(sw.toString()); + message = txtMsg; + } + + if (contentTypeProperty != null) { + message.setStringProperty(contentTypeProperty, contentType); + } + + } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) { + message = session.createBytesMessage(); + BytesMessage bytesMsg = (BytesMessage) message; + OMElement wrapper = msgContext.getEnvelope().getBody(). + getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER); + OMNode omNode = wrapper.getFirstOMChild(); + if (omNode != null && omNode instanceof OMText) { + Object dh = ((OMText) omNode).getDataHandler(); + if (dh != null && dh instanceof DataHandler) { + try { + ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg)); + } catch (IOException e) { + handleException("Error serializing binary content of element : " + + BaseConstants.DEFAULT_BINARY_WRAPPER, e); + } + } + } + + } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) { + message = session.createTextMessage(); + TextMessage txtMsg = (TextMessage) message; + txtMsg.setText(msgContext.getEnvelope().getBody(). + getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText()); + } + + // set the JMS correlation ID if specified + String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID); + if (correlationId == null && msgContext.getRelatesTo() != null) { + correlationId = msgContext.getRelatesTo().getValue(); + } + + if (correlationId != null) { + message.setJMSCorrelationID(correlationId); + } + + if (msgContext.isServerSide()) { + // set SOAP Action as a property on the JMS message + setProperty(message, msgContext, BaseConstants.SOAPACTION); + } else { + String action = msgContext.getOptions().getAction(); + if (action != null) { + message.setStringProperty(BaseConstants.SOAPACTION, action); + } + } + + JMSUtils.setTransportHeaders(msgContext, message); + return message; + } + + /** + * Guess the message type to use for JMS looking at the message contexts' envelope + * @param msgContext the message context + * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null + */ + private String guessMessageType(MessageContext msgContext) { + OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement(); + if (firstChild != null) { + if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) { + return JMSConstants.JMS_BYTE_MESSAGE; + } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) { + return JMSConstants.JMS_TEXT_MESSAGE; + } + } + return null; + } + + /** + * Creates an Axis MessageContext for the received JMS message and + * sets up the transports and various properties + * + * @param outMsgCtx the outgoing message for which we are expecting the response + * @param message the JMS response message received + * @param contentTypeProperty the message property used to determine the content type + * of the response message + * @throws AxisFault on error + */ + private void processSyncResponse(MessageContext outMsgCtx, Message message, + String contentTypeProperty) throws AxisFault { + + MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx); + + // load any transport headers from received message + JMSUtils.loadTransportHeaders(message, responseMsgCtx); + + // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response + // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This + // question is still under debate and due to the timelines, I am commiting this + // workaround as Axis2 1.2 is about to be released and Synapse 1.0 + responseMsgCtx.setServerSide(false); + + String contentType = + contentTypeProperty == null ? null + : JMSUtils.getProperty(message, contentTypeProperty); + + try { + JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType); + } catch (JMSException ex) { + throw AxisFault.makeFault(ex); + } +// responseMsgCtx.setServerSide(true); + + handleIncomingMessage( + responseMsgCtx, + JMSUtils.getTransportHeaders(message), + JMSUtils.getProperty(message, BaseConstants.SOAPACTION), + contentType + ); + } + + private void setProperty(Message message, MessageContext msgCtx, String key) { + + String value = getProperty(msgCtx, key); + if (value != null) { + try { + message.setStringProperty(key, value); + } catch (JMSException e) { + log.warn("Couldn't set message property : " + key + " = " + value, e); + } + } + } + + private String getProperty(MessageContext mc, String key) { + return (String) mc.getProperty(key); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java new file mode 100644 index 0000000000..63faa0b852 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java @@ -0,0 +1,1115 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.lang.reflect.Method; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.jms.TopicSession; +import javax.mail.internet.ContentType; +import javax.mail.internet.ParseException; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.Reference; + +import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.builder.Builder; +import org.apache.axis2.builder.BuilderUtil; +import org.apache.axis2.builder.SOAPBuilder; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.transport.TransportUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.binding.ws.axis2.format.DataSourceMessageBuilder; +import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilder; +import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilderAdapter; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; + +/** + * Miscallaneous methods used for the JMS transport + */ +public class JMSUtils extends BaseUtils { + + private static final Log log = LogFactory.getLog(JMSUtils.class); + private static final Class[] NOARGS = new Class[] {}; + private static final Object[] NOPARMS = new Object[] {}; + + /** + * Should this service be enabled over the JMS transport? + * + * @param service the Axis service + * @return true if JMS should be enabled + */ + public static boolean isJMSService(AxisService service) { + if (service.isEnableAllTransports()) { + return true; + + } else { + List transports = service.getExposedTransports(); + for (Object transport : transports) { + if (JMSListener.TRANSPORT_NAME.equals(transport)) { + return true; + } + } + } + return false; + } + + /** + * Get the EPR for the given JMS connection factory and destination + * the form of the URL is + * jms:/<destination>?[<key>=<value>&]* + * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS + * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered + * + * @param cf the Axis2 JMS connection factory + * @param destinationType the type of destination + * @param endpoint JMSEndpoint + * @return the EPR as a String + */ + static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) { + StringBuffer sb = new StringBuffer(); + + sb.append( + JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName()); + sb.append("?"). + append(JMSConstants.PARAM_DEST_TYPE).append("=").append( + destinationType == JMSConstants.TOPIC ? + JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE); + + if (endpoint.getContentTypeRuleSet() != null) { + String contentTypeProperty = + endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty(); + if (contentTypeProperty != null) { + sb.append("&"); + sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + sb.append("="); + sb.append(contentTypeProperty); + } + } + + for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) { + if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) && + !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) && + !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) && + !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) { + sb.append("&").append( + entry.getKey()).append("=").append(entry.getValue()); + } + } + return sb.toString(); + } + + /** + * Get a String property from the JMS message + * + * @param message JMS message + * @param property property name + * @return property value + */ + public static String getProperty(Message message, String property) { + try { + return message.getStringProperty(property); + } catch (JMSException e) { + return null; + } + } + + /** + * Return the destination name from the given URL + * + * @param url the URL + * @return the destination name + */ + public static String getDestination(String url) { + String tempUrl = url.substring(JMSConstants.JMS_PREFIX.length()); + int propPos = tempUrl.indexOf("?"); + + if (propPos == -1) { + return tempUrl; + } else { + return tempUrl.substring(0, propPos); + } + } + + /** + * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in + * @param message the JMS message read + * @param msgContext the Axis2 MessageContext to be populated + * @param contentType content type for the message + * @throws AxisFault + * @throws JMSException + */ + public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) + throws AxisFault, JMSException { + + if (contentType == null) { + if (message instanceof TextMessage) { + contentType = "text/plain"; + } else { + contentType = "application/octet-stream"; + } + if (log.isDebugEnabled()) { + log.debug("No content type specified; assuming " + contentType); + } + } + + int index = contentType.indexOf(';'); + String type = index > 0 ? contentType.substring(0, index) : contentType; + Builder builder = BuilderUtil.getBuilderFromSelector(type, msgContext); + if (builder == null) { + if (log.isDebugEnabled()) { + log.debug("No message builder found for type '" + type + "'. Falling back to SOAP."); + } + builder = new SOAPBuilder(); + } + + OMElement documentElement; + if (message instanceof BytesMessage) { + // Extract the charset encoding from the content type and + // set the CHARACTER_SET_ENCODING property as e.g. SOAPBuilder relies on this. + String charSetEnc = null; + try { + if (contentType != null) { + charSetEnc = new ContentType(contentType).getParameter("charset"); + } + } catch (ParseException ex) { + // ignore + } + msgContext.setProperty(Constants.Configuration.CHARACTER_SET_ENCODING, charSetEnc); + + if (builder instanceof DataSourceMessageBuilder) { + documentElement = ((DataSourceMessageBuilder)builder).processDocument( + new BytesMessageDataSource((BytesMessage)message), contentType, + msgContext); + } else { + documentElement = builder.processDocument( + new BytesMessageInputStream((BytesMessage)message), contentType, + msgContext); + } + } else if (message instanceof TextMessage) { + TextMessageBuilder textMessageBuilder; + if (builder instanceof TextMessageBuilder) { + textMessageBuilder = (TextMessageBuilder)builder; + } else { + textMessageBuilder = new TextMessageBuilderAdapter(builder); + } + String content = ((TextMessage)message).getText(); + documentElement = textMessageBuilder.processDocument(content, contentType, msgContext); + } else { + handleException("Unsupported JMS message type " + message.getClass().getName()); + return; // Make compiler happy + } + msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement)); + } + + /** + * Set the JMS ReplyTo for the message + * + * @param replyDestination the JMS Destination where the reply is expected + * @param session the session to use to create a temp Queue if a response is expected + * but a Destination has not been specified + * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo + * @return the JMS ReplyTo Destination for the message + */ + public static Destination setReplyDestination(Destination replyDestination, Session session, + Message message) { + + if (replyDestination == null) { + try { + // create temporary queue to receive the reply + replyDestination = createTemporaryDestination(session); + } catch (JMSException e) { + handleException("Error creating temporary queue for response"); + } + } + + try { + message.setJMSReplyTo(replyDestination); + } catch (JMSException e) { + log.warn("Error setting JMS ReplyTo destination to : " + replyDestination, e); + } + + if (log.isDebugEnabled()) { + try { + assert replyDestination != null; + log.debug("Expecting a response to JMS Destination : " + + (replyDestination instanceof Queue ? + ((Queue) replyDestination).getQueueName() : + ((Topic) replyDestination).getTopicName())); + } catch (JMSException ignore) {} + } + return replyDestination; + } + + /** + * Set transport headers from the axis message context, into the JMS message + * + * @param msgContext the axis message context + * @param message the JMS Message + * @throws JMSException on exception + */ + public static void setTransportHeaders(MessageContext msgContext, Message message) + throws JMSException { + + Map headerMap = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS); + + if (headerMap == null) { + return; + } + + for (Object headerName : headerMap.keySet()) { + + String name = (String) headerName; + + if (name.startsWith(JMSConstants.JMSX_PREFIX) && + !(name.equals(JMSConstants.JMSX_GROUP_ID) || name.equals(JMSConstants.JMSX_GROUP_SEQ))) { + continue; + } + + if (JMSConstants.JMS_COORELATION_ID.equals(name)) { + message.setJMSCorrelationID( + (String) headerMap.get(JMSConstants.JMS_COORELATION_ID)); + } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) { + Object o = headerMap.get(JMSConstants.JMS_DELIVERY_MODE); + if (o instanceof Integer) { + message.setJMSDeliveryMode((Integer) o); + } else if (o instanceof String) { + try { + message.setJMSDeliveryMode(Integer.parseInt((String) o)); + } catch (NumberFormatException nfe) { + log.warn("Invalid delivery mode ignored : " + o, nfe); + } + } else { + log.warn("Invalid delivery mode ignored : " + o); + } + + } else if (JMSConstants.JMS_EXPIRATION.equals(name)) { + message.setJMSExpiration( + Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION))); + } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) { + message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID)); + } else if (JMSConstants.JMS_PRIORITY.equals(name)) { + message.setJMSPriority( + Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY))); + } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) { + message.setJMSTimestamp( + Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP))); + } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) { + message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE)); + + } else { + Object value = headerMap.get(name); + if (value instanceof String) { + message.setStringProperty(name, (String) value); + } else if (value instanceof Boolean) { + message.setBooleanProperty(name, (Boolean) value); + } else if (value instanceof Integer) { + message.setIntProperty(name, (Integer) value); + } else if (value instanceof Long) { + message.setLongProperty(name, (Long) value); + } else if (value instanceof Double) { + message.setDoubleProperty(name, (Double) value); + } else if (value instanceof Float) { + message.setFloatProperty(name, (Float) value); + } + } + } + } + + /** + * Read the transport headers from the JMS Message and set them to the axis2 message context + * + * @param message the JMS Message received + * @param responseMsgCtx the axis message context + * @throws AxisFault on error + */ + public static void loadTransportHeaders(Message message, MessageContext responseMsgCtx) + throws AxisFault { + responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, getTransportHeaders(message)); + } + + /** + * Extract transport level headers for JMS from the given message into a Map + * + * @param message the JMS message + * @return a Map of the transport headers + */ + public static Map<String, Object> getTransportHeaders(Message message) { + // create a Map to hold transport headers + Map<String, Object> map = new HashMap<String, Object>(); + + // correlation ID + try { + if (message.getJMSCorrelationID() != null) { + map.put(JMSConstants.JMS_COORELATION_ID, message.getJMSCorrelationID()); + } + } catch (JMSException ignore) {} + + // set the delivery mode as persistent or not + try { + map.put(JMSConstants.JMS_DELIVERY_MODE, Integer.toString(message.getJMSDeliveryMode())); + } catch (JMSException ignore) {} + + // destination name + try { + if (message.getJMSDestination() != null) { + Destination dest = message.getJMSDestination(); + map.put(JMSConstants.JMS_DESTINATION, + dest instanceof Queue ? + ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); + } + } catch (JMSException ignore) {} + + // expiration + try { + map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message.getJMSExpiration())); + } catch (JMSException ignore) {} + + // if a JMS message ID is found + try { + if (message.getJMSMessageID() != null) { + map.put(JMSConstants.JMS_MESSAGE_ID, message.getJMSMessageID()); + } + } catch (JMSException ignore) {} + + // priority + try { + map.put(JMSConstants.JMS_PRIORITY, Long.toString(message.getJMSPriority())); + } catch (JMSException ignore) {} + + // redelivered + try { + map.put(JMSConstants.JMS_REDELIVERED, Boolean.toString(message.getJMSRedelivered())); + } catch (JMSException ignore) {} + + // replyto destination name + try { + if (message.getJMSReplyTo() != null) { + Destination dest = message.getJMSReplyTo(); + map.put(JMSConstants.JMS_REPLY_TO, + dest instanceof Queue ? + ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); + } + } catch (JMSException ignore) {} + + // priority + try { + map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message.getJMSTimestamp())); + } catch (JMSException ignore) {} + + // message type + try { + if (message.getJMSType() != null) { + map.put(JMSConstants.JMS_TYPE, message.getJMSType()); + } + } catch (JMSException ignore) {} + + // any other transport properties / headers + Enumeration e = null; + try { + e = message.getPropertyNames(); + } catch (JMSException ignore) {} + + if (e != null) { + while (e.hasMoreElements()) { + String headerName = (String) e.nextElement(); + try { + map.put(headerName, message.getStringProperty(headerName)); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, message.getBooleanProperty(headerName)); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, message.getIntProperty(headerName)); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, message.getLongProperty(headerName)); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, message.getDoubleProperty(headerName)); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, message.getFloatProperty(headerName)); + } catch (JMSException ignore) {} + } + } + + return map; + } + + + /** + * Create a MessageConsumer for the given Destination + * @param session JMS Session to use + * @param dest Destination for which the Consumer is to be created + * @param messageSelector the message selector to be used if any + * @return a MessageConsumer for the specified Destination + * @throws JMSException + */ + public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector) + throws JMSException { + + if (dest instanceof Queue) { + return ((QueueSession) session).createReceiver((Queue) dest, messageSelector); + } else { + return ((TopicSession) session).createSubscriber((Topic) dest, messageSelector, false); + } + } + + /** + * Create a temp queue or topic for synchronous receipt of responses, when a reply destination + * is not specified + * @param session the JMS Session to use + * @return a temporary Queue or Topic, depending on the session + * @throws JMSException + */ + public static Destination createTemporaryDestination(Session session) throws JMSException { + + if (session instanceof QueueSession) { + return session.createTemporaryQueue(); + } else { + return session.createTemporaryTopic(); + } + } + + /** + * Return the body length in bytes for a bytes message + * @param bMsg the JMS BytesMessage + * @return length of body in bytes + */ + public static long getBodyLength(BytesMessage bMsg) { + try { + Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS); + if (mtd != null) { + return (Long) mtd.invoke(bMsg, NOPARMS); + } + } catch (Exception e) { + // JMS 1.0 + if (log.isDebugEnabled()) { + log.debug("Error trying to determine JMS BytesMessage body length", e); + } + } + + // if JMS 1.0 + long length = 0; + try { + byte[] buffer = new byte[2048]; + bMsg.reset(); + for (int bytesRead = bMsg.readBytes(buffer); bytesRead != -1; + bytesRead = bMsg.readBytes(buffer)) { + length += bytesRead; + } + } catch (JMSException ignore) {} + return length; + } + + /** + * Get the length of the message in bytes + * @param message + * @return message size (or approximation) in bytes + * @throws JMSException + */ + public static long getMessageSize(Message message) throws JMSException { + if (message instanceof BytesMessage) { + return JMSUtils.getBodyLength((BytesMessage) message); + } else if (message instanceof TextMessage) { + // TODO: Converting the whole message to a byte array is too much overhead just to determine the message size. + // Anyway, the result is not accurate since we don't know what encoding the JMS provider uses. + return ((TextMessage) message).getText().getBytes().length; + } else { + log.warn("Can't determine size of JMS message; unsupported message type : " + + message.getClass().getName()); + return 0; + } + } + + public static <T> T lookup(Context context, Class<T> clazz, String name) + throws NamingException { + + Object object = context.lookup(name); + try { + return clazz.cast(object); + } catch (ClassCastException ex) { + // Instead of a ClassCastException, throw an exception with some + // more information. + if (object instanceof Reference) { + Reference ref = (Reference)object; + handleException("JNDI failed to de-reference Reference with name " + + name + "; is the factory " + ref.getFactoryClassName() + + " in your classpath?"); + return null; + } else { + handleException("JNDI lookup of name " + name + " returned a " + + object.getClass().getName() + " while a " + clazz + " was expected"); + return null; + } + } + } + + /** + * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory + * @param jcf + * @param service + * @param workerPool + * @return + */ + public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf, + AxisService service, WorkerPool workerPool) { + + String name = service.getName(); + Map<String, String> svc = getServiceStringParameters(service.getParameters()); + Map<String, String> cf = jcf.getParameters(); + + ServiceTaskManager stm = new ServiceTaskManager(); + + stm.setServiceName(name); + stm.addJmsProperties(cf); + stm.addJmsProperties(svc); + + stm.setConnFactoryJNDIName( + getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf)); + String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf); + if (destName == null) { + destName = service.getName(); + } + stm.setDestinationJNDIName(destName); + stm.setDestinationType(getDestinationType(svc, cf)); + + stm.setJmsSpec11( + getJMSSpecVersion(svc, cf)); + stm.setTransactionality( + getTransactionality(svc, cf)); + stm.setCacheUserTransaction( + getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf)); + stm.setUserTransactionJNDIName( + getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf)); + stm.setSessionTransacted( + getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf)); + stm.setSessionAckMode( + getSessionAck(svc, cf)); + stm.setMessageSelector( + getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf)); + stm.setSubscriptionDurable( + getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf)); + stm.setDurableSubscriberName( + getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf)); + + stm.setCacheLevel( + getCacheLevel(svc, cf)); + stm.setPubSubNoLocal( + getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf)); + + Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf); + if (value != null) { + stm.setReceiveTimeout(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf); + if (value != null) { + stm.setConcurrentConsumers(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf); + if (value != null) { + stm.setMaxConcurrentConsumers(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf); + if (value != null) { + stm.setIdleTaskExecutionLimit(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf); + if (value != null) { + stm.setMaxMessagesPerTask(value); + } + + value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf); + if (value != null) { + stm.setInitialReconnectDuration(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf); + if (value != null) { + stm.setMaxReconnectDuration(value); + } + Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf); + if (dValue != null) { + stm.setReconnectionProgressionFactor(dValue); + } + + stm.setWorkerPool(workerPool); + + // remove processed properties from property bag + stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION); + stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER); + stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY); + stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN); + stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED); + stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR); + stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE); + stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME); + stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL); + stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL); + stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT); + stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS); + stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS); + stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT); + stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION); + stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR); + + return stm; + } + + private static Map<String, String> getServiceStringParameters(List list) { + + Map<String, String> map = new HashMap<String, String>(); + for (Object o : list) { + Parameter p = (Parameter) o; + if (p.getValue() instanceof String) { + map.put(p.getName(), (String) p.getValue()); + } + } + return map; + } + + private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value == null) { + throw new AxisJMSException("Service/connection factory property : " + key); + } + return value; + } + + private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + return value; + } + + private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value == null) { + return null; + } else { + return Boolean.valueOf(value); + } + } + + private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value != null) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new AxisJMSException("Invalid value : " + value + " for " + key); + } + } + return null; + } + + private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value != null) { + try { + return Double.parseDouble(value); + } catch (NumberFormatException e) { + throw new AxisJMSException("Invalid value : " + value + " for " + key); + } + } + return null; + } + + private static int getTransactionality(Map svcMap, Map cfMap) { + + String key = BaseConstants.PARAM_TRANSACTIONALITY; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null) { + return BaseConstants.TRANSACTION_NONE; + + } else { + if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) { + return BaseConstants.TRANSACTION_JTA; + } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) { + return BaseConstants.TRANSACTION_LOCAL; + } else { + throw new AxisJMSException("Invalid option : " + val + " for parameter : " + + BaseConstants.STR_TRANSACTION_JTA); + } + } + } + + private static int getDestinationType(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_DEST_TYPE; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) { + return JMSConstants.TOPIC; + } + return JMSConstants.QUEUE; + } + + private static int getSessionAck(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_SESSION_ACK; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) { + return Session.AUTO_ACKNOWLEDGE; + } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) { + return Session.CLIENT_ACKNOWLEDGE; + } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){ + return Session.DUPS_OK_ACKNOWLEDGE; + } else if ("SESSION_TRANSACTED".equals(val)) { + return 0; //Session.SESSION_TRANSACTED; + } else { + try { + return Integer.parseInt(val); + } catch (NumberFormatException ignore) { + throw new AxisJMSException("Invalid session acknowledgement mode : " + val); + } + } + } + + private static int getCacheLevel(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_CACHE_LEVEL; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if ("none".equalsIgnoreCase(val)) { + return JMSConstants.CACHE_NONE; + } else if ("connection".equalsIgnoreCase(val)) { + return JMSConstants.CACHE_CONNECTION; + } else if ("session".equals(val)){ + return JMSConstants.CACHE_SESSION; + } else if ("consumer".equals(val)) { + return JMSConstants.CACHE_CONSUMER; + } else if (val != null) { + throw new AxisJMSException("Invalid cache level : " + val); + } + return JMSConstants.CACHE_AUTO; + } + + private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_JMS_SPEC_VER; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null || "1.1".equals(val)) { + return true; + } else { + return false; + } + } + + /** + * This is a JMS spec independent method to create a Connection. Please be cautious when + * making any changes + * + * @param conFac the ConnectionFactory to use + * @param user optional user name + * @param pass optional password + * @param jmsSpec11 should we use JMS 1.1 API ? + * @param isQueue is this to deal with a Queue? + * @return a JMS Connection as requested + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static Connection createConnection(ConnectionFactory conFac, + String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException { + + Connection connection = null; + if (log.isDebugEnabled()) { + log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") + + "Connection using credentials : (" + user + "/" + pass + ")"); + } + + if (jmsSpec11 || isQueue == null) { + if (user != null && pass != null) { + connection = conFac.createConnection(user, pass); + } else { + connection = conFac.createConnection(); + } + + } else { + QueueConnectionFactory qConFac = null; + TopicConnectionFactory tConFac = null; + if (isQueue) { + tConFac = (TopicConnectionFactory) conFac; + } else { + qConFac = (QueueConnectionFactory) conFac; + } + + if (user != null && pass != null) { + if (qConFac != null) { + connection = qConFac.createQueueConnection(user, pass); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(user, pass); + } + } else { + if (qConFac != null) { + connection = qConFac.createQueueConnection(); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(); + } + } + } + return connection; + } + + /** + * This is a JMS spec independent method to create a Session. Please be cautious when + * making any changes + * + * @param connection the JMS Connection + * @param transacted should the session be transacted? + * @param ackMode the ACK mode for the session + * @param jmsSpec11 should we use the JMS 1.1 API? + * @param isQueue is this Session to deal with a Queue? + * @return a Session created for the given information + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static Session createSession(Connection connection, boolean transacted, int ackMode, + boolean jmsSpec11, Boolean isQueue) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + return connection.createSession(transacted, ackMode); + + } else { + if (isQueue) { + return ((QueueConnection) connection).createQueueSession(transacted, ackMode); + } else { + return ((TopicConnection) connection).createTopicSession(transacted, ackMode); + } + } + } + + /** + * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when + * making any changes + * + * @param session JMS session + * @param destination the Destination + * @param isQueue is the Destination a queue? + * @param subscriberName optional client name to use for a durable subscription to a topic + * @param messageSelector optional message selector + * @param pubSubNoLocal should we receive messages sent by us during pub-sub? + * @param isDurable is this a durable topic subscription? + * @param jmsSpec11 should we use JMS 1.1 API ? + * @return a MessageConsumer to receive messages + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static MessageConsumer createConsumer( + Session session, Destination destination, Boolean isQueue, + String subscriberName, String messageSelector, boolean pubSubNoLocal, + boolean isDurable, boolean jmsSpec11) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + if (isDurable) { + return session.createDurableSubscriber( + (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); + } else { + return session.createConsumer(destination, messageSelector, pubSubNoLocal); + } + } else { + if (isQueue) { + return ((QueueSession) session).createReceiver((Queue) destination, messageSelector); + } else { + if (isDurable) { + return ((TopicSession) session).createDurableSubscriber( + (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); + } else { + return ((TopicSession) session).createSubscriber( + (Topic) destination, messageSelector, pubSubNoLocal); + } + } + } + } + + /** + * This is a JMS spec independent method to create a MessageProducer. Please be cautious when + * making any changes + * + * @param session JMS session + * @param destination the Destination + * @param isQueue is the Destination a queue? + * @param jmsSpec11 should we use JMS 1.1 API ? + * @return a MessageProducer to send messages to the given Destination + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static MessageProducer createProducer( + Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + return session.createProducer(destination); + } else { + if (isQueue) { + return ((QueueSession) session).createSender((Queue) destination); + } else { + return ((TopicSession) session).createPublisher((Topic) destination); + } + } + } + + /** + * Create a one time MessageProducer for the given JMS OutTransport information + * For simplicity and best compatibility, this method uses only JMS 1.0.2b API. + * Please be cautious when making any changes + * + * @param jmsOut the JMS OutTransport information (contains all properties) + * @return a JMSSender based on one-time use resources + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut) + throws JMSException { + + // digest the targetAddress and locate CF from the EPR + jmsOut.loadConnectionFactoryFromProperies(); + + // create a one time connection and session to be used + Hashtable<String,String> jmsProps = jmsOut.getProperties(); + String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null; + String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null; + + QueueConnectionFactory qConFac = null; + TopicConnectionFactory tConFac = null; + + int destType = -1; + if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) { + destType = JMSConstants.QUEUE; + qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory(); + + } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) { + destType = JMSConstants.TOPIC; + tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory(); + } + + Connection connection = null; + if (user != null && pass != null) { + if (qConFac != null) { + connection = qConFac.createQueueConnection(user, pass); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(user, pass); + } + } else { + if (qConFac != null) { + connection = qConFac.createQueueConnection(); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(); + } + } + + if (connection == null && jmsOut.getJmsConnectionFactory() != null) { + connection = jmsOut.getJmsConnectionFactory().getConnection(); + } + + Session session = null; + MessageProducer producer = null; + Destination destination = jmsOut.getDestination(); + + if (destType == JMSConstants.QUEUE) { + session = ((QueueConnection) connection). + createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + producer = ((QueueSession) session).createSender((Queue) destination); + } else { + session = ((TopicConnection) connection). + createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + producer = ((TopicSession) session).createPublisher((Topic) destination); + } + + return new JMSMessageSender(connection, session, producer, + destination, (jmsOut.getJmsConnectionFactory() == null ? + JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false, + destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE); + } + + /** + * Return a String representation of the destination type + * @param destType the destination type indicator int + * @return a descriptive String + */ + public static String getDestinationTypeAsString(int destType) { + if (destType == JMSConstants.QUEUE) { + return "Queue"; + } else if (destType == JMSConstants.TOPIC) { + return "Topic"; + } else { + return "Generic"; + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java new file mode 100644 index 0000000000..28c8da2a8d --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java @@ -0,0 +1,1217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.transaction.NotSupportedException; +import javax.transaction.SystemException; +import javax.transaction.UserTransaction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; + +/** + * Each service will have one ServiceTaskManager instance that will create, manage and also destroy + * idle tasks created for it, for message receipt. This will also allow individual tasks to cache + * the Connection, Session or Consumer as necessary, considering the transactionality required and + * user preference. + * + * This also acts as the ExceptionListener for all JMS connections made on behalf of the service. + * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try + * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh + * for the service, by discarding all connections. + */ +public class ServiceTaskManager { + + /** The logger */ + private static final Log log = LogFactory.getLog(ServiceTaskManager.class); + + /** The Task manager is stopped or has not started */ + private static final int STATE_STOPPED = 0; + /** The Task manager is started and active */ + private static final int STATE_STARTED = 1; + /** The Task manager is paused temporarily */ + private static final int STATE_PAUSED = 2; + /** The Task manager is started, but a shutdown has been requested */ + private static final int STATE_SHUTTING_DOWN = 3; + /** The Task manager has encountered an error */ + private static final int STATE_FAILURE = 4; + + /** The name of the service managed by this instance */ + private String serviceName; + /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */ + private String connFactoryJNDIName; + /** The JNDI name of the Destination Queue or Topic */ + private String destinationJNDIName; + /** JNDI location for the JTA UserTransaction */ + private String userTransactionJNDIName = "java:comp/UserTransaction"; + /** The type of destination - P2P or PubSub (or JMS 1.1 API generic?) */ + private int destinationType = JMSConstants.GENERIC; + /** An optional message selector */ + private String messageSelector = null; + + /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */ + private int transactionality = BaseConstants.TRANSACTION_NONE; + /** Should created Sessions be transactional ? - should be false when using JTA */ + private boolean sessionTransacted = true; + /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */ + private int sessionAckMode = Session.AUTO_ACKNOWLEDGE; + + /** Is the subscription durable ? */ + private boolean subscriptionDurable = false; + /** The name of the durable subscriber for this client */ + private String durableSubscriberName = null; + /** In PubSub mode, should I receive messages sent by me / my connection ? */ + private boolean pubSubNoLocal = false; + /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */ + private int concurrentConsumers = 1; + /** Maximum number of consumers to create - see @concurrentConsumers */ + private int maxConcurrentConsumers = 1; + /** The number of idle (i.e. message-less) attempts to be tried before suicide, to scale down */ + private int idleTaskExecutionLimit = 10; + /** The maximum number of successful message receipts for a task - to limit thread life span */ + private int maxMessagesPerTask = -1; // default is unlimited + /** The default receive timeout - a negative value means wait forever, zero dont wait at all */ + private int receiveTimeout = 1000; + /** JMS Resource cache level - Connection, Session, Consumer. Auto will select safe default */ + private int cacheLevel = JMSConstants.CACHE_AUTO; + /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */ + private boolean cacheUserTransaction = true; + /** Shared UserTransactionHandle */ + private UserTransaction sharedUserTransaction = null; + /** Should this service use JMS 1.1 ? (when false, defaults to 1.0.2b) */ + private boolean jmsSpec11 = true; + + /** Initial duration to attempt re-connection to JMS provider after failure */ + private int initialReconnectDuration = 10000; + /** Progression factory for geometric series that calculates re-connection times */ + private double reconnectionProgressionFactor = 2.0; // default to [bounded] exponential + /** Upper limit on reconnection attempt duration */ + private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour + + /** The JNDI context properties and other general properties */ + private Hashtable<String,String> jmsProperties = new Hashtable<String, String>(); + /** The JNDI Context acuired */ + private Context context = null; + /** The ConnectionFactory to be used */ + private ConnectionFactory conFactory = null; + /** The JMS Destination */ + private Destination destination = null; + + /** The list of active tasks thats managed by this instance */ + private final List<MessageListenerTask> pollingTasks = + Collections.synchronizedList(new ArrayList<MessageListenerTask>()); + /** The per-service JMS message receiver to be invoked after receipt of messages */ + private JMSMessageReceiver jmsMessageReceiver = null; + + /** State of this Task Manager */ + private volatile int serviceTaskManagerState = STATE_STOPPED; + /** Number of invoker tasks active */ + private volatile int activeTaskCount = 0; + /** The shared thread pool from the Listener */ + private WorkerPool workerPool = null; + + /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */ + private Connection sharedConnection = null; + + /** + * Start or re-start the Task Manager by shutting down any existing worker tasks and + * re-creating them. However, if this is STM is PAUSED, a start request is ignored. + * This applies for any connection failures during paused state as well, which then will + * not try to auto recover + */ + public synchronized void start() { + + if (serviceTaskManagerState == STATE_PAUSED) { + log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead"); + return; + } + + // if any tasks are running, stop whats running now + if (!pollingTasks.isEmpty()) { + stop(); + } + + if (cacheLevel == JMSConstants.CACHE_AUTO) { + cacheLevel = + transactionality == BaseConstants.TRANSACTION_NONE ? + JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE; + } + switch (cacheLevel) { + case JMSConstants.CACHE_NONE: + log.debug("No JMS resources will be cached/shared between poller " + + "worker tasks of service : " + serviceName); + break; + case JMSConstants.CACHE_CONNECTION: + log.debug("Only the JMS Connection will be cached and shared between *all* " + + "poller task invocations"); + break; + case JMSConstants.CACHE_SESSION: + log.debug("The JMS Connection and Session will be cached and shared between " + + "successive poller task invocations"); + break; + case JMSConstants.CACHE_CONSUMER: + log.debug("The JMS Connection, Session and MessageConsumer will be cached and " + + "shared between successive poller task invocations"); + break; + default : { + handleException("Invalid cache level : " + cacheLevel + + " for service : " + serviceName); + } + } + + for (int i=0; i<concurrentConsumers; i++) { + workerPool.execute(new MessageListenerTask()); + } + + serviceTaskManagerState = STATE_STARTED; + log.info("Task manager for service : " + serviceName + " [re-]initialized"); + } + + /** + * Shutdown the tasks and release any shared resources + */ + public synchronized void stop() { + + if (log.isDebugEnabled()) { + log.debug("Stopping ServiceTaskManager for service : " + serviceName); + } + + if (serviceTaskManagerState != STATE_FAILURE) { + serviceTaskManagerState = STATE_SHUTTING_DOWN; + } + + synchronized(pollingTasks) { + for (MessageListenerTask lstTask : pollingTasks) { + lstTask.requestShutdown(); + } + } + + // try to wait a bit for task shutdown + for (int i=0; i<5; i++) { + if (activeTaskCount == 0) { + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) {} + } + + if (sharedConnection != null) { + try { + sharedConnection.stop(); + } catch (JMSException e) { + logError("Error stopping shared Connection", e); + } finally { + sharedConnection = null; + } + } + + if (activeTaskCount > 0) { + log.warn("Unable to shutdown all polling tasks of service : " + serviceName); + } + + if (serviceTaskManagerState != STATE_FAILURE) { + serviceTaskManagerState = STATE_STOPPED; + } + log.info("Task manager for service : " + serviceName + " shutdown"); + } + + /** + * Temporarily suspend receipt and processing of messages. Accomplished by stopping the + * connection / or connections used by the poller tasks + */ + public synchronized void pause() { + for (MessageListenerTask lstTask : pollingTasks) { + lstTask.pause(); + } + if (sharedConnection != null) { + try { + sharedConnection.stop(); + } catch (JMSException e) { + logError("Error pausing shared Connection", e); + } + } + } + + /** + * Resume receipt and processing of messages of paused tasks + */ + public synchronized void resume() { + for (MessageListenerTask lstTask : pollingTasks) { + lstTask.resume(); + } + if (sharedConnection != null) { + try { + sharedConnection.start(); + } catch (JMSException e) { + logError("Error resuming shared Connection", e); + } + } + } + + /** + * Start a new MessageListenerTask if we are still active, the threshold is not reached, and w + * e do not have any idle tasks - i.e. scale up listening + */ + private void scheduleNewTaskIfAppropriate() { + if (serviceTaskManagerState == STATE_STARTED && + pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) { + workerPool.execute(new MessageListenerTask()); + } + } + + /** + * Get the number of MessageListenerTasks that are currently idle + * @return idle task count + */ + private int getIdleTaskCount() { + int count = 0; + for (MessageListenerTask lstTask : pollingTasks) { + if (lstTask.isTaskIdle()) { + count++; + } + } + return count; + } + + /** + * Get the number of MessageListenerTasks that are currently connected to the JMS provider + * @return connected task count + */ + private int getConnectedTaskCount() { + int count = 0; + for (MessageListenerTask lstTask : pollingTasks) { + if (lstTask.isConnected()) { + count++; + } + } + return count; + } + + /** + * The actual threads/tasks that perform message polling + */ + private class MessageListenerTask implements Runnable, ExceptionListener { + + /** The Connection used by the polling task */ + private Connection connection = null; + /** The Sesson used by the polling task */ + private Session session = null; + /** The MessageConsumer used by the polling task */ + private MessageConsumer consumer = null; + /** State of the worker polling task */ + private volatile int workerState = STATE_STOPPED; + /** The number of idle (i.e. without fetching a message) polls for this task */ + private int idleExecutionCount = 0; + /** Is this task idle right now? */ + private volatile boolean idle = false; + /** Is this task connected to the JMS provider successfully? */ + private boolean connected = false; + + /** As soon as we create a new polling task, add it to the STM for control later */ + MessageListenerTask() { + synchronized(pollingTasks) { + pollingTasks.add(this); + } + } + + /** + * Pause this polling worker task + */ + public void pause() { + if (isActive()) { + if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { + try { + connection.stop(); + } catch (JMSException e) { + log.warn("Error pausing Message Listener task for service : " + serviceName); + } + } + workerState = STATE_PAUSED; + } + } + + /** + * Resume this polling task + */ + public void resume() { + if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { + try { + connection.start(); + } catch (JMSException e) { + log.warn("Error resuming Message Listener task for service : " + serviceName); + } + } + workerState = STATE_STARTED; + } + + /** + * Execute the polling worker task + */ + public void run() { + workerState = STATE_STARTED; + activeTaskCount++; + int messageCount = 0; + + if (log.isDebugEnabled()) { + log.debug("New poll task starting : thread id = " + Thread.currentThread().getId()); + } + + try { + while (isActive() && + (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) && + (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) { + + UserTransaction ut = null; + try { + if (transactionality == BaseConstants.TRANSACTION_JTA) { + ut = getUserTransaction(); + ut.begin(); + } + } catch (NotSupportedException e) { + handleException("Listener Task is already associated with a transaction", e); + } catch (SystemException e) { + handleException("Error starting a JTA transaction", e); + } + + // Get a message by polling, or receive null + Message message = receiveMessage(); + + if (log.isTraceEnabled()) { + if (message != null) { + try { + log.trace("<<<<<<< READ message with Message ID : " + + message.getJMSMessageID() + " from : " + destination + + " by Thread ID : " + Thread.currentThread().getId()); + } catch (JMSException ignore) {} + } else { + log.trace("No message received by Thread ID : " + + Thread.currentThread().getId() + " for destination : " + destination); + } + } + + if (message != null) { + idle = false; + idleExecutionCount = 0; + messageCount++; + // I will be busy now while processing this message, so start another if needed + scheduleNewTaskIfAppropriate(); + handleMessage(message, ut); + + } else { + idle = true; + idleExecutionCount++; + } + } + + } finally { + workerState = STATE_STOPPED; + activeTaskCount--; + synchronized(pollingTasks) { + pollingTasks.remove(this); + } + } + + if (log.isTraceEnabled()) { + log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() + + " is stopping after processing : " + messageCount + " messages :: " + + " isActive : " + isActive() + " maxMessagesPerTask : " + + getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() + + " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " + + getIdleTaskExecutionLimit()); + } else if (log.isDebugEnabled()) { + log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() + + " is stopping after processing : " + messageCount + " messages"); + } + + closeConsumer(true); + closeSession(true); + closeConnection(); + + // My time is up, so if I am going away, create another + scheduleNewTaskIfAppropriate(); + } + + /** + * Poll for and return a message if available + * + * @return a message read, or null + */ + private Message receiveMessage() { + + // get a new connection, session and consumer to prevent a conflict. + // If idle, it means we can re-use what we already have + if (consumer == null) { + connection = getConnection(); + session = getSession(); + consumer = getMessageConsumer(); + if (log.isDebugEnabled()) { + log.debug("Preparing a Connection, Session and Consumer to read messages"); + } + } + + if (log.isDebugEnabled()) { + log.debug("Waiting for a message for service : " + serviceName + " - duration : " + + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms"))); + } + + try { + if (getReceiveTimeout() < 0) { + return consumer.receive(); + } else { + return consumer.receive(getReceiveTimeout()); + } + } catch (IllegalStateException ignore) { + // probably the consumer (shared) was closed.. which is still ok.. as we didn't read + } catch (JMSException e) { + logError("Error receiving message for service : " + serviceName, e); + } + return null; + } + + /** + * Invoke ultimate message handler/listener and ack message and/or + * commit/rollback transactions + * @param message the JMS message received + * @param ut the UserTransaction used to receive this message, or null + */ + private void handleMessage(Message message, UserTransaction ut) { + + String messageId = null; + try { + messageId = message.getJMSMessageID(); + } catch (JMSException ignore) {} + + boolean commitOrAck = true; + try { + commitOrAck = jmsMessageReceiver.onMessage(message, ut); + + } finally { + + // if client acknowledgement is selected, and processing requested ACK + if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) { + try { + message.acknowledge(); + if (log.isDebugEnabled()) { + log.debug("Message : " + messageId + " acknowledged"); + } + } catch (JMSException e) { + logError("Error acknowledging message : " + messageId, e); + } + } + + // close the consumer + closeConsumer(false); + + // if session was transacted, commit it or rollback + try { + if (session.getTransacted()) { + if (commitOrAck) { + session.commit(); + if (log.isDebugEnabled()) { + log.debug("Session for message : " + messageId + " committed"); + } + } else { + session.rollback(); + if (log.isDebugEnabled()) { + log.debug("Session for message : " + messageId + " rolled back"); + } + } + } + } catch (JMSException e) { + logError("Error " + (commitOrAck ? "committing" : "rolling back") + + " local session txn for message : " + messageId, e); + } + + // if a JTA transaction was being used, commit it or rollback + try { + if (ut != null) { + if (commitOrAck) { + ut.commit(); + if (log.isDebugEnabled()) { + log.debug("JTA txn for message : " + messageId + " committed"); + } + } else { + ut.rollback(); + if (log.isDebugEnabled()) { + log.debug("JTA txn for message : " + messageId + " rolled back"); + } + } + } + } catch (Exception e) { + logError("Error " + (commitOrAck ? "committing" : "rolling back") + + " JTA txn for message : " + messageId + " from the session", e); + } + + closeSession(false); + closeConnection(); + } + } + + /** Handle JMS Connection exceptions by re-initializing. A single connection failure could + * cause re-initialization of multiple MessageListenerTasks / Connections + */ + public void onException(JMSException j) { + + if (!isSTMActive()) { + requestShutdown(); + return; + } + + log.warn("JMS Connection failure : " + j.getMessage()); + setConnected(false); + + if (cacheLevel < JMSConstants.CACHE_CONNECTION) { + // failed Connection was not shared, thus no need to restart the whole STM + requestShutdown(); + return; + } + + // if we failed while active, update state to show failure + setServiceTaskManagerState(STATE_FAILURE); + log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks"); + + int r = 1; + long retryDuration = initialReconnectDuration; + + do { + try { + log.info("Reconnection attempt : " + r + " for service : " + serviceName); + start(); + } catch (Exception ignore) {} + + boolean connected = false; + for (int i=0; i<5; i++) { + if (getConnectedTaskCount() == concurrentConsumers) { + connected = true; + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) {} + } + + if (!connected) { + log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName + + " failed. Next retry in " + (retryDuration/1000) + "seconds"); + retryDuration = (long) (retryDuration * reconnectionProgressionFactor); + if (retryDuration > maxReconnectDuration) { + retryDuration = maxReconnectDuration; + } + + try { + Thread.sleep(retryDuration); + } catch (InterruptedException ignore) {} + } + + } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers); + } + + protected void requestShutdown() { + workerState = STATE_SHUTTING_DOWN; + } + + private boolean isActive() { + return workerState == STATE_STARTED; + } + + protected boolean isTaskIdle() { + return idle; + } + + public boolean isConnected() { + return connected; + } + + public void setConnected(boolean connected) { + this.connected = connected; + } + + /** + * Get a Connection that could/should be used by this task - depends on the cache level to reuse + * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection + */ + private Connection getConnection() { + if (cacheLevel < JMSConstants.CACHE_CONNECTION) { + // Connection is not shared + if (connection == null) { + connection = createConnection(); + } + } else { + if (sharedConnection != null) { + connection = sharedConnection; + } else { + synchronized(this) { + if (sharedConnection == null) { + sharedConnection = createConnection(); + } + connection = sharedConnection; + } + } + } + setConnected(true); + return connection; + } + + /** + * Get a Session that could/should be used by this task - depends on the cache level to reuse + * @param connection the connection (could be the shared connection) to use to create a Session + * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session + * created using the Connection passed, or a new/shared connection + */ + private Session getSession() { + if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) { + session = createSession(); + } + return session; + } + + /** + * Get a MessageConsumer that chould/should be used by this task - depends on the cache + * level to reuse + * @param connection option Connection to be used + * @param session optional Session to be used + * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new + * MessageConsumer possibly using the Connection and Session passed in + */ + private MessageConsumer getMessageConsumer() { + if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) { + consumer = createConsumer(); + } + return consumer; + } + + /** + * Close the given Connection, hiding exceptions if any which are logged + * @param connection the Connection to be closed + */ + private void closeConnection() { + if (connection != null && + cacheLevel < JMSConstants.CACHE_CONNECTION) { + try { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS connection for service : " + serviceName); + } + connection.close(); + } catch (JMSException e) { + logError("Error closing JMS connection", e); + } finally { + connection = null; + } + } + } + + /** + * Close the given Session, hiding exceptions if any which are logged + * @param session the Session to be closed + */ + private void closeSession(boolean forced) { + if (session != null && + (cacheLevel < JMSConstants.CACHE_SESSION || forced)) { + try { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS session for service : " + serviceName); + } + session.close(); + } catch (JMSException e) { + logError("Error closing JMS session", e); + } finally { + session = null; + } + } + } + + /** + * Close the given Consumer, hiding exceptions if any which are logged + * @param consumer the Consumer to be closed + */ + private void closeConsumer(boolean forced) { + if (consumer != null && + (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) { + try { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS consumer for service : " + serviceName); + } + consumer.close(); + } catch (JMSException e) { + logError("Error closing JMS consumer", e); + } finally { + consumer = null; + } + } + } + + /** + * Create a new Connection for this STM, using JNDI properties and credentials provided + * @return a new Connection for this STM, using JNDI properties and credentials provided + */ + private Connection createConnection() { + + try { + conFactory = JMSUtils.lookup( + getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName()); + log.debug("Connected to the JMS connection factory : " + getConnFactoryJNDIName()); + } catch (NamingException e) { + handleException("Error looking up connection factory : " + getConnFactoryJNDIName() + + " using JNDI properties : " + jmsProperties, e); + } + + Connection connection = null; + try { + connection = JMSUtils.createConnection( + conFactory, + jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME), + jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD), + isJmsSpec11(), isQueue()); + + connection.setExceptionListener(this); + connection.start(); + log.debug("JMS Connection for service : " + serviceName + " created and started"); + + } catch (JMSException e) { + handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() + + " using JNDI properties : " + jmsProperties, e); + } + return connection; + } + + /** + * Create a new Session for this STM + * @param connection the Connection to be used + * @return a new Session created using the Connection passed in + */ + private Session createSession() { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS Session for service : " + serviceName); + } + return JMSUtils.createSession( + connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue()); + + } catch (JMSException e) { + handleException("Error creating JMS session for service : " + serviceName, e); + } + return null; + } + + /** + * Create a new MessageConsumer for this STM + * @param session the Session to be used + * @return a new MessageConsumer created using the Session passed in + */ + private MessageConsumer createConsumer() { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS MessageConsumer for service : " + serviceName); + } + + return JMSUtils.createConsumer( + session, getDestination(session), isQueue(), + (isSubscriptionDurable() && getDurableSubscriberName() == null ? + getDurableSubscriberName() : serviceName), + getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11()); + + } catch (JMSException e) { + handleException("Error creating JMS consumer for service : " + serviceName,e); + } + return null; + } + } + + // -------------- mundane private methods ---------------- + /** + * Get the InitialContext for lookup using the JNDI parameters applicable to the service + * @return the InitialContext to be used + * @throws NamingException + */ + private Context getInitialContext() throws NamingException { + if (context == null) { + context = new InitialContext(jmsProperties); + } + return context; + } + + /** + * Return the JMS Destination for the JNDI name of the Destination from the InitialContext + * @return the JMS Destination to which this STM listens for messages + */ + private Destination getDestination(Session session) { + if (destination == null) { + try { + context = getInitialContext(); + destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName()); + if (log.isDebugEnabled()) { + log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() + + " found for service " + serviceName); + } + } catch (NamingException e) { + try { + switch (destinationType) { + case JMSConstants.QUEUE: { + destination = session.createQueue(getDestinationJNDIName()); + break; + } + case JMSConstants.TOPIC: { + destination = session.createTopic(getDestinationJNDIName()); + break; + } + default: { + handleException("Error looking up JMS destination : " + + getDestinationJNDIName() + " using JNDI properties : " + + jmsProperties, e); + } + } + } catch (JMSException j) { + handleException("Error looking up and creating JMS destination : " + + getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e); + } + } + } + return destination; + } + + /** + * The UserTransaction to be used, looked up from the JNDI + * @return The UserTransaction to be used, looked up from the JNDI + */ + private UserTransaction getUserTransaction() { + if (!cacheUserTransaction) { + if (log.isDebugEnabled()) { + log.debug("Acquiring a new UserTransaction for service : " + serviceName); + } + + try { + context = getInitialContext(); + return + JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); + } catch (NamingException e) { + handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + + " using JNDI properties : " + jmsProperties, e); + } + } + + if (sharedUserTransaction == null) { + try { + context = getInitialContext(); + sharedUserTransaction = + JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); + if (log.isDebugEnabled()) { + log.debug("Acquired shared UserTransaction for service : " + serviceName); + } + } catch (NamingException e) { + handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + + " using JNDI properties : " + jmsProperties, e); + } + } + return sharedUserTransaction; + } + + // -------------------- trivial methods --------------------- + private boolean isSTMActive() { + return serviceTaskManagerState == STATE_STARTED; + } + + /** + * Is this STM bound to a Queue, Topic or a JMS 1.1 Generic Destination? + * @return TRUE for a Queue, FALSE for a Topic and NULL for a Generic Destination + */ + private Boolean isQueue() { + if (destinationType == JMSConstants.GENERIC) { + return null; + } else { + return destinationType == JMSConstants.QUEUE; + } + } + + private void logError(String msg, Exception e) { + log.error(msg, e); + } + + private void handleException(String msg, Exception e) { + log.error(msg, e); + throw new AxisJMSException(msg, e); + } + + private void handleException(String msg) { + log.error(msg); + throw new AxisJMSException(msg); + } + + // -------------- getters and setters ------------------ + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getConnFactoryJNDIName() { + return connFactoryJNDIName; + } + + public void setConnFactoryJNDIName(String connFactoryJNDIName) { + this.connFactoryJNDIName = connFactoryJNDIName; + } + + public String getDestinationJNDIName() { + return destinationJNDIName; + } + + public void setDestinationJNDIName(String destinationJNDIName) { + this.destinationJNDIName = destinationJNDIName; + } + + public int getDestinationType() { + return destinationType; + } + + public void setDestinationType(int destinationType) { + this.destinationType = destinationType; + } + + public String getMessageSelector() { + return messageSelector; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + public int getTransactionality() { + return transactionality; + } + + public void setTransactionality(int transactionality) { + this.transactionality = transactionality; + sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL); + } + + public boolean isSessionTransacted() { + return sessionTransacted; + } + + public void setSessionTransacted(Boolean sessionTransacted) { + if (sessionTransacted != null) { + this.sessionTransacted = sessionTransacted; + // sesstionTransacted means local transactions are used, however !sessionTransacted does + // not mean that JTA is used + if (sessionTransacted) { + transactionality = BaseConstants.TRANSACTION_LOCAL; + } + } + } + + public int getSessionAckMode() { + return sessionAckMode; + } + + public void setSessionAckMode(int sessionAckMode) { + this.sessionAckMode = sessionAckMode; + } + + public boolean isSubscriptionDurable() { + return subscriptionDurable; + } + + public void setSubscriptionDurable(Boolean subscriptionDurable) { + if (subscriptionDurable != null) { + this.subscriptionDurable = subscriptionDurable; + } + } + + public String getDurableSubscriberName() { + return durableSubscriberName; + } + + public void setDurableSubscriberName(String durableSubscriberName) { + this.durableSubscriberName = durableSubscriberName; + } + + public boolean isPubSubNoLocal() { + return pubSubNoLocal; + } + + public void setPubSubNoLocal(Boolean pubSubNoLocal) { + if (pubSubNoLocal != null) { + this.pubSubNoLocal = pubSubNoLocal; + } + } + + public int getConcurrentConsumers() { + return concurrentConsumers; + } + + public void setConcurrentConsumers(int concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + + public int getMaxConcurrentConsumers() { + return maxConcurrentConsumers; + } + + public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { + this.maxConcurrentConsumers = maxConcurrentConsumers; + } + + public int getIdleTaskExecutionLimit() { + return idleTaskExecutionLimit; + } + + public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { + this.idleTaskExecutionLimit = idleTaskExecutionLimit; + } + + public int getReceiveTimeout() { + return receiveTimeout; + } + + public void setReceiveTimeout(int receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public int getCacheLevel() { + return cacheLevel; + } + + public void setCacheLevel(int cacheLevel) { + this.cacheLevel = cacheLevel; + } + + public int getInitialReconnectDuration() { + return initialReconnectDuration; + } + + public void setInitialReconnectDuration(int initialReconnectDuration) { + this.initialReconnectDuration = initialReconnectDuration; + } + + public double getReconnectionProgressionFactor() { + return reconnectionProgressionFactor; + } + + public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) { + this.reconnectionProgressionFactor = reconnectionProgressionFactor; + } + + public long getMaxReconnectDuration() { + return maxReconnectDuration; + } + + public void setMaxReconnectDuration(long maxReconnectDuration) { + this.maxReconnectDuration = maxReconnectDuration; + } + + public int getMaxMessagesPerTask() { + return maxMessagesPerTask; + } + + public void setMaxMessagesPerTask(int maxMessagesPerTask) { + this.maxMessagesPerTask = maxMessagesPerTask; + } + + public String getUserTransactionJNDIName() { + return userTransactionJNDIName; + } + + public void setUserTransactionJNDIName(String userTransactionJNDIName) { + if (userTransactionJNDIName != null) { + this.userTransactionJNDIName = userTransactionJNDIName; + } + } + + public boolean isCacheUserTransaction() { + return cacheUserTransaction; + } + + public void setCacheUserTransaction(Boolean cacheUserTransaction) { + if (cacheUserTransaction != null) { + this.cacheUserTransaction = cacheUserTransaction; + } + } + + public boolean isJmsSpec11() { + return jmsSpec11; + } + + public void setJmsSpec11(boolean jmsSpec11) { + this.jmsSpec11 = jmsSpec11; + } + + public Hashtable<String, String> getJmsProperties() { + return jmsProperties; + } + + public void addJmsProperties(Map<String, String> jmsProperties) { + this.jmsProperties.putAll(jmsProperties); + } + + public void removeJmsProperties(String key) { + this.jmsProperties.remove(key); + } + + public Context getContext() { + return context; + } + + public ConnectionFactory getConnectionFactory() { + return conFactory; + } + + public List<MessageListenerTask> getPollingTasks() { + return pollingTasks; + } + + public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) { + this.jmsMessageReceiver = jmsMessageReceiver; + } + + public void setWorkerPool(WorkerPool workerPool) { + this.workerPool = workerPool; + } + + public int getActiveTaskCount() { + return activeTaskCount; + } + + public void setServiceTaskManagerState(int serviceTaskManagerState) { + this.serviceTaskManagerState = serviceTaskManagerState; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java new file mode 100644 index 0000000000..2b415df64d --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeInfo.java @@ -0,0 +1,49 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; + +/** + * Class encapsulating the content type information for a given message. + */ +public class ContentTypeInfo { + private final String propertyName; + private final String contentType; + + public ContentTypeInfo(String propertyName, String contentType) { + this.propertyName = propertyName; + this.contentType = contentType; + } + + /** + * Get the name of the message property from which the content type + * has been extracted. + * + * @return the property name or null if the content type was not determined + * by extracting it from a message property + */ + public String getPropertyName() { + return propertyName; + } + + /** + * Get the content type of the message. + * + * @return The content type of the message. The return value is never null. + */ + public String getContentType() { + return contentType; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java new file mode 100644 index 0000000000..0dba93356f --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRule.java @@ -0,0 +1,43 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; + +import javax.jms.JMSException; +import javax.jms.Message; + +/** + * Interface implemented by content type rules. + */ +public interface ContentTypeRule { + /** + * Attempt to determine the content type of the given JMS message. + * + * @param message the message + * @return If the rule matches, the return value encapsulates the content type of the + * message and the message property name from which is was extracted + * (if applicable). If the rule doesn't match, the method returns null. + * @throws JMSException + */ + ContentTypeInfo getContentType(Message message) throws JMSException; + + /** + * Get the name of the message property used to extract the content type from, + * if applicable. + * + * @return the property name or null if not applicable + */ + String getExpectedContentTypeProperty(); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java new file mode 100644 index 0000000000..a9fd25ef1b --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleFactory.java @@ -0,0 +1,74 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; + +import java.util.Iterator; + +import javax.jms.BytesMessage; +import javax.jms.TextMessage; + +import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.Parameter; + +/** + * Utility class to create content type rules and rule sets from XML. + */ +public class ContentTypeRuleFactory { + private ContentTypeRuleFactory() {} + + public static ContentTypeRule parse(OMElement element) throws AxisFault { + String name = element.getLocalName(); + String value = element.getText(); + if (name.equals("jmsProperty")) { + return new PropertyRule(value); + } else if (name.equals("textMessage")) { + return new MessageTypeRule(TextMessage.class, value); + } else if (name.equals("bytesMessage")) { + return new MessageTypeRule(BytesMessage.class, value); + } else if (name.equals("default")) { + return new DefaultRule(value); + } else { + throw new AxisFault("Unknown content rule type '" + name + "'"); + } + } + + public static ContentTypeRuleSet parse(Parameter param) throws AxisFault { + ContentTypeRuleSet ruleSet = new ContentTypeRuleSet(); + Object value = param.getValue(); + if (value instanceof OMElement) { + OMElement element = (OMElement)value; + + // DescriptionBuilder#processParameters actually sets the parameter element + // itself as the value. We need to support this case. + // TODO: seems like a bug in Axis2 and is inconsistent with Synapse's way of parsing parameter in proxy definitions + if (element == param.getParameterElement()) { + element = element.getFirstElement(); + } + + if (element.getLocalName().equals("rules")) { + for (Iterator it = element.getChildElements(); it.hasNext(); ) { + ruleSet.addRule(parse((OMElement)it.next())); + } + } else { + throw new AxisFault("Expected <rules> element"); + } + } else { + ruleSet.addRule(new DefaultRule((String)value)); + } + return ruleSet; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java new file mode 100644 index 0000000000..90383a42f8 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/ContentTypeRuleSet.java @@ -0,0 +1,64 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.JMSException; +import javax.jms.Message; + +/** + * A set of content type rules. + */ +public class ContentTypeRuleSet { + private final List<ContentTypeRule> rules = new ArrayList<ContentTypeRule>(); + private String defaultContentTypeProperty; + + /** + * Add a content type rule to this set. + * + * @param rule the rule to add + */ + public void addRule(ContentTypeRule rule) { + rules.add(rule); + if (defaultContentTypeProperty == null) { + defaultContentTypeProperty = rule.getExpectedContentTypeProperty(); + } + } + + /** + * Determine the content type of the given message. + * This method will try the registered rules in turn until the first rule matches. + * + * @param message the message + * @return the content type information for the message or null if none of the rules matches + * @throws JMSException + */ + public ContentTypeInfo getContentTypeInfo(Message message) throws JMSException { + for (ContentTypeRule rule : rules) { + ContentTypeInfo contentTypeInfo = rule.getContentType(message); + if (contentTypeInfo != null) { + return contentTypeInfo; + } + } + return null; + } + + public String getDefaultContentTypeProperty() { + return defaultContentTypeProperty; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java new file mode 100644 index 0000000000..a158f6ec74 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/DefaultRule.java @@ -0,0 +1,37 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; + +import javax.jms.Message; + +/** + * Content type rule that always matches and that returns a fixed (default) content type. + */ +public class DefaultRule implements ContentTypeRule { + private final String contentType; + + public DefaultRule(String contentType) { + this.contentType = contentType; + } + + public ContentTypeInfo getContentType(Message message) { + return new ContentTypeInfo(null, contentType); + } + + public String getExpectedContentTypeProperty() { + return null; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java new file mode 100644 index 0000000000..cb25ab93d4 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/MessageTypeRule.java @@ -0,0 +1,39 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; + +import javax.jms.Message; + +/** + * Content type rule that matches a given message type and returns a fixed content type. + */ +public class MessageTypeRule implements ContentTypeRule { + private final Class<? extends Message> messageType; + private final String contentType; + + public MessageTypeRule(Class<? extends Message> messageType, String contentType) { + this.messageType = messageType; + this.contentType = contentType; + } + + public ContentTypeInfo getContentType(Message message) { + return messageType.isInstance(message) ? new ContentTypeInfo(null, contentType) : null; + } + + public String getExpectedContentTypeProperty() { + return null; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java new file mode 100644 index 0000000000..c8d13ba462 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/PropertyRule.java @@ -0,0 +1,39 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype; + +import javax.jms.JMSException; +import javax.jms.Message; + +/** + * Content type rule that attempts to extract the content type from a message property. + */ +public class PropertyRule implements ContentTypeRule { + private final String propertyName; + + public PropertyRule(String propertyName) { + this.propertyName = propertyName; + } + + public ContentTypeInfo getContentType(Message message) throws JMSException { + String value = message.getStringProperty(propertyName); + return value == null ? null : new ContentTypeInfo(propertyName, value); + } + + public String getExpectedContentTypeProperty() { + return propertyName; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java new file mode 100644 index 0000000000..750170edd7 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ctype/package-info.java @@ -0,0 +1,23 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +/** + * Provides classes and interfaces to define content type rules. + * + * Content type rules are used to determine the content type of a + * received message based on JMS properties, message type, etc. + */ +package org.apache.tuscany.sca.binding.ws.axis2.jms.ctype;
\ No newline at end of file diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html new file mode 100644 index 0000000000..b95b49eb69 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/package.html @@ -0,0 +1,356 @@ +<html> +<title>JMS Transport Configuration</title> +<body> + +<h2>JMS Listener Configuration (axis2.xml)</h2> + +e.g: + +<pre> + <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> + <parameter name="myTopicConnectionFactory"> + <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> + <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> + <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter> + <parameter name="transport.jms.ConnectionFactoryType">topic</parameter> + <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter> + </parameter> + + <parameter name="myQueueConnectionFactory"> + <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> + <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> + <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter> + <parameter name="transport.jms.ConnectionFactoryType">queue</parameter> + <parameter name="transport.jms.JMSSpecVersion">1.1</parameter> + </parameter> + + <parameter name="default"> + <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> + <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> + <parameter name="transport.jms.ConnectionFactoryJNDIName">ConnectionFactory</parameter> + </parameter> + </transportReceiver> + + <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"> + <parameter name="myTopicConnectionFactory"> + <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> + <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> + <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter> + <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter> + <parameter name="transport.jms.CacheLevel">producer</parameter> + </parameter> + + <parameter name="myQueueConnectionFactory"> + <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> + <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> + <parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter> + <parameter name="transport.jms.JMSSpecVersion">1.0.2b</parameter> + <parameter name="transport.jms.CacheLevel">producer</parameter> + </parameter> + + <parameter name="default"> + <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> + <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter> + <parameter name="transport.jms.ConnectionFactoryJNDIName">ConnectionFactory</parameter> + <parameter name="transport.jms.CacheLevel">connection</parameter> + </parameter> + </transportSender> +</pre> + +<p> + The Transport Listener and Sender both allows the user to configure one or more logical JMS Connection + Factories, which are named definitions as shown above. Any remaining parameters maybe defined at the + service level via the services.xml. The applicable set of parameters for a service would be the + union of the properties from the services.xml and the corresponding logical connection factory. +</p> + +<TABLE WIDTH="100%" BORDER=1 BORDERCOLOR="#000000" CELLPADDING=4 CELLSPACING=0> + <COL WIDTH="10%"> + <COL WIDTH="20%"> + <COL WIDTH="60%"> + <COL WIDTH="5%"> + <COL WIDTH="5%"> + <tr> + <td>Transport level</td> + <td><BR></td> + <td><BR></td> + <td>Listening</td> + <td>Sending</td> + </tr> + <tr> + <td>JNDI</td> + <td>java.naming.factory.initial</td> + <td>The JNDI InitialContext factory class</td> + <td>Required</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>java.naming.provider.url</td> + <td>JNDI Provider URL</td> + <td>Required</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>java.naming.security.principal</td> + <td>Username for JNDI access</td> + <td><BR></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>java.naming.security.credentials</td> + <td>Password for JNDI access</td> + <td><BR></td> + <td><BR></td> + </tr> + <tr> + <td>Transactions</td> + <td>transport.Transactionality</td> + <td>Desired transactionality. One of none / local / jta</td> + <td>Defaults to <B>none</B></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.UserTxnJNDIName</td> + <td>JNDI name to be used to obtain a UserTransaction</td> + <td>Defaults to "java:comp/UserTransaction"</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.CacheUserTxn</td> + <td>Generally its safe and more efficient to cache the + UserTransaction reference from JNDI. One of true/ false</td> + <td>Defaults to <B>true</B></td> + <td><BR></td> + </tr> + + <tr> + <td><BR></td> + <td>transport.jms.SessionTransacted</td> + <td>Should the JMS Session be transacted. One of true/ false</td> + <td>Defaults to <B>true</B> when local transactions are used</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.SessionAcknowledgement</td> + <td>JMS Session acknowledgement mode to be used. One of AUTO_ACKNOWLEDGE | CLIENT_ACKNOWLEDGE | DUPS_OK_ACKNOWLEDGE | SESSION_TRANSACTED</td> + <td>Defaults to <B>AUTO_ACKNOWLEDGE</B></td> + <td><BR></td> + </tr> + + <tr> + <td>Connection</td> + <td>transport.jms.ConnectionFactory</td> + <td>Name of the logical connection factory this service will use</td> + <td>Defaults to "default"</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.ConnectionFactoryJNDIName</td> + <td>The JNDI name of the JMS ConnectionFactory</td> + <td>Required</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.ConnectionFactoryType</td> + <td> Type of ConnectionFactory – queue / topic</td> + <td>Suggested to be specified</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.JMSSpecVersion</td> + <td>JMS API Version One of "1.1" or "1.0.2b"</td> + <td>Defaults to 1.1</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.UserName</td> + <td>The JMS connection username</td> + <td><BR></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.Password</td> + <td>The JMS connection password</td> + <td><BR></td> + <td><BR></td> + </tr> + <tr> + <td>Destinations</td> + <td>transport.jms.Destination</td> + <td>JNDI Name of the Destination </td> + <td>Defaults to Service name</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.DestinationType</td> + <td>Type of Destination – queue / topic</td> + <td>Defaults to a queue</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.DefaultReplyDestination</td> + <td>JNDI Name of the default reply Destination</td> + <td><BR></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.DefaultReplyDestinationType</td> + <td>Type of the reply Destination – queue / topic</td> + <td>Same type as of Destination</td> + <td><BR></td> + </tr> + <tr> + <td>Advanced</td> + <td>transport.jms.MessageSelector</td> + <td>Optional message selector to be applied</td> + <td><BR></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.SubscriptionDurable</td> + <td>Is the subscription durable? (For Topics) – true / false</td> + <td>Defaults to <B>false</B></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.DurableSubscriberName</td> + <td>Name to be used for the durable subscription</td> + <td>Required when subscription is durable</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.PubSubNoLocal</td> + <td>Should messages published by the same connection (for Topics) + be received? – true / false</td> + <td>Defaults to <B>false</B></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.CacheLevel</td> + <td>The JMS resource cache level. One of none / connection / + session / consumer / producer / auto</td> + <td>Defaults to <B>auto</B> </td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.ReceiveTimeout</td> + <td>Time to wait for a JMS message during polling. Negative means + wait forever, while 0 means do not wait at all. Anything else, is + a millisecond value for the poll</td> + <td>Defaults to 1000ms</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.ConcurrentConsumers</td> + <td>Number of concurrent consumer tasks (~threads) to be started to + poll for messages for this service. For Topics, this should be + always 1, to prevent the same message being processed multiple + times</td> + <td>Defaults to <B>1</B></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.MaxConcurrentConsumers</td> + <td>Will dynamically scale the number of concurrent consumer tasks + (~threads) until this value; as the load increases. Should always + be 1 for Topics.</td> + <td>Defaults to <B>1</B></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.IdleTaskLimit</td> + <td>The number of idle (i.e. poll without receipt of a message) + runs per task, before it dies – to recycle resources, and to + allow dynamic scale down.</td> + <td>Defaults to 10</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.MaxMessagesPerTask</td> + <td>The maximum number of successful message receipts to limit per + Task lifetime. </td> + <td>Defaults to <B>–1</B> which implies unlimited messages</td> + <td><BR></td> + </tr> + <tr> + <td>Reconnection</td> + <td>transport.jms.InitialReconnectDuration</td> + <td>Initial reconnection attempt duration</td> + <td>Defaults to 10,000ms</td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.ReconnectProgressFactor</td> + <td>Factor used to compute consecutive reconnection attempt + durations, in a geometric series</td> + <td>Defaults to <B>2 (i.e. exponential)</B></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.MaxReconnectDuration</td> + <td>Maximum limit for a reconnection duration</td> + <td>Defaults to <B>1 hour</B></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>transport.jms.PublishEPR</td> + <td>One or more JMS URL's to be showed as the JMS EPRs on the WSDL + for the service. Allows the specification of a custom EPR, and/or + hiding of internal properties from a public EPR (e.g. + credentials). Add one as LEGACY to retain auto generated EPR, when + adding new EPRs</td> + <td><BR></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td><BR></td> + <td><BR></td> + <td><BR></td> + <td><BR></td> + </tr> + <tr> + <td>Legacy Mode and Payload handling</td> + <td>Wrapper</td> + <td>Binary and Text payload wrapper element to be specified as "{ns}name" where ns refers to a namespace and name the name of the element</td> + <td>Default binary wrapper<ul><li>{http://ws.apache.org/commons/ns/payload}binary</li></ul> + Default text wrapper <ul><li>{http://ws.apache.org/commons/ns/payload}text</li></ul></td> + <td><BR></td> + </tr> + <tr> + <td><BR></td> + <td>Operation</td> + <td>operation name to be specified as "{ns}name" where ns refers to the namespace and name the name of the operation</td> + <td>Defaults to urn:mediate</td> + <td><BR></td> + </tr> +</TABLE> + +</body> +</html>
\ No newline at end of file |