diff options
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java')
-rw-r--r-- | branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java | 393 |
1 files changed, 393 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java new file mode 100644 index 0000000000..d5d164ce76 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java @@ -0,0 +1,393 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.Hashtable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterIncludeImpl; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Encapsulate a JMS Connection factory definition within an Axis2.xml + * + * JMS Connection Factory definitions, allows JNDI properties as well as other service + * level parameters to be defined, and re-used by each service that binds to it + * + * When used for sending messages out, the JMSConnectionFactory'ies are able to cache + * a Connection, Session or Producer + */ +public class JMSConnectionFactory { + + private static final Log log = LogFactory.getLog(JMSConnectionFactory.class); + + /** The name used for the connection factory definition within Axis2 */ + private String name = null; + /** The list of parameters from the axis2.xml definition */ + private Hashtable<String, String> parameters = new Hashtable<String, String>(); + + /** The cached InitialContext reference */ + private Context context = null; + /** The JMS ConnectionFactory this definition refers to */ + private ConnectionFactory conFactory = null; + /** The shared JMS Connection for this JMS connection factory */ + private Connection sharedConnection = null; + /** The shared JMS Session for this JMS connection factory */ + private Session sharedSession = null; + /** The shared JMS MessageProducer for this JMS connection factory */ + private MessageProducer sharedProducer = null; + /** The Shared Destination */ + private Destination sharedDestination = null; + /** The shared JMS connection for this JMS connection factory */ + private int cacheLevel = JMSConstants.CACHE_CONNECTION; + + /** + * Digest a JMS CF definition from an axis2.xml 'Parameter' and construct + * @param parameter the axis2.xml 'Parameter' that defined the JMS CF + */ + public JMSConnectionFactory(Parameter parameter) { + + this.name = parameter.getName(); + ParameterIncludeImpl pi = new ParameterIncludeImpl(); + + try { + pi.deserializeParameters((OMElement) parameter.getValue()); + } catch (AxisFault axisFault) { + handleException("Error reading parameters for JMS connection factory" + name, axisFault); + } + + for (Object o : pi.getParameters()) { + Parameter p = (Parameter) o; + parameters.put(p.getName(), (String) p.getValue()); + } + + digestCacheLevel(); + try { + context = new InitialContext(parameters); + conFactory = JMSUtils.lookup(context, ConnectionFactory.class, + parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)); + if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) { + sharedDestination = JMSUtils.lookup(context, Destination.class, + parameters.get(JMSConstants.PARAM_DESTINATION)); + } + log.info("JMS ConnectionFactory : " + name + " initialized"); + + } catch (NamingException e) { + throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " + + parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " + + parameters.get(JMSConstants.PARAM_DESTINATION) + + " for JMS CF : " + name + " using : " + parameters); + } + } + + /** + * Digest, the cache value iff specified + */ + private void digestCacheLevel() { + + String key = JMSConstants.PARAM_CACHE_LEVEL; + String val = parameters.get(key); + + if ("none".equalsIgnoreCase(val)) { + this.cacheLevel = JMSConstants.CACHE_NONE; + } else if ("connection".equalsIgnoreCase(val)) { + this.cacheLevel = JMSConstants.CACHE_CONNECTION; + } else if ("session".equals(val)){ + this.cacheLevel = JMSConstants.CACHE_SESSION; + } else if ("producer".equals(val)) { + this.cacheLevel = JMSConstants.CACHE_PRODUCER; + } else if (val != null) { + throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name); + } + } + + /** + * Return the name assigned to this JMS CF definition + * @return name of the JMS CF + */ + public String getName() { + return name; + } + + /** + * The list of properties (including JNDI and non-JNDI) + * @return properties defined on the JMS CF + */ + public Hashtable<String, String> getParameters() { + return parameters; + } + + /** + * Get cached InitialContext + * @return cache InitialContext + */ + public Context getContext() { + return context; + } + + /** + * Cache level applicable for this JMS CF + * @return applicable cache level + */ + public int getCacheLevel() { + return cacheLevel; + } + + /** + * Get the shared Destination - if defined + * @return + */ + public Destination getSharedDestination() { + return sharedDestination; + } + + /** + * Lookup a Destination using this JMS CF definitions and JNDI name + * @param name JNDI name of the Destionation + * @return JMS Destination for the given JNDI name or null + */ + public Destination getDestination(String name) { + try { + return JMSUtils.lookup(context, Destination.class, name); + } catch (NamingException e) { + handleException("Unknown JMS Destination : " + name + " using : " + parameters, e); + } + return null; + } + + /** + * Get the reply Destination from the PARAM_REPLY_DESTINATION parameter + * @return reply destination defined in the JMS CF + */ + public String getReplyToDestination() { + return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION); + } + + private void handleException(String msg, Exception e) { + log.error(msg, e); + throw new AxisJMSException(msg, e); + } + + /** + * Should the JMS 1.1 API be used? - defaults to yes + * @return true, if JMS 1.1 api should be used + */ + public boolean isJmsSpec11() { + return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null || + "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER)); + } + + /** + * Return the type of the JMS CF Destination + * @return TRUE if a Queue, FALSE for a Topic and NULL for a JMS 1.1 Generic Destination + */ + public Boolean isQueue() { + if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null && + parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) { + return null; + } + + if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) { + if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { + return true; + } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { + return false; + } else { + throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " + + parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name); + } + } else { + if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { + return true; + } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { + return false; + } else { + throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " + + parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name); + } + } + } + + /** + * Is a session transaction requested from users of this JMS CF? + * @return session transaction required by the clients of this? + */ + private boolean isSessionTransacted() { + return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null || + Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED)); + } + + /** + * Create a new Connection + * @return a new Connection + */ + private Connection createConnection() { + + Connection connection = null; + try { + connection = JMSUtils.createConnection( + conFactory, + parameters.get(JMSConstants.PARAM_JMS_USERNAME), + parameters.get(JMSConstants.PARAM_JMS_PASSWORD), + isJmsSpec11(), isQueue()); + + if (log.isDebugEnabled()) { + log.debug("New JMS Connection from JMS CF : " + name + " created"); + } + + } catch (JMSException e) { + handleException("Error acquiring a Connection from the JMS CF : " + name + + " using properties : " + parameters, e); + } + return connection; + } + + /** + * Create a new Session + * @param connection Connection to use + * @return A new Session + */ + private Session createSession(Connection connection) { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS Session from JMS CF : " + name); + } + return JMSUtils.createSession( + connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue()); + + } catch (JMSException e) { + handleException("Error creating JMS session from JMS CF : " + name, e); + } + return null; + } + + /** + * Create a new MessageProducer + * @param session Session to be used + * @param destination Destination to be used + * @return a new MessageProducer + */ + private MessageProducer createProducer(Session session, Destination destination) { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS MessageProducer from JMS CF : " + name); + } + + return JMSUtils.createProducer( + session, destination, isQueue(), isJmsSpec11()); + + } catch (JMSException e) { + handleException("Error creating JMS producer from JMS CF : " + name,e); + } + return null; + } + + /** + * Get a new Connection or shared Connection from this JMS CF + * @return new or shared Connection from this JMS CF + */ + public Connection getConnection() { + if (cacheLevel > JMSConstants.CACHE_NONE) { + return getSharedConnection(); + } else { + return createConnection(); + } + } + + /** + * Get a new Session or shared Session from this JMS CF + * @param connection the Connection to be used + * @return new or shared Session from this JMS CF + */ + public Session getSession(Connection connection) { + if (cacheLevel > JMSConstants.CACHE_CONNECTION) { + return getSharedSession(); + } else { + return createSession((connection == null ? getConnection() : connection)); + } + } + + /** + * Get a new MessageProducer or shared MessageProducer from this JMS CF + * @param connection the Connection to be used + * @param session the Session to be used + * @param destination the Destination to bind MessageProducer to + * @return new or shared MessageProducer from this JMS CF + */ + public MessageProducer getMessageProducer( + Connection connection, Session session, Destination destination) { + if (cacheLevel > JMSConstants.CACHE_SESSION) { + return getSharedProducer(); + } else { + return createProducer((session == null ? getSession(connection) : session), destination); + } + } + + /** + * Get a new Connection or shared Connection from this JMS CF + * @return new or shared Connection from this JMS CF + */ + private Connection getSharedConnection() { + if (sharedConnection == null) { + sharedConnection = createConnection(); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS Connection for JMS CF : " + name); + } + } + return sharedConnection; + } + + /** + * Get a shared Session from this JMS CF + * @return shared Session from this JMS CF + */ + private Session getSharedSession() { + if (sharedSession == null) { + sharedSession = createSession(getSharedConnection()); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS Session for JMS CF : " + name); + } + } + return sharedSession; + } + + /** + * Get a shared MessageProducer from this JMS CF + * @return shared MessageProducer from this JMS CF + */ + private MessageProducer getSharedProducer() { + if (sharedProducer == null) { + sharedProducer = createProducer(getSharedSession(), sharedDestination); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS MessageConsumer for JMS CF : " + name); + } + } + return sharedProducer; + } +} |