summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java267
1 files changed, 0 insertions, 267 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
deleted file mode 100644
index 0ee9d92443..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.transport.base;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterInclude;
-import org.apache.axis2.description.TransportInDescription;
-
-public abstract class AbstractPollingTransportListener<T extends AbstractPollTableEntry>
- extends AbstractTransportListener {
-
- /** The main timer. */
- private Timer timer;
- /** Keep the list of endpoints and poll durations */
- private final List<T> pollTable = new ArrayList<T>();
-
- @Override
- public void init(ConfigurationContext cfgCtx,
- TransportInDescription transportIn) throws AxisFault {
-
- timer = new Timer("PollTimer");
- super.init(cfgCtx, transportIn);
- T entry = createPollTableEntry(transportIn);
- if (entry != null) {
- entry.setPollInterval(getPollInterval(transportIn));
- schedulePoll(entry);
- pollTable.add(entry);
- }
- }
-
- @Override
- public void destroy() {
- // Explicitly cancel all polls not predispatched to services. All other polls will
- // be canceled by stopListeningForService. Pay attention to the fact the cancelPoll
- // modifies pollTable.
- List<T> entriesToCancel = new ArrayList<T>();
- for (T entry : pollTable) {
- if (entry.getService() == null) {
- entriesToCancel.add(entry);
- }
- }
- for (T entry : entriesToCancel) {
- cancelPoll(entry);
- }
-
- super.destroy();
- timer.cancel();
- timer = null;
- }
-
- /**
- * Schedule a repeated poll at the specified interval for a given service.
- * The method will schedule a single-shot timer task with executes a work
- * task on the worker pool. At the end of this work task, a new timer task
- * is scheduled for the next poll (except if the polling for the service
- * has been canceled). This effectively schedules the poll repeatedly
- * with fixed delay.
- * @param entry the poll table entry with the configuration for the service
- * @param pollInterval the interval between successive polls in milliseconds
- */
- void schedulePoll(final T entry) {
- final long pollInterval = entry.getPollInterval();
- TimerTask timerTask = new TimerTask() {
- @Override
- public void run() {
- workerPool.execute(new Runnable() {
- public void run() {
- if (state == BaseConstants.PAUSED) {
- if (log.isDebugEnabled()) {
- log.debug("Transport " + getTransportName() +
- " poll trigger : Transport is currently paused..");
- }
- } else {
- poll(entry);
- }
- }
- });
- }
- };
- entry.timerTask = timerTask;
- if (entry.isConcurrentPollingAllowed()) {
- timer.scheduleAtFixedRate(timerTask, pollInterval, pollInterval);
- } else {
- timer.schedule(timerTask, pollInterval);
- }
- }
-
- private void cancelPoll(T entry) {
- synchronized (entry) {
- entry.timerTask.cancel();
- entry.canceled = true;
- }
- pollTable.remove(entry);
- }
-
- protected abstract void poll(T entry);
-
- protected void onPollCompletion(T entry) {
- if (!entry.isConcurrentPollingAllowed()) {
- synchronized (entry) {
- if (!entry.canceled) {
- schedulePoll(entry);
- }
- }
- }
- }
-
- /**
- * method to log a failure to the log file and to update the last poll status and time
- * @param msg text for the log message
- * @param e optional exception encountered or null
- * @param entry the PollTableEntry
- */
- protected void processFailure(String msg, Exception e, T entry) {
- if (e == null) {
- log.error(msg);
- } else {
- log.error(msg, e);
- }
- long now = System.currentTimeMillis();
- entry.setLastPollState(AbstractPollTableEntry.FAILED);
- entry.setLastPollTime(now);
- entry.setNextPollTime(now + entry.getPollInterval());
- onPollCompletion(entry);
- }
-
- private long getPollInterval(ParameterInclude params) {
- Parameter param = params.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
- long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
- if (param != null && param.getValue() instanceof String) {
- String s = (String)param.getValue();
- int multiplier;
- if (s.endsWith("ms")) {
- s = s.substring(0, s.length()-2);
- multiplier = 1;
- } else {
- multiplier = 1000;
- }
- try {
- pollInterval = Integer.parseInt(s) * multiplier;
- } catch (NumberFormatException e) {
- log.error("Invalid poll interval : " + param.getValue() + ", default to : "
- + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e);
- }
- }
- return pollInterval;
- }
-
- @Override
- protected void startListeningForService(AxisService service) throws AxisFault {
- T entry = createPollTableEntry(service);
- if (entry == null) {
- throw new AxisFault("The service has no configuration for the transport");
- }
- entry.setService(service);
- entry.setPollInterval(getPollInterval(service));
- schedulePoll(entry);
- pollTable.add(entry);
- }
-
- /**
- * Create a poll table entry based on the provided parameters.
- * If no relevant parameters are found, the implementation should
- * return null. An exception should only be thrown if there is an
- * error or inconsistency in the parameters.
- *
- * @param params The source of the parameters to construct the
- * poll table entry. If the parameters were defined on
- * a service, this will be an {@link AxisService}
- * instance.
- * @return
- */
- protected abstract T createPollTableEntry(ParameterInclude params) throws AxisFault;
-
- /**
- * Get the EPR for the given service
- *
- * @param serviceName service name
- * @param ip ignored
- * @return the EPR for the service
- * @throws AxisFault not used
- */
- public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
- for (T entry : pollTable) {
- AxisService service = entry.getService();
- if (service != null) {
- String candidateName = service.getName();
- if (candidateName.equals(serviceName) ||
- serviceName.startsWith(candidateName + ".")) {
- return new EndpointReference[]{ entry.getEndpointReference() };
- }
- }
- }
- return null;
- }
-
- @Override
- protected void stopListeningForService(AxisService service) {
- for (T entry : pollTable) {
- if (service == entry.getService()) {
- cancelPoll(entry);
- break;
- }
- }
- }
-
- // -- jmx/management methods--
- /**
- * Pause the listener - Stop accepting/processing new messages, but continues processing existing
- * messages until they complete. This helps bring an instance into a maintenence mode
- * @throws org.apache.axis2.AxisFault on error
- */
- public void pause() throws AxisFault {
- if (state != BaseConstants.STARTED) return;
- state = BaseConstants.PAUSED;
- log.info("Listener paused");
- }
-
- /**
- * Resume the lister - Brings the lister into active mode back from a paused state
- * @throws AxisFault on error
- */
- public void resume() throws AxisFault {
- if (state != BaseConstants.PAUSED) return;
- state = BaseConstants.STARTED;
- log.info("Listener resumed");
- }
-
- /**
- * Stop processing new messages, and wait the specified maximum time for in-flight
- * requests to complete before a controlled shutdown for maintenence
- *
- * @param millis a number of milliseconds to wait until pending requests are allowed to complete
- * @throws AxisFault on error
- */
- public void maintenenceShutdown(long millis) throws AxisFault {
- if (state != BaseConstants.STARTED) return;
- stop();
- state = BaseConstants.STOPPED;
- log.info("Listener shutdown");
- }
-}