summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2009-05-11 07:45:29 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2009-05-11 07:45:29 +0000
commita3cbf8e5ffabac239cd965d8c0f9c680a83246f7 (patch)
tree03eede7de9657506784538ce0e3786a808e4ab22 /branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
parenta7a97f2875dc162750736b9611e3e8bc8c13f145 (diff)
Add a new soap/jms transport module copied from the Apache WS Commons transports but with the code backported to work with Axis2 1.4.1
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@773489 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java267
1 files changed, 267 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
new file mode 100644
index 0000000000..0ee9d92443
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
@@ -0,0 +1,267 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.ParameterInclude;
+import org.apache.axis2.description.TransportInDescription;
+
+public abstract class AbstractPollingTransportListener<T extends AbstractPollTableEntry>
+ extends AbstractTransportListener {
+
+ /** The main timer. */
+ private Timer timer;
+ /** Keep the list of endpoints and poll durations */
+ private final List<T> pollTable = new ArrayList<T>();
+
+ @Override
+ public void init(ConfigurationContext cfgCtx,
+ TransportInDescription transportIn) throws AxisFault {
+
+ timer = new Timer("PollTimer");
+ super.init(cfgCtx, transportIn);
+ T entry = createPollTableEntry(transportIn);
+ if (entry != null) {
+ entry.setPollInterval(getPollInterval(transportIn));
+ schedulePoll(entry);
+ pollTable.add(entry);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ // Explicitly cancel all polls not predispatched to services. All other polls will
+ // be canceled by stopListeningForService. Pay attention to the fact the cancelPoll
+ // modifies pollTable.
+ List<T> entriesToCancel = new ArrayList<T>();
+ for (T entry : pollTable) {
+ if (entry.getService() == null) {
+ entriesToCancel.add(entry);
+ }
+ }
+ for (T entry : entriesToCancel) {
+ cancelPoll(entry);
+ }
+
+ super.destroy();
+ timer.cancel();
+ timer = null;
+ }
+
+ /**
+ * Schedule a repeated poll at the specified interval for a given service.
+ * The method will schedule a single-shot timer task with executes a work
+ * task on the worker pool. At the end of this work task, a new timer task
+ * is scheduled for the next poll (except if the polling for the service
+ * has been canceled). This effectively schedules the poll repeatedly
+ * with fixed delay.
+ * @param entry the poll table entry with the configuration for the service
+ * @param pollInterval the interval between successive polls in milliseconds
+ */
+ void schedulePoll(final T entry) {
+ final long pollInterval = entry.getPollInterval();
+ TimerTask timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ workerPool.execute(new Runnable() {
+ public void run() {
+ if (state == BaseConstants.PAUSED) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transport " + getTransportName() +
+ " poll trigger : Transport is currently paused..");
+ }
+ } else {
+ poll(entry);
+ }
+ }
+ });
+ }
+ };
+ entry.timerTask = timerTask;
+ if (entry.isConcurrentPollingAllowed()) {
+ timer.scheduleAtFixedRate(timerTask, pollInterval, pollInterval);
+ } else {
+ timer.schedule(timerTask, pollInterval);
+ }
+ }
+
+ private void cancelPoll(T entry) {
+ synchronized (entry) {
+ entry.timerTask.cancel();
+ entry.canceled = true;
+ }
+ pollTable.remove(entry);
+ }
+
+ protected abstract void poll(T entry);
+
+ protected void onPollCompletion(T entry) {
+ if (!entry.isConcurrentPollingAllowed()) {
+ synchronized (entry) {
+ if (!entry.canceled) {
+ schedulePoll(entry);
+ }
+ }
+ }
+ }
+
+ /**
+ * method to log a failure to the log file and to update the last poll status and time
+ * @param msg text for the log message
+ * @param e optional exception encountered or null
+ * @param entry the PollTableEntry
+ */
+ protected void processFailure(String msg, Exception e, T entry) {
+ if (e == null) {
+ log.error(msg);
+ } else {
+ log.error(msg, e);
+ }
+ long now = System.currentTimeMillis();
+ entry.setLastPollState(AbstractPollTableEntry.FAILED);
+ entry.setLastPollTime(now);
+ entry.setNextPollTime(now + entry.getPollInterval());
+ onPollCompletion(entry);
+ }
+
+ private long getPollInterval(ParameterInclude params) {
+ Parameter param = params.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
+ long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
+ if (param != null && param.getValue() instanceof String) {
+ String s = (String)param.getValue();
+ int multiplier;
+ if (s.endsWith("ms")) {
+ s = s.substring(0, s.length()-2);
+ multiplier = 1;
+ } else {
+ multiplier = 1000;
+ }
+ try {
+ pollInterval = Integer.parseInt(s) * multiplier;
+ } catch (NumberFormatException e) {
+ log.error("Invalid poll interval : " + param.getValue() + ", default to : "
+ + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e);
+ }
+ }
+ return pollInterval;
+ }
+
+ @Override
+ protected void startListeningForService(AxisService service) throws AxisFault {
+ T entry = createPollTableEntry(service);
+ if (entry == null) {
+ throw new AxisFault("The service has no configuration for the transport");
+ }
+ entry.setService(service);
+ entry.setPollInterval(getPollInterval(service));
+ schedulePoll(entry);
+ pollTable.add(entry);
+ }
+
+ /**
+ * Create a poll table entry based on the provided parameters.
+ * If no relevant parameters are found, the implementation should
+ * return null. An exception should only be thrown if there is an
+ * error or inconsistency in the parameters.
+ *
+ * @param params The source of the parameters to construct the
+ * poll table entry. If the parameters were defined on
+ * a service, this will be an {@link AxisService}
+ * instance.
+ * @return
+ */
+ protected abstract T createPollTableEntry(ParameterInclude params) throws AxisFault;
+
+ /**
+ * Get the EPR for the given service
+ *
+ * @param serviceName service name
+ * @param ip ignored
+ * @return the EPR for the service
+ * @throws AxisFault not used
+ */
+ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
+ for (T entry : pollTable) {
+ AxisService service = entry.getService();
+ if (service != null) {
+ String candidateName = service.getName();
+ if (candidateName.equals(serviceName) ||
+ serviceName.startsWith(candidateName + ".")) {
+ return new EndpointReference[]{ entry.getEndpointReference() };
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void stopListeningForService(AxisService service) {
+ for (T entry : pollTable) {
+ if (service == entry.getService()) {
+ cancelPoll(entry);
+ break;
+ }
+ }
+ }
+
+ // -- jmx/management methods--
+ /**
+ * Pause the listener - Stop accepting/processing new messages, but continues processing existing
+ * messages until they complete. This helps bring an instance into a maintenence mode
+ * @throws org.apache.axis2.AxisFault on error
+ */
+ public void pause() throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ state = BaseConstants.PAUSED;
+ log.info("Listener paused");
+ }
+
+ /**
+ * Resume the lister - Brings the lister into active mode back from a paused state
+ * @throws AxisFault on error
+ */
+ public void resume() throws AxisFault {
+ if (state != BaseConstants.PAUSED) return;
+ state = BaseConstants.STARTED;
+ log.info("Listener resumed");
+ }
+
+ /**
+ * Stop processing new messages, and wait the specified maximum time for in-flight
+ * requests to complete before a controlled shutdown for maintenence
+ *
+ * @param millis a number of milliseconds to wait until pending requests are allowed to complete
+ * @throws AxisFault on error
+ */
+ public void maintenenceShutdown(long millis) throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ stop();
+ state = BaseConstants.STOPPED;
+ log.info("Listener shutdown");
+ }
+}