diff options
author | antelder <antelder@13f79535-47bb-0310-9956-ffa450edef68> | 2009-05-11 07:45:29 +0000 |
---|---|---|
committer | antelder <antelder@13f79535-47bb-0310-9956-ffa450edef68> | 2009-05-11 07:45:29 +0000 |
commit | a3cbf8e5ffabac239cd965d8c0f9c680a83246f7 (patch) | |
tree | 03eede7de9657506784538ce0e3786a808e4ab22 /branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java | |
parent | a7a97f2875dc162750736b9611e3e8bc8c13f145 (diff) |
Add a new soap/jms transport module copied from the Apache WS Commons transports but with the code backported to work with Axis2 1.4.1
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@773489 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java')
-rw-r--r-- | branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java new file mode 100644 index 0000000000..0ee9d92443 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java @@ -0,0 +1,267 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterInclude; +import org.apache.axis2.description.TransportInDescription; + +public abstract class AbstractPollingTransportListener<T extends AbstractPollTableEntry> + extends AbstractTransportListener { + + /** The main timer. */ + private Timer timer; + /** Keep the list of endpoints and poll durations */ + private final List<T> pollTable = new ArrayList<T>(); + + @Override + public void init(ConfigurationContext cfgCtx, + TransportInDescription transportIn) throws AxisFault { + + timer = new Timer("PollTimer"); + super.init(cfgCtx, transportIn); + T entry = createPollTableEntry(transportIn); + if (entry != null) { + entry.setPollInterval(getPollInterval(transportIn)); + schedulePoll(entry); + pollTable.add(entry); + } + } + + @Override + public void destroy() { + // Explicitly cancel all polls not predispatched to services. All other polls will + // be canceled by stopListeningForService. Pay attention to the fact the cancelPoll + // modifies pollTable. + List<T> entriesToCancel = new ArrayList<T>(); + for (T entry : pollTable) { + if (entry.getService() == null) { + entriesToCancel.add(entry); + } + } + for (T entry : entriesToCancel) { + cancelPoll(entry); + } + + super.destroy(); + timer.cancel(); + timer = null; + } + + /** + * Schedule a repeated poll at the specified interval for a given service. + * The method will schedule a single-shot timer task with executes a work + * task on the worker pool. At the end of this work task, a new timer task + * is scheduled for the next poll (except if the polling for the service + * has been canceled). This effectively schedules the poll repeatedly + * with fixed delay. + * @param entry the poll table entry with the configuration for the service + * @param pollInterval the interval between successive polls in milliseconds + */ + void schedulePoll(final T entry) { + final long pollInterval = entry.getPollInterval(); + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + workerPool.execute(new Runnable() { + public void run() { + if (state == BaseConstants.PAUSED) { + if (log.isDebugEnabled()) { + log.debug("Transport " + getTransportName() + + " poll trigger : Transport is currently paused.."); + } + } else { + poll(entry); + } + } + }); + } + }; + entry.timerTask = timerTask; + if (entry.isConcurrentPollingAllowed()) { + timer.scheduleAtFixedRate(timerTask, pollInterval, pollInterval); + } else { + timer.schedule(timerTask, pollInterval); + } + } + + private void cancelPoll(T entry) { + synchronized (entry) { + entry.timerTask.cancel(); + entry.canceled = true; + } + pollTable.remove(entry); + } + + protected abstract void poll(T entry); + + protected void onPollCompletion(T entry) { + if (!entry.isConcurrentPollingAllowed()) { + synchronized (entry) { + if (!entry.canceled) { + schedulePoll(entry); + } + } + } + } + + /** + * method to log a failure to the log file and to update the last poll status and time + * @param msg text for the log message + * @param e optional exception encountered or null + * @param entry the PollTableEntry + */ + protected void processFailure(String msg, Exception e, T entry) { + if (e == null) { + log.error(msg); + } else { + log.error(msg, e); + } + long now = System.currentTimeMillis(); + entry.setLastPollState(AbstractPollTableEntry.FAILED); + entry.setLastPollTime(now); + entry.setNextPollTime(now + entry.getPollInterval()); + onPollCompletion(entry); + } + + private long getPollInterval(ParameterInclude params) { + Parameter param = params.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL); + long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL; + if (param != null && param.getValue() instanceof String) { + String s = (String)param.getValue(); + int multiplier; + if (s.endsWith("ms")) { + s = s.substring(0, s.length()-2); + multiplier = 1; + } else { + multiplier = 1000; + } + try { + pollInterval = Integer.parseInt(s) * multiplier; + } catch (NumberFormatException e) { + log.error("Invalid poll interval : " + param.getValue() + ", default to : " + + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e); + } + } + return pollInterval; + } + + @Override + protected void startListeningForService(AxisService service) throws AxisFault { + T entry = createPollTableEntry(service); + if (entry == null) { + throw new AxisFault("The service has no configuration for the transport"); + } + entry.setService(service); + entry.setPollInterval(getPollInterval(service)); + schedulePoll(entry); + pollTable.add(entry); + } + + /** + * Create a poll table entry based on the provided parameters. + * If no relevant parameters are found, the implementation should + * return null. An exception should only be thrown if there is an + * error or inconsistency in the parameters. + * + * @param params The source of the parameters to construct the + * poll table entry. If the parameters were defined on + * a service, this will be an {@link AxisService} + * instance. + * @return + */ + protected abstract T createPollTableEntry(ParameterInclude params) throws AxisFault; + + /** + * Get the EPR for the given service + * + * @param serviceName service name + * @param ip ignored + * @return the EPR for the service + * @throws AxisFault not used + */ + public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + for (T entry : pollTable) { + AxisService service = entry.getService(); + if (service != null) { + String candidateName = service.getName(); + if (candidateName.equals(serviceName) || + serviceName.startsWith(candidateName + ".")) { + return new EndpointReference[]{ entry.getEndpointReference() }; + } + } + } + return null; + } + + @Override + protected void stopListeningForService(AxisService service) { + for (T entry : pollTable) { + if (service == entry.getService()) { + cancelPoll(entry); + break; + } + } + } + + // -- jmx/management methods-- + /** + * Pause the listener - Stop accepting/processing new messages, but continues processing existing + * messages until they complete. This helps bring an instance into a maintenence mode + * @throws org.apache.axis2.AxisFault on error + */ + public void pause() throws AxisFault { + if (state != BaseConstants.STARTED) return; + state = BaseConstants.PAUSED; + log.info("Listener paused"); + } + + /** + * Resume the lister - Brings the lister into active mode back from a paused state + * @throws AxisFault on error + */ + public void resume() throws AxisFault { + if (state != BaseConstants.PAUSED) return; + state = BaseConstants.STARTED; + log.info("Listener resumed"); + } + + /** + * Stop processing new messages, and wait the specified maximum time for in-flight + * requests to complete before a controlled shutdown for maintenence + * + * @param millis a number of milliseconds to wait until pending requests are allowed to complete + * @throws AxisFault on error + */ + public void maintenenceShutdown(long millis) throws AxisFault { + if (state != BaseConstants.STARTED) return; + stop(); + state = BaseConstants.STOPPED; + log.info("Listener shutdown"); + } +} |