summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollTableEntry.java100
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java267
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java550
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportSender.java419
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseConstants.java135
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseTransportException.java35
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseUtils.java229
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ManagementSupport.java51
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MessageLevelMetricsCollector.java49
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MetricsCollector.java315
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ParamUtils.java107
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/SynchronousCallback.java109
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointView.java61
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointViewMBean.java23
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportMBeanSupport.java115
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportView.java264
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportViewMBean.java54
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/AbstractDatagramTransportListener.java126
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcher.java27
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcherCallback.java23
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramEndpoint.java72
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/ProcessPacketTask.java68
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/Utils.java63
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/package-info.java30
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportError.java46
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorListener.java24
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSource.java25
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSourceSupport.java51
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/ReaderInputStream.java229
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/WriterOutputStream.java257
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeThreadFactory.java53
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeWorkerPool.java79
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPool.java49
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPoolFactory.java34
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceFilter.java36
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTracker.java245
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTrackerListener.java41
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/package-info.java26
38 files changed, 4487 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollTableEntry.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollTableEntry.java
new file mode 100644
index 0000000000..fa64f08a25
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollTableEntry.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.util.TimerTask;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.description.AxisService;
+
+public abstract class AbstractPollTableEntry {
+ // status of last scan
+ public static final int SUCCSESSFUL = 0;
+ public static final int WITH_ERRORS = 1;
+ public static final int FAILED = 2;
+ public static final int NONE = 3;
+
+ /** Axis2 service */
+ private AxisService service;
+ /** next poll time */
+ private long nextPollTime;
+ /** last poll performed at */
+ private long lastPollTime;
+ /** duration in ms between successive polls */
+ private long pollInterval;
+ /** state of the last poll */
+ private int lastPollState;
+ /** can polling occur in parallel? */
+ private boolean concurrentPollingAllowed = false;
+ /** The timer task that will trigger the next poll */
+ TimerTask timerTask;
+ /** Flag indicating whether polling has been canceled. */
+ boolean canceled;
+
+ public AxisService getService() {
+ return service;
+ }
+
+ void setService(AxisService service) {
+ this.service = service;
+ }
+
+ public abstract EndpointReference getEndpointReference();
+
+ public long getNextPollTime() {
+ return nextPollTime;
+ }
+
+ public void setNextPollTime(long nextPollTime) {
+ this.nextPollTime = nextPollTime;
+ }
+
+ public long getLastPollTime() {
+ return lastPollTime;
+ }
+
+ public void setLastPollTime(long lastPollTime) {
+ this.lastPollTime = lastPollTime;
+ }
+
+ public long getPollInterval() {
+ return pollInterval;
+ }
+
+ public void setPollInterval(long pollInterval) {
+ this.pollInterval = pollInterval;
+ }
+
+ public int getLastPollState() {
+ return lastPollState;
+ }
+
+ public void setLastPollState(int lastPollState) {
+ this.lastPollState = lastPollState;
+ }
+
+ public boolean isConcurrentPollingAllowed() {
+ return concurrentPollingAllowed;
+ }
+
+ public void setConcurrentPollingAllowed(boolean concurrentPollingAllowed) {
+ this.concurrentPollingAllowed = concurrentPollingAllowed;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
new file mode 100644
index 0000000000..0ee9d92443
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
@@ -0,0 +1,267 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterInclude;
+import org.apache.axis2.description.TransportInDescription;
+
+public abstract class AbstractPollingTransportListener<T extends AbstractPollTableEntry>
+ extends AbstractTransportListener {
+
+ /** The main timer. */
+ private Timer timer;
+ /** Keep the list of endpoints and poll durations */
+ private final List<T> pollTable = new ArrayList<T>();
+
+ @Override
+ public void init(ConfigurationContext cfgCtx,
+ TransportInDescription transportIn) throws AxisFault {
+
+ timer = new Timer("PollTimer");
+ super.init(cfgCtx, transportIn);
+ T entry = createPollTableEntry(transportIn);
+ if (entry != null) {
+ entry.setPollInterval(getPollInterval(transportIn));
+ schedulePoll(entry);
+ pollTable.add(entry);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ // Explicitly cancel all polls not predispatched to services. All other polls will
+ // be canceled by stopListeningForService. Pay attention to the fact the cancelPoll
+ // modifies pollTable.
+ List<T> entriesToCancel = new ArrayList<T>();
+ for (T entry : pollTable) {
+ if (entry.getService() == null) {
+ entriesToCancel.add(entry);
+ }
+ }
+ for (T entry : entriesToCancel) {
+ cancelPoll(entry);
+ }
+
+ super.destroy();
+ timer.cancel();
+ timer = null;
+ }
+
+ /**
+ * Schedule a repeated poll at the specified interval for a given service.
+ * The method will schedule a single-shot timer task with executes a work
+ * task on the worker pool. At the end of this work task, a new timer task
+ * is scheduled for the next poll (except if the polling for the service
+ * has been canceled). This effectively schedules the poll repeatedly
+ * with fixed delay.
+ * @param entry the poll table entry with the configuration for the service
+ * @param pollInterval the interval between successive polls in milliseconds
+ */
+ void schedulePoll(final T entry) {
+ final long pollInterval = entry.getPollInterval();
+ TimerTask timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ workerPool.execute(new Runnable() {
+ public void run() {
+ if (state == BaseConstants.PAUSED) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transport " + getTransportName() +
+ " poll trigger : Transport is currently paused..");
+ }
+ } else {
+ poll(entry);
+ }
+ }
+ });
+ }
+ };
+ entry.timerTask = timerTask;
+ if (entry.isConcurrentPollingAllowed()) {
+ timer.scheduleAtFixedRate(timerTask, pollInterval, pollInterval);
+ } else {
+ timer.schedule(timerTask, pollInterval);
+ }
+ }
+
+ private void cancelPoll(T entry) {
+ synchronized (entry) {
+ entry.timerTask.cancel();
+ entry.canceled = true;
+ }
+ pollTable.remove(entry);
+ }
+
+ protected abstract void poll(T entry);
+
+ protected void onPollCompletion(T entry) {
+ if (!entry.isConcurrentPollingAllowed()) {
+ synchronized (entry) {
+ if (!entry.canceled) {
+ schedulePoll(entry);
+ }
+ }
+ }
+ }
+
+ /**
+ * method to log a failure to the log file and to update the last poll status and time
+ * @param msg text for the log message
+ * @param e optional exception encountered or null
+ * @param entry the PollTableEntry
+ */
+ protected void processFailure(String msg, Exception e, T entry) {
+ if (e == null) {
+ log.error(msg);
+ } else {
+ log.error(msg, e);
+ }
+ long now = System.currentTimeMillis();
+ entry.setLastPollState(AbstractPollTableEntry.FAILED);
+ entry.setLastPollTime(now);
+ entry.setNextPollTime(now + entry.getPollInterval());
+ onPollCompletion(entry);
+ }
+
+ private long getPollInterval(ParameterInclude params) {
+ Parameter param = params.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
+ long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
+ if (param != null && param.getValue() instanceof String) {
+ String s = (String)param.getValue();
+ int multiplier;
+ if (s.endsWith("ms")) {
+ s = s.substring(0, s.length()-2);
+ multiplier = 1;
+ } else {
+ multiplier = 1000;
+ }
+ try {
+ pollInterval = Integer.parseInt(s) * multiplier;
+ } catch (NumberFormatException e) {
+ log.error("Invalid poll interval : " + param.getValue() + ", default to : "
+ + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e);
+ }
+ }
+ return pollInterval;
+ }
+
+ @Override
+ protected void startListeningForService(AxisService service) throws AxisFault {
+ T entry = createPollTableEntry(service);
+ if (entry == null) {
+ throw new AxisFault("The service has no configuration for the transport");
+ }
+ entry.setService(service);
+ entry.setPollInterval(getPollInterval(service));
+ schedulePoll(entry);
+ pollTable.add(entry);
+ }
+
+ /**
+ * Create a poll table entry based on the provided parameters.
+ * If no relevant parameters are found, the implementation should
+ * return null. An exception should only be thrown if there is an
+ * error or inconsistency in the parameters.
+ *
+ * @param params The source of the parameters to construct the
+ * poll table entry. If the parameters were defined on
+ * a service, this will be an {@link AxisService}
+ * instance.
+ * @return
+ */
+ protected abstract T createPollTableEntry(ParameterInclude params) throws AxisFault;
+
+ /**
+ * Get the EPR for the given service
+ *
+ * @param serviceName service name
+ * @param ip ignored
+ * @return the EPR for the service
+ * @throws AxisFault not used
+ */
+ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+ for (T entry : pollTable) {
+ AxisService service = entry.getService();
+ if (service != null) {
+ String candidateName = service.getName();
+ if (candidateName.equals(serviceName) ||
+ serviceName.startsWith(candidateName + ".")) {
+ return new EndpointReference[]{ entry.getEndpointReference() };
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void stopListeningForService(AxisService service) {
+ for (T entry : pollTable) {
+ if (service == entry.getService()) {
+ cancelPoll(entry);
+ break;
+ }
+ }
+ }
+
+ // -- jmx/management methods--
+ /**
+ * Pause the listener - Stop accepting/processing new messages, but continues processing existing
+ * messages until they complete. This helps bring an instance into a maintenence mode
+ * @throws org.apache.axis2.AxisFault on error
+ */
+ public void pause() throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ state = BaseConstants.PAUSED;
+ log.info("Listener paused");
+ }
+
+ /**
+ * Resume the lister - Brings the lister into active mode back from a paused state
+ * @throws AxisFault on error
+ */
+ public void resume() throws AxisFault {
+ if (state != BaseConstants.PAUSED) return;
+ state = BaseConstants.STARTED;
+ log.info("Listener resumed");
+ }
+
+ /**
+ * Stop processing new messages, and wait the specified maximum time for in-flight
+ * requests to complete before a controlled shutdown for maintenence
+ *
+ * @param millis a number of milliseconds to wait until pending requests are allowed to complete
+ * @throws AxisFault on error
+ */
+ public void maintenenceShutdown(long millis) throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ stop();
+ state = BaseConstants.STOPPED;
+ log.info("Listener shutdown");
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java
new file mode 100644
index 0000000000..440e09dc84
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java
@@ -0,0 +1,550 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.axiom.om.util.UUIDGenerator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.SessionContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPoolFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceFilter;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceTracker;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceTrackerListener;
+
+public abstract class AbstractTransportListener implements TransportListener {
+
+ /** the reference to the actual commons logger to be used for log messages */
+ protected Log log = null;
+
+ /** the axis2 configuration context */
+ protected ConfigurationContext cfgCtx = null;
+
+ /** transport in description */
+ private TransportInDescription transportIn = null;
+ /** transport out description */
+ private TransportOutDescription transportOut = null;
+ /** state of the listener */
+ protected int state = BaseConstants.STOPPED;
+ /** is this transport non-blocking? */
+ protected boolean isNonBlocking = false;
+ /**
+ * Service tracker used to invoke {@link #internalStartListeningForService(AxisService)}
+ * and {@link #internalStopListeningForService(AxisService)}. */
+ private AxisServiceTracker serviceTracker;
+
+ /** the thread pool to execute actual poll invocations */
+ protected WorkerPool workerPool = null;
+ /** use the thread pool available in the axis2 configuration context */
+ protected boolean useAxis2ThreadPool = false;
+ /** JMX support */
+ private TransportMBeanSupport mbeanSupport;
+ /** Metrics collector for this transport */
+ protected MetricsCollector metrics = new MetricsCollector();
+
+ /**
+ * A constructor that makes subclasses pick up the correct logger
+ */
+ protected AbstractTransportListener() {
+ log = LogFactory.getLog(this.getClass());
+ }
+
+ /**
+ * Initialize the generic transport. Sets up the transport and the thread pool to be used
+ * for message processing. Also creates an AxisObserver that gets notified of service
+ * life cycle events for the transport to act on
+ * @param cfgCtx the axis configuration context
+ * @param transportIn the transport-in description
+ * @throws AxisFault on error
+ */
+ public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn)
+ throws AxisFault {
+
+ this.cfgCtx = cfgCtx;
+ this.transportIn = transportIn;
+ this.transportOut = cfgCtx.getAxisConfiguration().getTransportOut(getTransportName());
+
+ if (useAxis2ThreadPool) {
+ //this.workerPool = cfgCtx.getThreadPool(); not yet implemented
+ throw new AxisFault("Unsupported thread pool for task execution - Axis2 thread pool");
+ } else {
+ this.workerPool = WorkerPoolFactory.getWorkerPool(
+ 10, 20, 5, -1, getTransportName() + "Server Worker thread group", getTransportName() + "-Worker");
+ }
+
+ // register to receive updates on services for lifetime management
+ serviceTracker = new AxisServiceTracker(
+ cfgCtx.getAxisConfiguration(),
+ new AxisServiceFilter() {
+ public boolean matches(AxisService service) {
+ return !service.getName().startsWith("__") // these are "private" services
+ && BaseUtils.isUsingTransport(service, getTransportName());
+ }
+ },
+ new AxisServiceTrackerListener() {
+ public void serviceAdded(AxisService service) {
+ internalStartListeningForService(service);
+ }
+
+ public void serviceRemoved(AxisService service) {
+ internalStopListeningForService(service);
+ }
+ });
+
+ // register with JMX
+ mbeanSupport = new TransportMBeanSupport(this, getTransportName());
+ mbeanSupport.register();
+ }
+
+ public void destroy() {
+ try {
+ if (state == BaseConstants.STARTED) {
+ try {
+ stop();
+ } catch (AxisFault ignore) {
+ log.warn("Error stopping the transport : " + getTransportName());
+ }
+ }
+ } finally {
+ state = BaseConstants.STOPPED;
+ mbeanSupport.unregister();
+ }
+ try {
+ workerPool.shutdown(10000);
+ } catch (InterruptedException ex) {
+ log.warn("Thread interrupted while waiting for worker pool to shut down");
+ }
+ }
+
+ public void stop() throws AxisFault {
+ if (state == BaseConstants.STARTED) {
+ state = BaseConstants.STOPPED;
+ // cancel receipt of service lifecycle events
+ log.info(getTransportName().toUpperCase() + " Listener Shutdown");
+ serviceTracker.stop();
+ }
+ }
+
+ public void start() throws AxisFault {
+ if (state != BaseConstants.STARTED) {
+ state = BaseConstants.STARTED;
+ // register to receive updates on services for lifetime management
+ // cfgCtx.getAxisConfiguration().addObservers(axisObserver);
+ log.info(getTransportName().toUpperCase() + " Listener started");
+ // iterate through deployed services and start
+ serviceTracker.start();
+ }
+ }
+
+ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+ return getEPRsForService(serviceName);
+ }
+
+ protected EndpointReference[] getEPRsForService(String serviceName) {
+ return null;
+ }
+
+ public void disableTransportForService(AxisService service) {
+
+ log.warn("Disabling the " + getTransportName() + " transport for the service "
+ + service.getName() + ", because it is not configured properly for the service");
+
+ if (service.isEnableAllTransports()) {
+ ArrayList<String> exposedTransports = new ArrayList<String>();
+ for(Object obj: cfgCtx.getAxisConfiguration().getTransportsIn().values()) {
+ String transportName = ((TransportInDescription) obj).getName();
+ if (!transportName.equals(getTransportName())) {
+ exposedTransports.add(transportName);
+ }
+ }
+ service.setEnableAllTransports(false);
+ service.setExposedTransports(exposedTransports);
+ } else {
+ service.removeExposedTransport(getTransportName());
+ }
+ }
+
+ void internalStartListeningForService(AxisService service) {
+ String serviceName = service.getName();
+ try {
+ startListeningForService(service);
+ } catch (AxisFault ex) {
+ String transportName = getTransportName().toUpperCase();
+ String msg = "Unable to configure the service " + serviceName + " for the " +
+ transportName + " transport: " + ex.getMessage() + ". " +
+ "This service is being marked as faulty and will not be available over the " +
+ transportName + " transport.";
+ // Only log the message at level WARN and log the full stack trace at level DEBUG.
+ // TODO: We should have a way to distinguish a missing configuration
+ // from an error. This may be addressed when implementing the enhancement
+ // described in point 3 of http://markmail.org/message/umhenrurlrekk5jh
+ log.warn(msg);
+ log.debug("Disabling service " + serviceName + " for the " + transportName +
+ "transport", ex);
+ BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration());
+ disableTransportForService(service);
+ return;
+ } catch (Throwable ex) {
+ String msg = "Unexpected error when configuring service " + serviceName +
+ " for the " + getTransportName().toUpperCase() + " transport. It will be" +
+ " disabled for this transport and marked as faulty.";
+ log.error(msg, ex);
+ BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration());
+ disableTransportForService(service);
+ return;
+ }
+ registerMBean(new TransportListenerEndpointView(this, serviceName),
+ getEndpointMBeanName(serviceName));
+ }
+
+ void internalStopListeningForService(AxisService service) {
+ unregisterMBean(getEndpointMBeanName(service.getName()));
+ stopListeningForService(service);
+ }
+
+ protected abstract void startListeningForService(AxisService service) throws AxisFault;
+
+ protected abstract void stopListeningForService(AxisService service);
+
+ /**
+ * This is a deprecated method in Axis2 and this default implementation returns the first
+ * result from the getEPRsForService() method
+ */
+ public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault {
+ return getEPRsForService(serviceName, ip)[0];
+ }
+
+ public SessionContext getSessionContext(MessageContext messageContext) {
+ return null;
+ }
+
+ /**
+ * Create a new axis MessageContext for an incoming message through this transport
+ * @return the newly created message context
+ */
+ public MessageContext createMessageContext() {
+ MessageContext msgCtx = new MessageContext();
+ msgCtx.setConfigurationContext(cfgCtx);
+
+ msgCtx.setIncomingTransportName(getTransportName());
+ msgCtx.setTransportOut(transportOut);
+ msgCtx.setTransportIn(transportIn);
+ msgCtx.setServerSide(true);
+ msgCtx.setMessageID(UUIDGenerator.getUUID());
+
+ // There is a discrepency in what I thought, Axis2 spawns a nes threads to
+ // send a message is this is TRUE - and I want it to be the other way
+ msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.valueOf(!isNonBlocking));
+
+ // are these relevant?
+ //msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID());
+ // this is required to support Sandesha 2
+ //msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
+ // new HttpCoreRequestResponseTransport(msgContext));
+
+ return msgCtx;
+ }
+
+ /**
+ * Process a new incoming message through the axis engine
+ * @param msgCtx the axis MessageContext
+ * @param trpHeaders the map containing transport level message headers
+ * @param soapAction the optional soap action or null
+ * @param contentType the optional content-type for the message
+ */
+ public void handleIncomingMessage(
+ MessageContext msgCtx, Map trpHeaders,
+ String soapAction, String contentType) throws AxisFault {
+
+ // set the soapaction if one is available via a transport header
+ if (soapAction != null) {
+ msgCtx.setSoapAction(soapAction);
+ }
+
+ // set the transport headers to the message context
+ msgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, trpHeaders);
+
+ // send the message context through the axis engine
+ try {
+ // check if an Axis2 callback has been registered for this message
+ Map callBackMap = (Map) msgCtx.getConfigurationContext().
+ getProperty(BaseConstants.CALLBACK_TABLE);
+ // FIXME: transport headers are protocol specific; the correlation ID should be
+ // passed as argument to this method
+ Object replyToMessageID = trpHeaders.get(BaseConstants.HEADER_IN_REPLY_TO);
+ // if there is a callback registerd with this replyto ID then this has to
+ // be handled as a synchronous incoming message, via the
+ if (replyToMessageID != null && callBackMap != null &&
+ callBackMap.get(replyToMessageID) != null) {
+
+ SynchronousCallback synchronousCallback =
+ (SynchronousCallback) callBackMap.get(replyToMessageID);
+ synchronousCallback.setInMessageContext(msgCtx);
+ callBackMap.remove(replyToMessageID);
+ } else {
+ AxisEngine.receive(msgCtx);
+ }
+
+ } catch (AxisFault e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Error receiving message", e);
+ }
+ if (msgCtx.isServerSide()) {
+ AxisEngine.sendFault(MessageContextBuilder.createFaultMessageContext(msgCtx, e));
+ }
+ }
+ }
+
+ protected void handleException(String msg, Exception e) throws AxisFault {
+ log.error(msg, e);
+ throw new AxisFault(msg, e);
+ }
+
+ protected void logException(String msg, Exception e) {
+ log.error(msg, e);
+ }
+
+ public String getTransportName() {
+ return transportIn.getName();
+ }
+
+ public MetricsCollector getMetricsCollector() {
+ return metrics;
+ }
+
+ // -- jmx/management methods--
+ /**
+ * Pause the listener - Stop accepting/processing new messages, but continues processing existing
+ * messages until they complete. This helps bring an instance into a maintenence mode
+ * @throws AxisFault on error
+ */
+ public void pause() throws AxisFault {}
+ /**
+ * Resume the lister - Brings the lister into active mode back from a paused state
+ * @throws AxisFault on error
+ */
+ public void resume() throws AxisFault {}
+
+ /**
+ * Stop processing new messages, and wait the specified maximum time for in-flight
+ * requests to complete before a controlled shutdown for maintenence
+ *
+ * @param millis a number of milliseconds to wait until pending requests are allowed to complete
+ * @throws AxisFault on error
+ */
+ public void maintenenceShutdown(long millis) throws AxisFault {}
+
+ /**
+ * Returns the number of active threads processing messages
+ * @return number of active threads processing messages
+ */
+ public int getActiveThreadCount() {
+ return workerPool.getActiveCount();
+ }
+
+ /**
+ * Return the number of requests queued in the thread pool
+ * @return queue size
+ */
+ public int getQueueSize() {
+ return workerPool.getQueueSize();
+ }
+
+ public long getMessagesReceived() {
+ if (metrics != null) {
+ return metrics.getMessagesReceived();
+ }
+ return -1;
+ }
+
+ public long getFaultsReceiving() {
+ if (metrics != null) {
+ return metrics.getFaultsReceiving();
+ }
+ return -1;
+ }
+
+ public long getBytesReceived() {
+ if (metrics != null) {
+ return metrics.getBytesReceived();
+ }
+ return -1;
+ }
+
+ public long getMessagesSent() {
+ if (metrics != null) {
+ return metrics.getMessagesSent();
+ }
+ return -1;
+ }
+
+ public long getFaultsSending() {
+ if (metrics != null) {
+ return metrics.getFaultsSending();
+ }
+ return -1;
+ }
+
+ public long getBytesSent() {
+ if (metrics != null) {
+ return metrics.getBytesSent();
+ }
+ return -1;
+ }
+
+ public long getTimeoutsReceiving() {
+ if (metrics != null) {
+ return metrics.getTimeoutsReceiving();
+ }
+ return -1;
+ }
+
+ public long getTimeoutsSending() {
+ if (metrics != null) {
+ return metrics.getTimeoutsSending();
+ }
+ return -1;
+ }
+
+ public long getMinSizeReceived() {
+ if (metrics != null) {
+ return metrics.getMinSizeReceived();
+ }
+ return -1;
+ }
+
+ public long getMaxSizeReceived() {
+ if (metrics != null) {
+ return metrics.getMaxSizeReceived();
+ }
+ return -1;
+ }
+
+ public double getAvgSizeReceived() {
+ if (metrics != null) {
+ return metrics.getAvgSizeReceived();
+ }
+ return -1;
+ }
+
+ public long getMinSizeSent() {
+ if (metrics != null) {
+ return metrics.getMinSizeSent();
+ }
+ return -1;
+ }
+
+ public long getMaxSizeSent() {
+ if (metrics != null) {
+ return metrics.getMaxSizeSent();
+ }
+ return -1;
+ }
+
+ public double getAvgSizeSent() {
+ if (metrics != null) {
+ return metrics.getAvgSizeSent();
+ }
+ return -1;
+ }
+
+ public Map getResponseCodeTable() {
+ if (metrics != null) {
+ return metrics.getResponseCodeTable();
+ }
+ return null;
+ }
+
+ public void resetStatistics() {
+ if (metrics != null) {
+ metrics.reset();
+ }
+ }
+
+ public long getLastResetTime() {
+ if (metrics != null) {
+ return metrics.getLastResetTime();
+ }
+ return -1;
+ }
+
+ public long getMetricsWindow() {
+ if (metrics != null) {
+ return System.currentTimeMillis() - metrics.getLastResetTime();
+ }
+ return -1;
+ }
+
+ private String getEndpointMBeanName(String serviceName) {
+ return mbeanSupport.getMBeanName() + ",Group=Services,Service=" + serviceName;
+ }
+
+ /**
+ * Utility method to allow transports to register MBeans
+ * @param mbeanInstance bean instance
+ * @param objectName name
+ */
+ private void registerMBean(Object mbeanInstance, String objectName) {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName name = new ObjectName(objectName);
+ Set set = mbs.queryNames(name, null);
+ if (set != null && set.isEmpty()) {
+ mbs.registerMBean(mbeanInstance, name);
+ } else {
+ mbs.unregisterMBean(name);
+ mbs.registerMBean(mbeanInstance, name);
+ }
+ } catch (Exception e) {
+ log.warn("Error registering a MBean with objectname ' " + objectName +
+ " ' for JMX management", e);
+ }
+ }
+
+ private void unregisterMBean(String objectName) {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName objName = new ObjectName(objectName);
+ if (mbs.isRegistered(objName)) {
+ mbs.unregisterMBean(objName);
+ }
+ } catch (Exception e) {
+ log.warn("Error un-registering a MBean with objectname ' " + objectName +
+ " ' for JMX management", e);
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportSender.java
new file mode 100644
index 0000000000..c2c84c3eb2
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportSender.java
@@ -0,0 +1,419 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.axiom.om.util.UUIDGenerator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.description.WSDL2Constants;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.OutTransportInfo;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public abstract class AbstractTransportSender extends AbstractHandler implements TransportSender {
+
+ /** the reference to the actual commons logger to be used for log messages */
+ protected Log log = null;
+
+ /** the axis2 configuration context */
+ protected ConfigurationContext cfgCtx = null;
+ /** transport in description */
+ private TransportInDescription transportIn = null;
+ /** transport out description */
+ private TransportOutDescription transportOut = null;
+ /** JMX support */
+ private TransportMBeanSupport mbeanSupport;
+ /** Metrics collector for the sender */
+ protected MetricsCollector metrics = new MetricsCollector();
+ /** state of the listener */
+ private int state = BaseConstants.STOPPED;
+
+ /**
+ * A constructor that makes subclasses pick up the correct logger
+ */
+ protected AbstractTransportSender() {
+ log = LogFactory.getLog(this.getClass());
+ }
+
+ /**
+ * Initialize the generic transport sender.
+ *
+ * @param cfgCtx the axis configuration context
+ * @param transportOut the transport-out description
+ * @throws AxisFault on error
+ */
+ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut)
+ throws AxisFault {
+ this.cfgCtx = cfgCtx;
+ this.transportOut = transportOut;
+ this.transportIn = cfgCtx.getAxisConfiguration().getTransportIn(getTransportName());
+ this.state = BaseConstants.STARTED;
+
+ // register with JMX
+ mbeanSupport = new TransportMBeanSupport(this, getTransportName());
+ mbeanSupport.register();
+ log.info(getTransportName().toUpperCase() + " Sender started");
+ }
+
+ public void stop() {
+ if (state != BaseConstants.STARTED) return;
+ state = BaseConstants.STOPPED;
+ mbeanSupport.unregister();
+ log.info(getTransportName().toUpperCase() + " Sender Shutdown");
+ }
+
+ public void cleanup(MessageContext msgContext) throws AxisFault {}
+
+ public abstract void sendMessage(MessageContext msgCtx, String targetEPR,
+ OutTransportInfo outTransportInfo) throws AxisFault;
+
+ public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
+
+ // is there a transport url which may be different to the WS-A To but has higher precedence
+ String targetAddress = (String) msgContext.getProperty(
+ Constants.Configuration.TRANSPORT_URL);
+
+ if (targetAddress != null) {
+ sendMessage(msgContext, targetAddress, null);
+ } else if (msgContext.getTo() != null && !msgContext.getTo().hasAnonymousAddress()) {
+ targetAddress = msgContext.getTo().getAddress();
+
+ if (!msgContext.getTo().hasNoneAddress()) {
+ sendMessage(msgContext, targetAddress, null);
+ } else {
+ //Don't send the message.
+ return InvocationResponse.CONTINUE;
+ }
+ } else if (msgContext.isServerSide()) {
+ // get the out transport info for server side when target EPR is unknown
+ sendMessage(msgContext, null,
+ (OutTransportInfo) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO));
+ }
+
+ return InvocationResponse.CONTINUE;
+ }
+
+ /**
+ * Process a new incoming message (Response) through the axis engine
+ * @param msgCtx the axis MessageContext
+ * @param trpHeaders the map containing transport level message headers
+ * @param soapAction the optional soap action or null
+ * @param contentType the optional content-type for the message
+ */
+ public void handleIncomingMessage(
+ MessageContext msgCtx, Map trpHeaders,
+ String soapAction, String contentType) {
+
+
+ // set the soapaction if one is available via a transport header
+ if (soapAction != null) {
+ msgCtx.setSoapAction(soapAction);
+ }
+
+ // set the transport headers to the message context
+ msgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, trpHeaders);
+
+ // send the message context through the axis engine
+ try {
+ try {
+ AxisEngine.receive(msgCtx);
+ } catch (AxisFault e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Error receiving message", e);
+ }
+ if (msgCtx.isServerSide()) {
+ AxisEngine.sendFault(MessageContextBuilder.createFaultMessageContext(msgCtx, e));
+ }
+ }
+ } catch (AxisFault axisFault) {
+ logException("Error processing response message", axisFault);
+ }
+ }
+
+ /**
+ * Create a new axis MessageContext for an incoming response message
+ * through this transport, for the given outgoing message
+ *
+ * @param outMsgCtx the outgoing message
+ * @return the newly created message context
+ */
+ public MessageContext createResponseMessageContext(MessageContext outMsgCtx) {
+
+ MessageContext responseMsgCtx = null;
+ try {
+ responseMsgCtx = outMsgCtx.getOperationContext().
+ getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN);
+ } catch (AxisFault af) {
+ log.error("Error getting IN message context from the operation context", af);
+ }
+
+ if (responseMsgCtx == null) {
+ responseMsgCtx = new MessageContext();
+ responseMsgCtx.setOperationContext(outMsgCtx.getOperationContext());
+ }
+
+ responseMsgCtx.setIncomingTransportName(getTransportName());
+ responseMsgCtx.setTransportOut(transportOut);
+ responseMsgCtx.setTransportIn(transportIn);
+
+ responseMsgCtx.setMessageID(UUIDGenerator.getUUID());
+
+ responseMsgCtx.setDoingREST(outMsgCtx.isDoingREST());
+ responseMsgCtx.setProperty(
+ MessageContext.TRANSPORT_IN, outMsgCtx.getProperty(MessageContext.TRANSPORT_IN));
+ responseMsgCtx.setAxisMessage(outMsgCtx.getOperationContext().getAxisOperation().
+ getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
+ responseMsgCtx.setTo(null);
+ //msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isNonBlocking);
+
+
+ // are these relevant?
+ //msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID());
+ // this is required to support Sandesha 2
+ //msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
+ // new HttpCoreRequestResponseTransport(msgContext));
+
+ return responseMsgCtx;
+ }
+
+ /**
+ * Should the transport sender wait for a synchronous response to be received?
+ * @param msgCtx the outgoing message context
+ * @return true if a sync response is expected
+ */
+ protected boolean waitForSynchronousResponse(MessageContext msgCtx) {
+ return
+ msgCtx.getOperationContext() != null &&
+ WSDL2Constants.MEP_URI_OUT_IN.equals(
+ msgCtx.getOperationContext().getAxisOperation().getMessageExchangePattern());
+ }
+
+ public String getTransportName() {
+ return transportOut.getName();
+ }
+
+ protected void handleException(String msg, Exception e) throws AxisFault {
+ log.error(msg, e);
+ throw new AxisFault(msg, e);
+ }
+
+ protected void handleException(String msg) throws AxisFault {
+ log.error(msg);
+ throw new AxisFault(msg);
+ }
+
+ protected void logException(String msg, Exception e) {
+ log.error(msg, e);
+ }
+
+ //--- jmx/management methods ---
+ public void pause() throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ state = BaseConstants.PAUSED;
+ log.info("Sender paused");
+ }
+
+ public void resume() throws AxisFault {
+ if (state != BaseConstants.PAUSED) return;
+ state = BaseConstants.STARTED;
+ log.info("Sender resumed");
+ }
+
+ public void maintenenceShutdown(long millis) throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ long start = System.currentTimeMillis();
+ stop();
+ state = BaseConstants.STOPPED;
+ log.info("Sender shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s");
+ }
+
+ /**
+ * Returns the number of active threads processing messages
+ * @return number of active threads processing messages
+ */
+ public int getActiveThreadCount() {
+ return 0;
+ }
+
+ /**
+ * Return the number of requests queued in the thread pool
+ * @return queue size
+ */
+ public int getQueueSize() {
+ return 0;
+ }
+
+ // -- jmx/management methods--
+ public long getMessagesReceived() {
+ if (metrics != null) {
+ return metrics.getMessagesReceived();
+ }
+ return -1;
+ }
+
+ public long getFaultsReceiving() {
+ if (metrics != null) {
+ return metrics.getFaultsReceiving();
+ }
+ return -1;
+ }
+
+ public long getBytesReceived() {
+ if (metrics != null) {
+ return metrics.getBytesReceived();
+ }
+ return -1;
+ }
+
+ public long getMessagesSent() {
+ if (metrics != null) {
+ return metrics.getMessagesSent();
+ }
+ return -1;
+ }
+
+ public long getFaultsSending() {
+ if (metrics != null) {
+ return metrics.getFaultsSending();
+ }
+ return -1;
+ }
+
+ public long getBytesSent() {
+ if (metrics != null) {
+ return metrics.getBytesSent();
+ }
+ return -1;
+ }
+
+ public long getTimeoutsReceiving() {
+ if (metrics != null) {
+ return metrics.getTimeoutsReceiving();
+ }
+ return -1;
+ }
+
+ public long getTimeoutsSending() {
+ if (metrics != null) {
+ return metrics.getTimeoutsSending();
+ }
+ return -1;
+ }
+
+ public long getMinSizeReceived() {
+ if (metrics != null) {
+ return metrics.getMinSizeReceived();
+ }
+ return -1;
+ }
+
+ public long getMaxSizeReceived() {
+ if (metrics != null) {
+ return metrics.getMaxSizeReceived();
+ }
+ return -1;
+ }
+
+ public double getAvgSizeReceived() {
+ if (metrics != null) {
+ return metrics.getAvgSizeReceived();
+ }
+ return -1;
+ }
+
+ public long getMinSizeSent() {
+ if (metrics != null) {
+ return metrics.getMinSizeSent();
+ }
+ return -1;
+ }
+
+ public long getMaxSizeSent() {
+ if (metrics != null) {
+ return metrics.getMaxSizeSent();
+ }
+ return -1;
+ }
+
+ public double getAvgSizeSent() {
+ if (metrics != null) {
+ return metrics.getAvgSizeSent();
+ }
+ return -1;
+ }
+
+ public Map getResponseCodeTable() {
+ if (metrics != null) {
+ return metrics.getResponseCodeTable();
+ }
+ return null;
+ }
+
+ public void resetStatistics() {
+ if (metrics != null) {
+ metrics.reset();
+ }
+ }
+
+ public long getLastResetTime() {
+ if (metrics != null) {
+ return metrics.getLastResetTime();
+ }
+ return -1;
+ }
+
+ public long getMetricsWindow() {
+ if (metrics != null) {
+ return System.currentTimeMillis() - metrics.getLastResetTime();
+ }
+ return -1;
+ }
+
+ private void registerMBean(MBeanServer mbs, Object mbeanInstance, String objectName) {
+ try {
+ ObjectName name = new ObjectName(objectName);
+ Set set = mbs.queryNames(name, null);
+ if (set != null && set.isEmpty()) {
+ mbs.registerMBean(mbeanInstance, name);
+ } else {
+ mbs.unregisterMBean(name);
+ mbs.registerMBean(mbeanInstance, name);
+ }
+ } catch (Exception e) {
+ log.warn("Error registering a MBean with objectname ' " + objectName +
+ " ' for JMX management", e);
+ }
+ }
+
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseConstants.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseConstants.java
new file mode 100644
index 0000000000..5edf76b898
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseConstants.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+
+import javax.xml.namespace.QName;
+
+public class BaseConstants {
+ // -- status of a transport --
+ public final static int STOPPED = 0;
+ public final static int STARTED = 1;
+ public final static int PAUSED = 2;
+
+ /**
+ * A message property specifying the SOAP Action
+ */
+ public static final String SOAPACTION = "SOAPAction";
+ /**
+ * A message property specifying the content type
+ */
+ public static final String CONTENT_TYPE = "Content-Type";
+ /**
+ * A message context property indicating "TRUE", if a transport or the message builder
+ * has information that the current message is a fault (e.g. SOAP faults, non-HTTP 2xx, etc)
+ */
+ public static final String FAULT_MESSAGE = "FAULT_MESSAGE";
+ /**
+ * content type identifier for multipart / MTOM messages
+ */
+ public static final String MULTIPART_RELATED = "multipart/related";
+ /**
+ * character set marker to identify charset from a Content-Type string
+ */
+ public static final String CHARSET_PARAM = "; charset=";
+ /**
+ * The property specifying an optional message level metrics collector
+ */
+ public static final String METRICS_COLLECTOR = "METRICS_COLLECTOR";
+
+ //------------------------------------ defaults ------------------------------------
+ /**
+ * The default operation name to be used for non SOAP/XML messages
+ * if the operation cannot be determined
+ */
+ public static final QName DEFAULT_OPERATION = new QName("urn:mediate");
+ /**
+ * The name of the element which wraps binary content into a SOAP envelope
+ */
+
+ // This has to match org.apache.synapse.util.PayloadHelper
+ // at some future point this can be merged into Axiom as a common base
+ public final static String AXIOMPAYLOADNS = "http://ws.apache.org/commons/ns/payload";
+
+
+ public static final QName DEFAULT_BINARY_WRAPPER =
+ new QName(AXIOMPAYLOADNS, "binary");
+ /**
+ * The name of the element which wraps plain text content into a SOAP envelope
+ */
+ public static final QName DEFAULT_TEXT_WRAPPER =
+ new QName(AXIOMPAYLOADNS, "text");
+
+ //-------------------------- services.xml parameters --------------------------------
+ /**
+ * The Parameter name indicating the operation to dispatch non SOAP/XML messages
+ */
+ public static final String OPERATION_PARAM = "Operation";
+ /**
+ * The Parameter name indicating the wrapper element for non SOAP/XML messages
+ */
+ public static final String WRAPPER_PARAM = "Wrapper";
+ /**
+ * the parameter in the services.xml that specifies the poll interval for a service
+ */
+ public static final String TRANSPORT_POLL_INTERVAL = "transport.PollInterval";
+ /**
+ * Could polling take place in parallel, i.e. starting at fixed intervals?
+ */
+ public static final String TRANSPORT_POLL_IN_PARALLEL = "transport.ConcurrentPollingAllowed";
+ /**
+ * The default poll interval in milliseconds.
+ */
+ public static final int DEFAULT_POLL_INTERVAL = 5 * 60 * 1000; // 5 mins by default
+
+ public static final String CALLBACK_TABLE = "callbackTable";
+ public static final String HEADER_IN_REPLY_TO = "In-Reply-To";
+
+ // this is an property required by axis2
+ // FIXME: where is this required in Axis2?
+ public final static String MAIL_CONTENT_TYPE = "mail.contenttype";
+
+ /** Service transaction level - non-transactional */
+ public static final int TRANSACTION_NONE = 0;
+ /** Service transaction level - use non-JTA (i.e. local) transactions */
+ public static final int TRANSACTION_LOCAL = 1;
+ /** Service transaction level - use JTA transactions */
+ public static final int TRANSACTION_JTA = 2;
+ /** Service transaction level - non-transactional */
+ public static final String STR_TRANSACTION_NONE = "none";
+ /** Service transaction level - use non-JTA (i.e. local) transactions */
+ public static final String STR_TRANSACTION_LOCAL = "local";
+ /** Service transaction level - use JTA transactions */
+ public static final String STR_TRANSACTION_JTA = "jta";
+
+ /** The Parameter name indicating the transactionality of a service */
+ public static final String PARAM_TRANSACTIONALITY = "transport.Transactionality";
+ /** Parameter name indicating the JNDI name to get a UserTransaction from JNDI */
+ public static final String PARAM_USER_TXN_JNDI_NAME = "transport.UserTxnJNDIName";
+ /** Parameter that indicates if a UserTransaction reference could be cached - default yes */
+ public static final String PARAM_CACHE_USER_TXN = "transport.CacheUserTxn";
+
+ /** The UserTransaction associated with this message */
+ public static final String USER_TRANSACTION = "UserTransaction";
+ /** A message level property indicating a request to rollback the transaction associated with the message */
+ public static final String SET_ROLLBACK_ONLY = "SET_ROLLBACK_ONLY";
+ /** A message level property indicating a commit is required after the next immidiate send over a transport */
+ public static final String JTA_COMMIT_AFTER_SEND = "JTA_COMMIT_AFTER_SEND";
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseTransportException.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseTransportException.java
new file mode 100644
index 0000000000..b4d4243ad2
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseTransportException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+public class BaseTransportException extends RuntimeException {
+
+ BaseTransportException() {
+ super();
+ }
+
+ public BaseTransportException(String msg) {
+ super(msg);
+ }
+
+ public BaseTransportException(String msg, Exception e) {
+ super(msg, e);
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseUtils.java
new file mode 100644
index 0000000000..008d1b5f28
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseUtils.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMOutputFormat;
+import org.apache.axiom.om.impl.builder.StAXBuilder;
+import org.apache.axiom.om.util.StAXUtils;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.impl.builder.StAXSOAPModelBuilder;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.transport.MessageFormatter;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.format.BinaryFormatter;
+import org.apache.tuscany.sca.binding.ws.axis2.format.PlainTextFormatter;
+
+public class BaseUtils {
+
+ private static final Log log = LogFactory.getLog(BaseUtils.class);
+
+ /**
+ * Return a QName from the String passed in of the form {ns}element
+ * @param obj a QName or a String containing a QName in {ns}element form
+ * @return a corresponding QName object
+ */
+ public static QName getQNameFromString(Object obj) {
+ String value;
+ if (obj instanceof QName) {
+ return (QName) obj;
+ } else {
+ value = obj.toString();
+ }
+ int open = value.indexOf('{');
+ int close = value.indexOf('}');
+ if (close > open && open > -1 && value.length() > close) {
+ return new QName(value.substring(open+1, close-open), value.substring(close+1));
+ } else {
+ return new QName(value);
+ }
+ }
+
+ /**
+ * Marks the given service as faulty with the given comment
+ *
+ * @param serviceName service name
+ * @param msg comment for being faulty
+ * @param axisCfg configuration context
+ */
+ public static void markServiceAsFaulty(String serviceName, String msg,
+ AxisConfiguration axisCfg) {
+ if (serviceName != null) {
+ try {
+ AxisService service = axisCfg.getService(serviceName);
+ axisCfg.getFaultyServices().put(service.getName(), msg);
+
+ } catch (AxisFault axisFault) {
+ log.warn("Error marking service : " + serviceName + " as faulty", axisFault);
+ }
+ }
+ }
+
+ /**
+ * Create a SOAP envelope using SOAP 1.1 or 1.2 depending on the namespace
+ * @param in InputStream for the payload
+ * @param namespace the SOAP namespace
+ * @return the SOAP envelope for the correct version
+ * @throws javax.xml.stream.XMLStreamException on error
+ */
+ public static SOAPEnvelope getEnvelope(InputStream in, String namespace) throws XMLStreamException {
+
+ try {
+ in.reset();
+ } catch (IOException ignore) {}
+ XMLStreamReader xmlreader =
+ StAXUtils.createXMLStreamReader(in, MessageContext.DEFAULT_CHAR_SET_ENCODING);
+ StAXBuilder builder = new StAXSOAPModelBuilder(xmlreader, namespace);
+ return (SOAPEnvelope) builder.getDocumentElement();
+ }
+
+ /**
+ * Get the OMOutput format for the given message
+ * @param msgContext the axis message context
+ * @return the OMOutput format to be used
+ */
+ public static OMOutputFormat getOMOutputFormat(MessageContext msgContext) {
+
+ OMOutputFormat format = new OMOutputFormat();
+ msgContext.setDoingMTOM(TransportUtils.doWriteMTOM(msgContext));
+ msgContext.setDoingSwA(TransportUtils.doWriteSwA(msgContext));
+ msgContext.setDoingREST(TransportUtils.isDoingREST(msgContext));
+ format.setSOAP11(msgContext.isSOAP11());
+ format.setDoOptimize(msgContext.isDoingMTOM());
+ format.setDoingSWA(msgContext.isDoingSwA());
+
+ format.setCharSetEncoding(TransportUtils.getCharSetEncoding(msgContext));
+ Object mimeBoundaryProperty = msgContext.getProperty(Constants.Configuration.MIME_BOUNDARY);
+ if (mimeBoundaryProperty != null) {
+ format.setMimeBoundary((String) mimeBoundaryProperty);
+ }
+ return format;
+ }
+
+ /**
+ * Get the MessageFormatter for the given message.
+ * @param msgContext the axis message context
+ * @return the MessageFormatter to be used
+ */
+ public static MessageFormatter getMessageFormatter(MessageContext msgContext) {
+ // check the first element of the SOAP body, do we have content wrapped using the
+ // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or
+ // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, select the appropriate
+ // message formatter directly ...
+ OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement();
+ if (firstChild != null) {
+ if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) {
+ return new BinaryFormatter();
+ } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) {
+ return new PlainTextFormatter();
+ }
+ }
+
+ // ... otherwise, let Axis choose the right message formatter:
+ try {
+ return TransportUtils.getMessageFormatter(msgContext);
+ } catch (AxisFault axisFault) {
+ throw new BaseTransportException("Unable to get the message formatter to use");
+ }
+ }
+
+ protected static void handleException(String s) {
+ log.error(s);
+ throw new BaseTransportException(s);
+ }
+
+ protected static void handleException(String s, Exception e) {
+ log.error(s, e);
+ throw new BaseTransportException(s, e);
+ }
+
+ /**
+ * Utility method to check if a string is null or empty
+ * @param str the string to check
+ * @return true if the string is null or empty
+ */
+ public static boolean isBlank(String str) {
+ if (str == null || str.length() == 0) {
+ return true;
+ }
+ for (int i = 0; i < str.length(); i++) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isUsingTransport(AxisService service, String transportName) {
+ boolean process = service.isEnableAllTransports();
+ if (process) {
+ return true;
+
+ } else {
+ List transports = service.getExposedTransports();
+ for (Object transport : transports) {
+ if (transportName.equals(transport)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Extract the properties from an endpoint reference.
+ *
+ * @param url an endpoint reference
+ * @return the extracted properties
+ */
+ public static Hashtable<String,String> getEPRProperties(String url) {
+ Hashtable<String,String> h = new Hashtable<String,String>();
+ int propPos = url.indexOf("?");
+ if (propPos != -1) {
+ StringTokenizer st = new StringTokenizer(url.substring(propPos + 1), "&");
+ while (st.hasMoreTokens()) {
+ String token = st.nextToken();
+ int sep = token.indexOf("=");
+ if (sep != -1) {
+ h.put(token.substring(0, sep), token.substring(sep + 1));
+ } else {
+ // ignore, what else can we do?
+ }
+ }
+ }
+ return h;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ManagementSupport.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ManagementSupport.java
new file mode 100644
index 0000000000..8b20b4a0b3
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ManagementSupport.java
@@ -0,0 +1,51 @@
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.util.Map;
+
+import org.apache.axis2.AxisFault;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+public interface ManagementSupport {
+ public void pause() throws AxisFault;
+ public void resume() throws AxisFault;
+ void maintenenceShutdown(long millis) throws AxisFault;
+ public int getActiveThreadCount();
+ public int getQueueSize();
+
+ public long getMessagesReceived();
+ public long getFaultsReceiving();
+ public long getTimeoutsReceiving();
+ public long getMessagesSent();
+ public long getFaultsSending();
+ public long getTimeoutsSending();
+ public long getBytesReceived();
+ public long getBytesSent();
+ public long getMinSizeReceived();
+ public long getMaxSizeReceived();
+ public double getAvgSizeReceived();
+ public long getMinSizeSent();
+ public long getMaxSizeSent();
+ public double getAvgSizeSent();
+ public Map getResponseCodeTable();
+
+ public void resetStatistics();
+ public long getLastResetTime();
+ public long getMetricsWindow();
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MessageLevelMetricsCollector.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MessageLevelMetricsCollector.java
new file mode 100644
index 0000000000..59dc9e9118
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MessageLevelMetricsCollector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+public interface MessageLevelMetricsCollector {
+
+ public void incrementMessagesReceived();
+
+ public void incrementFaultsReceiving(int errorCode);
+
+ public void incrementTimeoutsReceiving();
+
+ public void incrementBytesReceived(long size);
+
+ public void incrementMessagesSent();
+
+ public void incrementFaultsSending(int errorCode);
+
+ public void incrementTimeoutsSending();
+
+ public void incrementBytesSent(long size);
+
+ public void notifyReceivedMessageSize(long size);
+
+ public void notifySentMessageSize(long size);
+
+ public void reportReceivingFault(int errorCode);
+
+ public void reportSendingFault(int errorCode);
+
+ public void reportResponseCode(int respCode);
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MetricsCollector.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MetricsCollector.java
new file mode 100644
index 0000000000..45dcde944c
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MetricsCollector.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.axis2.context.MessageContext;
+
+/**
+ * Collects metrics related to a transport that has metrics support enabled
+ */
+public class MetricsCollector {
+
+ public static final int LEVEL_NONE = 0;
+ public static final int LEVEL_TRANSPORT = 1;
+ public static final int LEVEL_FULL = 2;
+ private static final Long ONE = (long) 1;
+
+ /** By default, full metrics collection is enabled */
+ private int level = LEVEL_FULL;
+
+ private long messagesReceived;
+ private long faultsReceiving;
+ private long timeoutsReceiving;
+ private long bytesReceived;
+ private long minSizeReceived;
+ private long maxSizeReceived;
+ private double avgSizeReceived;
+
+ private long messagesSent;
+ private long faultsSending;
+ private long timeoutsSending;
+ private long bytesSent;
+ private long minSizeSent;
+ private long maxSizeSent;
+ private double avgSizeSent;
+
+ private final Map<Integer, Long> responseCodeTable =
+ Collections.synchronizedMap(new HashMap<Integer, Long>());
+
+ private long lastResetTime = System.currentTimeMillis();
+
+ public void reset() {
+ messagesReceived = 0;
+ faultsReceiving = 0;
+ timeoutsReceiving = 0;
+ bytesReceived = 0;
+ minSizeReceived = 0;
+ maxSizeReceived = 0;
+ avgSizeReceived = 0;
+
+ messagesSent = 0;
+ faultsSending = 0;
+ timeoutsSending = 0;
+ bytesSent = 0;
+ minSizeSent = 0;
+ maxSizeSent = 0;
+ avgSizeSent = 0;
+
+ responseCodeTable.clear();
+ lastResetTime = System.currentTimeMillis();
+ }
+
+ public int getLevel() {
+ return level;
+ }
+
+ public void setLevel(int level) {
+ this.level = level;
+ }
+
+ public long getLastResetTime() {
+ return lastResetTime;
+ }
+
+ public long getMessagesReceived() {
+ return messagesReceived;
+ }
+
+ public long getFaultsReceiving() {
+ return faultsReceiving;
+ }
+
+ public long getTimeoutsReceiving() {
+ return timeoutsReceiving;
+ }
+
+ public long getBytesReceived() {
+ return bytesReceived;
+ }
+
+ public long getMessagesSent() {
+ return messagesSent;
+ }
+
+ public long getFaultsSending() {
+ return faultsSending;
+ }
+
+ public long getTimeoutsSending() {
+ return timeoutsSending;
+ }
+
+ public long getBytesSent() {
+ return bytesSent;
+ }
+
+ public long getMinSizeReceived() {
+ return minSizeReceived;
+ }
+
+ public long getMaxSizeReceived() {
+ return maxSizeReceived;
+ }
+
+ public long getMinSizeSent() {
+ return minSizeSent;
+ }
+
+ public long getMaxSizeSent() {
+ return maxSizeSent;
+ }
+
+ public double getAvgSizeReceived() {
+ return avgSizeReceived;
+ }
+
+ public double getAvgSizeSent() {
+ return avgSizeSent;
+ }
+
+ public Map<Integer, Long> getResponseCodeTable() {
+ return responseCodeTable;
+ }
+
+ public synchronized void incrementMessagesReceived() {
+ messagesReceived++;
+ }
+
+ public synchronized void incrementFaultsReceiving() {
+ faultsReceiving++;
+ }
+
+ public synchronized void incrementTimeoutsReceiving() {
+ timeoutsReceiving++;
+ }
+
+ public synchronized void incrementBytesReceived(long size) {
+ bytesReceived += size;
+ }
+
+ public synchronized void incrementMessagesSent() {
+ messagesSent++;
+ }
+
+ public synchronized void incrementFaultsSending() {
+ faultsSending++;
+ }
+
+ public synchronized void incrementTimeoutsSending() {
+ timeoutsSending++;
+ }
+
+ public synchronized void incrementBytesSent(long size) {
+ bytesSent += size;
+ }
+
+ public synchronized void notifyReceivedMessageSize(long size) {
+ if (minSizeReceived == 0 || size < minSizeReceived) {
+ minSizeReceived = size;
+ }
+ if (size > maxSizeReceived) {
+ maxSizeReceived = size;
+ }
+ avgSizeReceived = (avgSizeReceived == 0 ? size : (avgSizeReceived + size) / 2);
+ }
+
+ public synchronized void notifySentMessageSize(long size) {
+ if (minSizeSent == 0 || size < minSizeSent) {
+ minSizeSent = size;
+ }
+ if (size > maxSizeSent) {
+ maxSizeSent = size;
+ }
+ avgSizeSent = (avgSizeSent == 0 ? size : (avgSizeSent + size) / 2);
+ }
+
+ public void reportResponseCode(int respCode) {
+ synchronized(responseCodeTable) {
+ Object o = responseCodeTable.get(respCode);
+ if (o == null) {
+ responseCodeTable.put(respCode, ONE);
+ } else {
+ responseCodeTable.put(respCode, (Long) o + 1);
+ }
+ }
+ }
+
+ // --- enhanced methods ---
+ private MessageLevelMetricsCollector getMsgLevelMetrics(MessageContext mc) {
+ if (mc != null && level == LEVEL_FULL) {
+ return (MessageLevelMetricsCollector) mc.getProperty(BaseConstants.METRICS_COLLECTOR);
+ }
+ return null;
+ }
+
+ public void incrementMessagesReceived(MessageContext mc) {
+ incrementMessagesReceived();
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.incrementMessagesReceived();
+ }
+ }
+
+ public void incrementFaultsReceiving(int errorCode, MessageContext mc) {
+ incrementFaultsReceiving();
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.incrementFaultsReceiving(errorCode);
+ }
+ }
+
+ public void incrementTimeoutsReceiving(MessageContext mc) {
+ incrementTimeoutsReceiving();
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.incrementTimeoutsReceiving();
+ }
+ }
+
+ public void incrementBytesReceived(MessageContext mc, long size) {
+ incrementBytesReceived(size);
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.incrementBytesReceived(size);
+ }
+ }
+
+ public void incrementMessagesSent(MessageContext mc) {
+ incrementMessagesSent();
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.incrementMessagesSent();
+ }
+ }
+
+ public void incrementFaultsSending(int errorCode, MessageContext mc) {
+ incrementFaultsSending();
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.incrementFaultsSending(errorCode);
+ }
+ }
+
+ public void incrementTimeoutsSending(MessageContext mc) {
+ incrementTimeoutsSending();
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.incrementTimeoutsSending();
+ }
+ }
+
+ public void incrementBytesSent(MessageContext mc, long size) {
+ incrementBytesSent(size);
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.incrementBytesSent(size);
+ }
+ }
+
+ public void notifyReceivedMessageSize(MessageContext mc, long size) {
+ notifyReceivedMessageSize(size);
+
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.notifyReceivedMessageSize(size);
+ }
+ }
+
+ public void notifySentMessageSize(MessageContext mc, long size) {
+ notifySentMessageSize(size);
+
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.notifySentMessageSize(size);
+ }
+ }
+
+ public void reportResponseCode(MessageContext mc, int respCode) {
+ reportResponseCode(respCode);
+
+ MessageLevelMetricsCollector m = getMsgLevelMetrics(mc);
+ if (m != null) {
+ m.reportResponseCode(respCode);
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ParamUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ParamUtils.java
new file mode 100644
index 0000000000..7c77d553fc
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ParamUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterInclude;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.util.JavaUtils;
+
+/**
+ * Utility class with methods to manipulate service or transport parameters.
+ */
+public class ParamUtils {
+ private ParamUtils() {}
+
+ public static String getRequiredParam(ParameterInclude paramInclude, String paramName) throws AxisFault {
+ Parameter param = paramInclude.getParameter(paramName);
+ if (param != null && param.getValue() != null && param.getValue() instanceof String) {
+ return (String) param.getValue();
+ } else {
+ throw new AxisFault("Cannot find parameter '" + paramName + "' for "
+ + getDescriptionFor(paramInclude));
+ }
+ }
+
+ public static String getOptionalParam(ParameterInclude paramInclude, String paramName) throws AxisFault {
+ Parameter param = paramInclude.getParameter(paramName);
+ if (param != null && param.getValue() != null && param.getValue() instanceof String) {
+ return (String) param.getValue();
+ } else {
+ return null;
+ }
+ }
+
+ public static Integer getOptionalParamInt(ParameterInclude paramInclude, String paramName) throws AxisFault {
+ Parameter param = paramInclude.getParameter(paramName);
+ if (param == null || param.getValue() == null) {
+ return null;
+ } else {
+ Object paramValue = param.getValue();
+ if (paramValue instanceof Integer) {
+ return (Integer)paramValue;
+ } else if (paramValue instanceof String) {
+ try {
+ return Integer.valueOf((String)paramValue);
+ } catch (NumberFormatException ex) {
+ throw new AxisFault("Invalid value '" + paramValue + "' for parameter '" + paramName +
+ "' for " + getDescriptionFor(paramInclude));
+ }
+ } else {
+ throw new AxisFault("Invalid type for parameter '" + paramName + "' for " +
+ getDescriptionFor(paramInclude));
+ }
+ }
+ }
+
+ public static int getOptionalParamInt(ParameterInclude paramInclude, String paramName, int defaultValue) throws AxisFault {
+ Integer value = getOptionalParamInt(paramInclude, paramName);
+ return value == null ? defaultValue : value.intValue();
+ }
+
+ public static boolean getOptionalParamBoolean(ParameterInclude paramInclude, String paramName, boolean defaultValue) throws AxisFault {
+ Parameter param = paramInclude.getParameter(paramName);
+ return param == null ? defaultValue : JavaUtils.isTrueExplicitly(param.getValue(), defaultValue);
+ }
+
+ public static int getRequiredParamInt(ParameterInclude paramInclude, String paramName) throws AxisFault {
+ Integer value = getOptionalParamInt(paramInclude, paramName);
+ if (value == null) {
+ throw new AxisFault("Cannot find parameter '" + paramName +
+ "' for " + getDescriptionFor(paramInclude));
+ } else {
+ return value.intValue();
+ }
+ }
+
+ private static String getDescriptionFor(ParameterInclude paramInclude) {
+ if (paramInclude instanceof AxisService) {
+ return "service '" + ((AxisService)paramInclude).getName() + "'";
+ } else if (paramInclude instanceof TransportInDescription) {
+ return "transport receiver '" + ((TransportInDescription)paramInclude).getName() + "'";
+ } else if (paramInclude instanceof TransportOutDescription) {
+ return "transport sender '" + ((TransportOutDescription)paramInclude).getName() + "'";
+ } else {
+ return paramInclude.getClass().getName();
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/SynchronousCallback.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/SynchronousCallback.java
new file mode 100644
index 0000000000..1016e88a82
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/SynchronousCallback.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisMessage;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.wsdl.WSDLConstants;
+
+
+public class SynchronousCallback {
+
+ private MessageContext outMessageContext;
+ private MessageContext inMessageContext;
+
+ private boolean isComplete;
+
+ public SynchronousCallback(MessageContext outMessageContext) {
+ this.outMessageContext = outMessageContext;
+ this.isComplete = false;
+ }
+
+ public synchronized void setInMessageContext(MessageContext inMessageContext) throws AxisFault {
+
+ // if some other thread has access and complete then return without doing any thing.
+ // thread should have activate by the first message.
+ if (!isComplete) {
+ // this code is invoked only if the code use with axis2 at the client side
+ // when axis2 client receive messages it waits in the sending thread until the response comes.
+ // so this thread only notify the waiting thread and hence we need to build the message here.
+ inMessageContext.getEnvelope().build();
+ OperationContext operationContext = outMessageContext.getOperationContext();
+ MessageContext msgCtx =
+ operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+
+ if (msgCtx == null) {
+ // try to see whether there is a piggy back message context
+ if (outMessageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) != null) {
+
+ msgCtx = (MessageContext) outMessageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE);
+ msgCtx.setTransportIn(inMessageContext.getTransportIn());
+ msgCtx.setTransportOut(inMessageContext.getTransportOut());
+ msgCtx.setServerSide(false);
+ msgCtx.setProperty(BaseConstants.MAIL_CONTENT_TYPE,
+ inMessageContext.getProperty(BaseConstants.MAIL_CONTENT_TYPE));
+ // FIXME: this class must not be transport dependent since it is used by AbstractTransportListener
+ msgCtx.setIncomingTransportName(org.apache.axis2.Constants.TRANSPORT_MAIL);
+ msgCtx.setEnvelope(inMessageContext.getEnvelope());
+
+ } else {
+ inMessageContext.setOperationContext(operationContext);
+ inMessageContext.setServiceContext(outMessageContext.getServiceContext());
+ if (!operationContext.isComplete()) {
+ operationContext.addMessageContext(inMessageContext);
+ }
+ AxisOperation axisOp = operationContext.getAxisOperation();
+ AxisMessage inMessage = axisOp.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ inMessageContext.setAxisMessage(inMessage);
+ inMessageContext.setServerSide(false);
+ }
+
+ } else {
+ msgCtx.setOperationContext(operationContext);
+ msgCtx.setServiceContext(outMessageContext.getServiceContext());
+ AxisOperation axisOp = operationContext.getAxisOperation();
+ AxisMessage inMessage = axisOp.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ msgCtx.setAxisMessage(inMessage);
+ msgCtx.setTransportIn(inMessageContext.getTransportIn());
+ msgCtx.setTransportOut(inMessageContext.getTransportOut());
+ msgCtx.setServerSide(false);
+ msgCtx.setProperty(BaseConstants.MAIL_CONTENT_TYPE,
+ inMessageContext.getProperty(BaseConstants.MAIL_CONTENT_TYPE));
+ // FIXME: this class must not be transport dependent since it is used by AbstractTransportListener
+ msgCtx.setIncomingTransportName(org.apache.axis2.Constants.TRANSPORT_MAIL);
+ msgCtx.setEnvelope(inMessageContext.getEnvelope());
+
+ }
+ this.inMessageContext = inMessageContext;
+ isComplete = true;
+ this.notifyAll();
+ }
+
+ }
+
+
+ public boolean isComplete() {
+ return isComplete;
+ }
+
+ public void setComplete(boolean complete) {
+ isComplete = complete;
+ }
+
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointView.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointView.java
new file mode 100644
index 0000000000..8919aea5e7
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointView.java
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+
+public class TransportListenerEndpointView implements TransportListenerEndpointViewMBean {
+ private final AbstractTransportListener listener;
+ private final String serviceName;
+
+ public TransportListenerEndpointView(AbstractTransportListener listener, String serviceName) {
+ this.listener = listener;
+ this.serviceName = serviceName;
+ }
+
+ public String[] getAddresses() {
+ String hostname;
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ }
+ catch (UnknownHostException ex) {
+ hostname = "localhost";
+ }
+ EndpointReference[] epr;
+ try {
+ epr = listener.getEPRsForService(serviceName, hostname);
+ }
+ catch (AxisFault ex) {
+ return null;
+ }
+ if (epr == null) {
+ return null;
+ } else {
+ String[] result = new String[epr.length];
+ for (int i=0; i<epr.length; i++) {
+ result[i] = epr[i].getAddress();
+ }
+ return result;
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointViewMBean.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointViewMBean.java
new file mode 100644
index 0000000000..71b9618977
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointViewMBean.java
@@ -0,0 +1,23 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+public interface TransportListenerEndpointViewMBean {
+ String[] getAddresses();
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportMBeanSupport.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportMBeanSupport.java
new file mode 100644
index 0000000000..bb23c5f96d
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportMBeanSupport.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Support class to register MBeans for transport listeners and senders.
+ * This class can be used by {@link TransportListener} and {@link TransportSender} classes
+ * to register the {@link TransportView} management bean. It takes care of registering
+ * the bean under a consistent name and makes sure that a JMX related error doesn't stop
+ * the transport from working: a failure to register the MBean will cause JMX support
+ * to be disabled.
+ */
+public class TransportMBeanSupport {
+ private static final Log log = LogFactory.getLog(TransportMBeanSupport.class);
+
+ private boolean enabled = true;
+ private boolean registered;
+ private MBeanServer mbs;
+ private ObjectName mbeanName;
+ private TransportView mbeanInstance;
+
+ private TransportMBeanSupport(String connectorName, TransportView mbeanInstance) {
+ try {
+ mbs = ManagementFactory.getPlatformMBeanServer();
+ } catch (SecurityException ex) {
+ log.warn("Unable to get the platform MBean server; JMX support disabled", ex);
+ enabled = false;
+ return;
+ }
+ String jmxAgentName = System.getProperty("jmx.agent.name");
+ if (jmxAgentName == null || "".equals(jmxAgentName)) {
+ jmxAgentName = "org.apache.axis2";
+ }
+ String mbeanNameString = jmxAgentName + ":Type=Transport,ConnectorName=" + connectorName;
+ try {
+ mbeanName = ObjectName.getInstance(mbeanNameString);
+ } catch (MalformedObjectNameException ex) {
+ log.warn("Unable to create object name '" + mbeanNameString
+ + "'; JMX support disabled", ex);
+ enabled = false;
+ }
+ this.mbeanInstance = mbeanInstance;
+ }
+
+ public TransportMBeanSupport(TransportListener listener, String name) {
+ this(name + "-listener", new TransportView(listener, null));
+ }
+
+ public TransportMBeanSupport(TransportSender sender, String name) {
+ this(name + "-sender", new TransportView(null, sender));
+ }
+
+ public ObjectName getMBeanName() {
+ return mbeanName;
+ }
+
+ /**
+ * Register the {@link TransportView} MBean.
+ */
+ public void register() {
+ if (enabled && !registered) {
+ try {
+ mbs.registerMBean(mbeanInstance, mbeanName);
+ registered = true;
+ } catch (Exception e) {
+ log.warn("Error registering a MBean with objectname ' " + mbeanName +
+ " ' for JMX management", e);
+ enabled = false;
+ }
+ }
+ }
+
+ /**
+ * Unregister the {@link TransportView} MBean.
+ */
+ public void unregister() {
+ if (enabled && registered) {
+ try {
+ mbs.unregisterMBean(mbeanName);
+ registered = false;
+ } catch (Exception e) {
+ log.warn("Error un-registering a MBean with objectname ' " + mbeanName +
+ " ' for JMX management", e);
+ }
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportView.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportView.java
new file mode 100644
index 0000000000..0724110ed3
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportView.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.util.Map;
+
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TransportView implements TransportViewMBean {
+
+ private static final Log log = LogFactory.getLog(TransportView.class);
+
+ public static final int STOPPED = 0;
+ public static final int RUNNING = 1;
+ public static final int PAUSED = 2;
+ public static final int SHUTTING_DOWN = 3;
+
+ private TransportListener listener = null;
+ private TransportSender sender = null;
+
+ public TransportView(TransportListener listener, TransportSender sender) {
+ this.listener = listener;
+ this.sender = sender;
+ }
+
+ // JMX Attributes
+ public long getMessagesReceived() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getMessagesReceived();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getMessagesReceived();
+ }
+ return -1;
+ }
+
+ public long getFaultsReceiving() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getFaultsReceiving();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getFaultsReceiving();
+ }
+ return -1;
+ }
+
+ public long getTimeoutsReceiving() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getTimeoutsReceiving();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getTimeoutsReceiving();
+ }
+ return -1;
+ }
+
+ public long getTimeoutsSending() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getTimeoutsSending();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getTimeoutsSending();
+ }
+ return -1;
+ }
+
+ public long getBytesReceived() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getBytesReceived();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getBytesReceived();
+ }
+ return -1;
+ }
+
+ public long getMessagesSent() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getMessagesSent();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getMessagesSent();
+ }
+ return -1;
+ }
+
+ public long getFaultsSending() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getFaultsSending();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getFaultsSending();
+ }
+ return -1;
+ }
+
+ public long getBytesSent() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getBytesSent();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getBytesSent();
+ }
+ return -1;
+ }
+
+ public long getMinSizeReceived() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getMinSizeReceived();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getMinSizeReceived();
+ }
+ return -1;
+ }
+
+ public long getMaxSizeReceived() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getMaxSizeReceived();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getMaxSizeReceived();
+ }
+ return -1;
+ }
+
+ public double getAvgSizeReceived() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getAvgSizeReceived();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getAvgSizeReceived();
+ }
+ return -1;
+ }
+
+ public long getMinSizeSent() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getMinSizeSent();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getMinSizeSent();
+ }
+ return -1;
+ }
+
+ public long getMaxSizeSent() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getMaxSizeSent();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getMaxSizeSent();
+ }
+ return -1;
+ }
+
+ public double getAvgSizeSent() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getAvgSizeSent();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getAvgSizeSent();
+ }
+ return -1;
+ }
+
+ public Map getResponseCodeTable() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getResponseCodeTable();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getResponseCodeTable();
+ }
+ return null;
+ }
+
+ public int getActiveThreadCount() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getActiveThreadCount();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getActiveThreadCount();
+ }
+ return -1;
+ }
+
+ public int getQueueSize() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getQueueSize();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getQueueSize();
+ }
+ return -1;
+ }
+
+ // JMX Operations
+ public void start() throws Exception{
+ if (listener != null) {
+ listener.start();
+ }
+ }
+
+ public void stop() throws Exception {
+ if (listener != null) {
+ listener.stop();
+ } else if (sender != null) {
+ sender.stop();
+ }
+ }
+
+ public void pause() throws Exception {
+ if (listener instanceof ManagementSupport) {
+ ((ManagementSupport) listener).pause();
+ } else if (sender instanceof ManagementSupport) {
+ ((ManagementSupport) sender).pause();
+ }
+ }
+
+ public void resume() throws Exception {
+ if (listener instanceof ManagementSupport) {
+ ((ManagementSupport) listener).resume();
+ } else if (sender instanceof ManagementSupport) {
+ ((ManagementSupport) sender).resume();
+ }
+ }
+
+ public void maintenenceShutdown(long seconds) throws Exception {
+ if (listener instanceof ManagementSupport) {
+ ((ManagementSupport) listener).maintenenceShutdown(seconds * 1000);
+ } else if (sender instanceof ManagementSupport) {
+ ((ManagementSupport) sender).maintenenceShutdown(seconds * 1000);
+ }
+ }
+
+ public void resetStatistics() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ ((ManagementSupport) listener).resetStatistics();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ ((ManagementSupport) sender).resetStatistics();
+ }
+ }
+
+ public long getLastResetTime() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getLastResetTime();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getLastResetTime();
+ }
+ return -1;
+ }
+
+ public long getMetricsWindow() {
+ if (listener != null && listener instanceof ManagementSupport) {
+ return System.currentTimeMillis() - ((ManagementSupport) listener).getLastResetTime();
+ } else if (sender != null && sender instanceof ManagementSupport) {
+ return System.currentTimeMillis() - ((ManagementSupport) sender).getLastResetTime();
+ }
+ return -1;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportViewMBean.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportViewMBean.java
new file mode 100644
index 0000000000..2aa86c1e8e
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportViewMBean.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+import java.util.Map;
+
+public interface TransportViewMBean {
+
+ // JMX Attributes
+ public long getMessagesReceived();
+ public long getFaultsReceiving();
+ public long getTimeoutsReceiving();
+ public long getMessagesSent();
+ public long getFaultsSending();
+ public long getTimeoutsSending();
+ public long getBytesReceived();
+ public long getBytesSent();
+ public long getMinSizeReceived();
+ public long getMaxSizeReceived();
+ public double getAvgSizeReceived();
+ public long getMinSizeSent();
+ public long getMaxSizeSent();
+ public double getAvgSizeSent();
+ public int getActiveThreadCount();
+ public int getQueueSize();
+ public Map getResponseCodeTable();
+
+ // JMX Operations
+ public void start() throws Exception;
+ public void stop() throws Exception;
+ public void pause() throws Exception;
+ public void resume() throws Exception;
+ public void maintenenceShutdown(long seconds) throws Exception;
+
+ public void resetStatistics();
+ public long getLastResetTime();
+ public long getMetricsWindow();
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/AbstractDatagramTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
new file mode 100644
index 0000000000..92cca71a79
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/AbstractDatagramTransportListener.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportListener;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ParamUtils;
+
+public abstract class AbstractDatagramTransportListener<E extends DatagramEndpoint>
+ extends AbstractTransportListener {
+
+ private final Map<String,E> endpoints = new HashMap<String,E>();
+ private DatagramDispatcher<E> dispatcher;
+ private String defaultIp;
+
+ @Override
+ public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn)
+ throws AxisFault {
+
+ super.init(cfgCtx, transportIn);
+ DatagramDispatcherCallback callback = new DatagramDispatcherCallback() {
+ public void receive(DatagramEndpoint endpoint, byte[] data, int length) {
+ workerPool.execute(new ProcessPacketTask(endpoint, data, length));
+ }
+ };
+ try {
+ dispatcher = createDispatcher(callback);
+ } catch (IOException ex) {
+ throw new AxisFault("Unable to create selector", ex);
+ }
+ try {
+ defaultIp = org.apache.axis2.util.Utils.getIpAddress(cfgCtx.getAxisConfiguration());
+ } catch (SocketException ex) {
+ throw new AxisFault("Unable to determine the host's IP address", ex);
+ }
+ }
+
+ @Override
+ protected void startListeningForService(AxisService service) throws AxisFault {
+ E endpoint = createEndpoint(service);
+ endpoint.setListener(this);
+ endpoint.setService(service);
+ endpoint.setContentType(ParamUtils.getRequiredParam(
+ service, "transport." + getTransportName() + ".contentType"));
+ endpoint.setMetrics(metrics);
+
+ try {
+ dispatcher.addEndpoint(endpoint);
+ } catch (IOException ex) {
+ throw new AxisFault("Unable to listen on endpoint "
+ + endpoint.getEndpointReference(defaultIp), ex);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Started listening on endpoint " + endpoint.getEndpointReference(defaultIp)
+ + " [contentType=" + endpoint.getContentType()
+ + "; service=" + service.getName() + "]");
+ }
+ endpoints.put(service.getName(), endpoint);
+ }
+
+ @Override
+ protected void stopListeningForService(AxisService service) {
+ try {
+ dispatcher.removeEndpoint(endpoints.get(service.getName()));
+ } catch (IOException ex) {
+ log.error("I/O exception while stopping listener for service " + service.getName(), ex);
+ }
+ endpoints.remove(service.getName());
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ try {
+ dispatcher.stop();
+ } catch (IOException ex) {
+ log.error("Failed to stop dispatcher", ex);
+ }
+ }
+
+ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+
+ // strip out the endpoint name if present
+ if (serviceName.indexOf('.') != -1) {
+ serviceName = serviceName.substring(0, serviceName.indexOf('.'));
+ }
+
+ E endpoint = endpoints.get(serviceName);
+ if (endpoint == null) {
+ return null;
+ } else {
+ return new EndpointReference[] {
+ endpoint.getEndpointReference(ip == null ? defaultIp : ip) };
+ }
+ }
+
+ protected abstract DatagramDispatcher<E> createDispatcher(DatagramDispatcherCallback callback)
+ throws IOException;
+
+ protected abstract E createEndpoint(AxisService service) throws AxisFault;
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcher.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcher.java
new file mode 100644
index 0000000000..e19aec4483
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcher.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram;
+
+import java.io.IOException;
+
+public interface DatagramDispatcher<E> {
+ void addEndpoint(E endpoint) throws IOException;
+ void removeEndpoint(E endpoint) throws IOException;
+ void stop() throws IOException;
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcherCallback.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcherCallback.java
new file mode 100644
index 0000000000..691fe2fef9
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcherCallback.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram;
+
+public interface DatagramDispatcherCallback {
+ void receive(DatagramEndpoint endpoint, byte[] data, int length);
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramEndpoint.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramEndpoint.java
new file mode 100644
index 0000000000..95eebbca06
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramEndpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.description.AxisService;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector;
+
+/**
+ * Endpoint description.
+ * This class is used by the transport to store information
+ * about an endpoint, e.g. the Axis service it is bound to.
+ * Transports extend this abstract class to store additional
+ * transport specific information, such as the port number
+ * the transport listens on.
+ */
+public abstract class DatagramEndpoint {
+ private AbstractDatagramTransportListener listener;
+ private String contentType;
+ private AxisService service;
+ private MetricsCollector metrics;
+
+ public AbstractDatagramTransportListener getListener() {
+ return listener;
+ }
+
+ public void setListener(AbstractDatagramTransportListener listener) {
+ this.listener = listener;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public AxisService getService() {
+ return service;
+ }
+
+ public void setService(AxisService service) {
+ this.service = service;
+ }
+
+ public MetricsCollector getMetrics() {
+ return metrics;
+ }
+
+ public void setMetrics(MetricsCollector metrics) {
+ this.metrics = metrics;
+ }
+
+ public abstract EndpointReference getEndpointReference(String ip);
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/ProcessPacketTask.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/ProcessPacketTask.java
new file mode 100644
index 0000000000..bb102317b8
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/ProcessPacketTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector;
+
+/**
+ * Task encapsulating the processing of a datagram.
+ * Instances of this class will be dispatched to worker threads for
+ * execution.
+ */
+public class ProcessPacketTask implements Runnable {
+ private static final Log log = LogFactory.getLog(ProcessPacketTask.class);
+
+ private final DatagramEndpoint endpoint;
+ private final byte[] data;
+ private final int length;
+
+ public ProcessPacketTask(DatagramEndpoint endpoint, byte[] data, int length) {
+ this.endpoint = endpoint;
+ this.data = data;
+ this.length = length;
+ }
+
+ public void run() {
+ MetricsCollector metrics = endpoint.getMetrics();
+ try {
+ InputStream inputStream = new ByteArrayInputStream(data, 0, length);
+ MessageContext msgContext = endpoint.getListener().createMessageContext();
+ msgContext.setAxisService(endpoint.getService());
+ SOAPEnvelope envelope = TransportUtils.createSOAPMessage(msgContext, inputStream, endpoint.getContentType());
+ msgContext.setEnvelope(envelope);
+ AxisEngine.receive(msgContext);
+ metrics.incrementMessagesReceived();
+ metrics.incrementBytesReceived(length);
+ } catch (Exception ex) {
+ metrics.incrementFaultsReceiving();
+ StringBuilder buffer = new StringBuilder("Error during processing of datagram:\n");
+ Utils.hexDump(buffer, data, length);
+ log.error(buffer.toString(), ex);
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/Utils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/Utils.java
new file mode 100644
index 0000000000..41230bbcdc
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/Utils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram;
+
+/**
+ * Utility class with methods used by datagram transports.
+ */
+public class Utils {
+ private Utils() {}
+
+ public static void hexDump(StringBuilder buffer, byte[] data, int length) {
+ for (int start = 0; start < length; start += 16) {
+ for (int i=0; i<16; i++) {
+ int index = start+i;
+ if (index < length) {
+ String hex = Integer.toHexString(data[start+i] & 0xFF);
+ if (hex.length() < 2) {
+ buffer.append('0');
+ }
+ buffer.append(hex);
+ } else {
+ buffer.append(" ");
+ }
+ buffer.append(' ');
+ if (i == 7) {
+ buffer.append(' ');
+ }
+ }
+ buffer.append(" |");
+ for (int i=0; i<16; i++) {
+ int index = start+i;
+ if (index < length) {
+ int b = data[index] & 0xFF;
+ if (32 <= b && b < 128) {
+ buffer.append((char)b);
+ } else {
+ buffer.append('.');
+ }
+ } else {
+ buffer.append(' ');
+ }
+ }
+ buffer.append('|');
+ buffer.append('\n');
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/package-info.java
new file mode 100644
index 0000000000..a5765e7e55
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Base classes for datagram transports.
+ * <p>
+ * A datagram type transport is a transport that entirely reads a message
+ * into memory before starting to process it: in contrast to transports like HTTP,
+ * it doesn't support streaming. This approach can be chosen either because
+ * of the characteristics of the underlying protocol (such as in the case of UDP)
+ * or because streaming a message would unnecessarily delay the processing of the
+ * next available message (as in the case of a UNIX pipe).
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram;
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportError.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportError.java
new file mode 100644
index 0000000000..5de216163f
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportError.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.event;
+
+import org.apache.axis2.description.AxisService;
+
+public class TransportError {
+ private final Object source;
+ private final AxisService service;
+ private final Throwable exception;
+
+ public TransportError(Object source, AxisService service, Throwable exception) {
+ this.source = source;
+ this.service = service;
+ this.exception = exception;
+ }
+
+ public Object getSource() {
+ return source;
+ }
+
+ public AxisService getService() {
+ return service;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorListener.java
new file mode 100644
index 0000000000..40d46dba56
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.event;
+
+public interface TransportErrorListener {
+ void error(TransportError error);
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSource.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSource.java
new file mode 100644
index 0000000000..141ca20099
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.event;
+
+public interface TransportErrorSource {
+ void addErrorListener(TransportErrorListener listener);
+ void removeErrorListener(TransportErrorListener listener);
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSourceSupport.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSourceSupport.java
new file mode 100644
index 0000000000..274b7b7b90
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSourceSupport.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.event;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.axis2.description.AxisService;
+
+public class TransportErrorSourceSupport implements TransportErrorSource {
+ private final Object source;
+ private final List<TransportErrorListener> listeners = new LinkedList<TransportErrorListener>();
+
+ public TransportErrorSourceSupport(Object source) {
+ this.source = source;
+ }
+
+ public synchronized void addErrorListener(TransportErrorListener listener) {
+ listeners.add(listener);
+ }
+
+ public synchronized void removeErrorListener(TransportErrorListener listener) {
+ listeners.remove(listener);
+ }
+
+ public synchronized void error(AxisService service, Throwable ex) {
+ if (!listeners.isEmpty()) {
+ TransportError error = new TransportError(source, service, ex);
+ for (TransportErrorListener listener : listeners) {
+ listener.error(error);
+ }
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/ReaderInputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/ReaderInputStream.java
new file mode 100644
index 0000000000..5047e1a499
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/ReaderInputStream.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+
+/**
+ * {@link InputStream} implementation that reads a character stream from a {@link Reader}
+ * and transforms it to a byte stream using a specified charset encoding. The stream
+ * is transformed using a {@link CharsetEncoder} object, guaranteeing that all charset
+ * encodings supported by the JRE are handled correctly. In particular for charsets such as
+ * UTF-16, the implementation ensures that one and only one byte order marker
+ * is produced.
+ * <p>
+ * Since in general it is not possible to predict the number of characters to be read from the
+ * {@link Reader} to satisfy a read request on the {@link ReaderInputStream}, all reads from
+ * the {@link Reader} are buffered. There is therefore no well defined correlation
+ * between the current position of the {@link Reader} and that of the {@link ReaderInputStream}.
+ * This also implies that in general there is no need to wrap the underlying {@link Reader}
+ * in a {@link java.io.BufferedReader}.
+ * <p>
+ * {@link ReaderInputStream} implements the inverse transformation of {@link java.io.InputStreamReader};
+ * in the following example, reading from <tt>in2</tt> would return the same byte
+ * sequence as reading from <tt>in</tt> (provided that the initial byte sequence is legal
+ * with respect to the charset encoding):
+ * <pre>
+ * InputStream in = ...
+ * Charset cs = ...
+ * InputStreamReader reader = new InputStreamReader(in, cs);
+ * ReaderInputStream in2 = new ReaderInputStream(reader, cs);</pre>
+ * {@link ReaderInputStream} implements the same transformation as {@link java.io.OutputStreamWriter},
+ * except that the control flow is reversed: both classes transform a character stream
+ * into a byte stream, but {@link java.io.OutputStreamWriter} pushes data to the underlying stream,
+ * while {@link ReaderInputStream} pulls it from the underlying stream.
+ * <p>
+ * Note that while there are use cases where there is no alternative to using
+ * this class, very often the need to use this class is an indication of a flaw
+ * in the design of the code. This class is typically used in situations where an existing
+ * API only accepts an {@link InputStream}, but where the most natural way to produce the data
+ * is as a character stream, i.e. by providing a {@link Reader} instance. An example of a situation
+ * where this problem may appear is when implementing the {@link javax.activation.DataSource}
+ * interface from the Java Activation Framework.
+ * <p>
+ * Given the fact that the {@link Reader} class doesn't provide any way to predict whether the next
+ * read operation will block or not, it is not possible to provide a meaningful
+ * implementation of the {@link InputStream#available()} method. A call to this method
+ * will always return 0. Also, this class doesn't support {@link InputStream#mark(int)}.
+ * <p>
+ * Instances of {@link ReaderInputStream} are not thread safe.
+ */
+// NOTE: Remove this class once Commons IO 2.0 is available (see IO-158)
+public class ReaderInputStream extends InputStream {
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+
+ private final Reader reader;
+ private final CharsetEncoder encoder;
+
+ /**
+ * CharBuffer used as input for the decoder. It should be reasonably
+ * large as we read data from the underlying Reader into this buffer.
+ */
+ private final CharBuffer encoderIn;
+
+ /**
+ * ByteBuffer used as output for the decoder. This buffer can be small
+ * as it is only used to transfer data from the decoder to the
+ * buffer provided by the caller.
+ */
+ private final ByteBuffer encoderOut = ByteBuffer.allocate(128);
+
+ private CoderResult lastCoderResult;
+ private boolean endOfInput;
+
+ /**
+ * Construct a new {@link ReaderInputStream}.
+ *
+ * @param reader the target {@link Reader}
+ * @param charset the charset encoding
+ * @param bufferSize the size of the input buffer in number of characters
+ */
+ public ReaderInputStream(Reader reader, Charset charset, int bufferSize) {
+ this.reader = reader;
+ encoder = charset.newEncoder();
+ encoderIn = CharBuffer.allocate(bufferSize);
+ encoderIn.flip();
+ }
+
+ /**
+ * Construct a new {@link ReaderInputStream} with a default input buffer size of
+ * 1024 characters.
+ *
+ * @param reader the target {@link Reader}
+ * @param charset the charset encoding
+ */
+ public ReaderInputStream(Reader reader, Charset charset) {
+ this(reader, charset, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Construct a new {@link ReaderInputStream}.
+ *
+ * @param reader the target {@link Reader}
+ * @param charsetName the name of the charset encoding
+ * @param bufferSize the size of the input buffer in number of characters
+ */
+ public ReaderInputStream(Reader reader, String charsetName, int bufferSize) {
+ this(reader, Charset.forName(charsetName), bufferSize);
+ }
+
+ /**
+ * Construct a new {@link ReaderInputStream} with a default input buffer size of
+ * 1024 characters.
+ *
+ * @param reader the target {@link Reader}
+ * @param charsetName the name of the charset encoding
+ */
+ public ReaderInputStream(Reader reader, String charsetName) {
+ this(reader, charsetName, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Construct a new {@link ReaderInputStream} that uses the default character encoding
+ * with a default input buffer size of 1024 characters.
+ *
+ * @param reader the target {@link Reader}
+ */
+ public ReaderInputStream(Reader reader) {
+ this(reader, Charset.defaultCharset());
+ }
+
+ /**
+ * Read the specified number of bytes into an array.
+ *
+ * @param b the byte array to read into
+ * @param off the offset to start reading bytes into
+ * @param len the number of bytes to read
+ * @return the number of bytes read or <code>-1</code>
+ * if the end of the stream has been reached
+ */
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int read = 0;
+ while (len > 0) {
+ if (encoderOut.position() > 0) {
+ encoderOut.flip();
+ int c = Math.min(encoderOut.remaining(), len);
+ encoderOut.get(b, off, c);
+ off += c;
+ len -= c;
+ read += c;
+ encoderOut.compact();
+ } else {
+ if (!endOfInput && (lastCoderResult == null || lastCoderResult.isUnderflow())) {
+ encoderIn.compact();
+ int position = encoderIn.position();
+ // We don't use Reader#read(CharBuffer) here because it is more efficient
+ // to write directly to the underlying char array (the default implementation
+ // copies data to a temporary char array).
+ int c = reader.read(encoderIn.array(), position, encoderIn.remaining());
+ if (c == -1) {
+ endOfInput = true;
+ } else {
+ encoderIn.position(position+c);
+ }
+ encoderIn.flip();
+ }
+ lastCoderResult = encoder.encode(encoderIn, encoderOut, endOfInput);
+ if (endOfInput && encoderOut.position() == 0) {
+ break;
+ }
+ }
+ }
+ return read == 0 && endOfInput ? -1 : read;
+ }
+
+ /**
+ * Read the specified number of bytes into an array.
+ *
+ * @param b the byte array to read into
+ * @return the number of bytes read or <code>-1</code>
+ * if the end of the stream has been reached
+ */
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ /**
+ * Read a single byte.
+ *
+ * @return either the byte read or <code>-1</code> if the end of the stream
+ * has been reached
+ */
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ return read(b) == -1 ? -1 : b[0] & 0xFF;
+ }
+
+ /**
+ * Close the stream. This method will cause the underlying {@link Reader}
+ * to be closed.
+ */
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/WriterOutputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/WriterOutputStream.java
new file mode 100644
index 0000000000..3b6370da63
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/WriterOutputStream.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+
+/**
+ * {@link OutputStream} implementation that transforms a byte stream to a
+ * character stream using a specified charset encoding and writes the resulting
+ * stream to a {@link Writer}. The stream is transformed using a
+ * {@link CharsetDecoder} object, guaranteeing that all charset
+ * encodings supported by the JRE are handled correctly.
+ * <p>
+ * The output of the {@link CharsetDecoder} is buffered using a fixed size buffer.
+ * This implies that the data is written to the underlying {@link Writer} in chunks
+ * that are no larger than the size of this buffer. By default, the buffer is
+ * flushed only when it overflows or when {@link #flush()} or {@link #close()}
+ * is called. In general there is therefore no need to wrap the underlying {@link Writer}
+ * in a {@link java.io.BufferedWriter}. {@link WriterOutputStream} can also
+ * be instructed to flush the buffer after each write operation. In this case, all
+ * available data is written immediately to the underlying {@link Writer}, implying that
+ * the current position of the {@link Writer} is correlated to the current position
+ * of the {@link WriterOutputStream}.
+ * <p>
+ * {@link WriterOutputStream} implements the inverse transformation of {@link java.io.OutputStreamWriter};
+ * in the following example, writing to <tt>out2</tt> would have the same result as writing to
+ * <tt>out</tt> directly (provided that the byte sequence is legal with respect to the
+ * charset encoding):
+ * <pre>
+ * OutputStream out = ...
+ * Charset cs = ...
+ * OutputStreamWriter writer = new OutputStreamWriter(out, cs);
+ * WriterOutputStream out2 = new WriterOutputStream(writer, cs);</pre>
+ * {@link WriterOutputStream} implements the same transformation as {@link java.io.InputStreamReader},
+ * except that the control flow is reversed: both classes transform a byte stream
+ * into a character stream, but {@link java.io.InputStreamReader} pulls data from the underlying stream,
+ * while {@link WriterOutputStream} pushes it to the underlying stream.
+ * <p>
+ * Note that while there are use cases where there is no alternative to using
+ * this class, very often the need to use this class is an indication of a flaw
+ * in the design of the code. This class is typically used in situations where an existing
+ * API only accepts an {@link OutputStream} object, but where the stream is known to represent
+ * character data that must be decoded for further use.
+ * <p>
+ * Instances of {@link WriterOutputStream} are not thread safe.
+ */
+//NOTE: Remove this class once Commons IO 2.0 is available (see IO-158)
+public class WriterOutputStream extends OutputStream {
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+
+ private final Writer writer;
+ private final CharsetDecoder decoder;
+ private final boolean writeImmediately;
+
+ /**
+ * ByteBuffer used as input for the decoder. This buffer can be small
+ * as it is used only to transfer the received data to the
+ * decoder.
+ */
+ private final ByteBuffer decoderIn = ByteBuffer.allocate(128);
+
+ /**
+ * CharBuffer used as output for the decoder. It should be
+ * somewhat larger as we write from this buffer to the
+ * underlying Writer.
+ */
+ private final CharBuffer decoderOut;
+
+ /**
+ * Constructs a new {@link WriterOutputStream}.
+ *
+ * @param writer the target {@link Writer}
+ * @param charset the charset encoding
+ * @param bufferSize the size of the output buffer in number of characters
+ * @param writeImmediately If <tt>true</tt> the output buffer will be flushed after each
+ * write operation, i.e. all available data will be written to the
+ * underlying {@link Writer} immediately. If <tt>false</tt>, the
+ * output buffer will only be flushed when it overflows or when
+ * {@link #flush()} or {@link #close()} is called.
+ */
+ public WriterOutputStream(Writer writer, Charset charset, int bufferSize, boolean writeImmediately) {
+ this.writer = writer;
+ decoder = charset.newDecoder();
+ decoder.onMalformedInput(CodingErrorAction.REPLACE);
+ decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+ decoder.replaceWith("?");
+ this.writeImmediately = writeImmediately;
+ decoderOut = CharBuffer.allocate(bufferSize);
+ }
+
+ /**
+ * Constructs a new {@link WriterOutputStream} with a default output buffer size of
+ * 1024 characters. The output buffer will only be flushed when it overflows or when
+ * {@link #flush()} or {@link #close()} is called.
+ *
+ * @param writer the target {@link Writer}
+ * @param charset the charset encoding
+ */
+ public WriterOutputStream(Writer writer, Charset charset) {
+ this(writer, charset, DEFAULT_BUFFER_SIZE, false);
+ }
+
+ /**
+ * Constructs a new {@link WriterOutputStream}.
+ *
+ * @param writer the target {@link Writer}
+ * @param charsetName the name of the charset encoding
+ * @param bufferSize the size of the output buffer in number of characters
+ * @param writeImmediately If <tt>true</tt> the output buffer will be flushed after each
+ * write operation, i.e. all available data will be written to the
+ * underlying {@link Writer} immediately. If <tt>false</tt>, the
+ * output buffer will only be flushed when it overflows or when
+ * {@link #flush()} or {@link #close()} is called.
+ */
+ public WriterOutputStream(Writer writer, String charsetName, int bufferSize, boolean writeImmediately) {
+ this(writer, Charset.forName(charsetName), bufferSize, writeImmediately);
+ }
+
+ /**
+ * Constructs a new {@link WriterOutputStream} with a default output buffer size of
+ * 1024 characters. The output buffer will only be flushed when it overflows or when
+ * {@link #flush()} or {@link #close()} is called.
+ *
+ * @param writer the target {@link Writer}
+ * @param charsetName the name of the charset encoding
+ */
+ public WriterOutputStream(Writer writer, String charsetName) {
+ this(writer, charsetName, DEFAULT_BUFFER_SIZE, false);
+ }
+
+ /**
+ * Constructs a new {@link WriterOutputStream} that uses the default character encoding
+ * and with a default output buffer size of 1024 characters. The output buffer will only
+ * be flushed when it overflows or when {@link #flush()} or {@link #close()} is called.
+ *
+ * @param writer the target {@link Writer}
+ */
+ public WriterOutputStream(Writer writer) {
+ this(writer, Charset.defaultCharset(), DEFAULT_BUFFER_SIZE, false);
+ }
+
+ /**
+ * Write bytes from the specified byte array to the stream.
+ *
+ * @param b the byte array containing the bytes to write
+ * @param off the start offset in the byte array
+ * @param len the number of bytes to write
+ */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ while (len > 0) {
+ int c = Math.min(len, decoderIn.remaining());
+ decoderIn.put(b, off, c);
+ processInput(false);
+ len -= c;
+ off += c;
+ }
+ if (writeImmediately) {
+ flushOutput();
+ }
+ }
+
+ /**
+ * Write bytes from the specified byte array to the stream.
+ *
+ * @param b the byte array containing the bytes to write
+ */
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ /**
+ * Write a single byte to the stream.
+ *
+ * @param b the byte to write
+ */
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[] { (byte)b }, 0, 1);
+ }
+
+ /**
+ * Flush the stream. Any remaining content accumulated in the output buffer
+ * will be written to the underlying {@link Writer}. After that
+ * {@link Writer#flush()} will be called.
+ */
+ @Override
+ public void flush() throws IOException {
+ flushOutput();
+ writer.flush();
+ }
+
+ /**
+ * Close the stream. Any remaining content accumulated in the output buffer
+ * will be written to the underlying {@link Writer}. After that
+ * {@link Writer#close()} will be called.
+ */
+ @Override
+ public void close() throws IOException {
+ processInput(true);
+ flushOutput();
+ writer.close();
+ }
+
+ private void processInput(boolean endOfInput) throws IOException {
+ // Prepare decoderIn for reading
+ decoderIn.flip();
+ CoderResult coderResult;
+ while (true) {
+ coderResult = decoder.decode(decoderIn, decoderOut, endOfInput);
+ if (coderResult.isOverflow()) {
+ flushOutput();
+ } else if (coderResult.isUnderflow()) {
+ break;
+ } else {
+ // The decoder is configured to replace malformed input and unmappable characters,
+ // so we should not get here.
+ throw new IOException("Unexpected coder result");
+ }
+ }
+ // Discard the bytes that have been read
+ decoderIn.compact();
+ }
+
+ private void flushOutput() throws IOException {
+ if (decoderOut.position() > 0) {
+ writer.write(decoderOut.array(), 0, decoderOut.position());
+ decoderOut.rewind();
+ }
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeThreadFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeThreadFactory.java
new file mode 100644
index 0000000000..290ca08471
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeThreadFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This is a simple ThreadFactory implementation using java.util.concurrent
+ * Creates threads with the given name prefix
+ */
+public class NativeThreadFactory implements
+ ThreadFactory {
+
+ final ThreadGroup group;
+ final AtomicInteger count;
+ final String namePrefix;
+
+ public NativeThreadFactory(final ThreadGroup group, final String namePrefix) {
+ super();
+ this.count = new AtomicInteger(1);
+ this.group = group;
+ this.namePrefix = namePrefix;
+ }
+
+ public Thread newThread(final Runnable runnable) {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(this.namePrefix);
+ buffer.append('-');
+ buffer.append(this.count.getAndIncrement());
+ Thread t = new Thread(group, runnable, buffer.toString(), 0);
+ t.setDaemon(false);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+} \ No newline at end of file
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeWorkerPool.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeWorkerPool.java
new file mode 100644
index 0000000000..9401daee8e
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeWorkerPool.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Worker pool implementation based on java.util.concurrent in JDK 1.5 or later.
+ */
+public class NativeWorkerPool implements WorkerPool {
+
+ static final Log log = LogFactory.getLog(NativeWorkerPool.class);
+
+ private final ThreadPoolExecutor executor;
+ private final LinkedBlockingQueue<Runnable> blockingQueue;
+
+ public NativeWorkerPool(int core, int max, int keepAlive,
+ int queueLength, String threadGroupName, String threadGroupId) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Using native util.concurrent package..");
+ }
+ blockingQueue =
+ (queueLength == -1 ? new LinkedBlockingQueue<Runnable>()
+ : new LinkedBlockingQueue<Runnable>(queueLength));
+ executor = new ThreadPoolExecutor(
+ core, max, keepAlive,
+ TimeUnit.SECONDS,
+ blockingQueue,
+ new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId));
+ }
+
+ public void execute(final Runnable task) {
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ task.run();
+ } catch (Throwable t) {
+ log.error("Uncaught exception", t);
+ }
+ }
+ });
+ }
+
+ public int getActiveCount() {
+ return executor.getActiveCount();
+ }
+
+ public int getQueueSize() {
+ return blockingQueue.size();
+ }
+
+ public void shutdown(int timeout) throws InterruptedException {
+ executor.shutdown();
+ executor.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPool.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPool.java
new file mode 100644
index 0000000000..61745545ba
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPool.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads;
+
+public interface WorkerPool {
+ /**
+ * Asynchronously execute the given task using one of the threads of the worker pool.
+ * The task is expected to terminate gracefully, i.e. {@link Runnable#run()} should not
+ * throw an exception. Any uncaught exceptions should be logged by the worker pool
+ * implementation.
+ *
+ * @param task the task to execute
+ */
+ public void execute(Runnable task);
+
+ public int getActiveCount();
+ public int getQueueSize();
+
+ /**
+ * Destroy the worker pool. The pool will immediately stop
+ * accepting new tasks. All previously submitted tasks will
+ * be executed. The method blocks until all tasks have
+ * completed execution, or the timeout occurs, or the current
+ * thread is interrupted, whichever happens first.
+ *
+ * @param timeout the timeout value in milliseconds
+ * @throws InterruptedException if the current thread was
+ * interrupted while waiting for pending tasks to
+ * finish execution
+ */
+ public void shutdown(int timeout) throws InterruptedException;
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPoolFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPoolFactory.java
new file mode 100644
index 0000000000..f93c8580cd
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPoolFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads;
+
+/**
+ * Worker pool factory.
+ * For the moment this always creates {@link NativeWorkerPool} instances since
+ * we assume that we are running on Java 1.5 or above.
+ */
+public class WorkerPoolFactory {
+
+ public static WorkerPool getWorkerPool(int core, int max, int keepAlive,
+ int queueLength, String threadGroupName, String threadGroupId) {
+ return new NativeWorkerPool(
+ core, max, keepAlive, queueLength, threadGroupName, threadGroupId);
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceFilter.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceFilter.java
new file mode 100644
index 0000000000..8d49a2a437
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceFilter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker;
+
+import org.apache.axis2.description.AxisService;
+
+/**
+ * Filter for {@link AxisService} instances. This interface is used by
+ * {@link AxisServiceTracker}.
+ */
+public interface AxisServiceFilter {
+ /**
+ * Examine whether a given service matches the filter criteria.
+ *
+ * @param service the service to examine
+ * @return <code>true</code> if the service matches the filter criteria
+ */
+ boolean matches(AxisService service);
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTracker.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTracker.java
new file mode 100644
index 0000000000..fca85ee71e
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTracker.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.engine.AxisEvent;
+import org.apache.axis2.engine.AxisObserver;
+
+/**
+ * <p>Tracks services deployed in a given {@link AxisConfiguration}.
+ * The tracker is configured with references to three objects:</p>
+ * <ol>
+ * <li>An {@link AxisConfiguration} to watch.</li>
+ * <li>An {@link AxisServiceFilter} restricting the services to track.</li>
+ * <li>An {@link AxisServiceTrackerListener} receiving tracking events.</li>
+ * </ol>
+ * <p>An instance of this class maintains an up-to-date list of services
+ * satisfying all of the following criteria:</p>
+ * <ol>
+ * <li>The service is deployed in the given {@link AxisConfiguration}.</li>
+ * <li>The service is started, i.e. {@link AxisService#isActive()} returns true.</li>
+ * <li>The service matches the criteria specified by the given
+ * {@link AxisServiceFilter} instance.</li>
+ * </ol>
+ * <p>Whenever a service appears on the list, the tracker will call
+ * {@link AxisServiceTrackerListener#serviceAdded(AxisService)}. When a service disappears, it
+ * will call {@link AxisServiceTrackerListener#serviceRemoved(AxisService)}.</p>
+ * <p>When the tracker is created, it is initially in the stopped state. In this state no
+ * events will be sent to the listener. It can be started using {@link #start()} and stopped again
+ * using {@link #stop()}. The tracker list is defined to be empty when the tracker is in the
+ * stopped state. This implies that a call to {@link #start()} will generate
+ * {@link AxisServiceTrackerListener#serviceAdded(AxisService)} events for all services that meet
+ * the above criteria at that point in time. In the same way, {@link #stop()} will generate
+ * {@link AxisServiceTrackerListener#serviceRemoved(AxisService)} events for the current entries
+ * in the list.</p>
+ * <p>As a corollary the tracker guarantees that during a complete lifecycle (start-stop),
+ * there will be exactly one {@link AxisServiceTrackerListener#serviceRemoved(AxisService)} event
+ * for every {@link AxisServiceTrackerListener#serviceAdded(AxisService)} event and vice-versa.
+ * This property is important when the tracker is used to allocate resources for a dynamic set
+ * of services.</p>
+ *
+ * <h2>Limitations</h2>
+ *
+ * <p>The tracker is not able to detect property changes on services. E.g. if a service initially
+ * matches the filter criteria, but later changes so that it doesn't match the criteria any more,
+ * the tracker will not be able to detect this and the service will not be removed from the tracker
+ * list.</p>
+ */
+public class AxisServiceTracker {
+ private final AxisObserver observer = new AxisObserver() {
+ public void init(AxisConfiguration axisConfig) {}
+
+ public void serviceUpdate(AxisEvent event, final AxisService service) {
+ switch (event.getEventType()) {
+ case AxisEvent.SERVICE_DEPLOY:
+ case AxisEvent.SERVICE_START:
+ if (filter.matches(service)) {
+ boolean pending;
+ synchronized (lock) {
+ if (pending = (pendingActions != null)) {
+ pendingActions.add(new Runnable() {
+ public void run() {
+ serviceAdded(service);
+ }
+ });
+ }
+ }
+ if (!pending) {
+ serviceAdded(service);
+ }
+ }
+ break;
+ case AxisEvent.SERVICE_REMOVE:
+ case AxisEvent.SERVICE_STOP:
+ // Don't check filter here because the properties of the service may have
+ // changed in the meantime.
+ boolean pending;
+ synchronized (lock) {
+ if (pending = (pendingActions != null)) {
+ pendingActions.add(new Runnable() {
+ public void run() {
+ serviceRemoved(service);
+ }
+ });
+ }
+ }
+ if (!pending) {
+ serviceRemoved(service);
+ }
+ }
+ }
+
+ public void moduleUpdate(AxisEvent event, AxisModule module) {}
+ public void addParameter(Parameter param) throws AxisFault {}
+ public void removeParameter(Parameter param) throws AxisFault {}
+ public void deserializeParameters(OMElement parameterElement) throws AxisFault {}
+ public Parameter getParameter(String name) { return null; }
+ public ArrayList<Parameter> getParameters() { return null; }
+ public boolean isParameterLocked(String parameterName) { return false; }
+ public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup serviceGroup) {}
+ };
+
+ private final AxisConfiguration config;
+ final AxisServiceFilter filter;
+ private final AxisServiceTrackerListener listener;
+
+ /**
+ * Object used to synchronize access to {@link #pendingActions} and {@link #services}.
+ */
+ final Object lock = new Object();
+
+ /**
+ * Queue for notifications received by the {@link AxisObserver} during startup of the tracker.
+ * We need this because the events may already be reflected in the list of services returned
+ * by {@link AxisConfiguration#getServices()} (getting the list of currently deployed services
+ * and adding the observer can't be done atomically). It also allows us to make sure that
+ * events are sent to the listener in the right order, e.g. when a service is being removed
+ * during startup of the tracker.
+ */
+ Queue<Runnable> pendingActions;
+
+ /**
+ * The current list of services. <code>null</code> if the tracker is stopped.
+ */
+ private Set<AxisService> services;
+
+ public AxisServiceTracker(AxisConfiguration config, AxisServiceFilter filter,
+ AxisServiceTrackerListener listener) {
+ this.config = config;
+ this.filter = filter;
+ this.listener = listener;
+ }
+
+ /**
+ * Check whether the tracker is started.
+ *
+ * @return <code>true</code> if the tracker is started
+ */
+ public boolean isStarted() {
+ return services != null;
+ }
+
+ /**
+ * Start the tracker.
+ *
+ * @throws IllegalStateException if the tracker has already been started
+ */
+ public void start() {
+ if (services != null) {
+ throw new IllegalStateException();
+ }
+ synchronized (lock) {
+ pendingActions = new LinkedList<Runnable>();
+ config.addObservers(observer);
+ services = new HashSet<AxisService>();
+ }
+ for (Object o : config.getServices().values()) {
+ AxisService service = (AxisService)o;
+ if (service.isActive() && filter.matches(service)) {
+ serviceAdded(service);
+ }
+ }
+ while (true) {
+ Runnable action;
+ synchronized (lock) {
+ action = pendingActions.poll();
+ if (action == null) {
+ pendingActions = null;
+ break;
+ }
+ }
+ action.run();
+ }
+ }
+
+ void serviceAdded(AxisService service) {
+ // callListener may be false because the observer got an event for a service that
+ // was already in the initial list of services retrieved by AxisConfiguration#getServices.
+ boolean callListener;
+ synchronized (lock) {
+ callListener = services.add(service);
+ }
+ if (callListener) {
+ listener.serviceAdded(service);
+ }
+ }
+
+ void serviceRemoved(AxisService service) {
+ // callListener may be false because the observer invokes this method without applying the
+ // filter.
+ boolean callListener;
+ synchronized (lock) {
+ callListener = services.remove(service);
+ }
+ if (callListener) {
+ listener.serviceRemoved(service);
+ }
+ }
+
+ /**
+ * Stop the tracker.
+ *
+ * @throws IllegalStateException if the tracker is not started
+ */
+ public void stop() {
+ if (services == null) {
+ throw new IllegalStateException();
+ }
+ // TODO: This is very bad, but AxisConfiguration has no removeObserver method!
+ config.getObserversList().remove(observer);
+ for (AxisService service : services) {
+ listener.serviceRemoved(service);
+ }
+ services = null;
+ }
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTrackerListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTrackerListener.java
new file mode 100644
index 0000000000..ccca293732
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTrackerListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker;
+
+import org.apache.axis2.description.AxisService;
+
+/**
+ * Listener for events generated by an {@link AxisServiceTracker}.
+ */
+public interface AxisServiceTrackerListener {
+ /**
+ * Inform the listener that a service has been added to tracker list.
+ *
+ * @param service the service that has been added to the tracker list
+ */
+ void serviceAdded(AxisService service);
+
+ /**
+ * Inform the listener that a service has been removed from the tracker list.
+ *
+ * @param service the service that has been removed from the tracker list
+ */
+ void serviceRemoved(AxisService service);
+}
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/package-info.java
new file mode 100644
index 0000000000..eefadbbc1e
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Contains utility classes to track a dynamic set of services deployed in an
+ * Axis configuration.
+ *
+ * @see org.apache.axis2.transport.base.tracker.AxisServiceTracker
+ */
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker; \ No newline at end of file