/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.tuscany.sca.binding.ws.axis2.jms; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.StringTokenizer; import javax.jms.JMSException; import javax.naming.Context; import javax.naming.NamingException; import org.apache.axiom.om.OMElement; import org.apache.axis2.AxisFault; import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.SessionContext; import org.apache.axis2.description.AxisModule; import org.apache.axis2.description.AxisService; import org.apache.axis2.description.AxisServiceGroup; import org.apache.axis2.description.Parameter; import org.apache.axis2.description.ParameterIncludeImpl; import org.apache.axis2.description.TransportInDescription; import org.apache.axis2.engine.AxisConfiguration; import org.apache.axis2.engine.AxisEvent; import org.apache.axis2.engine.AxisObserver; import org.apache.axis2.transport.TransportListener; import org.apache.axis2.transport.jms.JMSConstants; import org.apache.axis2.transport.jms.JMSUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tuscany.sca.work.WorkScheduler; import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; /** * The JMS Transport listener implementation. A JMS Listner will hold one or * more JMS connection factories, which would be created at initialization * time. This implementation does not support the creation of connection * factories at runtime. This JMS Listener registers with Axis to be notified * of service deployment/undeployment/start and stop, and enables or disables * listening for messages on the destinations as appropriate. *

* A Service could state the JMS connection factory name and the destination * name for use as Parameters in its services.xml as shown in the example * below. If the connection name was not specified, it will use the connection * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a * factory is defined in the Axis2.xml. If the destination name is not specified * it will default to a JMS queue by the name of the service. If the destination * should be a Topic, it should be created on the JMS implementation, and * specified in the services.xml of the service. *

* * myTopicConnectionFactory * * dynamicTopics/something.TestTopic */ public class JMSListener implements TransportListener { private static final Log log = LogFactory.getLog(JMSListener.class); /** * The maximum number of threads used for the worker thread pool */ private static final int WORKERS_MAX_THREADS = 100; /** * The keep alive time of an idle worker thread */ private static final long WORKER_KEEP_ALIVE = 60L; /** * The worker thread timeout time unit */ private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; /** * A Map containing the connection factories managed by this, keyed by name */ private Map connectionFactories = new HashMap(); /** * A Map of service name to the JMS EPR addresses */ private Map serviceNameToEprMap = new HashMap(); /** * The Axis2 Configuration context */ private ConfigurationContext configCtx = null; private ExecutorService workerPool; private WorkScheduler workScheduler; public JMSListener(WorkScheduler workScheduler) { this.workScheduler = workScheduler; } /** * This is the TransportListener initialization method invoked by Axis2 * * @param axisConf the Axis configuration context * @param transprtIn the TransportIn description */ public void init(ConfigurationContext axisConf, TransportInDescription transprtIn) { // save reference to the configuration context this.configCtx = axisConf; // initialize the defined connection factories initializeConnectionFactories(transprtIn); // if no connection factories are defined, we cannot listen if (connectionFactories.isEmpty()) { log.warn("No JMS connection factories are defined." + "Will not listen for any JMS messages"); return; } // iterate through deployed services and validate connection factory // names, and mark services as faulty where appropriate. Iterator services = axisConf.getAxisConfiguration().getServices().values().iterator(); while (services.hasNext()) { AxisService service = (AxisService) services.next(); if (JMSUtils.isJMSService(service)) { processService(service); } } // register to receive updates on services for lifetime management axisConf.getAxisConfiguration().addObservers(new JMSAxisObserver()); log.info("JMS Transport Receiver (Listener) initialized..."); } /** * Prepare to listen for JMS messages on behalf of this service * * @param service */ private void processService(AxisService service) { JMSConnectionFactory cf = getConnectionFactory(service); if (cf == null) { String msg = "Service " + service.getName() + " does not specify" + "a JMS connection factory or refers to an invalid factory. " + "This service is being marked as faulty and will not be " + "available over the JMS transport"; log.warn(msg); JMSUtils.markServiceAsFaulty( service.getName(), msg, service.getAxisConfiguration()); return; } String destination = JMSUtils.getDestination(service); // compute service EPR and keep for later use serviceNameToEprMap.put(service.getName(), getEPR(cf, destination)); // add the specified or implicit destination of this service // to its connection factory cf.addDestination(destination, service.getName()); } /** * Return the connection factory name for this service. If this service * refers to an invalid factory or defaults to a non-existent default * factory, this returns null * * @param service the AxisService * @return the JMSConnectionFactory to be used, or null if reference is invalid */ private JMSConnectionFactory getConnectionFactory(AxisService service) { Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM); // validate connection factory name (specified or default) if (conFacParam != null) { String conFac = (String) conFacParam.getValue(); if (connectionFactories.containsKey(conFac)) { return (JMSConnectionFactory) connectionFactories.get(conFac); } else { return null; } } else if (connectionFactories.containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) { return (JMSConnectionFactory) connectionFactories. get(JMSConstants.DEFAULT_CONFAC_NAME); } else { return null; } } /** * Initialize the defined connection factories, parsing the TransportIn * descriptions * * @param transprtIn The Axis2 Transport in for the JMS */ private void initializeConnectionFactories(TransportInDescription transprtIn) { // iterate through all defined connection factories Iterator conFacIter = transprtIn.getParameters().iterator(); while (conFacIter.hasNext()) { Parameter param = (Parameter) conFacIter.next(); JMSConnectionFactory jmsConFactory = new JMSConnectionFactory(param.getName(), workScheduler); ParameterIncludeImpl pi = new ParameterIncludeImpl(); try { pi.deserializeParameters((OMElement) param.getValue()); } catch (AxisFault axisFault) { handleException("Error reading Parameters for JMS connection " + "factory" + jmsConFactory.getName(), axisFault); } // read connection facotry properties Iterator params = pi.getParameters().iterator(); while (params.hasNext()) { Parameter p = (Parameter) params.next(); if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) { jmsConFactory.addProperty( Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue()); } else if (Context.PROVIDER_URL.equals(p.getName())) { jmsConFactory.addProperty( Context.PROVIDER_URL, (String) p.getValue()); } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) { jmsConFactory.addProperty( Context.SECURITY_PRINCIPAL, (String) p.getValue()); } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) { jmsConFactory.addProperty( Context.SECURITY_CREDENTIALS, (String) p.getValue()); } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) { jmsConFactory.setJndiName((String) p.getValue()); } else if (JMSConstants.CONFAC_JNDI_NAME_USER.equals(p.getName())) { jmsConFactory.setJndiUser((String) p.getValue()); } else if (JMSConstants.CONFAC_JNDI_NAME_PASS.equals(p.getName())) { jmsConFactory.setJndiPass((String) p.getValue()); } else if (JMSConstants.DEST_PARAM.equals(p.getName())) { StringTokenizer st = new StringTokenizer((String) p.getValue(), " ,"); while (st.hasMoreTokens()) { jmsConFactory.addDestination(st.nextToken(), null); } } } // connect to the actual connection factory try { jmsConFactory.connect(); connectionFactories.put(jmsConFactory.getName(), jmsConFactory); } catch (NamingException e) { handleException("Error connecting to JMS connection factory : " + jmsConFactory.getJndiName(), e); } } } /** * Get the EPR for the given JMS connection factory and destination * the form of the URL is * jms:/?[=&]* * * @param cf the Axis2 JMS connection factory * @param destination the JNDI name of the destination * @return the EPR as a String */ private static String getEPR(JMSConnectionFactory cf, String destination) { StringBuffer sb = new StringBuffer(); sb.append(JMSConstants.JMS_PREFIX).append(destination); sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). append("=").append(cf.getJndiName()); Iterator props = cf.getProperties().keySet().iterator(); while (props.hasNext()) { String key = (String) props.next(); String value = (String) cf.getProperties().get(key); sb.append("&").append(key).append("=").append(value); } return sb.toString(); } /** * Start this JMS Listener (Transport Listener) * * @throws AxisFault */ public void start() throws AxisFault { // create thread pool of workers workerPool = new ThreadPoolExecutor( 1, WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT, new LinkedBlockingQueue(), new org.apache.axis2.util.threadpool.DefaultThreadFactory( new ThreadGroup("JMS Worker thread group"), "JMSWorker")); Iterator iter = connectionFactories.values().iterator(); while (iter.hasNext()) { JMSConnectionFactory conFac = (JMSConnectionFactory) iter.next(); JMSMessageReceiver msgRcvr = new JMSMessageReceiver(conFac, workerPool, configCtx); try { conFac.listen(msgRcvr); } catch (JMSException e) { handleException("Error starting connection factory : " + conFac.getName(), e); } } } /** * Stop this transport listener and shutdown all of the connection factories */ public void stop() { Iterator iter = connectionFactories.values().iterator(); while (iter.hasNext()) { ((JMSConnectionFactory) iter.next()).stop(); } if (workerPool != null) { workerPool.shutdown(); } } /** * Returns EPRs for the given service and IP. (Picks up precomputed EPR) * * @param serviceName service name * @param ip ignored * @return the EPR for the service * @throws AxisFault not used */ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { //Strip out the operation name if (serviceName.indexOf('/') != -1) { serviceName = serviceName.substring(0, serviceName.indexOf('/')); } String endpointName = (String) serviceNameToEprMap.get(serviceName); if (endpointName == null){ if (serviceName.indexOf(".") != -1){ serviceName = serviceName.substring(0, serviceName.indexOf(".")); endpointName = (String) serviceNameToEprMap.get(serviceName); } } return new EndpointReference[]{new EndpointReference(endpointName)}; } /** * Returns the EPR for the given service and IP. (Picks up precomputed EPR) * * @param serviceName service name * @param ip ignored * @return the EPR for the service * @throws AxisFault not used */ public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault { return getEPRsForService(serviceName, ip)[0]; } /** * Starts listening for messages on this service * * @param service the AxisService just deployed */ private void startListeningForService(AxisService service) { processService(service); JMSConnectionFactory cf = getConnectionFactory(service); if (cf == null) { String msg = "Service " + service.getName() + " does not specify" + "a JMS connection factory or refers to an invalid factory." + "This service is being marked as faulty and will not be " + "available over the JMS transport"; log.warn(msg); JMSUtils.markServiceAsFaulty( service.getName(), msg, service.getAxisConfiguration()); return; } String destination = JMSUtils.getDestination(service); try { cf.listenOnDestination(destination); log.info("Started listening on destination : " + destination + " for service " + service.getName()); } catch (JMSException e) { handleException( "Could not listen on JMS for service " + service.getName(), e); JMSUtils.markServiceAsFaulty( service.getName(), e.getMessage(), service.getAxisConfiguration()); } } /** * Stops listening for messages for the service undeployed * * @param service the AxisService just undeployed */ private void stopListeningForService(AxisService service) { JMSConnectionFactory cf = getConnectionFactory(service); if (cf == null) { String msg = "Service " + service.getName() + " does not specify" + "a JMS connection factory or refers to an invalid factory." + "This service is being marked as faulty and will not be " + "available over the JMS transport"; log.warn(msg); JMSUtils.markServiceAsFaulty( service.getName(), msg, service.getAxisConfiguration()); return; } // remove from the serviceNameToEprMap serviceNameToEprMap.remove(service.getName()); String destination = JMSUtils.getDestination(service); try { cf.removeDestination(destination); } catch (JMSException e) { handleException( "Error while terminating listening on JMS destination : " + destination, e); } } private void handleException(String msg, Exception e) { log.error(msg, e); throw new AxisJMSException(msg, e); } /** * An AxisObserver which will start listening for newly deployed services, * and stop listening when services are undeployed. */ class JMSAxisObserver implements AxisObserver { // The initilization code will go here public void init(AxisConfiguration axisConfig) { } public void serviceUpdate(AxisEvent event, AxisService service) { if (JMSUtils.isJMSService(service)) { switch (event.getEventType()) { case AxisEvent.SERVICE_DEPLOY : startListeningForService(service); break; case AxisEvent.SERVICE_REMOVE : stopListeningForService(service); break; case AxisEvent.SERVICE_START : startListeningForService(service); break; case AxisEvent.SERVICE_STOP : stopListeningForService(service); break; } } } public void moduleUpdate(AxisEvent event, AxisModule module) { } //-------------------------------------------------------- public void addParameter(Parameter param) throws AxisFault { } public void removeParameter(Parameter param) throws AxisFault { } public void deserializeParameters(OMElement parameterElement) throws AxisFault { } public Parameter getParameter(String name) { return null; } public ArrayList getParameters() { return null; } public boolean isParameterLocked(String parameterName) { return false; } public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup serviceGroup) { } } public ConfigurationContext getConfigurationContext() { return this.configCtx; } public SessionContext getSessionContext(MessageContext messageContext) { return null; } public void destroy() { this.configCtx = null; } }