From fdd5b43d3c139cf2cbd1655d2efbfaf9032a5b5e Mon Sep 17 00:00:00 2001 From: lresende Date: Wed, 11 Nov 2009 23:14:18 +0000 Subject: Moving 1.x branches git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@835145 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/ws/axis2/jms/JMSListener.java | 527 --------------------- 1 file changed, 527 deletions(-) delete mode 100644 branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java (limited to 'branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java') diff --git a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java deleted file mode 100644 index 08190fb57c..0000000000 --- a/branches/sca-java-1.5.1/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java +++ /dev/null @@ -1,527 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.StringTokenizer; - -import javax.jms.JMSException; -import javax.naming.Context; -import javax.naming.NamingException; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.context.SessionContext; -import org.apache.axis2.description.AxisModule; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.AxisServiceGroup; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.ParameterIncludeImpl; -import org.apache.axis2.description.TransportInDescription; -import org.apache.axis2.engine.AxisConfiguration; -import org.apache.axis2.engine.AxisEvent; -import org.apache.axis2.engine.AxisObserver; -import org.apache.axis2.transport.TransportListener; -import org.apache.axis2.transport.jms.JMSConstants; -import org.apache.axis2.transport.jms.JMSUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.work.WorkScheduler; - -import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; -import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; -import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; -import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; - -/** - * The JMS Transport listener implementation. A JMS Listner will hold one or - * more JMS connection factories, which would be created at initialization - * time. This implementation does not support the creation of connection - * factories at runtime. This JMS Listener registers with Axis to be notified - * of service deployment/undeployment/start and stop, and enables or disables - * listening for messages on the destinations as appropriate. - *

- * A Service could state the JMS connection factory name and the destination - * name for use as Parameters in its services.xml as shown in the example - * below. If the connection name was not specified, it will use the connection - * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a - * factory is defined in the Axis2.xml. If the destination name is not specified - * it will default to a JMS queue by the name of the service. If the destination - * should be a Topic, it should be created on the JMS implementation, and - * specified in the services.xml of the service. - *

- * - * myTopicConnectionFactory - * - * dynamicTopics/something.TestTopic - */ -public class JMSListener implements TransportListener { - - private static final Log log = LogFactory.getLog(JMSListener.class); - - /** - * The maximum number of threads used for the worker thread pool - */ - private static final int WORKERS_MAX_THREADS = 100; - /** - * The keep alive time of an idle worker thread - */ - private static final long WORKER_KEEP_ALIVE = 60L; - /** - * The worker thread timeout time unit - */ - private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; - - /** - * A Map containing the connection factories managed by this, keyed by name - */ - private Map connectionFactories = new HashMap(); - /** - * A Map of service name to the JMS EPR addresses - */ - private Map serviceNameToEprMap = new HashMap(); - /** - * The Axis2 Configuration context - */ - private ConfigurationContext configCtx = null; - - private ExecutorService workerPool; - - private WorkScheduler workScheduler; - - public JMSListener(WorkScheduler workScheduler) { - this.workScheduler = workScheduler; - } - - /** - * This is the TransportListener initialization method invoked by Axis2 - * - * @param axisConf the Axis configuration context - * @param transprtIn the TransportIn description - */ - public void init(ConfigurationContext axisConf, - TransportInDescription transprtIn) { - - // save reference to the configuration context - this.configCtx = axisConf; - - // initialize the defined connection factories - initializeConnectionFactories(transprtIn); - - // if no connection factories are defined, we cannot listen - if (connectionFactories.isEmpty()) { - log.warn("No JMS connection factories are defined." + - "Will not listen for any JMS messages"); - return; - } - - // iterate through deployed services and validate connection factory - // names, and mark services as faulty where appropriate. - Iterator services = - axisConf.getAxisConfiguration().getServices().values().iterator(); - - while (services.hasNext()) { - AxisService service = (AxisService) services.next(); - if (JMSUtils.isJMSService(service)) { - processService(service); - } - } - - // register to receive updates on services for lifetime management - axisConf.getAxisConfiguration().addObservers(new JMSAxisObserver()); - - log.info("JMS Transport Receiver (Listener) initialized..."); - } - - - /** - * Prepare to listen for JMS messages on behalf of this service - * - * @param service - */ - private void processService(AxisService service) { - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - String msg = "Service " + service.getName() + " does not specify" + - "a JMS connection factory or refers to an invalid factory. " + - "This service is being marked as faulty and will not be " + - "available over the JMS transport"; - log.warn(msg); - JMSUtils.markServiceAsFaulty( - service.getName(), msg, service.getAxisConfiguration()); - return; - } - - String destination = JMSUtils.getDestination(service); - - // compute service EPR and keep for later use - serviceNameToEprMap.put(service.getName(), getEPR(cf, destination)); - - // add the specified or implicit destination of this service - // to its connection factory - cf.addDestination(destination, service.getName()); - } - - /** - * Return the connection factory name for this service. If this service - * refers to an invalid factory or defaults to a non-existent default - * factory, this returns null - * - * @param service the AxisService - * @return the JMSConnectionFactory to be used, or null if reference is invalid - */ - private JMSConnectionFactory getConnectionFactory(AxisService service) { - Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM); - - // validate connection factory name (specified or default) - if (conFacParam != null) { - String conFac = (String) conFacParam.getValue(); - if (connectionFactories.containsKey(conFac)) { - return (JMSConnectionFactory) connectionFactories.get(conFac); - } else { - return null; - } - - } else if (connectionFactories.containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) { - return (JMSConnectionFactory) connectionFactories. - get(JMSConstants.DEFAULT_CONFAC_NAME); - - } else { - return null; - } - } - - /** - * Initialize the defined connection factories, parsing the TransportIn - * descriptions - * - * @param transprtIn The Axis2 Transport in for the JMS - */ - private void initializeConnectionFactories(TransportInDescription transprtIn) { - // iterate through all defined connection factories - Iterator conFacIter = transprtIn.getParameters().iterator(); - - while (conFacIter.hasNext()) { - - Parameter param = (Parameter) conFacIter.next(); - JMSConnectionFactory jmsConFactory = - new JMSConnectionFactory(param.getName(), workScheduler); - - ParameterIncludeImpl pi = new ParameterIncludeImpl(); - try { - pi.deserializeParameters((OMElement) param.getValue()); - } catch (AxisFault axisFault) { - handleException("Error reading Parameters for JMS connection " + - "factory" + jmsConFactory.getName(), axisFault); - } - - // read connection facotry properties - Iterator params = pi.getParameters().iterator(); - - while (params.hasNext()) { - Parameter p = (Parameter) params.next(); - - if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) { - jmsConFactory.addProperty( - Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue()); - } else if (Context.PROVIDER_URL.equals(p.getName())) { - jmsConFactory.addProperty( - Context.PROVIDER_URL, (String) p.getValue()); - } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) { - jmsConFactory.addProperty( - Context.SECURITY_PRINCIPAL, (String) p.getValue()); - } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) { - jmsConFactory.addProperty( - Context.SECURITY_CREDENTIALS, (String) p.getValue()); - } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) { - jmsConFactory.setJndiName((String) p.getValue()); - } else if (JMSConstants.CONFAC_JNDI_NAME_USER.equals(p.getName())) { - jmsConFactory.setJndiUser((String) p.getValue()); - } else if (JMSConstants.CONFAC_JNDI_NAME_PASS.equals(p.getName())) { - jmsConFactory.setJndiPass((String) p.getValue()); - } else if (JMSConstants.DEST_PARAM.equals(p.getName())) { - StringTokenizer st = - new StringTokenizer((String) p.getValue(), " ,"); - while (st.hasMoreTokens()) { - jmsConFactory.addDestination(st.nextToken(), null); - } - } - } - - // connect to the actual connection factory - try { - jmsConFactory.connect(); - connectionFactories.put(jmsConFactory.getName(), jmsConFactory); - } catch (NamingException e) { - handleException("Error connecting to JMS connection factory : " + - jmsConFactory.getJndiName(), e); - } - } - } - - /** - * Get the EPR for the given JMS connection factory and destination - * the form of the URL is - * jms:/?[=&]* - * - * @param cf the Axis2 JMS connection factory - * @param destination the JNDI name of the destination - * @return the EPR as a String - */ - private static String getEPR(JMSConnectionFactory cf, String destination) { - StringBuffer sb = new StringBuffer(); - sb.append(JMSConstants.JMS_PREFIX).append(destination); - sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). - append("=").append(cf.getJndiName()); - Iterator props = cf.getProperties().keySet().iterator(); - while (props.hasNext()) { - String key = (String) props.next(); - String value = (String) cf.getProperties().get(key); - sb.append("&").append(key).append("=").append(value); - } - return sb.toString(); - } - - /** - * Start this JMS Listener (Transport Listener) - * - * @throws AxisFault - */ - public void start() throws AxisFault { - // create thread pool of workers - workerPool = new ThreadPoolExecutor( - 1, - WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT, - new LinkedBlockingQueue(), - new org.apache.axis2.util.threadpool.DefaultThreadFactory( - new ThreadGroup("JMS Worker thread group"), - "JMSWorker")); - - Iterator iter = connectionFactories.values().iterator(); - while (iter.hasNext()) { - JMSConnectionFactory conFac = (JMSConnectionFactory) iter.next(); - JMSMessageReceiver msgRcvr = - new JMSMessageReceiver(conFac, workerPool, configCtx); - - try { - conFac.listen(msgRcvr); - } catch (JMSException e) { - handleException("Error starting connection factory : " + - conFac.getName(), e); - } - } - } - - /** - * Stop this transport listener and shutdown all of the connection factories - */ - public void stop() { - Iterator iter = connectionFactories.values().iterator(); - while (iter.hasNext()) { - ((JMSConnectionFactory) iter.next()).stop(); - } - if (workerPool != null) { - workerPool.shutdown(); - } - } - - /** - * Returns EPRs for the given service and IP. (Picks up precomputed EPR) - * - * @param serviceName service name - * @param ip ignored - * @return the EPR for the service - * @throws AxisFault not used - */ - public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { - //Strip out the operation name - if (serviceName.indexOf('/') != -1) { - serviceName = serviceName.substring(0, serviceName.indexOf('/')); - } - - String endpointName = (String) serviceNameToEprMap.get(serviceName); - if (endpointName == null){ - if (serviceName.indexOf(".") != -1){ - serviceName = serviceName.substring(0, serviceName.indexOf(".")); - endpointName = (String) serviceNameToEprMap.get(serviceName); - } - } - return new EndpointReference[]{new EndpointReference(endpointName)}; - } - - /** - * Returns the EPR for the given service and IP. (Picks up precomputed EPR) - * - * @param serviceName service name - * @param ip ignored - * @return the EPR for the service - * @throws AxisFault not used - */ - public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault { - return getEPRsForService(serviceName, ip)[0]; - } - - /** - * Starts listening for messages on this service - * - * @param service the AxisService just deployed - */ - private void startListeningForService(AxisService service) { - processService(service); - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - String msg = "Service " + service.getName() + " does not specify" + - "a JMS connection factory or refers to an invalid factory." + - "This service is being marked as faulty and will not be " + - "available over the JMS transport"; - log.warn(msg); - JMSUtils.markServiceAsFaulty( - service.getName(), msg, service.getAxisConfiguration()); - return; - } - - String destination = JMSUtils.getDestination(service); - try { - cf.listenOnDestination(destination); - log.info("Started listening on destination : " + destination + - " for service " + service.getName()); - - } catch (JMSException e) { - handleException( - "Could not listen on JMS for service " + service.getName(), e); - JMSUtils.markServiceAsFaulty( - service.getName(), e.getMessage(), service.getAxisConfiguration()); - } - } - - /** - * Stops listening for messages for the service undeployed - * - * @param service the AxisService just undeployed - */ - private void stopListeningForService(AxisService service) { - - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - String msg = "Service " + service.getName() + " does not specify" + - "a JMS connection factory or refers to an invalid factory." + - "This service is being marked as faulty and will not be " + - "available over the JMS transport"; - log.warn(msg); - JMSUtils.markServiceAsFaulty( - service.getName(), msg, service.getAxisConfiguration()); - return; - } - - // remove from the serviceNameToEprMap - serviceNameToEprMap.remove(service.getName()); - - String destination = JMSUtils.getDestination(service); - try { - cf.removeDestination(destination); - } catch (JMSException e) { - handleException( - "Error while terminating listening on JMS destination : " + destination, e); - } - } - - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - /** - * An AxisObserver which will start listening for newly deployed services, - * and stop listening when services are undeployed. - */ - class JMSAxisObserver implements AxisObserver { - - // The initilization code will go here - public void init(AxisConfiguration axisConfig) { - } - - public void serviceUpdate(AxisEvent event, AxisService service) { - - if (JMSUtils.isJMSService(service)) { - switch (event.getEventType()) { - case AxisEvent.SERVICE_DEPLOY : - startListeningForService(service); - break; - case AxisEvent.SERVICE_REMOVE : - stopListeningForService(service); - break; - case AxisEvent.SERVICE_START : - startListeningForService(service); - break; - case AxisEvent.SERVICE_STOP : - stopListeningForService(service); - break; - } - } - } - - public void moduleUpdate(AxisEvent event, AxisModule module) { - } - - //-------------------------------------------------------- - public void addParameter(Parameter param) throws AxisFault { - } - - public void removeParameter(Parameter param) throws AxisFault { - } - - public void deserializeParameters(OMElement parameterElement) throws AxisFault { - } - - public Parameter getParameter(String name) { - return null; - } - - public ArrayList getParameters() { - return null; - } - - public boolean isParameterLocked(String parameterName) { - return false; - } - - public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup serviceGroup) { - } - } - - public ConfigurationContext getConfigurationContext() { - return this.configCtx; - } - - - public SessionContext getSessionContext(MessageContext messageContext) { - return null; - } - - public void destroy() { - this.configCtx = null; - } -} -- cgit v1.2.3