summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java294
1 files changed, 0 insertions, 294 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
deleted file mode 100644
index 8c9f66dfbf..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSListener.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.BytesMessage;
-import javax.jms.TextMessage;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.TransportInDescription;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleFactory;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeRuleSet;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.MessageTypeRule;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.PropertyRule;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportListener;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorListener;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSource;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.event.TransportErrorSourceSupport;
-
-/**
- * The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances
- * for each service requesting exposure over JMS, and stops these if they are undeployed / stopped.
- * <p>
- * A service indicates a JMS Connection factory definition by name, which would be defined in the
- * JMSListner on the axis2.xml, and this provides a way to reuse common configuration between
- * services, as well as to optimize resources utilized
- * <p>
- * If the connection factory name was not specified, it will default to the one named "default"
- * {@see JMSConstants.DEFAULT_CONFAC_NAME}
- * <p>
- * If a destination JNDI name is not specified, a service will expect to use a Queue with the same
- * JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify
- * many more detailed control options. See package documentation for more details
- * <p>
- * All Destinations / JMS Administered objects used MUST be pre-created or already available
- */
-public class JMSListener extends AbstractTransportListener implements ManagementSupport,
- TransportErrorSource {
-
- public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
-
- /** The JMSConnectionFactoryManager which centralizes the management of defined factories */
- private JMSConnectionFactoryManager connFacManager;
- /** A Map of service name to the JMS endpoints */
- private Map<String,JMSEndpoint> serviceNameToEndpointMap = new HashMap<String,JMSEndpoint>();
- /** A Map of service name to its ServiceTaskManager instances */
- private Map<String, ServiceTaskManager> serviceNameToSTMMap =
- new HashMap<String, ServiceTaskManager>();
- private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this);
-
- /**
- * TransportListener initialization
- *
- * @param cfgCtx the Axis configuration context
- * @param trpInDesc the TransportIn description
- */
- public void init(ConfigurationContext cfgCtx,
- TransportInDescription trpInDesc) throws AxisFault {
-
- super.init(cfgCtx, trpInDesc);
- connFacManager = new JMSConnectionFactoryManager(trpInDesc);
- log.info("JMS Transport Receiver/Listener initialized...");
- }
-
- /**
- * Returns EPRs for the given service over the JMS transport
- *
- * @param serviceName service name
- * @return the JMS EPRs for the service
- */
- public EndpointReference[] getEPRsForService(String serviceName) {
- //Strip out the operation name
- if (serviceName.indexOf('/') != -1) {
- serviceName = serviceName.substring(0, serviceName.indexOf('/'));
- }
- // strip out the endpoint name if present
- if (serviceName.indexOf('.') != -1) {
- serviceName = serviceName.substring(0, serviceName.indexOf('.'));
- }
- JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName);
- if (endpoint != null) {
- return endpoint.getEndpointReferences();
- } else {
- return null;
- }
- }
-
- /**
- * Listen for JMS messages on behalf of the given service
- *
- * @param service the Axis service for which to listen for messages
- */
- protected void startListeningForService(AxisService service) throws AxisFault {
- JMSConnectionFactory cf = getConnectionFactory(service);
- if (cf == null) {
- throw new AxisFault("The service doesn't specify a JMS connection factory or refers " +
- "to an invalid factory.");
- }
-
- JMSEndpoint endpoint = new JMSEndpoint();
- endpoint.setService(service);
- endpoint.setCf(cf);
-
- Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION);
- if (destParam != null) {
- endpoint.setJndiDestinationName((String)destParam.getValue());
- } else {
- // Assume that the JNDI destination name is the same as the service name
- endpoint.setJndiDestinationName(service.getName());
- }
-
- Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE);
- if (destTypeParam != null) {
- String paramValue = (String) destTypeParam.getValue();
- if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
- JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
- endpoint.setDestinationType(paramValue);
- } else {
- throw new AxisFault("Invalid destinaton type value " + paramValue);
- }
- } else {
- log.debug("JMS destination type not given. default queue");
- endpoint.setDestinationType(JMSConstants.DESTINATION_TYPE_QUEUE);
- }
-
- Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM);
- if (contentTypeParam == null) {
- ContentTypeRuleSet contentTypeRuleSet = new ContentTypeRuleSet();
- contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE));
- contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream"));
- contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain"));
- endpoint.setContentTypeRuleSet(contentTypeRuleSet);
- } else {
- endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam));
- }
-
- endpoint.computeEPRs(); // compute service EPR and keep for later use
- serviceNameToEndpointMap.put(service.getName(), endpoint);
-
- ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool);
- stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint));
- stm.start();
- serviceNameToSTMMap.put(service.getName(), stm);
-
- for (int i=0; i<3; i++) {
- if (stm.getActiveTaskCount() > 0) {
- log.info("Started to listen on destination : " + stm.getDestinationJNDIName() +
- " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
- " for service " + stm.getServiceName());
- return;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {}
- }
-
- log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() +
- " of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
- " for service " + stm.getServiceName() + " have not yet started after 3 seconds ..");
- }
-
- /**
- * Stops listening for messages for the service thats undeployed or stopped
- *
- * @param service the service that was undeployed or stopped
- */
- protected void stopListeningForService(AxisService service) {
-
- ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName());
- if (stm != null) {
- if (log.isDebugEnabled()) {
- log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() +
- " for service : " + stm.getServiceName());
- }
-
- stm.stop();
-
- serviceNameToSTMMap.remove(service.getName());
- serviceNameToEndpointMap.remove(service.getName());
- log.info("Stopped listening for JMS messages to service : " + service.getName());
-
- } else {
- log.error("Unable to stop service : " + service.getName() +
- " - unable to find its ServiceTaskManager");
- }
- }
- /**
- * Return the connection factory name for this service. If this service
- * refers to an invalid factory or defaults to a non-existent default
- * factory, this returns null
- *
- * @param service the AxisService
- * @return the JMSConnectionFactory to be used, or null if reference is invalid
- */
- public JMSConnectionFactory getConnectionFactory(AxisService service) {
-
- Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC);
- // validate connection factory name (specified or default)
- if (conFacParam != null) {
- return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue());
- } else {
- return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME);
- }
- }
-
- // -- jmx/management methods--
- /**
- * Pause the listener - Stop accepting/processing new messages, but continues processing existing
- * messages until they complete. This helps bring an instance into a maintenence mode
- * @throws AxisFault on error
- */
- public void pause() throws AxisFault {
- if (state != BaseConstants.STARTED) return;
- try {
- for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
- stm.pause();
- }
- state = BaseConstants.PAUSED;
- log.info("Listener paused");
- } catch (AxisJMSException e) {
- log.error("At least one service could not be paused", e);
- }
- }
-
- /**
- * Resume the lister - Brings the lister into active mode back from a paused state
- * @throws AxisFault on error
- */
- public void resume() throws AxisFault {
- if (state != BaseConstants.PAUSED) return;
- try {
- for (ServiceTaskManager stm : serviceNameToSTMMap.values()) {
- stm.resume();
- }
- state = BaseConstants.STARTED;
- log.info("Listener resumed");
- } catch (AxisJMSException e) {
- log.error("At least one service could not be resumed", e);
- }
- }
-
- /**
- * Stop processing new messages, and wait the specified maximum time for in-flight
- * requests to complete before a controlled shutdown for maintenence
- *
- * @param millis a number of milliseconds to wait until pending requests are allowed to complete
- * @throws AxisFault on error
- */
- public void maintenenceShutdown(long millis) throws AxisFault {
- if (state != BaseConstants.STARTED) return;
- try {
- long start = System.currentTimeMillis();
- stop();
- state = BaseConstants.STOPPED;
- log.info("Listener shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s");
- } catch (Exception e) {
- handleException("Error shutting down the listener for maintenence", e);
- }
- }
-
- public void addErrorListener(TransportErrorListener listener) {
- tess.addErrorListener(listener);
- }
-
- public void removeErrorListener(TransportErrorListener listener) {
- tess.removeErrorListener(listener);
- }
-
- void error(AxisService service, Throwable ex) {
- tess.error(service, ex);
- }
-}