diff options
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java')
-rw-r--r-- | branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java | 550 |
1 files changed, 0 insertions, 550 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java deleted file mode 100644 index 440e09dc84..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java +++ /dev/null @@ -1,550 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.transport.base; - -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.Map; -import java.util.Set; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.apache.axiom.om.util.UUIDGenerator; -import org.apache.axis2.AxisFault; -import org.apache.axis2.addressing.EndpointReference; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.context.SessionContext; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.TransportInDescription; -import org.apache.axis2.description.TransportOutDescription; -import org.apache.axis2.engine.AxisEngine; -import org.apache.axis2.transport.TransportListener; -import org.apache.axis2.util.MessageContextBuilder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPoolFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceFilter; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceTracker; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceTrackerListener; - -public abstract class AbstractTransportListener implements TransportListener { - - /** the reference to the actual commons logger to be used for log messages */ - protected Log log = null; - - /** the axis2 configuration context */ - protected ConfigurationContext cfgCtx = null; - - /** transport in description */ - private TransportInDescription transportIn = null; - /** transport out description */ - private TransportOutDescription transportOut = null; - /** state of the listener */ - protected int state = BaseConstants.STOPPED; - /** is this transport non-blocking? */ - protected boolean isNonBlocking = false; - /** - * Service tracker used to invoke {@link #internalStartListeningForService(AxisService)} - * and {@link #internalStopListeningForService(AxisService)}. */ - private AxisServiceTracker serviceTracker; - - /** the thread pool to execute actual poll invocations */ - protected WorkerPool workerPool = null; - /** use the thread pool available in the axis2 configuration context */ - protected boolean useAxis2ThreadPool = false; - /** JMX support */ - private TransportMBeanSupport mbeanSupport; - /** Metrics collector for this transport */ - protected MetricsCollector metrics = new MetricsCollector(); - - /** - * A constructor that makes subclasses pick up the correct logger - */ - protected AbstractTransportListener() { - log = LogFactory.getLog(this.getClass()); - } - - /** - * Initialize the generic transport. Sets up the transport and the thread pool to be used - * for message processing. Also creates an AxisObserver that gets notified of service - * life cycle events for the transport to act on - * @param cfgCtx the axis configuration context - * @param transportIn the transport-in description - * @throws AxisFault on error - */ - public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn) - throws AxisFault { - - this.cfgCtx = cfgCtx; - this.transportIn = transportIn; - this.transportOut = cfgCtx.getAxisConfiguration().getTransportOut(getTransportName()); - - if (useAxis2ThreadPool) { - //this.workerPool = cfgCtx.getThreadPool(); not yet implemented - throw new AxisFault("Unsupported thread pool for task execution - Axis2 thread pool"); - } else { - this.workerPool = WorkerPoolFactory.getWorkerPool( - 10, 20, 5, -1, getTransportName() + "Server Worker thread group", getTransportName() + "-Worker"); - } - - // register to receive updates on services for lifetime management - serviceTracker = new AxisServiceTracker( - cfgCtx.getAxisConfiguration(), - new AxisServiceFilter() { - public boolean matches(AxisService service) { - return !service.getName().startsWith("__") // these are "private" services - && BaseUtils.isUsingTransport(service, getTransportName()); - } - }, - new AxisServiceTrackerListener() { - public void serviceAdded(AxisService service) { - internalStartListeningForService(service); - } - - public void serviceRemoved(AxisService service) { - internalStopListeningForService(service); - } - }); - - // register with JMX - mbeanSupport = new TransportMBeanSupport(this, getTransportName()); - mbeanSupport.register(); - } - - public void destroy() { - try { - if (state == BaseConstants.STARTED) { - try { - stop(); - } catch (AxisFault ignore) { - log.warn("Error stopping the transport : " + getTransportName()); - } - } - } finally { - state = BaseConstants.STOPPED; - mbeanSupport.unregister(); - } - try { - workerPool.shutdown(10000); - } catch (InterruptedException ex) { - log.warn("Thread interrupted while waiting for worker pool to shut down"); - } - } - - public void stop() throws AxisFault { - if (state == BaseConstants.STARTED) { - state = BaseConstants.STOPPED; - // cancel receipt of service lifecycle events - log.info(getTransportName().toUpperCase() + " Listener Shutdown"); - serviceTracker.stop(); - } - } - - public void start() throws AxisFault { - if (state != BaseConstants.STARTED) { - state = BaseConstants.STARTED; - // register to receive updates on services for lifetime management - // cfgCtx.getAxisConfiguration().addObservers(axisObserver); - log.info(getTransportName().toUpperCase() + " Listener started"); - // iterate through deployed services and start - serviceTracker.start(); - } - } - - public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { - return getEPRsForService(serviceName); - } - - protected EndpointReference[] getEPRsForService(String serviceName) { - return null; - } - - public void disableTransportForService(AxisService service) { - - log.warn("Disabling the " + getTransportName() + " transport for the service " - + service.getName() + ", because it is not configured properly for the service"); - - if (service.isEnableAllTransports()) { - ArrayList<String> exposedTransports = new ArrayList<String>(); - for(Object obj: cfgCtx.getAxisConfiguration().getTransportsIn().values()) { - String transportName = ((TransportInDescription) obj).getName(); - if (!transportName.equals(getTransportName())) { - exposedTransports.add(transportName); - } - } - service.setEnableAllTransports(false); - service.setExposedTransports(exposedTransports); - } else { - service.removeExposedTransport(getTransportName()); - } - } - - void internalStartListeningForService(AxisService service) { - String serviceName = service.getName(); - try { - startListeningForService(service); - } catch (AxisFault ex) { - String transportName = getTransportName().toUpperCase(); - String msg = "Unable to configure the service " + serviceName + " for the " + - transportName + " transport: " + ex.getMessage() + ". " + - "This service is being marked as faulty and will not be available over the " + - transportName + " transport."; - // Only log the message at level WARN and log the full stack trace at level DEBUG. - // TODO: We should have a way to distinguish a missing configuration - // from an error. This may be addressed when implementing the enhancement - // described in point 3 of http://markmail.org/message/umhenrurlrekk5jh - log.warn(msg); - log.debug("Disabling service " + serviceName + " for the " + transportName + - "transport", ex); - BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration()); - disableTransportForService(service); - return; - } catch (Throwable ex) { - String msg = "Unexpected error when configuring service " + serviceName + - " for the " + getTransportName().toUpperCase() + " transport. It will be" + - " disabled for this transport and marked as faulty."; - log.error(msg, ex); - BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration()); - disableTransportForService(service); - return; - } - registerMBean(new TransportListenerEndpointView(this, serviceName), - getEndpointMBeanName(serviceName)); - } - - void internalStopListeningForService(AxisService service) { - unregisterMBean(getEndpointMBeanName(service.getName())); - stopListeningForService(service); - } - - protected abstract void startListeningForService(AxisService service) throws AxisFault; - - protected abstract void stopListeningForService(AxisService service); - - /** - * This is a deprecated method in Axis2 and this default implementation returns the first - * result from the getEPRsForService() method - */ - public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault { - return getEPRsForService(serviceName, ip)[0]; - } - - public SessionContext getSessionContext(MessageContext messageContext) { - return null; - } - - /** - * Create a new axis MessageContext for an incoming message through this transport - * @return the newly created message context - */ - public MessageContext createMessageContext() { - MessageContext msgCtx = new MessageContext(); - msgCtx.setConfigurationContext(cfgCtx); - - msgCtx.setIncomingTransportName(getTransportName()); - msgCtx.setTransportOut(transportOut); - msgCtx.setTransportIn(transportIn); - msgCtx.setServerSide(true); - msgCtx.setMessageID(UUIDGenerator.getUUID()); - - // There is a discrepency in what I thought, Axis2 spawns a nes threads to - // send a message is this is TRUE - and I want it to be the other way - msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.valueOf(!isNonBlocking)); - - // are these relevant? - //msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID()); - // this is required to support Sandesha 2 - //msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, - // new HttpCoreRequestResponseTransport(msgContext)); - - return msgCtx; - } - - /** - * Process a new incoming message through the axis engine - * @param msgCtx the axis MessageContext - * @param trpHeaders the map containing transport level message headers - * @param soapAction the optional soap action or null - * @param contentType the optional content-type for the message - */ - public void handleIncomingMessage( - MessageContext msgCtx, Map trpHeaders, - String soapAction, String contentType) throws AxisFault { - - // set the soapaction if one is available via a transport header - if (soapAction != null) { - msgCtx.setSoapAction(soapAction); - } - - // set the transport headers to the message context - msgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, trpHeaders); - - // send the message context through the axis engine - try { - // check if an Axis2 callback has been registered for this message - Map callBackMap = (Map) msgCtx.getConfigurationContext(). - getProperty(BaseConstants.CALLBACK_TABLE); - // FIXME: transport headers are protocol specific; the correlation ID should be - // passed as argument to this method - Object replyToMessageID = trpHeaders.get(BaseConstants.HEADER_IN_REPLY_TO); - // if there is a callback registerd with this replyto ID then this has to - // be handled as a synchronous incoming message, via the - if (replyToMessageID != null && callBackMap != null && - callBackMap.get(replyToMessageID) != null) { - - SynchronousCallback synchronousCallback = - (SynchronousCallback) callBackMap.get(replyToMessageID); - synchronousCallback.setInMessageContext(msgCtx); - callBackMap.remove(replyToMessageID); - } else { - AxisEngine.receive(msgCtx); - } - - } catch (AxisFault e) { - if (log.isDebugEnabled()) { - log.debug("Error receiving message", e); - } - if (msgCtx.isServerSide()) { - AxisEngine.sendFault(MessageContextBuilder.createFaultMessageContext(msgCtx, e)); - } - } - } - - protected void handleException(String msg, Exception e) throws AxisFault { - log.error(msg, e); - throw new AxisFault(msg, e); - } - - protected void logException(String msg, Exception e) { - log.error(msg, e); - } - - public String getTransportName() { - return transportIn.getName(); - } - - public MetricsCollector getMetricsCollector() { - return metrics; - } - - // -- jmx/management methods-- - /** - * Pause the listener - Stop accepting/processing new messages, but continues processing existing - * messages until they complete. This helps bring an instance into a maintenence mode - * @throws AxisFault on error - */ - public void pause() throws AxisFault {} - /** - * Resume the lister - Brings the lister into active mode back from a paused state - * @throws AxisFault on error - */ - public void resume() throws AxisFault {} - - /** - * Stop processing new messages, and wait the specified maximum time for in-flight - * requests to complete before a controlled shutdown for maintenence - * - * @param millis a number of milliseconds to wait until pending requests are allowed to complete - * @throws AxisFault on error - */ - public void maintenenceShutdown(long millis) throws AxisFault {} - - /** - * Returns the number of active threads processing messages - * @return number of active threads processing messages - */ - public int getActiveThreadCount() { - return workerPool.getActiveCount(); - } - - /** - * Return the number of requests queued in the thread pool - * @return queue size - */ - public int getQueueSize() { - return workerPool.getQueueSize(); - } - - public long getMessagesReceived() { - if (metrics != null) { - return metrics.getMessagesReceived(); - } - return -1; - } - - public long getFaultsReceiving() { - if (metrics != null) { - return metrics.getFaultsReceiving(); - } - return -1; - } - - public long getBytesReceived() { - if (metrics != null) { - return metrics.getBytesReceived(); - } - return -1; - } - - public long getMessagesSent() { - if (metrics != null) { - return metrics.getMessagesSent(); - } - return -1; - } - - public long getFaultsSending() { - if (metrics != null) { - return metrics.getFaultsSending(); - } - return -1; - } - - public long getBytesSent() { - if (metrics != null) { - return metrics.getBytesSent(); - } - return -1; - } - - public long getTimeoutsReceiving() { - if (metrics != null) { - return metrics.getTimeoutsReceiving(); - } - return -1; - } - - public long getTimeoutsSending() { - if (metrics != null) { - return metrics.getTimeoutsSending(); - } - return -1; - } - - public long getMinSizeReceived() { - if (metrics != null) { - return metrics.getMinSizeReceived(); - } - return -1; - } - - public long getMaxSizeReceived() { - if (metrics != null) { - return metrics.getMaxSizeReceived(); - } - return -1; - } - - public double getAvgSizeReceived() { - if (metrics != null) { - return metrics.getAvgSizeReceived(); - } - return -1; - } - - public long getMinSizeSent() { - if (metrics != null) { - return metrics.getMinSizeSent(); - } - return -1; - } - - public long getMaxSizeSent() { - if (metrics != null) { - return metrics.getMaxSizeSent(); - } - return -1; - } - - public double getAvgSizeSent() { - if (metrics != null) { - return metrics.getAvgSizeSent(); - } - return -1; - } - - public Map getResponseCodeTable() { - if (metrics != null) { - return metrics.getResponseCodeTable(); - } - return null; - } - - public void resetStatistics() { - if (metrics != null) { - metrics.reset(); - } - } - - public long getLastResetTime() { - if (metrics != null) { - return metrics.getLastResetTime(); - } - return -1; - } - - public long getMetricsWindow() { - if (metrics != null) { - return System.currentTimeMillis() - metrics.getLastResetTime(); - } - return -1; - } - - private String getEndpointMBeanName(String serviceName) { - return mbeanSupport.getMBeanName() + ",Group=Services,Service=" + serviceName; - } - - /** - * Utility method to allow transports to register MBeans - * @param mbeanInstance bean instance - * @param objectName name - */ - private void registerMBean(Object mbeanInstance, String objectName) { - try { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - ObjectName name = new ObjectName(objectName); - Set set = mbs.queryNames(name, null); - if (set != null && set.isEmpty()) { - mbs.registerMBean(mbeanInstance, name); - } else { - mbs.unregisterMBean(name); - mbs.registerMBean(mbeanInstance, name); - } - } catch (Exception e) { - log.warn("Error registering a MBean with objectname ' " + objectName + - " ' for JMX management", e); - } - } - - private void unregisterMBean(String objectName) { - try { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - ObjectName objName = new ObjectName(objectName); - if (mbs.isRegistered(objName)) { - mbs.unregisterMBean(objName); - } - } catch (Exception e) { - log.warn("Error un-registering a MBean with objectname ' " + objectName + - " ' for JMX management", e); - } - } -} |