From fdd5b43d3c139cf2cbd1655d2efbfaf9032a5b5e Mon Sep 17 00:00:00 2001 From: lresende Date: Wed, 11 Nov 2009 23:14:18 +0000 Subject: Moving 1.x branches git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@835145 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/ws/axis2/jms/AxisJMSException.java | 35 -- .../binding/ws/axis2/jms/JMSConnectionFactory.java | 605 --------------------- .../sca/binding/ws/axis2/jms/JMSListener.java | 527 ------------------ .../binding/ws/axis2/jms/JMSMessageReceiver.java | 270 --------- .../binding/ws/axis2/jms/JMSOutTransportInfo.java | 220 -------- .../sca/binding/ws/axis2/jms/JMSSender.java | 389 ------------- .../apache/tuscany/sca/binding/ws/axis2/jms/README | 14 - 7 files changed, 2060 deletions(-) delete mode 100644 branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java delete mode 100644 branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java delete mode 100644 branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java delete mode 100644 branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java delete mode 100644 branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java delete mode 100644 branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java delete mode 100644 branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/README (limited to 'branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms') diff --git a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java b/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java deleted file mode 100644 index 09a1960ce4..0000000000 --- a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/AxisJMSException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -public class AxisJMSException extends RuntimeException { - - AxisJMSException() { - super(); - } - - AxisJMSException(String msg) { - super(msg); - } - - AxisJMSException(String msg, Exception e) { - super(msg, e); - } -} diff --git a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java b/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java deleted file mode 100644 index a96ec0b1c4..0000000000 --- a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSConnectionFactory.java +++ /dev/null @@ -1,605 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.HashMap; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; - -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.transport.jms.JMSConstants; -import org.apache.axis2.transport.jms.JMSUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.work.WorkScheduler; - -/** - * Encapsulate a JMS Connection factory definition within an Axis2.xml - *

- * More than one JMS connection factory could be defined within an Axis2 XML - * specifying the JMSListener as the transportReceiver. - *

- * These connection factories are created at the initialization of the - * transportReceiver, and any service interested in using any of these could - * specify the name of the factory and the destination through Parameters named - * JMSConstants.CONFAC_PARAM and JMSConstants.DEST_PARAM as shown below. - *

- * myQueueConnectionFactory - * TestQueue - *

- * If a connection factory is defined by a parameter named - * JMSConstants.DEFAULT_CONFAC_NAME in the Axis2 XML, services which does not - * explicitly specify a connection factory will be defaulted to it - if it is - * defined in the Axis2 configuration. - *

- * e.g. - * - * - * org.apache.activemq.jndi.ActiveMQInitialContextFactory - * tcp://localhost:61616 - * TopicConnectionFactory - * myTopicOne, myTopicTwo - * - * - * org.apache.activemq.jndi.ActiveMQInitialContextFactory - * tcp://localhost:61616 - * QueueConnectionFactory - * myQueueOne, myQueueTwo - * - * - * org.apache.activemq.jndi.ActiveMQInitialContextFactory - * tcp://localhost:61616 - * ConnectionFactory - * myDestinationOne, myDestinationTwo - * - * - */ -public class JMSConnectionFactory { - - private static final Log log = LogFactory.getLog(JMSConnectionFactory.class); - - /** - * The name used for the connection factory definition within Axis2 - */ - private String name = null; - /** - * The JNDI name of the actual connection factory - */ - private String jndiName = null; - /** - * The JNDI name of the actual connection factory username - */ - private String jndiUser = null; - /** - * The JNDI name of the actual connection factory password - */ - private String jndiPass = null; - /** - * Map of destination JNDI names to service names - */ - private Map serviceJNDINameMapping = null; - /** - * Map of destinations to service names - */ - private Map serviceDestinationMapping = null; - /** - * The JMS Sessions listening for messages - */ - private Map jmsSessions = null; - /** - * Properties of the connection factory - */ - private Hashtable properties = null; - /** - * The JNDI Context used - */ - private Context context = null; - /** - * The actual ConnectionFactory instance held within - */ - private ConnectionFactory conFactory = null; - /** - * The JMS Connection is opened. - */ - private Connection connection = null; - /** - * The JMS Message receiver for this connection factory - */ - private JMSMessageReceiver msgRcvr = null; - /** - * The actual password for the connection factory after retrieval from JNDI. - * If this is not supplied, the OS username will be used by default - */ - private String user = null; - /** - * The actual password for the connection factory after retrieval from JNDI. - * If this is not supplied, the OS credentials will be used by default - */ - private String pass = null; - - private WorkScheduler workScheduler; - private boolean consumerRunning; - - /** - * Create a JMSConnectionFactory for the given Axis2 name and JNDI name - * - * @param name the local Axis2 name of the connection factory - * @param jndiName the JNDI name of the actual connection factory used - */ - JMSConnectionFactory(String name, String jndiName, WorkScheduler workScheduler) { - this.name = name; - this.jndiName = jndiName; - serviceJNDINameMapping = new HashMap(); - serviceDestinationMapping = new HashMap(); - properties = new Hashtable(); - jmsSessions = new HashMap(); - this.workScheduler = workScheduler; - } - - /** - * Create a JMSConnectionFactory for the given Axis2 name - * - * @param name the local Axis2 name of the connection factory - */ - JMSConnectionFactory(String name, WorkScheduler workScheduler) { - this(name, null, workScheduler); - } - - /** - * Connect to the actual JMS connection factory specified by the JNDI name - * - * @throws NamingException if the connection factory cannot be found - */ - public void connect() throws NamingException { - if (context == null) { - createInitialContext(); - } - conFactory = (ConnectionFactory) context.lookup(jndiName); - - if (jndiUser != null) - user = (String ) context.lookup(jndiUser); - - if (jndiPass != null) - pass = (String ) context.lookup(jndiPass); - - log.debug("Connected to the actual connection factory : " + jndiName); - } - - /** - * Creates the initial context using the set properties - * - * @throws NamingException - */ - private void createInitialContext() throws NamingException { - context = new InitialContext(properties); - } - - /** - * Set the JNDI connection factory name - * - * @param jndiName - */ - public void setJndiName(String jndiName) { - this.jndiName = jndiName; - } - - /** - * Get the JNDI name of the actual factory username - * - * @return the jndi name of the actual connection factory username - */ - public void setJndiUser(String jndiUser) { - this.jndiUser = jndiUser; - } - - /** - * Get the JNDI name of the actual factory password - * - * @return the jndi name of the actual connection factory password - */ - public void setJndiPass(String jndiPass) { - this.jndiPass = jndiPass; - } - - /** - * Add a listen destination on this connection factory on behalf of the given service - * - * @param destinationJndi destination JNDI name - * @param serviceName the service to which it belongs - */ - public void addDestination(String destinationJndi, String serviceName) { - - serviceJNDINameMapping.put(destinationJndi, serviceName); - String destinationName = getDestinationName(destinationJndi); - - if (destinationName == null) { - log.warn("JMS Destination with JNDI name : " + destinationJndi + " does not exist"); - - Connection con = null; - try { - if ((jndiUser == null) || (jndiPass == null)){ - // Use the OS username and credentials - con = conFactory.createConnection(); - } else{ - // use an explicit username and password - con = conFactory.createConnection(user, pass); - } - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(destinationJndi); - destinationName = queue.getQueueName(); - log.warn("JMS Destination with JNDI name : " + destinationJndi + " created"); - - } catch (JMSException e) { - log.error("Unable to create a Destination with JNDI name : " + destinationJndi, e); - // mark service as faulty - JMSUtils.markServiceAsFaulty( - (String) serviceJNDINameMapping.get(destinationJndi), - "Error looking up JMS destination : " + destinationJndi, - msgRcvr.getAxisConf().getAxisConfiguration()); - - } finally { - if (con != null) { - try { - con.close(); - } catch (JMSException ignore) {} - } - } - } - - serviceDestinationMapping.put(destinationName, serviceName); - log.info("Mapping JNDI name : " + destinationJndi + " and JMS Destination name : " + - destinationName + " against service : " + serviceName); - } - - /** - * Remove listen destination on this connection factory - * - * @param destinationJndi the JMS destination to be removed - * @throws if an error occurs trying to stop listening for messages before removal - */ - public void removeDestination(String destinationJndi) throws JMSException { - // find and save provider specific Destination name before we delete - String providerSpecificDestination = getDestinationName(destinationJndi); - stoplistenOnDestination(destinationJndi); - serviceJNDINameMapping.remove(destinationJndi); - if (providerSpecificDestination != null) { - serviceDestinationMapping.remove(providerSpecificDestination); - } - } - - /** - * Add a property to the connection factory - * - * @param key - * @param value - */ - public void addProperty(String key, String value) { - properties.put(key, value); - } - - /** - * Return the name of the connection factory - * - * @return the Axis2 name of this factory - */ - public String getName() { - return name; - } - - /** - * Get the JNDI name of the actual factory - * - * @return the jndi name of the actual connection factory - */ - public String getJndiName() { - return jndiName; - } - - /** - * Get the JNDI name of the actual factory username - * - * @return the jndi name of the actual connection factory username - */ - public String getJndiUser() { - return jndiUser; - } - - /** - * Get the JNDI name of the actual factory password - * - * @return the jndi name of the actual connection factory password - */ - public String getJndiPass() { - return jndiPass; - } - - - /** - * This is the real password for the connection factory after the JNDI lookup - * - * @return the real password for the connection factory after the JNDI lookup - */ - public String getPass() { - return pass; - } - - /** - * This is the real username for the connection factory after the JNDI lookup - * - * @return the eal username for the connection factory after the JNDI lookup - */ - public String getUser() { - return user; - } - - /** - * Get the actual underlying connection factory - * - * @return actual connection factory - */ - public ConnectionFactory getConFactory() { - return conFactory; - } - - /** - * Get the list of destinations (JNDI) associated with this connection factory - * - * @return destinations to service maping - */ - public Map getDestinations() { - return serviceJNDINameMapping; - } - - /** - * Get the connection factory properties - * - * @return properties - */ - public Hashtable getProperties() { - return properties; - } - - /** - * Begin listening for messages on the list of destinations associated - * with this connection factory. (Called during Axis2 initialization of - * the Transport receivers) - * - * @param msgRcvr the message receiver which will process received messages - * @throws JMSException on exceptions - */ - public void listen(JMSMessageReceiver msgRcvr) throws JMSException { - - // save a reference to the message receiver - this.msgRcvr = msgRcvr; - - log.debug("Connection factory : " + name + " initializing..."); - - if (conFactory == null || context == null) { - handleException( - "Connection factory must be 'connected' before listening"); - } else { - try { - if ((jndiUser == null) || (jndiPass == null)){ - // User the OS username and credentials - connection = conFactory.createConnection(); - } else{ - // use an explicit username and password - connection = conFactory.createConnection(user, pass); - } - } catch (JMSException e) { - handleException("Error creating a JMS connection using the " + - "factory : " + jndiName, e); - } - } - - Iterator iter = serviceJNDINameMapping.keySet().iterator(); - while (iter.hasNext()) { - String destinationJndi = (String) iter.next(); - listenOnDestination(destinationJndi); - } - - // start the connection - if (!consumerRunning) { - connection.start(); - } - log.info("Connection factory : " + name + " initialized..."); - } - - /** - * Listen on the given destination from this connection factory. Used to - * start listening on a destination associated with a newly deployed service - * - * @param destinationJndi the JMS destination to listen on - * @throws JMSException on exception - */ - public void listenOnDestination(String destinationJndi) throws JMSException { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = null; - try { - Object o = context.lookup(destinationJndi); - destination = (Destination) o; - - } catch (NameNotFoundException e) { - log.warn("Cannot find destination : " + destinationJndi + - " Creating a Queue with this name"); - destination = session.createQueue(destinationJndi); - - } catch (NamingException e) { - log.warn("Error looking up destination : " + destinationJndi, e); - // mark service as faulty - JMSUtils.markServiceAsFaulty( - (String) serviceJNDINameMapping.get(destinationJndi), - "Error looking up JMS destination : " + destinationJndi, - this.msgRcvr.getAxisConf().getAxisConfiguration()); - } - - MessageConsumer consumer = session.createConsumer(destination); -// consumer.setMessageListener(this.msgRcvr); replace with new Tuscany method: - registerMessageReceiver(consumer, this.msgRcvr); - jmsSessions.put(destinationJndi, session); - } - - private void registerMessageReceiver(final MessageConsumer consumer, final JMSMessageReceiver messageReceiver) throws JMSException { - - try { - - consumer.setMessageListener(messageReceiver); - - } catch (javax.jms.JMSException e) { - - // setMessageListener not allowed in JEE container so use Tuscany threads - - connection.start(); - consumerRunning = true; - - workScheduler.scheduleWork(new Runnable() { - - public void run() { - try { - while (consumerRunning) { - final Message msg = consumer.receive(); - if (msg != null) { - workScheduler.scheduleWork(new Runnable() { - public void run() { - try { - messageReceiver.onMessage(msg); - } catch (Exception e) { - log.error("Exception on message receiver thread", e); - } - } - }); - } - } - } catch (Exception e) { - log.error("Exception on consumer receive thread", e); - } - } - }); - } - } - - /** - * Stop listening on the given destination - for undeployment of services - * - * @param destinationJndi the JNDI name of the JMS destination - * @throws JMSException on exception - */ - private void stoplistenOnDestination(String destinationJndi) throws JMSException { - ((Session) jmsSessions.get(destinationJndi)).close(); - } - - /** - * Return the service name using this destination - * - * @param destination the destination name - * @return the service which uses the given destination, or null - */ - public String getServiceNameForDestination(String destination) { - - return (String) serviceJNDINameMapping.get(destination); - } - - /** - * Close all connections, sessions etc.. and stop this connection factory - */ - public void stop() { - try { - consumerRunning = false; - connection.close(); - } catch (JMSException e) { - log.warn("Error shutting down connection factory : " + name, e); - } - } - - /** - * Return the provider specific Destination name if any for the destination with the given - * JNDI name - * @param destinationJndi the JNDI name of the destination - * @return the provider specific Destination name or null if cannot be found - */ - public String getDestinationName(String destinationJndi) { - try { - Destination destination = (Destination) context.lookup(destinationJndi); - if (destination != null && destination instanceof Queue) { - return ((Queue) destination).getQueueName(); - } else if (destination != null && destination instanceof Topic) { - return ((Topic) destination).getTopicName(); - } - } catch (JMSException e) { - log.warn("Error reading provider specific JMS destination name for destination " + - "with JNDI name : " + destinationJndi, e); - } catch (NamingException e) { - log.warn("Error looking up destination with JNDI name : " + destinationJndi + - " to map its corresponding provider specific Destination name"); - } - return null; - } - - /** - * Return the EPR for the JMS Destination with the given JNDI name and using - * this connection factory - * @param destination the JNDI name of the JMS Destionation - * @return the EPR - */ - public EndpointReference getEPRForDestination(String destination) { - - StringBuffer sb = new StringBuffer(); - sb.append(JMSConstants.JMS_PREFIX).append(destination); - sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). - append("=").append(getJndiName()); - Iterator props = getProperties().keySet().iterator(); - while (props.hasNext()) { - String key = (String) props.next(); - String value = (String) getProperties().get(key); - sb.append("&").append(key).append("=").append(value); - } - - return new EndpointReference(sb.toString()); - } - - public String getServiceByDestination(String destinationName) { - return (String) serviceDestinationMapping.get(destinationName); - } - - private void handleException(String msg) throws AxisJMSException { - log.error(msg); - throw new AxisJMSException(msg); - } - - private void handleException(String msg, Exception e) throws AxisJMSException { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } -} diff --git a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java deleted file mode 100644 index 08190fb57c..0000000000 --- a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java +++ /dev/null @@ -1,527 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.StringTokenizer; - -import javax.jms.JMSException; -import javax.naming.Context; -import javax.naming.NamingException; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.context.SessionContext; -import org.apache.axis2.description.AxisModule; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.AxisServiceGroup; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.ParameterIncludeImpl; -import org.apache.axis2.description.TransportInDescription; -import org.apache.axis2.engine.AxisConfiguration; -import org.apache.axis2.engine.AxisEvent; -import org.apache.axis2.engine.AxisObserver; -import org.apache.axis2.transport.TransportListener; -import org.apache.axis2.transport.jms.JMSConstants; -import org.apache.axis2.transport.jms.JMSUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.work.WorkScheduler; - -import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; -import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; -import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; -import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; - -/** - * The JMS Transport listener implementation. A JMS Listner will hold one or - * more JMS connection factories, which would be created at initialization - * time. This implementation does not support the creation of connection - * factories at runtime. This JMS Listener registers with Axis to be notified - * of service deployment/undeployment/start and stop, and enables or disables - * listening for messages on the destinations as appropriate. - *

- * A Service could state the JMS connection factory name and the destination - * name for use as Parameters in its services.xml as shown in the example - * below. If the connection name was not specified, it will use the connection - * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a - * factory is defined in the Axis2.xml. If the destination name is not specified - * it will default to a JMS queue by the name of the service. If the destination - * should be a Topic, it should be created on the JMS implementation, and - * specified in the services.xml of the service. - *

- * - * myTopicConnectionFactory - * - * dynamicTopics/something.TestTopic - */ -public class JMSListener implements TransportListener { - - private static final Log log = LogFactory.getLog(JMSListener.class); - - /** - * The maximum number of threads used for the worker thread pool - */ - private static final int WORKERS_MAX_THREADS = 100; - /** - * The keep alive time of an idle worker thread - */ - private static final long WORKER_KEEP_ALIVE = 60L; - /** - * The worker thread timeout time unit - */ - private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; - - /** - * A Map containing the connection factories managed by this, keyed by name - */ - private Map connectionFactories = new HashMap(); - /** - * A Map of service name to the JMS EPR addresses - */ - private Map serviceNameToEprMap = new HashMap(); - /** - * The Axis2 Configuration context - */ - private ConfigurationContext configCtx = null; - - private ExecutorService workerPool; - - private WorkScheduler workScheduler; - - public JMSListener(WorkScheduler workScheduler) { - this.workScheduler = workScheduler; - } - - /** - * This is the TransportListener initialization method invoked by Axis2 - * - * @param axisConf the Axis configuration context - * @param transprtIn the TransportIn description - */ - public void init(ConfigurationContext axisConf, - TransportInDescription transprtIn) { - - // save reference to the configuration context - this.configCtx = axisConf; - - // initialize the defined connection factories - initializeConnectionFactories(transprtIn); - - // if no connection factories are defined, we cannot listen - if (connectionFactories.isEmpty()) { - log.warn("No JMS connection factories are defined." + - "Will not listen for any JMS messages"); - return; - } - - // iterate through deployed services and validate connection factory - // names, and mark services as faulty where appropriate. - Iterator services = - axisConf.getAxisConfiguration().getServices().values().iterator(); - - while (services.hasNext()) { - AxisService service = (AxisService) services.next(); - if (JMSUtils.isJMSService(service)) { - processService(service); - } - } - - // register to receive updates on services for lifetime management - axisConf.getAxisConfiguration().addObservers(new JMSAxisObserver()); - - log.info("JMS Transport Receiver (Listener) initialized..."); - } - - - /** - * Prepare to listen for JMS messages on behalf of this service - * - * @param service - */ - private void processService(AxisService service) { - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - String msg = "Service " + service.getName() + " does not specify" + - "a JMS connection factory or refers to an invalid factory. " + - "This service is being marked as faulty and will not be " + - "available over the JMS transport"; - log.warn(msg); - JMSUtils.markServiceAsFaulty( - service.getName(), msg, service.getAxisConfiguration()); - return; - } - - String destination = JMSUtils.getDestination(service); - - // compute service EPR and keep for later use - serviceNameToEprMap.put(service.getName(), getEPR(cf, destination)); - - // add the specified or implicit destination of this service - // to its connection factory - cf.addDestination(destination, service.getName()); - } - - /** - * Return the connection factory name for this service. If this service - * refers to an invalid factory or defaults to a non-existent default - * factory, this returns null - * - * @param service the AxisService - * @return the JMSConnectionFactory to be used, or null if reference is invalid - */ - private JMSConnectionFactory getConnectionFactory(AxisService service) { - Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM); - - // validate connection factory name (specified or default) - if (conFacParam != null) { - String conFac = (String) conFacParam.getValue(); - if (connectionFactories.containsKey(conFac)) { - return (JMSConnectionFactory) connectionFactories.get(conFac); - } else { - return null; - } - - } else if (connectionFactories.containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) { - return (JMSConnectionFactory) connectionFactories. - get(JMSConstants.DEFAULT_CONFAC_NAME); - - } else { - return null; - } - } - - /** - * Initialize the defined connection factories, parsing the TransportIn - * descriptions - * - * @param transprtIn The Axis2 Transport in for the JMS - */ - private void initializeConnectionFactories(TransportInDescription transprtIn) { - // iterate through all defined connection factories - Iterator conFacIter = transprtIn.getParameters().iterator(); - - while (conFacIter.hasNext()) { - - Parameter param = (Parameter) conFacIter.next(); - JMSConnectionFactory jmsConFactory = - new JMSConnectionFactory(param.getName(), workScheduler); - - ParameterIncludeImpl pi = new ParameterIncludeImpl(); - try { - pi.deserializeParameters((OMElement) param.getValue()); - } catch (AxisFault axisFault) { - handleException("Error reading Parameters for JMS connection " + - "factory" + jmsConFactory.getName(), axisFault); - } - - // read connection facotry properties - Iterator params = pi.getParameters().iterator(); - - while (params.hasNext()) { - Parameter p = (Parameter) params.next(); - - if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) { - jmsConFactory.addProperty( - Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue()); - } else if (Context.PROVIDER_URL.equals(p.getName())) { - jmsConFactory.addProperty( - Context.PROVIDER_URL, (String) p.getValue()); - } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) { - jmsConFactory.addProperty( - Context.SECURITY_PRINCIPAL, (String) p.getValue()); - } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) { - jmsConFactory.addProperty( - Context.SECURITY_CREDENTIALS, (String) p.getValue()); - } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) { - jmsConFactory.setJndiName((String) p.getValue()); - } else if (JMSConstants.CONFAC_JNDI_NAME_USER.equals(p.getName())) { - jmsConFactory.setJndiUser((String) p.getValue()); - } else if (JMSConstants.CONFAC_JNDI_NAME_PASS.equals(p.getName())) { - jmsConFactory.setJndiPass((String) p.getValue()); - } else if (JMSConstants.DEST_PARAM.equals(p.getName())) { - StringTokenizer st = - new StringTokenizer((String) p.getValue(), " ,"); - while (st.hasMoreTokens()) { - jmsConFactory.addDestination(st.nextToken(), null); - } - } - } - - // connect to the actual connection factory - try { - jmsConFactory.connect(); - connectionFactories.put(jmsConFactory.getName(), jmsConFactory); - } catch (NamingException e) { - handleException("Error connecting to JMS connection factory : " + - jmsConFactory.getJndiName(), e); - } - } - } - - /** - * Get the EPR for the given JMS connection factory and destination - * the form of the URL is - * jms:/?[=&]* - * - * @param cf the Axis2 JMS connection factory - * @param destination the JNDI name of the destination - * @return the EPR as a String - */ - private static String getEPR(JMSConnectionFactory cf, String destination) { - StringBuffer sb = new StringBuffer(); - sb.append(JMSConstants.JMS_PREFIX).append(destination); - sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). - append("=").append(cf.getJndiName()); - Iterator props = cf.getProperties().keySet().iterator(); - while (props.hasNext()) { - String key = (String) props.next(); - String value = (String) cf.getProperties().get(key); - sb.append("&").append(key).append("=").append(value); - } - return sb.toString(); - } - - /** - * Start this JMS Listener (Transport Listener) - * - * @throws AxisFault - */ - public void start() throws AxisFault { - // create thread pool of workers - workerPool = new ThreadPoolExecutor( - 1, - WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT, - new LinkedBlockingQueue(), - new org.apache.axis2.util.threadpool.DefaultThreadFactory( - new ThreadGroup("JMS Worker thread group"), - "JMSWorker")); - - Iterator iter = connectionFactories.values().iterator(); - while (iter.hasNext()) { - JMSConnectionFactory conFac = (JMSConnectionFactory) iter.next(); - JMSMessageReceiver msgRcvr = - new JMSMessageReceiver(conFac, workerPool, configCtx); - - try { - conFac.listen(msgRcvr); - } catch (JMSException e) { - handleException("Error starting connection factory : " + - conFac.getName(), e); - } - } - } - - /** - * Stop this transport listener and shutdown all of the connection factories - */ - public void stop() { - Iterator iter = connectionFactories.values().iterator(); - while (iter.hasNext()) { - ((JMSConnectionFactory) iter.next()).stop(); - } - if (workerPool != null) { - workerPool.shutdown(); - } - } - - /** - * Returns EPRs for the given service and IP. (Picks up precomputed EPR) - * - * @param serviceName service name - * @param ip ignored - * @return the EPR for the service - * @throws AxisFault not used - */ - public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { - //Strip out the operation name - if (serviceName.indexOf('/') != -1) { - serviceName = serviceName.substring(0, serviceName.indexOf('/')); - } - - String endpointName = (String) serviceNameToEprMap.get(serviceName); - if (endpointName == null){ - if (serviceName.indexOf(".") != -1){ - serviceName = serviceName.substring(0, serviceName.indexOf(".")); - endpointName = (String) serviceNameToEprMap.get(serviceName); - } - } - return new EndpointReference[]{new EndpointReference(endpointName)}; - } - - /** - * Returns the EPR for the given service and IP. (Picks up precomputed EPR) - * - * @param serviceName service name - * @param ip ignored - * @return the EPR for the service - * @throws AxisFault not used - */ - public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault { - return getEPRsForService(serviceName, ip)[0]; - } - - /** - * Starts listening for messages on this service - * - * @param service the AxisService just deployed - */ - private void startListeningForService(AxisService service) { - processService(service); - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - String msg = "Service " + service.getName() + " does not specify" + - "a JMS connection factory or refers to an invalid factory." + - "This service is being marked as faulty and will not be " + - "available over the JMS transport"; - log.warn(msg); - JMSUtils.markServiceAsFaulty( - service.getName(), msg, service.getAxisConfiguration()); - return; - } - - String destination = JMSUtils.getDestination(service); - try { - cf.listenOnDestination(destination); - log.info("Started listening on destination : " + destination + - " for service " + service.getName()); - - } catch (JMSException e) { - handleException( - "Could not listen on JMS for service " + service.getName(), e); - JMSUtils.markServiceAsFaulty( - service.getName(), e.getMessage(), service.getAxisConfiguration()); - } - } - - /** - * Stops listening for messages for the service undeployed - * - * @param service the AxisService just undeployed - */ - private void stopListeningForService(AxisService service) { - - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - String msg = "Service " + service.getName() + " does not specify" + - "a JMS connection factory or refers to an invalid factory." + - "This service is being marked as faulty and will not be " + - "available over the JMS transport"; - log.warn(msg); - JMSUtils.markServiceAsFaulty( - service.getName(), msg, service.getAxisConfiguration()); - return; - } - - // remove from the serviceNameToEprMap - serviceNameToEprMap.remove(service.getName()); - - String destination = JMSUtils.getDestination(service); - try { - cf.removeDestination(destination); - } catch (JMSException e) { - handleException( - "Error while terminating listening on JMS destination : " + destination, e); - } - } - - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - /** - * An AxisObserver which will start listening for newly deployed services, - * and stop listening when services are undeployed. - */ - class JMSAxisObserver implements AxisObserver { - - // The initilization code will go here - public void init(AxisConfiguration axisConfig) { - } - - public void serviceUpdate(AxisEvent event, AxisService service) { - - if (JMSUtils.isJMSService(service)) { - switch (event.getEventType()) { - case AxisEvent.SERVICE_DEPLOY : - startListeningForService(service); - break; - case AxisEvent.SERVICE_REMOVE : - stopListeningForService(service); - break; - case AxisEvent.SERVICE_START : - startListeningForService(service); - break; - case AxisEvent.SERVICE_STOP : - stopListeningForService(service); - break; - } - } - } - - public void moduleUpdate(AxisEvent event, AxisModule module) { - } - - //-------------------------------------------------------- - public void addParameter(Parameter param) throws AxisFault { - } - - public void removeParameter(Parameter param) throws AxisFault { - } - - public void deserializeParameters(OMElement parameterElement) throws AxisFault { - } - - public Parameter getParameter(String name) { - return null; - } - - public ArrayList getParameters() { - return null; - } - - public boolean isParameterLocked(String parameterName) { - return false; - } - - public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup serviceGroup) { - } - } - - public ConfigurationContext getConfigurationContext() { - return this.configCtx; - } - - - public SessionContext getSessionContext(MessageContext messageContext) { - return null; - } - - public void destroy() { - this.configCtx = null; - } -} diff --git a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java deleted file mode 100644 index e9e9f04ab2..0000000000 --- a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.InputStream; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Queue; -import javax.jms.Topic; -import javax.naming.Context; -import javax.xml.stream.XMLStreamException; - -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.addressing.RelatesTo; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.engine.AxisEngine; -import org.apache.axis2.transport.jms.JMSConstants; -import org.apache.axis2.transport.jms.JMSUtils; -import org.apache.axis2.util.MessageContextBuilder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; - -/** - * This is the actual receiver which listens for and accepts JMS messages, and - * hands them over to be processed by a worker thread. An instance of this - * class is created for each JMSConnectionFactory, but all instances may and - * will share the same worker thread pool. - */ -public class JMSMessageReceiver implements MessageListener { - - private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); - - /** - * The thread pool of workers - */ - private Executor workerPool = null; - /** - * The Axis configuration context - */ - private ConfigurationContext axisConf = null; - /** - * A reference to the JMS Connection Factory - */ - private JMSConnectionFactory jmsConFac = null; - - /** - * Create a new JMSMessage receiver - * - * @param jmsConFac the JMS connection factory associated with - * @param workerPool the worker thead pool to be used - * @param axisConf the Axis2 configuration - */ - JMSMessageReceiver(JMSConnectionFactory jmsConFac, - Executor workerPool, ConfigurationContext axisConf) { - this.jmsConFac = jmsConFac; - this.workerPool = workerPool; - this.axisConf = axisConf; - } - - /** - * Return the Axis configuration - * - * @return the Axis configuration - */ - public ConfigurationContext getAxisConf() { - return axisConf; - } - - /** - * Set the worker thread pool - * - * @param workerPool the worker thead pool - */ - public void setWorkerPool(Executor workerPool) { - this.workerPool = workerPool; - } - - /** - * The entry point on the recepit of each JMS message - * - * @param message the JMS message received - */ - public void onMessage(Message message) { - // directly create a new worker and delegate processing - try { - if (log.isDebugEnabled()) { - StringBuffer sb = new StringBuffer(); - sb.append("Received JMS message to destination : " + message.getJMSDestination()); - sb.append("\nMessage ID : " + message.getJMSMessageID()); - sb.append("\nCorrelation ID : " + message.getJMSCorrelationID()); - sb.append("\nReplyTo ID : " + message.getJMSReplyTo()); - log.debug(sb.toString()); - } - } catch (JMSException e) { - e.printStackTrace(); - } - workerPool.execute(new Worker(message)); - } - - /** - * Creates an Axis MessageContext for the received JMS message and - * sets up the transports and various properties - * - * @param message the JMS message - * @return the Axis MessageContext - */ - private MessageContext createMessageContext(Message message) { - - InputStream in = JMSUtils.getInputStream(message); - - try { - MessageContext msgContext = axisConf.createMessageContext(); - - // get destination and create correct EPR - Destination dest = message.getJMSDestination(); - String destinationName = null; - if (dest instanceof Queue) { - destinationName = ((Queue) dest).getQueueName(); - } else if (dest instanceof Topic) { - destinationName = ((Topic) dest).getTopicName(); - } - - String serviceName = jmsConFac.getServiceByDestination(destinationName); - - // hack to get around the crazy Active MQ dynamic queue and topic issues - if (serviceName == null) { - String provider = (String) jmsConFac.getProperties().get( - Context.INITIAL_CONTEXT_FACTORY); - if (provider.indexOf("activemq") != -1) { - serviceName = jmsConFac.getServiceNameForDestination( - ((dest instanceof Queue ? - JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE : - JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC) + destinationName)); - } - } - - - if (serviceName != null) { - // set to bypass dispatching and handover directly to this service - msgContext.setAxisService( - axisConf.getAxisConfiguration().getService(serviceName)); - } - - msgContext.setIncomingTransportName(Constants.TRANSPORT_JMS); - msgContext.setTransportIn( - axisConf.getAxisConfiguration().getTransportIn(Constants.TRANSPORT_JMS)); - - msgContext.setTransportOut( - axisConf.getAxisConfiguration().getTransportOut(Constants.TRANSPORT_JMS)); - // the reply is assumed to be on the JMSReplyTo destination, using - // the same incoming connection factory - - - JMSOutTransportInfo jmsOutTransportInfo; - - if ((jmsConFac.getJndiUser() == null) || (jmsConFac.getJndiPass() == null)) - jmsOutTransportInfo= new JMSOutTransportInfo(jmsConFac.getConFactory(), message.getJMSReplyTo()); - else - jmsOutTransportInfo= new JMSOutTransportInfo(jmsConFac.getConFactory(), jmsConFac.getUser(), jmsConFac.getPass(), message.getJMSReplyTo()); - - msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, jmsOutTransportInfo); - - msgContext.setServerSide(true); - msgContext.setMessageID(message.getJMSMessageID()); - - Destination replyTo = message.getJMSReplyTo(); - String jndiDestinationName = null; - if (replyTo == null) { - Parameter param = msgContext.getAxisService().getParameter(JMSConstants.REPLY_PARAM); - if (param != null && param.getValue() != null) { - jndiDestinationName = (String) param.getValue(); - } - } - - if (jndiDestinationName != null) { - msgContext.setReplyTo(jmsConFac.getEPRForDestination(jndiDestinationName)); - } - - String soapAction = JMSUtils.getProperty(message, JMSConstants.SOAPACTION); - if (soapAction != null) { - msgContext.setSoapAction(soapAction); - } - - msgContext.setEnvelope( - JMSUtils.getSOAPEnvelope(message, msgContext, in)); - - // set correlation id - String correlationId = message.getJMSCorrelationID(); - if (correlationId != null && correlationId.length() > 0) { - msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, correlationId); - msgContext.setRelationships( - new RelatesTo[] { new RelatesTo(correlationId) }); - } - - return msgContext; - - } catch (JMSException e) { - handleException("JMS Exception reading the destination name", e); - } catch (AxisFault e) { - handleException("Axis fault creating the MessageContext", e); - } catch (XMLStreamException e) { - handleException("Error reading the SOAP envelope", e); - } - return null; - } - - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - /** - * The actual Runnable Worker implementation which will process the - * received JMS messages in the worker thread pool - */ - class Worker implements Runnable { - - private Message message = null; - - Worker(Message message) { - this.message = message; - } - - public void run() { - MessageContext msgCtx = createMessageContext(message); - - AxisEngine engine = new AxisEngine(msgCtx.getConfigurationContext()); - try { - log.debug("Delegating JMS message for processing to the Axis engine"); - try { - engine.receive(msgCtx); - } catch (AxisFault e) { - log.debug("Exception occured when receiving the SOAP message", e); - if (msgCtx.isServerSide()) { - MessageContext faultContext = MessageContextBuilder.createFaultMessageContext(msgCtx, e); - engine.sendFault(faultContext); - } - } - } catch (AxisFault af) { - log.error("JMS Worker [" + Thread.currentThread().getName() + - "] Encountered an Axis Fault : " + af.getMessage(), af); - } - } - } -} diff --git a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java b/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java deleted file mode 100644 index 5fa6542eec..0000000000 --- a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSOutTransportInfo.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.Hashtable; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; - -import org.apache.axis2.transport.OutTransportInfo; -import org.apache.axis2.transport.jms.JMSConstants; -import org.apache.axis2.transport.jms.JMSUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * The JMS OutTransportInfo - */ -public class JMSOutTransportInfo implements OutTransportInfo { - - private static final Log log = LogFactory.getLog(JMSOutTransportInfo.class); - - private ConnectionFactory connectionFactory = null; - private String connectionFactoryUser = null; - private String connectionFactoryPassword = null; - private Destination destination = null; - - private String contentType = null; - - /** - * Creates an instance using the given connection factory and destination - * - * @param connectionFactory the connection factory - * @param dest the destination - */ - JMSOutTransportInfo(ConnectionFactory connectionFactory, Destination dest) { - this.connectionFactory = connectionFactory; - this.destination = dest; - } - - /** - * Creates an instance using the given connection factory and destination - * - * @param connectionFactory the connection factory - * @param dest the destination - */ - JMSOutTransportInfo(ConnectionFactory connectionFactory, String connectionFactoryUser, String connectionFactoryPassword, Destination dest) { - this.connectionFactory = connectionFactory; - this.connectionFactoryUser = connectionFactoryUser; - this.connectionFactoryPassword = connectionFactoryPassword; - this.destination = dest; - } - - /** - * Creates and instance using the given URL - * - * @param url the URL - */ - JMSOutTransportInfo(String url) { - if (!url.startsWith(JMSConstants.JMS_PREFIX)) { - handleException("Invalid JMS URL : " + url + - " Must begin with the prefix " + JMSConstants.JMS_PREFIX); - } else { - Context context = null; - Hashtable props = JMSUtils.getProperties(url); - try { - context = new InitialContext(props); - } catch (NamingException e) { - handleException("Could not get the initial context", e); - } - - connectionFactory = getConnectionFactory(context, props); - connectionFactoryUser = getConnectionFactoryUser(context, props); - connectionFactoryPassword = getConnectionFactoryPass(context, props); - destination = getDestination(context, url); - } - } - - /** - * Get the referenced ConnectionFactory using the properties from the context - * - * @param context the context to use for lookup - * @param props the properties which contains the JNDI name of the factory - * @return the connection factory - */ - private ConnectionFactory getConnectionFactory(Context context, Hashtable props) { - try { - - String conFacJndiName = (String) props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM); - if (conFacJndiName != null) { - return (ConnectionFactory) context.lookup(conFacJndiName); - } else { - throw new NamingException( - "JMS Connection Factory JNDI name cannot be determined from url"); - } - } catch (NamingException e) { - handleException("Cannot get JMS Connection factory with props : " + props, e); - } - return null; - } - - /** - * Get the referenced ConnectionFactory Username (if supplied) using the properties from the context - * - * @param context the context to use for lookup - * @param props the properties which contains the JNDI name of the factory username - * @return the connection factory username (or null if one is not in the JNDI tree) - */ - private String getConnectionFactoryUser(Context context, Hashtable props) { - try { - - String conFacJndiUser = (String) props.get(JMSConstants.CONFAC_JNDI_NAME_USER); - if (conFacJndiUser != null) { - return (String) context.lookup(conFacJndiUser); - } else { - return null; - } - } catch (NamingException e) { - handleException("Cannot get JMS Connection factory username with props : " + props, e); - } - return null; - } - - /** - * Get the referenced ConnectionFactory Password (if supplied) using the properties from the context - * - * @param context the context to use for lookup - * @param props the properties which contains the JNDI name of the factory password - * @return the connection factory password (or null if one is not in the JNDI tree) - */ - private String getConnectionFactoryPass(Context context, Hashtable props) { - try { - - String conFacJndiPass = (String) props.get(JMSConstants.CONFAC_JNDI_NAME_PASS); - if (conFacJndiPass != null) { - return (String) context.lookup(conFacJndiPass); - } else { - return null; - } - } catch (NamingException e) { - handleException("Cannot get JMS Connection factory password with props : " + props, e); - } - return null; - } - - /** - * Get the JMS destination specified by the given URL from the context - * - * @param context the Context to lookup - * @param url URL - * @return the JMS destination, or null if it does not exist - */ - private Destination getDestination(Context context, String url) { - String destinationName = JMSUtils.getDestination(url); - try { - return (Destination) context.lookup(destinationName); - - } catch (NameNotFoundException e) { - log.warn("Cannot get or lookup JMS destination : " + destinationName + - " from url : " + url + " : " + e.getMessage()); - - } catch (NamingException e) { - handleException("Cannot get JMS destination : " + destinationName + - " from url : " + url, e); - } - return null; - } - - - private void handleException(String s) { - log.error(s); - throw new AxisJMSException(s); - } - - private void handleException(String s, Exception e) { - log.error(s, e); - throw new AxisJMSException(s, e); - } - - public Destination getDestination() { - return destination; - } - - public ConnectionFactory getConnectionFactory() { - return connectionFactory; - } - - public String getConnectionFactoryPassword() { - return connectionFactoryPassword; - } - - public String getConnectionFactoryUser() { - return connectionFactoryUser; - } - - public void setContentType(String contentType) { - this.contentType = contentType; - } -} diff --git a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java deleted file mode 100644 index 7caa045015..0000000000 --- a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java +++ /dev/null @@ -1,389 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.Hashtable; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NameNotFoundException; -import javax.naming.NamingException; -import javax.xml.stream.XMLStreamException; - -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMOutputFormat; -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.TransportOutDescription; -import org.apache.axis2.description.WSDL2Constants; -import org.apache.axis2.handlers.AbstractHandler; -import org.apache.axis2.java.security.AccessController; -import org.apache.axis2.transport.TransportSender; -import org.apache.axis2.transport.http.HTTPTransportUtils; -import org.apache.axis2.transport.http.SOAPMessageFormatter; -import org.apache.axis2.transport.jms.JMSConstants; -import org.apache.axis2.transport.jms.JMSUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * The TransportSender for JMS - */ -public class JMSSender extends AbstractHandler implements TransportSender { - - private static final Log log = LogFactory.getLog(JMSSender.class); - - /** - * Performs the actual sending of the JMS message - * - * @param msgContext the message context to be sent - * @throws AxisFault on exception - */ - public InvocationResponse invoke(MessageContext msgContext) throws AxisFault { - - log.debug("JMSSender invoke()"); - - /* Added due to possible bug in Axis2, MTOM enablement is based on msgContext.isDoingMTOM - * However msgContext.isDoingMTOM will always return false unless set programmatically. - * HTTP sets this boolean programmatically by looking up whether enableMTOM has been set - * in axis2.xml or as an option on the client. - */ - msgContext.setDoingMTOM(HTTPTransportUtils.doWriteMTOM(msgContext)); - - JMSOutTransportInfo transportInfo = null; - String targetAddress = null; - - // is there a transport url? which may be different from the WS-A To.. - targetAddress = (String) msgContext.getProperty( - Constants.Configuration.TRANSPORT_URL); - - if (targetAddress != null) { - transportInfo = new JMSOutTransportInfo(targetAddress); - } else if (targetAddress == null && msgContext.getTo() != null && - !msgContext.getTo().hasAnonymousAddress()) { - targetAddress = msgContext.getTo().getAddress(); - - if (!msgContext.getTo().hasNoneAddress()) { - transportInfo = new JMSOutTransportInfo(targetAddress); - } else { - //Don't send the message. - return InvocationResponse.CONTINUE; - } - } else if (msgContext.isServerSide()) { - // get the jms ReplyTo - transportInfo = (JMSOutTransportInfo) - msgContext.getProperty(Constants.OUT_TRANSPORT_INFO); - } - - // get the ConnectionFactory to be used for the send - ConnectionFactory connectionFac = transportInfo.getConnectionFactory(); - - Connection con = null; - try { - String user = transportInfo.getConnectionFactoryUser(); - String password = transportInfo.getConnectionFactoryPassword(); - - if ((user == null) || (password == null)){ - // Use the OS username and credentials - con = connectionFac.createConnection(); - } else{ - // use an explicit username and password - con = connectionFac.createConnection(user, password); - } - - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Message message = createJMSMessage(msgContext, session); - - // get the JMS destination for the message being sent - Destination dest = transportInfo.getDestination(); - - if (dest == null) { - if (targetAddress != null) { - - // if it does not exist, create it - String name = JMSUtils.getDestination(targetAddress); - if (log.isDebugEnabled()) { - log.debug("Creating JMS Destination : " + name); - } - - try { - dest = session.createQueue(name); - } catch (JMSException e) { - handleException("Error creating destination Queue : " + name, e); - } - } else { - handleException("Cannot send reply to unknown JMS Destination"); - } - } - - MessageProducer producer = session.createProducer(dest); - Destination replyDest = null; - - boolean waitForResponse = - msgContext.getOperationContext() != null && - WSDL2Constants.MEP_URI_OUT_IN.equals( - msgContext.getOperationContext().getAxisOperation().getMessageExchangePattern()); - - if (waitForResponse) { - String replyToJNDIName = (String) msgContext.getProperty(JMSConstants.REPLY_PARAM); - if (replyToJNDIName != null && replyToJNDIName.length() > 0) { - Context context = null; - final Hashtable props = JMSUtils.getProperties(targetAddress); - try { - try { - context = (Context) AccessController.doPrivileged( - new PrivilegedExceptionAction() { - public Object run() throws NamingException{ - return new InitialContext(props); - } - } - ) - ; - } catch (PrivilegedActionException e) { - throw (NamingException) e.getException(); - } - } catch (NamingException e) { - handleException("Could not get the initial context", e); - } - - try { - replyDest = (Destination) context.lookup(replyToJNDIName); - - } catch (NameNotFoundException e) { - log.warn("Cannot get or lookup JMS response destination : " + - replyToJNDIName + " : " + e.getMessage() + - ". Attempting to create a Queue named : " + replyToJNDIName); - replyDest = session.createQueue(replyToJNDIName); - - } catch (NamingException e) { - handleException("Cannot get JMS response destination : " + - replyToJNDIName + " : ", e); - } - - } else { - try { - // create temporary queue to receive reply - replyDest = session.createTemporaryQueue(); - } catch (JMSException e) { - handleException("Error creating temporary queue for response"); - } - } - message.setJMSReplyTo(replyDest); - if (log.isDebugEnabled()) { - log.debug("Expecting a response to JMS Destination : " + - (replyDest instanceof Queue ? - ((Queue) replyDest).getQueueName() : ((Topic) replyDest).getTopicName())); - } - } - - try { - log.debug("[" + (msgContext.isServerSide() ? "Server" : "Client") + - "]Sending message to destination : " + dest); - producer.send(message); - producer.close(); - - } catch (JMSException e) { - handleException("Error sending JMS message to destination : " + - dest.toString(), e); - } - - if (waitForResponse) { - try { - // wait for reply - MessageConsumer consumer = session.createConsumer(replyDest); - - long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; - Long waitReply = (Long) msgContext.getProperty(JMSConstants.JMS_WAIT_REPLY); - if (waitReply != null) { - timeout = waitReply.longValue(); - } - - log.debug("Waiting for a maximum of " + timeout + - "ms for a response message to destination : " + replyDest); - con.start(); - Message reply = consumer.receive(timeout); - - if (reply != null) { - msgContext.setProperty(MessageContext.TRANSPORT_IN, - JMSUtils.getInputStream(reply)); - } else { - log.warn("Did not receive a JMS response within " + - timeout + " ms to destination : " + dest); - } - - } catch (JMSException e) { - handleException("Error reading response from temporary " + - "queue : " + replyDest, e); - } - } - } catch (JMSException e) { - handleException("Error preparing to send message to destination", e); - - } finally { - if (con != null) { - try { - con.close(); // closes all sessions, producers, temp Q's etc - } catch (JMSException e) { - } // ignore - } - } - return InvocationResponse.CONTINUE; - } - - public void cleanup(MessageContext msgContext) throws AxisFault { - // do nothing - } - - public void init(ConfigurationContext confContext, - TransportOutDescription transportOut) throws AxisFault { - // do nothing - } - - public void stop() { - // do nothing - } - - /** - * Create a JMS Message from the given MessageContext and using the given - * session - * - * @param msgContext the MessageContext - * @param session the JMS session - * @return a JMS message from the context and session - * @throws JMSException on exception - */ - private Message createJMSMessage(MessageContext msgContext, Session session) - throws JMSException { - - Message message = null; - String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); - - OMElement msgElement = msgContext.getEnvelope(); - if (msgContext.isDoingREST()) { - msgElement = msgContext.getEnvelope().getBody().getFirstElement(); - } - - if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType)) { - - message = session.createBytesMessage(); - BytesMessage bytesMsg = (BytesMessage) message; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - OMOutputFormat format = new OMOutputFormat(); - - /* Added due to possible bug in Axis2, OMOutputFormat's boolean isSOAP11 defaults to true. - * This means that if left untouched all JMS byte messages must be SOAP 1.1 - * We set the boolean here based on the messageContexts value, which is assertained from - * the soap namespace used. This is what HTTP does also. - */ - format.setSOAP11(msgContext.isSOAP11()); - format.setCharSetEncoding( - getProperty(msgContext, Constants.Configuration.CHARACTER_SET_ENCODING)); - format.setDoOptimize(msgContext.isDoingMTOM()); - try { - msgElement.serializeAndConsume(baos, format); - baos.flush(); - } catch (XMLStreamException e) { - handleException("XML serialization error creating BytesMessage", e); - } catch (IOException e) { - handleException("IO Error while creating BytesMessage", e); - } - bytesMsg.writeBytes(baos.toByteArray()); - - /* Added due to possible bug in Axis2, the content type is never set for a JMS byte message. This - * goes unnoticed when MTOM is not used, as the server can handle the message. However once MTOM - * is used a contentType of multipart/related is required. - */ - bytesMsg.setStringProperty(JMSConstants.CONTENT_TYPE, - new SOAPMessageFormatter().getContentType(msgContext, format, null)); - } else { - message = session.createTextMessage(); // default - TextMessage txtMsg = (TextMessage) message; - txtMsg.setText(msgElement.toString()); - } - - // set the JMS correlation ID if specified - String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID); - if (correlationId == null && msgContext.getRelatesTo() != null) { - correlationId = msgContext.getRelatesTo().getValue(); - } - - if (correlationId != null) { - message.setJMSCorrelationID(correlationId); - } - - if (msgContext.isServerSide()) { - // set SOAP Action and context type as properties on the JMS message - setProperty(message, msgContext, JMSConstants.SOAPACTION); - setProperty(message, msgContext, JMSConstants.CONTENT_TYPE); - } else { - String action = msgContext.getOptions().getAction(); - if (action != null) { - message.setStringProperty(JMSConstants.SOAPACTION, action); - } - } - - return message; - } - - private void setProperty(Message message, MessageContext msgCtx, String key) { - - String value = getProperty(msgCtx, key); - if (value != null) { - try { - message.setStringProperty(key, value); - } catch (JMSException e) { - log.warn("Couldn't set message property : " + key + " = " + value, e); - } - } - } - - private String getProperty(MessageContext mc, String key) { - return (String) mc.getProperty(key); - } - - private static void handleException(String s) { - log.error(s); - throw new AxisJMSException(s); - } - - private static void handleException(String s, Exception e) { - log.error(s, e); - throw new AxisJMSException(s, e); - } - -} diff --git a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/README b/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/README deleted file mode 100644 index 5df1751298..0000000000 --- a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/README +++ /dev/null @@ -1,14 +0,0 @@ -The classes in this package are a copy of the same classes from the -Axis2 package org.apache.axis2.transport.jms in the Axis2 1.4.1 release. - -The only change is in the listenOnDestination method in JMSConnectionFactory -to use Tuscany threads instead of the setMessageListener call approach when -running in a JEE container where setMessageListener is prohibited. There are -several classes copied in this Tuscany package as many of the constructors -and methods are not public so we can't just subclass to fix the problem. - -In Axis2 1.5 and the new separately released JMS transport will fix this -problem so when we move up to that in Tuscany we can get rid of this package. - - - -- cgit v1.2.3