summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java550
1 files changed, 550 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java
new file mode 100644
index 0000000000..440e09dc84
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java
@@ -0,0 +1,550 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.axiom.om.util.UUIDGenerator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.SessionContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPoolFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceFilter;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceTracker;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceTrackerListener;
+
+public abstract class AbstractTransportListener implements TransportListener {
+
+ /** the reference to the actual commons logger to be used for log messages */
+ protected Log log = null;
+
+ /** the axis2 configuration context */
+ protected ConfigurationContext cfgCtx = null;
+
+ /** transport in description */
+ private TransportInDescription transportIn = null;
+ /** transport out description */
+ private TransportOutDescription transportOut = null;
+ /** state of the listener */
+ protected int state = BaseConstants.STOPPED;
+ /** is this transport non-blocking? */
+ protected boolean isNonBlocking = false;
+ /**
+ * Service tracker used to invoke {@link #internalStartListeningForService(AxisService)}
+ * and {@link #internalStopListeningForService(AxisService)}. */
+ private AxisServiceTracker serviceTracker;
+
+ /** the thread pool to execute actual poll invocations */
+ protected WorkerPool workerPool = null;
+ /** use the thread pool available in the axis2 configuration context */
+ protected boolean useAxis2ThreadPool = false;
+ /** JMX support */
+ private TransportMBeanSupport mbeanSupport;
+ /** Metrics collector for this transport */
+ protected MetricsCollector metrics = new MetricsCollector();
+
+ /**
+ * A constructor that makes subclasses pick up the correct logger
+ */
+ protected AbstractTransportListener() {
+ log = LogFactory.getLog(this.getClass());
+ }
+
+ /**
+ * Initialize the generic transport. Sets up the transport and the thread pool to be used
+ * for message processing. Also creates an AxisObserver that gets notified of service
+ * life cycle events for the transport to act on
+ * @param cfgCtx the axis configuration context
+ * @param transportIn the transport-in description
+ * @throws AxisFault on error
+ */
+ public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn)
+ throws AxisFault {
+
+ this.cfgCtx = cfgCtx;
+ this.transportIn = transportIn;
+ this.transportOut = cfgCtx.getAxisConfiguration().getTransportOut(getTransportName());
+
+ if (useAxis2ThreadPool) {
+ //this.workerPool = cfgCtx.getThreadPool(); not yet implemented
+ throw new AxisFault("Unsupported thread pool for task execution - Axis2 thread pool");
+ } else {
+ this.workerPool = WorkerPoolFactory.getWorkerPool(
+ 10, 20, 5, -1, getTransportName() + "Server Worker thread group", getTransportName() + "-Worker");
+ }
+
+ // register to receive updates on services for lifetime management
+ serviceTracker = new AxisServiceTracker(
+ cfgCtx.getAxisConfiguration(),
+ new AxisServiceFilter() {
+ public boolean matches(AxisService service) {
+ return !service.getName().startsWith("__") // these are "private" services
+ && BaseUtils.isUsingTransport(service, getTransportName());
+ }
+ },
+ new AxisServiceTrackerListener() {
+ public void serviceAdded(AxisService service) {
+ internalStartListeningForService(service);
+ }
+
+ public void serviceRemoved(AxisService service) {
+ internalStopListeningForService(service);
+ }
+ });
+
+ // register with JMX
+ mbeanSupport = new TransportMBeanSupport(this, getTransportName());
+ mbeanSupport.register();
+ }
+
+ public void destroy() {
+ try {
+ if (state == BaseConstants.STARTED) {
+ try {
+ stop();
+ } catch (AxisFault ignore) {
+ log.warn("Error stopping the transport : " + getTransportName());
+ }
+ }
+ } finally {
+ state = BaseConstants.STOPPED;
+ mbeanSupport.unregister();
+ }
+ try {
+ workerPool.shutdown(10000);
+ } catch (InterruptedException ex) {
+ log.warn("Thread interrupted while waiting for worker pool to shut down");
+ }
+ }
+
+ public void stop() throws AxisFault {
+ if (state == BaseConstants.STARTED) {
+ state = BaseConstants.STOPPED;
+ // cancel receipt of service lifecycle events
+ log.info(getTransportName().toUpperCase() + " Listener Shutdown");
+ serviceTracker.stop();
+ }
+ }
+
+ public void start() throws AxisFault {
+ if (state != BaseConstants.STARTED) {
+ state = BaseConstants.STARTED;
+ // register to receive updates on services for lifetime management
+ // cfgCtx.getAxisConfiguration().addObservers(axisObserver);
+ log.info(getTransportName().toUpperCase() + " Listener started");
+ // iterate through deployed services and start
+ serviceTracker.start();
+ }
+ }
+
+ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+ return getEPRsForService(serviceName);
+ }
+
+ protected EndpointReference[] getEPRsForService(String serviceName) {
+ return null;
+ }
+
+ public void disableTransportForService(AxisService service) {
+
+ log.warn("Disabling the " + getTransportName() + " transport for the service "
+ + service.getName() + ", because it is not configured properly for the service");
+
+ if (service.isEnableAllTransports()) {
+ ArrayList<String> exposedTransports = new ArrayList<String>();
+ for(Object obj: cfgCtx.getAxisConfiguration().getTransportsIn().values()) {
+ String transportName = ((TransportInDescription) obj).getName();
+ if (!transportName.equals(getTransportName())) {
+ exposedTransports.add(transportName);
+ }
+ }
+ service.setEnableAllTransports(false);
+ service.setExposedTransports(exposedTransports);
+ } else {
+ service.removeExposedTransport(getTransportName());
+ }
+ }
+
+ void internalStartListeningForService(AxisService service) {
+ String serviceName = service.getName();
+ try {
+ startListeningForService(service);
+ } catch (AxisFault ex) {
+ String transportName = getTransportName().toUpperCase();
+ String msg = "Unable to configure the service " + serviceName + " for the " +
+ transportName + " transport: " + ex.getMessage() + ". " +
+ "This service is being marked as faulty and will not be available over the " +
+ transportName + " transport.";
+ // Only log the message at level WARN and log the full stack trace at level DEBUG.
+ // TODO: We should have a way to distinguish a missing configuration
+ // from an error. This may be addressed when implementing the enhancement
+ // described in point 3 of http://markmail.org/message/umhenrurlrekk5jh
+ log.warn(msg);
+ log.debug("Disabling service " + serviceName + " for the " + transportName +
+ "transport", ex);
+ BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration());
+ disableTransportForService(service);
+ return;
+ } catch (Throwable ex) {
+ String msg = "Unexpected error when configuring service " + serviceName +
+ " for the " + getTransportName().toUpperCase() + " transport. It will be" +
+ " disabled for this transport and marked as faulty.";
+ log.error(msg, ex);
+ BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration());
+ disableTransportForService(service);
+ return;
+ }
+ registerMBean(new TransportListenerEndpointView(this, serviceName),
+ getEndpointMBeanName(serviceName));
+ }
+
+ void internalStopListeningForService(AxisService service) {
+ unregisterMBean(getEndpointMBeanName(service.getName()));
+ stopListeningForService(service);
+ }
+
+ protected abstract void startListeningForService(AxisService service) throws AxisFault;
+
+ protected abstract void stopListeningForService(AxisService service);
+
+ /**
+ * This is a deprecated method in Axis2 and this default implementation returns the first
+ * result from the getEPRsForService() method
+ */
+ public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault {
+ return getEPRsForService(serviceName, ip)[0];
+ }
+
+ public SessionContext getSessionContext(MessageContext messageContext) {
+ return null;
+ }
+
+ /**
+ * Create a new axis MessageContext for an incoming message through this transport
+ * @return the newly created message context
+ */
+ public MessageContext createMessageContext() {
+ MessageContext msgCtx = new MessageContext();
+ msgCtx.setConfigurationContext(cfgCtx);
+
+ msgCtx.setIncomingTransportName(getTransportName());
+ msgCtx.setTransportOut(transportOut);
+ msgCtx.setTransportIn(transportIn);
+ msgCtx.setServerSide(true);
+ msgCtx.setMessageID(UUIDGenerator.getUUID());
+
+ // There is a discrepency in what I thought, Axis2 spawns a nes threads to
+ // send a message is this is TRUE - and I want it to be the other way
+ msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.valueOf(!isNonBlocking));
+
+ // are these relevant?
+ //msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID());
+ // this is required to support Sandesha 2
+ //msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
+ // new HttpCoreRequestResponseTransport(msgContext));
+
+ return msgCtx;
+ }
+
+ /**
+ * Process a new incoming message through the axis engine
+ * @param msgCtx the axis MessageContext
+ * @param trpHeaders the map containing transport level message headers
+ * @param soapAction the optional soap action or null
+ * @param contentType the optional content-type for the message
+ */
+ public void handleIncomingMessage(
+ MessageContext msgCtx, Map trpHeaders,
+ String soapAction, String contentType) throws AxisFault {
+
+ // set the soapaction if one is available via a transport header
+ if (soapAction != null) {
+ msgCtx.setSoapAction(soapAction);
+ }
+
+ // set the transport headers to the message context
+ msgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, trpHeaders);
+
+ // send the message context through the axis engine
+ try {
+ // check if an Axis2 callback has been registered for this message
+ Map callBackMap = (Map) msgCtx.getConfigurationContext().
+ getProperty(BaseConstants.CALLBACK_TABLE);
+ // FIXME: transport headers are protocol specific; the correlation ID should be
+ // passed as argument to this method
+ Object replyToMessageID = trpHeaders.get(BaseConstants.HEADER_IN_REPLY_TO);
+ // if there is a callback registerd with this replyto ID then this has to
+ // be handled as a synchronous incoming message, via the
+ if (replyToMessageID != null && callBackMap != null &&
+ callBackMap.get(replyToMessageID) != null) {
+
+ SynchronousCallback synchronousCallback =
+ (SynchronousCallback) callBackMap.get(replyToMessageID);
+ synchronousCallback.setInMessageContext(msgCtx);
+ callBackMap.remove(replyToMessageID);
+ } else {
+ AxisEngine.receive(msgCtx);
+ }
+
+ } catch (AxisFault e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Error receiving message", e);
+ }
+ if (msgCtx.isServerSide()) {
+ AxisEngine.sendFault(MessageContextBuilder.createFaultMessageContext(msgCtx, e));
+ }
+ }
+ }
+
+ protected void handleException(String msg, Exception e) throws AxisFault {
+ log.error(msg, e);
+ throw new AxisFault(msg, e);
+ }
+
+ protected void logException(String msg, Exception e) {
+ log.error(msg, e);
+ }
+
+ public String getTransportName() {
+ return transportIn.getName();
+ }
+
+ public MetricsCollector getMetricsCollector() {
+ return metrics;
+ }
+
+ // -- jmx/management methods--
+ /**
+ * Pause the listener - Stop accepting/processing new messages, but continues processing existing
+ * messages until they complete. This helps bring an instance into a maintenence mode
+ * @throws AxisFault on error
+ */
+ public void pause() throws AxisFault {}
+ /**
+ * Resume the lister - Brings the lister into active mode back from a paused state
+ * @throws AxisFault on error
+ */
+ public void resume() throws AxisFault {}
+
+ /**
+ * Stop processing new messages, and wait the specified maximum time for in-flight
+ * requests to complete before a controlled shutdown for maintenence
+ *
+ * @param millis a number of milliseconds to wait until pending requests are allowed to complete
+ * @throws AxisFault on error
+ */
+ public void maintenenceShutdown(long millis) throws AxisFault {}
+
+ /**
+ * Returns the number of active threads processing messages
+ * @return number of active threads processing messages
+ */
+ public int getActiveThreadCount() {
+ return workerPool.getActiveCount();
+ }
+
+ /**
+ * Return the number of requests queued in the thread pool
+ * @return queue size
+ */
+ public int getQueueSize() {
+ return workerPool.getQueueSize();
+ }
+
+ public long getMessagesReceived() {
+ if (metrics != null) {
+ return metrics.getMessagesReceived();
+ }
+ return -1;
+ }
+
+ public long getFaultsReceiving() {
+ if (metrics != null) {
+ return metrics.getFaultsReceiving();
+ }
+ return -1;
+ }
+
+ public long getBytesReceived() {
+ if (metrics != null) {
+ return metrics.getBytesReceived();
+ }
+ return -1;
+ }
+
+ public long getMessagesSent() {
+ if (metrics != null) {
+ return metrics.getMessagesSent();
+ }
+ return -1;
+ }
+
+ public long getFaultsSending() {
+ if (metrics != null) {
+ return metrics.getFaultsSending();
+ }
+ return -1;
+ }
+
+ public long getBytesSent() {
+ if (metrics != null) {
+ return metrics.getBytesSent();
+ }
+ return -1;
+ }
+
+ public long getTimeoutsReceiving() {
+ if (metrics != null) {
+ return metrics.getTimeoutsReceiving();
+ }
+ return -1;
+ }
+
+ public long getTimeoutsSending() {
+ if (metrics != null) {
+ return metrics.getTimeoutsSending();
+ }
+ return -1;
+ }
+
+ public long getMinSizeReceived() {
+ if (metrics != null) {
+ return metrics.getMinSizeReceived();
+ }
+ return -1;
+ }
+
+ public long getMaxSizeReceived() {
+ if (metrics != null) {
+ return metrics.getMaxSizeReceived();
+ }
+ return -1;
+ }
+
+ public double getAvgSizeReceived() {
+ if (metrics != null) {
+ return metrics.getAvgSizeReceived();
+ }
+ return -1;
+ }
+
+ public long getMinSizeSent() {
+ if (metrics != null) {
+ return metrics.getMinSizeSent();
+ }
+ return -1;
+ }
+
+ public long getMaxSizeSent() {
+ if (metrics != null) {
+ return metrics.getMaxSizeSent();
+ }
+ return -1;
+ }
+
+ public double getAvgSizeSent() {
+ if (metrics != null) {
+ return metrics.getAvgSizeSent();
+ }
+ return -1;
+ }
+
+ public Map getResponseCodeTable() {
+ if (metrics != null) {
+ return metrics.getResponseCodeTable();
+ }
+ return null;
+ }
+
+ public void resetStatistics() {
+ if (metrics != null) {
+ metrics.reset();
+ }
+ }
+
+ public long getLastResetTime() {
+ if (metrics != null) {
+ return metrics.getLastResetTime();
+ }
+ return -1;
+ }
+
+ public long getMetricsWindow() {
+ if (metrics != null) {
+ return System.currentTimeMillis() - metrics.getLastResetTime();
+ }
+ return -1;
+ }
+
+ private String getEndpointMBeanName(String serviceName) {
+ return mbeanSupport.getMBeanName() + ",Group=Services,Service=" + serviceName;
+ }
+
+ /**
+ * Utility method to allow transports to register MBeans
+ * @param mbeanInstance bean instance
+ * @param objectName name
+ */
+ private void registerMBean(Object mbeanInstance, String objectName) {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName name = new ObjectName(objectName);
+ Set set = mbs.queryNames(name, null);
+ if (set != null && set.isEmpty()) {
+ mbs.registerMBean(mbeanInstance, name);
+ } else {
+ mbs.unregisterMBean(name);
+ mbs.registerMBean(mbeanInstance, name);
+ }
+ } catch (Exception e) {
+ log.warn("Error registering a MBean with objectname ' " + objectName +
+ " ' for JMX management", e);
+ }
+ }
+
+ private void unregisterMBean(String objectName) {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName objName = new ObjectName(objectName);
+ if (mbs.isRegistered(objName)) {
+ mbs.unregisterMBean(objName);
+ }
+ } catch (Exception e) {
+ log.warn("Error un-registering a MBean with objectname ' " + objectName +
+ " ' for JMX management", e);
+ }
+ }
+}