From bee80d90df6104edff19cbb6a318fac3d64296aa Mon Sep 17 00:00:00 2001 From: antelder Date: Sat, 16 May 2009 08:53:06 +0000 Subject: Create 1.5 release branch from current 1.x trunk git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@775437 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/ws/axis2/jms/AxisJMSException.java | 35 ++ .../binding/ws/axis2/jms/JMSConnectionFactory.java | 605 +++++++++++++++++++++ .../sca/binding/ws/axis2/jms/JMSListener.java | 527 ++++++++++++++++++ .../binding/ws/axis2/jms/JMSMessageReceiver.java | 270 +++++++++ .../binding/ws/axis2/jms/JMSOutTransportInfo.java | 220 ++++++++ .../sca/binding/ws/axis2/jms/JMSSender.java | 389 +++++++++++++ .../apache/tuscany/sca/binding/ws/axis2/jms/README | 14 + 7 files changed, 2060 insertions(+) create mode 100644 branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java create mode 100644 branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java create mode 100644 branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java create mode 100644 branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java create mode 100644 branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java create mode 100644 branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java create mode 100644 branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/README (limited to 'branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms') diff --git a/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java new file mode 100644 index 0000000000..09a1960ce4 --- /dev/null +++ b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +public class AxisJMSException extends RuntimeException { + + AxisJMSException() { + super(); + } + + AxisJMSException(String msg) { + super(msg); + } + + AxisJMSException(String msg, Exception e) { + super(msg, e); + } +} diff --git a/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java new file mode 100644 index 0000000000..a96ec0b1c4 --- /dev/null +++ b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java @@ -0,0 +1,605 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NameNotFoundException; +import javax.naming.NamingException; + +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.transport.jms.JMSConstants; +import org.apache.axis2.transport.jms.JMSUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.work.WorkScheduler; + +/** + * Encapsulate a JMS Connection factory definition within an Axis2.xml + *

+ * More than one JMS connection factory could be defined within an Axis2 XML + * specifying the JMSListener as the transportReceiver. + *

+ * These connection factories are created at the initialization of the + * transportReceiver, and any service interested in using any of these could + * specify the name of the factory and the destination through Parameters named + * JMSConstants.CONFAC_PARAM and JMSConstants.DEST_PARAM as shown below. + *

+ * myQueueConnectionFactory + * TestQueue + *

+ * If a connection factory is defined by a parameter named + * JMSConstants.DEFAULT_CONFAC_NAME in the Axis2 XML, services which does not + * explicitly specify a connection factory will be defaulted to it - if it is + * defined in the Axis2 configuration. + *

+ * e.g. + * + * + * org.apache.activemq.jndi.ActiveMQInitialContextFactory + * tcp://localhost:61616 + * TopicConnectionFactory + * myTopicOne, myTopicTwo + * + * + * org.apache.activemq.jndi.ActiveMQInitialContextFactory + * tcp://localhost:61616 + * QueueConnectionFactory + * myQueueOne, myQueueTwo + * + * + * org.apache.activemq.jndi.ActiveMQInitialContextFactory + * tcp://localhost:61616 + * ConnectionFactory + * myDestinationOne, myDestinationTwo + * + * + */ +public class JMSConnectionFactory { + + private static final Log log = LogFactory.getLog(JMSConnectionFactory.class); + + /** + * The name used for the connection factory definition within Axis2 + */ + private String name = null; + /** + * The JNDI name of the actual connection factory + */ + private String jndiName = null; + /** + * The JNDI name of the actual connection factory username + */ + private String jndiUser = null; + /** + * The JNDI name of the actual connection factory password + */ + private String jndiPass = null; + /** + * Map of destination JNDI names to service names + */ + private Map serviceJNDINameMapping = null; + /** + * Map of destinations to service names + */ + private Map serviceDestinationMapping = null; + /** + * The JMS Sessions listening for messages + */ + private Map jmsSessions = null; + /** + * Properties of the connection factory + */ + private Hashtable properties = null; + /** + * The JNDI Context used + */ + private Context context = null; + /** + * The actual ConnectionFactory instance held within + */ + private ConnectionFactory conFactory = null; + /** + * The JMS Connection is opened. + */ + private Connection connection = null; + /** + * The JMS Message receiver for this connection factory + */ + private JMSMessageReceiver msgRcvr = null; + /** + * The actual password for the connection factory after retrieval from JNDI. + * If this is not supplied, the OS username will be used by default + */ + private String user = null; + /** + * The actual password for the connection factory after retrieval from JNDI. + * If this is not supplied, the OS credentials will be used by default + */ + private String pass = null; + + private WorkScheduler workScheduler; + private boolean consumerRunning; + + /** + * Create a JMSConnectionFactory for the given Axis2 name and JNDI name + * + * @param name the local Axis2 name of the connection factory + * @param jndiName the JNDI name of the actual connection factory used + */ + JMSConnectionFactory(String name, String jndiName, WorkScheduler workScheduler) { + this.name = name; + this.jndiName = jndiName; + serviceJNDINameMapping = new HashMap(); + serviceDestinationMapping = new HashMap(); + properties = new Hashtable(); + jmsSessions = new HashMap(); + this.workScheduler = workScheduler; + } + + /** + * Create a JMSConnectionFactory for the given Axis2 name + * + * @param name the local Axis2 name of the connection factory + */ + JMSConnectionFactory(String name, WorkScheduler workScheduler) { + this(name, null, workScheduler); + } + + /** + * Connect to the actual JMS connection factory specified by the JNDI name + * + * @throws NamingException if the connection factory cannot be found + */ + public void connect() throws NamingException { + if (context == null) { + createInitialContext(); + } + conFactory = (ConnectionFactory) context.lookup(jndiName); + + if (jndiUser != null) + user = (String ) context.lookup(jndiUser); + + if (jndiPass != null) + pass = (String ) context.lookup(jndiPass); + + log.debug("Connected to the actual connection factory : " + jndiName); + } + + /** + * Creates the initial context using the set properties + * + * @throws NamingException + */ + private void createInitialContext() throws NamingException { + context = new InitialContext(properties); + } + + /** + * Set the JNDI connection factory name + * + * @param jndiName + */ + public void setJndiName(String jndiName) { + this.jndiName = jndiName; + } + + /** + * Get the JNDI name of the actual factory username + * + * @return the jndi name of the actual connection factory username + */ + public void setJndiUser(String jndiUser) { + this.jndiUser = jndiUser; + } + + /** + * Get the JNDI name of the actual factory password + * + * @return the jndi name of the actual connection factory password + */ + public void setJndiPass(String jndiPass) { + this.jndiPass = jndiPass; + } + + /** + * Add a listen destination on this connection factory on behalf of the given service + * + * @param destinationJndi destination JNDI name + * @param serviceName the service to which it belongs + */ + public void addDestination(String destinationJndi, String serviceName) { + + serviceJNDINameMapping.put(destinationJndi, serviceName); + String destinationName = getDestinationName(destinationJndi); + + if (destinationName == null) { + log.warn("JMS Destination with JNDI name : " + destinationJndi + " does not exist"); + + Connection con = null; + try { + if ((jndiUser == null) || (jndiPass == null)){ + // Use the OS username and credentials + con = conFactory.createConnection(); + } else{ + // use an explicit username and password + con = conFactory.createConnection(user, pass); + } + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(destinationJndi); + destinationName = queue.getQueueName(); + log.warn("JMS Destination with JNDI name : " + destinationJndi + " created"); + + } catch (JMSException e) { + log.error("Unable to create a Destination with JNDI name : " + destinationJndi, e); + // mark service as faulty + JMSUtils.markServiceAsFaulty( + (String) serviceJNDINameMapping.get(destinationJndi), + "Error looking up JMS destination : " + destinationJndi, + msgRcvr.getAxisConf().getAxisConfiguration()); + + } finally { + if (con != null) { + try { + con.close(); + } catch (JMSException ignore) {} + } + } + } + + serviceDestinationMapping.put(destinationName, serviceName); + log.info("Mapping JNDI name : " + destinationJndi + " and JMS Destination name : " + + destinationName + " against service : " + serviceName); + } + + /** + * Remove listen destination on this connection factory + * + * @param destinationJndi the JMS destination to be removed + * @throws if an error occurs trying to stop listening for messages before removal + */ + public void removeDestination(String destinationJndi) throws JMSException { + // find and save provider specific Destination name before we delete + String providerSpecificDestination = getDestinationName(destinationJndi); + stoplistenOnDestination(destinationJndi); + serviceJNDINameMapping.remove(destinationJndi); + if (providerSpecificDestination != null) { + serviceDestinationMapping.remove(providerSpecificDestination); + } + } + + /** + * Add a property to the connection factory + * + * @param key + * @param value + */ + public void addProperty(String key, String value) { + properties.put(key, value); + } + + /** + * Return the name of the connection factory + * + * @return the Axis2 name of this factory + */ + public String getName() { + return name; + } + + /** + * Get the JNDI name of the actual factory + * + * @return the jndi name of the actual connection factory + */ + public String getJndiName() { + return jndiName; + } + + /** + * Get the JNDI name of the actual factory username + * + * @return the jndi name of the actual connection factory username + */ + public String getJndiUser() { + return jndiUser; + } + + /** + * Get the JNDI name of the actual factory password + * + * @return the jndi name of the actual connection factory password + */ + public String getJndiPass() { + return jndiPass; + } + + + /** + * This is the real password for the connection factory after the JNDI lookup + * + * @return the real password for the connection factory after the JNDI lookup + */ + public String getPass() { + return pass; + } + + /** + * This is the real username for the connection factory after the JNDI lookup + * + * @return the eal username for the connection factory after the JNDI lookup + */ + public String getUser() { + return user; + } + + /** + * Get the actual underlying connection factory + * + * @return actual connection factory + */ + public ConnectionFactory getConFactory() { + return conFactory; + } + + /** + * Get the list of destinations (JNDI) associated with this connection factory + * + * @return destinations to service maping + */ + public Map getDestinations() { + return serviceJNDINameMapping; + } + + /** + * Get the connection factory properties + * + * @return properties + */ + public Hashtable getProperties() { + return properties; + } + + /** + * Begin listening for messages on the list of destinations associated + * with this connection factory. (Called during Axis2 initialization of + * the Transport receivers) + * + * @param msgRcvr the message receiver which will process received messages + * @throws JMSException on exceptions + */ + public void listen(JMSMessageReceiver msgRcvr) throws JMSException { + + // save a reference to the message receiver + this.msgRcvr = msgRcvr; + + log.debug("Connection factory : " + name + " initializing..."); + + if (conFactory == null || context == null) { + handleException( + "Connection factory must be 'connected' before listening"); + } else { + try { + if ((jndiUser == null) || (jndiPass == null)){ + // User the OS username and credentials + connection = conFactory.createConnection(); + } else{ + // use an explicit username and password + connection = conFactory.createConnection(user, pass); + } + } catch (JMSException e) { + handleException("Error creating a JMS connection using the " + + "factory : " + jndiName, e); + } + } + + Iterator iter = serviceJNDINameMapping.keySet().iterator(); + while (iter.hasNext()) { + String destinationJndi = (String) iter.next(); + listenOnDestination(destinationJndi); + } + + // start the connection + if (!consumerRunning) { + connection.start(); + } + log.info("Connection factory : " + name + " initialized..."); + } + + /** + * Listen on the given destination from this connection factory. Used to + * start listening on a destination associated with a newly deployed service + * + * @param destinationJndi the JMS destination to listen on + * @throws JMSException on exception + */ + public void listenOnDestination(String destinationJndi) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = null; + try { + Object o = context.lookup(destinationJndi); + destination = (Destination) o; + + } catch (NameNotFoundException e) { + log.warn("Cannot find destination : " + destinationJndi + + " Creating a Queue with this name"); + destination = session.createQueue(destinationJndi); + + } catch (NamingException e) { + log.warn("Error looking up destination : " + destinationJndi, e); + // mark service as faulty + JMSUtils.markServiceAsFaulty( + (String) serviceJNDINameMapping.get(destinationJndi), + "Error looking up JMS destination : " + destinationJndi, + this.msgRcvr.getAxisConf().getAxisConfiguration()); + } + + MessageConsumer consumer = session.createConsumer(destination); +// consumer.setMessageListener(this.msgRcvr); replace with new Tuscany method: + registerMessageReceiver(consumer, this.msgRcvr); + jmsSessions.put(destinationJndi, session); + } + + private void registerMessageReceiver(final MessageConsumer consumer, final JMSMessageReceiver messageReceiver) throws JMSException { + + try { + + consumer.setMessageListener(messageReceiver); + + } catch (javax.jms.JMSException e) { + + // setMessageListener not allowed in JEE container so use Tuscany threads + + connection.start(); + consumerRunning = true; + + workScheduler.scheduleWork(new Runnable() { + + public void run() { + try { + while (consumerRunning) { + final Message msg = consumer.receive(); + if (msg != null) { + workScheduler.scheduleWork(new Runnable() { + public void run() { + try { + messageReceiver.onMessage(msg); + } catch (Exception e) { + log.error("Exception on message receiver thread", e); + } + } + }); + } + } + } catch (Exception e) { + log.error("Exception on consumer receive thread", e); + } + } + }); + } + } + + /** + * Stop listening on the given destination - for undeployment of services + * + * @param destinationJndi the JNDI name of the JMS destination + * @throws JMSException on exception + */ + private void stoplistenOnDestination(String destinationJndi) throws JMSException { + ((Session) jmsSessions.get(destinationJndi)).close(); + } + + /** + * Return the service name using this destination + * + * @param destination the destination name + * @return the service which uses the given destination, or null + */ + public String getServiceNameForDestination(String destination) { + + return (String) serviceJNDINameMapping.get(destination); + } + + /** + * Close all connections, sessions etc.. and stop this connection factory + */ + public void stop() { + try { + consumerRunning = false; + connection.close(); + } catch (JMSException e) { + log.warn("Error shutting down connection factory : " + name, e); + } + } + + /** + * Return the provider specific Destination name if any for the destination with the given + * JNDI name + * @param destinationJndi the JNDI name of the destination + * @return the provider specific Destination name or null if cannot be found + */ + public String getDestinationName(String destinationJndi) { + try { + Destination destination = (Destination) context.lookup(destinationJndi); + if (destination != null && destination instanceof Queue) { + return ((Queue) destination).getQueueName(); + } else if (destination != null && destination instanceof Topic) { + return ((Topic) destination).getTopicName(); + } + } catch (JMSException e) { + log.warn("Error reading provider specific JMS destination name for destination " + + "with JNDI name : " + destinationJndi, e); + } catch (NamingException e) { + log.warn("Error looking up destination with JNDI name : " + destinationJndi + + " to map its corresponding provider specific Destination name"); + } + return null; + } + + /** + * Return the EPR for the JMS Destination with the given JNDI name and using + * this connection factory + * @param destination the JNDI name of the JMS Destionation + * @return the EPR + */ + public EndpointReference getEPRForDestination(String destination) { + + StringBuffer sb = new StringBuffer(); + sb.append(JMSConstants.JMS_PREFIX).append(destination); + sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). + append("=").append(getJndiName()); + Iterator props = getProperties().keySet().iterator(); + while (props.hasNext()) { + String key = (String) props.next(); + String value = (String) getProperties().get(key); + sb.append("&").append(key).append("=").append(value); + } + + return new EndpointReference(sb.toString()); + } + + public String getServiceByDestination(String destinationName) { + return (String) serviceDestinationMapping.get(destinationName); + } + + private void handleException(String msg) throws AxisJMSException { + log.error(msg); + throw new AxisJMSException(msg); + } + + private void handleException(String msg, Exception e) throws AxisJMSException { + log.error(msg, e); + throw new AxisJMSException(msg, e); + } +} diff --git a/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java new file mode 100644 index 0000000000..08190fb57c --- /dev/null +++ b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.StringTokenizer; + +import javax.jms.JMSException; +import javax.naming.Context; +import javax.naming.NamingException; + +import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.context.SessionContext; +import org.apache.axis2.description.AxisModule; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.AxisServiceGroup; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterIncludeImpl; +import org.apache.axis2.description.TransportInDescription; +import org.apache.axis2.engine.AxisConfiguration; +import org.apache.axis2.engine.AxisEvent; +import org.apache.axis2.engine.AxisObserver; +import org.apache.axis2.transport.TransportListener; +import org.apache.axis2.transport.jms.JMSConstants; +import org.apache.axis2.transport.jms.JMSUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.work.WorkScheduler; + +import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; +import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; +import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; + +/** + * The JMS Transport listener implementation. A JMS Listner will hold one or + * more JMS connection factories, which would be created at initialization + * time. This implementation does not support the creation of connection + * factories at runtime. This JMS Listener registers with Axis to be notified + * of service deployment/undeployment/start and stop, and enables or disables + * listening for messages on the destinations as appropriate. + *

+ * A Service could state the JMS connection factory name and the destination + * name for use as Parameters in its services.xml as shown in the example + * below. If the connection name was not specified, it will use the connection + * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a + * factory is defined in the Axis2.xml. If the destination name is not specified + * it will default to a JMS queue by the name of the service. If the destination + * should be a Topic, it should be created on the JMS implementation, and + * specified in the services.xml of the service. + *

+ * + * myTopicConnectionFactory + * + * dynamicTopics/something.TestTopic + */ +public class JMSListener implements TransportListener { + + private static final Log log = LogFactory.getLog(JMSListener.class); + + /** + * The maximum number of threads used for the worker thread pool + */ + private static final int WORKERS_MAX_THREADS = 100; + /** + * The keep alive time of an idle worker thread + */ + private static final long WORKER_KEEP_ALIVE = 60L; + /** + * The worker thread timeout time unit + */ + private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; + + /** + * A Map containing the connection factories managed by this, keyed by name + */ + private Map connectionFactories = new HashMap(); + /** + * A Map of service name to the JMS EPR addresses + */ + private Map serviceNameToEprMap = new HashMap(); + /** + * The Axis2 Configuration context + */ + private ConfigurationContext configCtx = null; + + private ExecutorService workerPool; + + private WorkScheduler workScheduler; + + public JMSListener(WorkScheduler workScheduler) { + this.workScheduler = workScheduler; + } + + /** + * This is the TransportListener initialization method invoked by Axis2 + * + * @param axisConf the Axis configuration context + * @param transprtIn the TransportIn description + */ + public void init(ConfigurationContext axisConf, + TransportInDescription transprtIn) { + + // save reference to the configuration context + this.configCtx = axisConf; + + // initialize the defined connection factories + initializeConnectionFactories(transprtIn); + + // if no connection factories are defined, we cannot listen + if (connectionFactories.isEmpty()) { + log.warn("No JMS connection factories are defined." + + "Will not listen for any JMS messages"); + return; + } + + // iterate through deployed services and validate connection factory + // names, and mark services as faulty where appropriate. + Iterator services = + axisConf.getAxisConfiguration().getServices().values().iterator(); + + while (services.hasNext()) { + AxisService service = (AxisService) services.next(); + if (JMSUtils.isJMSService(service)) { + processService(service); + } + } + + // register to receive updates on services for lifetime management + axisConf.getAxisConfiguration().addObservers(new JMSAxisObserver()); + + log.info("JMS Transport Receiver (Listener) initialized..."); + } + + + /** + * Prepare to listen for JMS messages on behalf of this service + * + * @param service + */ + private void processService(AxisService service) { + JMSConnectionFactory cf = getConnectionFactory(service); + if (cf == null) { + String msg = "Service " + service.getName() + " does not specify" + + "a JMS connection factory or refers to an invalid factory. " + + "This service is being marked as faulty and will not be " + + "available over the JMS transport"; + log.warn(msg); + JMSUtils.markServiceAsFaulty( + service.getName(), msg, service.getAxisConfiguration()); + return; + } + + String destination = JMSUtils.getDestination(service); + + // compute service EPR and keep for later use + serviceNameToEprMap.put(service.getName(), getEPR(cf, destination)); + + // add the specified or implicit destination of this service + // to its connection factory + cf.addDestination(destination, service.getName()); + } + + /** + * Return the connection factory name for this service. If this service + * refers to an invalid factory or defaults to a non-existent default + * factory, this returns null + * + * @param service the AxisService + * @return the JMSConnectionFactory to be used, or null if reference is invalid + */ + private JMSConnectionFactory getConnectionFactory(AxisService service) { + Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM); + + // validate connection factory name (specified or default) + if (conFacParam != null) { + String conFac = (String) conFacParam.getValue(); + if (connectionFactories.containsKey(conFac)) { + return (JMSConnectionFactory) connectionFactories.get(conFac); + } else { + return null; + } + + } else if (connectionFactories.containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) { + return (JMSConnectionFactory) connectionFactories. + get(JMSConstants.DEFAULT_CONFAC_NAME); + + } else { + return null; + } + } + + /** + * Initialize the defined connection factories, parsing the TransportIn + * descriptions + * + * @param transprtIn The Axis2 Transport in for the JMS + */ + private void initializeConnectionFactories(TransportInDescription transprtIn) { + // iterate through all defined connection factories + Iterator conFacIter = transprtIn.getParameters().iterator(); + + while (conFacIter.hasNext()) { + + Parameter param = (Parameter) conFacIter.next(); + JMSConnectionFactory jmsConFactory = + new JMSConnectionFactory(param.getName(), workScheduler); + + ParameterIncludeImpl pi = new ParameterIncludeImpl(); + try { + pi.deserializeParameters((OMElement) param.getValue()); + } catch (AxisFault axisFault) { + handleException("Error reading Parameters for JMS connection " + + "factory" + jmsConFactory.getName(), axisFault); + } + + // read connection facotry properties + Iterator params = pi.getParameters().iterator(); + + while (params.hasNext()) { + Parameter p = (Parameter) params.next(); + + if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) { + jmsConFactory.addProperty( + Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue()); + } else if (Context.PROVIDER_URL.equals(p.getName())) { + jmsConFactory.addProperty( + Context.PROVIDER_URL, (String) p.getValue()); + } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) { + jmsConFactory.addProperty( + Context.SECURITY_PRINCIPAL, (String) p.getValue()); + } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) { + jmsConFactory.addProperty( + Context.SECURITY_CREDENTIALS, (String) p.getValue()); + } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) { + jmsConFactory.setJndiName((String) p.getValue()); + } else if (JMSConstants.CONFAC_JNDI_NAME_USER.equals(p.getName())) { + jmsConFactory.setJndiUser((String) p.getValue()); + } else if (JMSConstants.CONFAC_JNDI_NAME_PASS.equals(p.getName())) { + jmsConFactory.setJndiPass((String) p.getValue()); + } else if (JMSConstants.DEST_PARAM.equals(p.getName())) { + StringTokenizer st = + new StringTokenizer((String) p.getValue(), " ,"); + while (st.hasMoreTokens()) { + jmsConFactory.addDestination(st.nextToken(), null); + } + } + } + + // connect to the actual connection factory + try { + jmsConFactory.connect(); + connectionFactories.put(jmsConFactory.getName(), jmsConFactory); + } catch (NamingException e) { + handleException("Error connecting to JMS connection factory : " + + jmsConFactory.getJndiName(), e); + } + } + } + + /** + * Get the EPR for the given JMS connection factory and destination + * the form of the URL is + * jms:/?[=&]* + * + * @param cf the Axis2 JMS connection factory + * @param destination the JNDI name of the destination + * @return the EPR as a String + */ + private static String getEPR(JMSConnectionFactory cf, String destination) { + StringBuffer sb = new StringBuffer(); + sb.append(JMSConstants.JMS_PREFIX).append(destination); + sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). + append("=").append(cf.getJndiName()); + Iterator props = cf.getProperties().keySet().iterator(); + while (props.hasNext()) { + String key = (String) props.next(); + String value = (String) cf.getProperties().get(key); + sb.append("&").append(key).append("=").append(value); + } + return sb.toString(); + } + + /** + * Start this JMS Listener (Transport Listener) + * + * @throws AxisFault + */ + public void start() throws AxisFault { + // create thread pool of workers + workerPool = new ThreadPoolExecutor( + 1, + WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT, + new LinkedBlockingQueue(), + new org.apache.axis2.util.threadpool.DefaultThreadFactory( + new ThreadGroup("JMS Worker thread group"), + "JMSWorker")); + + Iterator iter = connectionFactories.values().iterator(); + while (iter.hasNext()) { + JMSConnectionFactory conFac = (JMSConnectionFactory) iter.next(); + JMSMessageReceiver msgRcvr = + new JMSMessageReceiver(conFac, workerPool, configCtx); + + try { + conFac.listen(msgRcvr); + } catch (JMSException e) { + handleException("Error starting connection factory : " + + conFac.getName(), e); + } + } + } + + /** + * Stop this transport listener and shutdown all of the connection factories + */ + public void stop() { + Iterator iter = connectionFactories.values().iterator(); + while (iter.hasNext()) { + ((JMSConnectionFactory) iter.next()).stop(); + } + if (workerPool != null) { + workerPool.shutdown(); + } + } + + /** + * Returns EPRs for the given service and IP. (Picks up precomputed EPR) + * + * @param serviceName service name + * @param ip ignored + * @return the EPR for the service + * @throws AxisFault not used + */ + public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + //Strip out the operation name + if (serviceName.indexOf('/') != -1) { + serviceName = serviceName.substring(0, serviceName.indexOf('/')); + } + + String endpointName = (String) serviceNameToEprMap.get(serviceName); + if (endpointName == null){ + if (serviceName.indexOf(".") != -1){ + serviceName = serviceName.substring(0, serviceName.indexOf(".")); + endpointName = (String) serviceNameToEprMap.get(serviceName); + } + } + return new EndpointReference[]{new EndpointReference(endpointName)}; + } + + /** + * Returns the EPR for the given service and IP. (Picks up precomputed EPR) + * + * @param serviceName service name + * @param ip ignored + * @return the EPR for the service + * @throws AxisFault not used + */ + public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault { + return getEPRsForService(serviceName, ip)[0]; + } + + /** + * Starts listening for messages on this service + * + * @param service the AxisService just deployed + */ + private void startListeningForService(AxisService service) { + processService(service); + JMSConnectionFactory cf = getConnectionFactory(service); + if (cf == null) { + String msg = "Service " + service.getName() + " does not specify" + + "a JMS connection factory or refers to an invalid factory." + + "This service is being marked as faulty and will not be " + + "available over the JMS transport"; + log.warn(msg); + JMSUtils.markServiceAsFaulty( + service.getName(), msg, service.getAxisConfiguration()); + return; + } + + String destination = JMSUtils.getDestination(service); + try { + cf.listenOnDestination(destination); + log.info("Started listening on destination : " + destination + + " for service " + service.getName()); + + } catch (JMSException e) { + handleException( + "Could not listen on JMS for service " + service.getName(), e); + JMSUtils.markServiceAsFaulty( + service.getName(), e.getMessage(), service.getAxisConfiguration()); + } + } + + /** + * Stops listening for messages for the service undeployed + * + * @param service the AxisService just undeployed + */ + private void stopListeningForService(AxisService service) { + + JMSConnectionFactory cf = getConnectionFactory(service); + if (cf == null) { + String msg = "Service " + service.getName() + " does not specify" + + "a JMS connection factory or refers to an invalid factory." + + "This service is being marked as faulty and will not be " + + "available over the JMS transport"; + log.warn(msg); + JMSUtils.markServiceAsFaulty( + service.getName(), msg, service.getAxisConfiguration()); + return; + } + + // remove from the serviceNameToEprMap + serviceNameToEprMap.remove(service.getName()); + + String destination = JMSUtils.getDestination(service); + try { + cf.removeDestination(destination); + } catch (JMSException e) { + handleException( + "Error while terminating listening on JMS destination : " + destination, e); + } + } + + private void handleException(String msg, Exception e) { + log.error(msg, e); + throw new AxisJMSException(msg, e); + } + + /** + * An AxisObserver which will start listening for newly deployed services, + * and stop listening when services are undeployed. + */ + class JMSAxisObserver implements AxisObserver { + + // The initilization code will go here + public void init(AxisConfiguration axisConfig) { + } + + public void serviceUpdate(AxisEvent event, AxisService service) { + + if (JMSUtils.isJMSService(service)) { + switch (event.getEventType()) { + case AxisEvent.SERVICE_DEPLOY : + startListeningForService(service); + break; + case AxisEvent.SERVICE_REMOVE : + stopListeningForService(service); + break; + case AxisEvent.SERVICE_START : + startListeningForService(service); + break; + case AxisEvent.SERVICE_STOP : + stopListeningForService(service); + break; + } + } + } + + public void moduleUpdate(AxisEvent event, AxisModule module) { + } + + //-------------------------------------------------------- + public void addParameter(Parameter param) throws AxisFault { + } + + public void removeParameter(Parameter param) throws AxisFault { + } + + public void deserializeParameters(OMElement parameterElement) throws AxisFault { + } + + public Parameter getParameter(String name) { + return null; + } + + public ArrayList getParameters() { + return null; + } + + public boolean isParameterLocked(String parameterName) { + return false; + } + + public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup serviceGroup) { + } + } + + public ConfigurationContext getConfigurationContext() { + return this.configCtx; + } + + + public SessionContext getSessionContext(MessageContext messageContext) { + return null; + } + + public void destroy() { + this.configCtx = null; + } +} diff --git a/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java new file mode 100644 index 0000000000..e9e9f04ab2 --- /dev/null +++ b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.io.InputStream; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.Context; +import javax.xml.stream.XMLStreamException; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.addressing.RelatesTo; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.engine.AxisEngine; +import org.apache.axis2.transport.jms.JMSConstants; +import org.apache.axis2.transport.jms.JMSUtils; +import org.apache.axis2.util.MessageContextBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; + +/** + * This is the actual receiver which listens for and accepts JMS messages, and + * hands them over to be processed by a worker thread. An instance of this + * class is created for each JMSConnectionFactory, but all instances may and + * will share the same worker thread pool. + */ +public class JMSMessageReceiver implements MessageListener { + + private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); + + /** + * The thread pool of workers + */ + private Executor workerPool = null; + /** + * The Axis configuration context + */ + private ConfigurationContext axisConf = null; + /** + * A reference to the JMS Connection Factory + */ + private JMSConnectionFactory jmsConFac = null; + + /** + * Create a new JMSMessage receiver + * + * @param jmsConFac the JMS connection factory associated with + * @param workerPool the worker thead pool to be used + * @param axisConf the Axis2 configuration + */ + JMSMessageReceiver(JMSConnectionFactory jmsConFac, + Executor workerPool, ConfigurationContext axisConf) { + this.jmsConFac = jmsConFac; + this.workerPool = workerPool; + this.axisConf = axisConf; + } + + /** + * Return the Axis configuration + * + * @return the Axis configuration + */ + public ConfigurationContext getAxisConf() { + return axisConf; + } + + /** + * Set the worker thread pool + * + * @param workerPool the worker thead pool + */ + public void setWorkerPool(Executor workerPool) { + this.workerPool = workerPool; + } + + /** + * The entry point on the recepit of each JMS message + * + * @param message the JMS message received + */ + public void onMessage(Message message) { + // directly create a new worker and delegate processing + try { + if (log.isDebugEnabled()) { + StringBuffer sb = new StringBuffer(); + sb.append("Received JMS message to destination : " + message.getJMSDestination()); + sb.append("\nMessage ID : " + message.getJMSMessageID()); + sb.append("\nCorrelation ID : " + message.getJMSCorrelationID()); + sb.append("\nReplyTo ID : " + message.getJMSReplyTo()); + log.debug(sb.toString()); + } + } catch (JMSException e) { + e.printStackTrace(); + } + workerPool.execute(new Worker(message)); + } + + /** + * Creates an Axis MessageContext for the received JMS message and + * sets up the transports and various properties + * + * @param message the JMS message + * @return the Axis MessageContext + */ + private MessageContext createMessageContext(Message message) { + + InputStream in = JMSUtils.getInputStream(message); + + try { + MessageContext msgContext = axisConf.createMessageContext(); + + // get destination and create correct EPR + Destination dest = message.getJMSDestination(); + String destinationName = null; + if (dest instanceof Queue) { + destinationName = ((Queue) dest).getQueueName(); + } else if (dest instanceof Topic) { + destinationName = ((Topic) dest).getTopicName(); + } + + String serviceName = jmsConFac.getServiceByDestination(destinationName); + + // hack to get around the crazy Active MQ dynamic queue and topic issues + if (serviceName == null) { + String provider = (String) jmsConFac.getProperties().get( + Context.INITIAL_CONTEXT_FACTORY); + if (provider.indexOf("activemq") != -1) { + serviceName = jmsConFac.getServiceNameForDestination( + ((dest instanceof Queue ? + JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE : + JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC) + destinationName)); + } + } + + + if (serviceName != null) { + // set to bypass dispatching and handover directly to this service + msgContext.setAxisService( + axisConf.getAxisConfiguration().getService(serviceName)); + } + + msgContext.setIncomingTransportName(Constants.TRANSPORT_JMS); + msgContext.setTransportIn( + axisConf.getAxisConfiguration().getTransportIn(Constants.TRANSPORT_JMS)); + + msgContext.setTransportOut( + axisConf.getAxisConfiguration().getTransportOut(Constants.TRANSPORT_JMS)); + // the reply is assumed to be on the JMSReplyTo destination, using + // the same incoming connection factory + + + JMSOutTransportInfo jmsOutTransportInfo; + + if ((jmsConFac.getJndiUser() == null) || (jmsConFac.getJndiPass() == null)) + jmsOutTransportInfo= new JMSOutTransportInfo(jmsConFac.getConFactory(), message.getJMSReplyTo()); + else + jmsOutTransportInfo= new JMSOutTransportInfo(jmsConFac.getConFactory(), jmsConFac.getUser(), jmsConFac.getPass(), message.getJMSReplyTo()); + + msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, jmsOutTransportInfo); + + msgContext.setServerSide(true); + msgContext.setMessageID(message.getJMSMessageID()); + + Destination replyTo = message.getJMSReplyTo(); + String jndiDestinationName = null; + if (replyTo == null) { + Parameter param = msgContext.getAxisService().getParameter(JMSConstants.REPLY_PARAM); + if (param != null && param.getValue() != null) { + jndiDestinationName = (String) param.getValue(); + } + } + + if (jndiDestinationName != null) { + msgContext.setReplyTo(jmsConFac.getEPRForDestination(jndiDestinationName)); + } + + String soapAction = JMSUtils.getProperty(message, JMSConstants.SOAPACTION); + if (soapAction != null) { + msgContext.setSoapAction(soapAction); + } + + msgContext.setEnvelope( + JMSUtils.getSOAPEnvelope(message, msgContext, in)); + + // set correlation id + String correlationId = message.getJMSCorrelationID(); + if (correlationId != null && correlationId.length() > 0) { + msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, correlationId); + msgContext.setRelationships( + new RelatesTo[] { new RelatesTo(correlationId) }); + } + + return msgContext; + + } catch (JMSException e) { + handleException("JMS Exception reading the destination name", e); + } catch (AxisFault e) { + handleException("Axis fault creating the MessageContext", e); + } catch (XMLStreamException e) { + handleException("Error reading the SOAP envelope", e); + } + return null; + } + + private void handleException(String msg, Exception e) { + log.error(msg, e); + throw new AxisJMSException(msg, e); + } + + /** + * The actual Runnable Worker implementation which will process the + * received JMS messages in the worker thread pool + */ + class Worker implements Runnable { + + private Message message = null; + + Worker(Message message) { + this.message = message; + } + + public void run() { + MessageContext msgCtx = createMessageContext(message); + + AxisEngine engine = new AxisEngine(msgCtx.getConfigurationContext()); + try { + log.debug("Delegating JMS message for processing to the Axis engine"); + try { + engine.receive(msgCtx); + } catch (AxisFault e) { + log.debug("Exception occured when receiving the SOAP message", e); + if (msgCtx.isServerSide()) { + MessageContext faultContext = MessageContextBuilder.createFaultMessageContext(msgCtx, e); + engine.sendFault(faultContext); + } + } + } catch (AxisFault af) { + log.error("JMS Worker [" + Thread.currentThread().getName() + + "] Encountered an Axis Fault : " + af.getMessage(), af); + } + } + } +} diff --git a/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java new file mode 100644 index 0000000000..5fa6542eec --- /dev/null +++ b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.util.Hashtable; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NameNotFoundException; +import javax.naming.NamingException; + +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.axis2.transport.jms.JMSConstants; +import org.apache.axis2.transport.jms.JMSUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * The JMS OutTransportInfo + */ +public class JMSOutTransportInfo implements OutTransportInfo { + + private static final Log log = LogFactory.getLog(JMSOutTransportInfo.class); + + private ConnectionFactory connectionFactory = null; + private String connectionFactoryUser = null; + private String connectionFactoryPassword = null; + private Destination destination = null; + + private String contentType = null; + + /** + * Creates an instance using the given connection factory and destination + * + * @param connectionFactory the connection factory + * @param dest the destination + */ + JMSOutTransportInfo(ConnectionFactory connectionFactory, Destination dest) { + this.connectionFactory = connectionFactory; + this.destination = dest; + } + + /** + * Creates an instance using the given connection factory and destination + * + * @param connectionFactory the connection factory + * @param dest the destination + */ + JMSOutTransportInfo(ConnectionFactory connectionFactory, String connectionFactoryUser, String connectionFactoryPassword, Destination dest) { + this.connectionFactory = connectionFactory; + this.connectionFactoryUser = connectionFactoryUser; + this.connectionFactoryPassword = connectionFactoryPassword; + this.destination = dest; + } + + /** + * Creates and instance using the given URL + * + * @param url the URL + */ + JMSOutTransportInfo(String url) { + if (!url.startsWith(JMSConstants.JMS_PREFIX)) { + handleException("Invalid JMS URL : " + url + + " Must begin with the prefix " + JMSConstants.JMS_PREFIX); + } else { + Context context = null; + Hashtable props = JMSUtils.getProperties(url); + try { + context = new InitialContext(props); + } catch (NamingException e) { + handleException("Could not get the initial context", e); + } + + connectionFactory = getConnectionFactory(context, props); + connectionFactoryUser = getConnectionFactoryUser(context, props); + connectionFactoryPassword = getConnectionFactoryPass(context, props); + destination = getDestination(context, url); + } + } + + /** + * Get the referenced ConnectionFactory using the properties from the context + * + * @param context the context to use for lookup + * @param props the properties which contains the JNDI name of the factory + * @return the connection factory + */ + private ConnectionFactory getConnectionFactory(Context context, Hashtable props) { + try { + + String conFacJndiName = (String) props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM); + if (conFacJndiName != null) { + return (ConnectionFactory) context.lookup(conFacJndiName); + } else { + throw new NamingException( + "JMS Connection Factory JNDI name cannot be determined from url"); + } + } catch (NamingException e) { + handleException("Cannot get JMS Connection factory with props : " + props, e); + } + return null; + } + + /** + * Get the referenced ConnectionFactory Username (if supplied) using the properties from the context + * + * @param context the context to use for lookup + * @param props the properties which contains the JNDI name of the factory username + * @return the connection factory username (or null if one is not in the JNDI tree) + */ + private String getConnectionFactoryUser(Context context, Hashtable props) { + try { + + String conFacJndiUser = (String) props.get(JMSConstants.CONFAC_JNDI_NAME_USER); + if (conFacJndiUser != null) { + return (String) context.lookup(conFacJndiUser); + } else { + return null; + } + } catch (NamingException e) { + handleException("Cannot get JMS Connection factory username with props : " + props, e); + } + return null; + } + + /** + * Get the referenced ConnectionFactory Password (if supplied) using the properties from the context + * + * @param context the context to use for lookup + * @param props the properties which contains the JNDI name of the factory password + * @return the connection factory password (or null if one is not in the JNDI tree) + */ + private String getConnectionFactoryPass(Context context, Hashtable props) { + try { + + String conFacJndiPass = (String) props.get(JMSConstants.CONFAC_JNDI_NAME_PASS); + if (conFacJndiPass != null) { + return (String) context.lookup(conFacJndiPass); + } else { + return null; + } + } catch (NamingException e) { + handleException("Cannot get JMS Connection factory password with props : " + props, e); + } + return null; + } + + /** + * Get the JMS destination specified by the given URL from the context + * + * @param context the Context to lookup + * @param url URL + * @return the JMS destination, or null if it does not exist + */ + private Destination getDestination(Context context, String url) { + String destinationName = JMSUtils.getDestination(url); + try { + return (Destination) context.lookup(destinationName); + + } catch (NameNotFoundException e) { + log.warn("Cannot get or lookup JMS destination : " + destinationName + + " from url : " + url + " : " + e.getMessage()); + + } catch (NamingException e) { + handleException("Cannot get JMS destination : " + destinationName + + " from url : " + url, e); + } + return null; + } + + + private void handleException(String s) { + log.error(s); + throw new AxisJMSException(s); + } + + private void handleException(String s, Exception e) { + log.error(s, e); + throw new AxisJMSException(s, e); + } + + public Destination getDestination() { + return destination; + } + + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public String getConnectionFactoryPassword() { + return connectionFactoryPassword; + } + + public String getConnectionFactoryUser() { + return connectionFactoryUser; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } +} diff --git a/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java new file mode 100644 index 0000000000..7caa045015 --- /dev/null +++ b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Hashtable; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NameNotFoundException; +import javax.naming.NamingException; +import javax.xml.stream.XMLStreamException; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMOutputFormat; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.description.WSDL2Constants; +import org.apache.axis2.handlers.AbstractHandler; +import org.apache.axis2.java.security.AccessController; +import org.apache.axis2.transport.TransportSender; +import org.apache.axis2.transport.http.HTTPTransportUtils; +import org.apache.axis2.transport.http.SOAPMessageFormatter; +import org.apache.axis2.transport.jms.JMSConstants; +import org.apache.axis2.transport.jms.JMSUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * The TransportSender for JMS + */ +public class JMSSender extends AbstractHandler implements TransportSender { + + private static final Log log = LogFactory.getLog(JMSSender.class); + + /** + * Performs the actual sending of the JMS message + * + * @param msgContext the message context to be sent + * @throws AxisFault on exception + */ + public InvocationResponse invoke(MessageContext msgContext) throws AxisFault { + + log.debug("JMSSender invoke()"); + + /* Added due to possible bug in Axis2, MTOM enablement is based on msgContext.isDoingMTOM + * However msgContext.isDoingMTOM will always return false unless set programmatically. + * HTTP sets this boolean programmatically by looking up whether enableMTOM has been set + * in axis2.xml or as an option on the client. + */ + msgContext.setDoingMTOM(HTTPTransportUtils.doWriteMTOM(msgContext)); + + JMSOutTransportInfo transportInfo = null; + String targetAddress = null; + + // is there a transport url? which may be different from the WS-A To.. + targetAddress = (String) msgContext.getProperty( + Constants.Configuration.TRANSPORT_URL); + + if (targetAddress != null) { + transportInfo = new JMSOutTransportInfo(targetAddress); + } else if (targetAddress == null && msgContext.getTo() != null && + !msgContext.getTo().hasAnonymousAddress()) { + targetAddress = msgContext.getTo().getAddress(); + + if (!msgContext.getTo().hasNoneAddress()) { + transportInfo = new JMSOutTransportInfo(targetAddress); + } else { + //Don't send the message. + return InvocationResponse.CONTINUE; + } + } else if (msgContext.isServerSide()) { + // get the jms ReplyTo + transportInfo = (JMSOutTransportInfo) + msgContext.getProperty(Constants.OUT_TRANSPORT_INFO); + } + + // get the ConnectionFactory to be used for the send + ConnectionFactory connectionFac = transportInfo.getConnectionFactory(); + + Connection con = null; + try { + String user = transportInfo.getConnectionFactoryUser(); + String password = transportInfo.getConnectionFactoryPassword(); + + if ((user == null) || (password == null)){ + // Use the OS username and credentials + con = connectionFac.createConnection(); + } else{ + // use an explicit username and password + con = connectionFac.createConnection(user, password); + } + + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Message message = createJMSMessage(msgContext, session); + + // get the JMS destination for the message being sent + Destination dest = transportInfo.getDestination(); + + if (dest == null) { + if (targetAddress != null) { + + // if it does not exist, create it + String name = JMSUtils.getDestination(targetAddress); + if (log.isDebugEnabled()) { + log.debug("Creating JMS Destination : " + name); + } + + try { + dest = session.createQueue(name); + } catch (JMSException e) { + handleException("Error creating destination Queue : " + name, e); + } + } else { + handleException("Cannot send reply to unknown JMS Destination"); + } + } + + MessageProducer producer = session.createProducer(dest); + Destination replyDest = null; + + boolean waitForResponse = + msgContext.getOperationContext() != null && + WSDL2Constants.MEP_URI_OUT_IN.equals( + msgContext.getOperationContext().getAxisOperation().getMessageExchangePattern()); + + if (waitForResponse) { + String replyToJNDIName = (String) msgContext.getProperty(JMSConstants.REPLY_PARAM); + if (replyToJNDIName != null && replyToJNDIName.length() > 0) { + Context context = null; + final Hashtable props = JMSUtils.getProperties(targetAddress); + try { + try { + context = (Context) AccessController.doPrivileged( + new PrivilegedExceptionAction() { + public Object run() throws NamingException{ + return new InitialContext(props); + } + } + ) + ; + } catch (PrivilegedActionException e) { + throw (NamingException) e.getException(); + } + } catch (NamingException e) { + handleException("Could not get the initial context", e); + } + + try { + replyDest = (Destination) context.lookup(replyToJNDIName); + + } catch (NameNotFoundException e) { + log.warn("Cannot get or lookup JMS response destination : " + + replyToJNDIName + " : " + e.getMessage() + + ". Attempting to create a Queue named : " + replyToJNDIName); + replyDest = session.createQueue(replyToJNDIName); + + } catch (NamingException e) { + handleException("Cannot get JMS response destination : " + + replyToJNDIName + " : ", e); + } + + } else { + try { + // create temporary queue to receive reply + replyDest = session.createTemporaryQueue(); + } catch (JMSException e) { + handleException("Error creating temporary queue for response"); + } + } + message.setJMSReplyTo(replyDest); + if (log.isDebugEnabled()) { + log.debug("Expecting a response to JMS Destination : " + + (replyDest instanceof Queue ? + ((Queue) replyDest).getQueueName() : ((Topic) replyDest).getTopicName())); + } + } + + try { + log.debug("[" + (msgContext.isServerSide() ? "Server" : "Client") + + "]Sending message to destination : " + dest); + producer.send(message); + producer.close(); + + } catch (JMSException e) { + handleException("Error sending JMS message to destination : " + + dest.toString(), e); + } + + if (waitForResponse) { + try { + // wait for reply + MessageConsumer consumer = session.createConsumer(replyDest); + + long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; + Long waitReply = (Long) msgContext.getProperty(JMSConstants.JMS_WAIT_REPLY); + if (waitReply != null) { + timeout = waitReply.longValue(); + } + + log.debug("Waiting for a maximum of " + timeout + + "ms for a response message to destination : " + replyDest); + con.start(); + Message reply = consumer.receive(timeout); + + if (reply != null) { + msgContext.setProperty(MessageContext.TRANSPORT_IN, + JMSUtils.getInputStream(reply)); + } else { + log.warn("Did not receive a JMS response within " + + timeout + " ms to destination : " + dest); + } + + } catch (JMSException e) { + handleException("Error reading response from temporary " + + "queue : " + replyDest, e); + } + } + } catch (JMSException e) { + handleException("Error preparing to send message to destination", e); + + } finally { + if (con != null) { + try { + con.close(); // closes all sessions, producers, temp Q's etc + } catch (JMSException e) { + } // ignore + } + } + return InvocationResponse.CONTINUE; + } + + public void cleanup(MessageContext msgContext) throws AxisFault { + // do nothing + } + + public void init(ConfigurationContext confContext, + TransportOutDescription transportOut) throws AxisFault { + // do nothing + } + + public void stop() { + // do nothing + } + + /** + * Create a JMS Message from the given MessageContext and using the given + * session + * + * @param msgContext the MessageContext + * @param session the JMS session + * @return a JMS message from the context and session + * @throws JMSException on exception + */ + private Message createJMSMessage(MessageContext msgContext, Session session) + throws JMSException { + + Message message = null; + String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); + + OMElement msgElement = msgContext.getEnvelope(); + if (msgContext.isDoingREST()) { + msgElement = msgContext.getEnvelope().getBody().getFirstElement(); + } + + if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType)) { + + message = session.createBytesMessage(); + BytesMessage bytesMsg = (BytesMessage) message; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OMOutputFormat format = new OMOutputFormat(); + + /* Added due to possible bug in Axis2, OMOutputFormat's boolean isSOAP11 defaults to true. + * This means that if left untouched all JMS byte messages must be SOAP 1.1 + * We set the boolean here based on the messageContexts value, which is assertained from + * the soap namespace used. This is what HTTP does also. + */ + format.setSOAP11(msgContext.isSOAP11()); + format.setCharSetEncoding( + getProperty(msgContext, Constants.Configuration.CHARACTER_SET_ENCODING)); + format.setDoOptimize(msgContext.isDoingMTOM()); + try { + msgElement.serializeAndConsume(baos, format); + baos.flush(); + } catch (XMLStreamException e) { + handleException("XML serialization error creating BytesMessage", e); + } catch (IOException e) { + handleException("IO Error while creating BytesMessage", e); + } + bytesMsg.writeBytes(baos.toByteArray()); + + /* Added due to possible bug in Axis2, the content type is never set for a JMS byte message. This + * goes unnoticed when MTOM is not used, as the server can handle the message. However once MTOM + * is used a contentType of multipart/related is required. + */ + bytesMsg.setStringProperty(JMSConstants.CONTENT_TYPE, + new SOAPMessageFormatter().getContentType(msgContext, format, null)); + } else { + message = session.createTextMessage(); // default + TextMessage txtMsg = (TextMessage) message; + txtMsg.setText(msgElement.toString()); + } + + // set the JMS correlation ID if specified + String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID); + if (correlationId == null && msgContext.getRelatesTo() != null) { + correlationId = msgContext.getRelatesTo().getValue(); + } + + if (correlationId != null) { + message.setJMSCorrelationID(correlationId); + } + + if (msgContext.isServerSide()) { + // set SOAP Action and context type as properties on the JMS message + setProperty(message, msgContext, JMSConstants.SOAPACTION); + setProperty(message, msgContext, JMSConstants.CONTENT_TYPE); + } else { + String action = msgContext.getOptions().getAction(); + if (action != null) { + message.setStringProperty(JMSConstants.SOAPACTION, action); + } + } + + return message; + } + + private void setProperty(Message message, MessageContext msgCtx, String key) { + + String value = getProperty(msgCtx, key); + if (value != null) { + try { + message.setStringProperty(key, value); + } catch (JMSException e) { + log.warn("Couldn't set message property : " + key + " = " + value, e); + } + } + } + + private String getProperty(MessageContext mc, String key) { + return (String) mc.getProperty(key); + } + + private static void handleException(String s) { + log.error(s); + throw new AxisJMSException(s); + } + + private static void handleException(String s, Exception e) { + log.error(s, e); + throw new AxisJMSException(s, e); + } + +} diff --git a/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/README b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/README new file mode 100644 index 0000000000..5df1751298 --- /dev/null +++ b/branches/sca-java-1.5/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/README @@ -0,0 +1,14 @@ +The classes in this package are a copy of the same classes from the +Axis2 package org.apache.axis2.transport.jms in the Axis2 1.4.1 release. + +The only change is in the listenOnDestination method in JMSConnectionFactory +to use Tuscany threads instead of the setMessageListener call approach when +running in a JEE container where setMessageListener is prohibited. There are +several classes copied in this Tuscany package as many of the constructors +and methods are not public so we can't just subclass to fix the problem. + +In Axis2 1.5 and the new separately released JMS transport will fix this +problem so when we move up to that in Tuscany we can get rid of this package. + + + -- cgit v1.2.3