diff options
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base')
38 files changed, 4487 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollTableEntry.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollTableEntry.java new file mode 100644 index 0000000000..fa64f08a25 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollTableEntry.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.util.TimerTask; + +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.description.AxisService; + +public abstract class AbstractPollTableEntry { + // status of last scan + public static final int SUCCSESSFUL = 0; + public static final int WITH_ERRORS = 1; + public static final int FAILED = 2; + public static final int NONE = 3; + + /** Axis2 service */ + private AxisService service; + /** next poll time */ + private long nextPollTime; + /** last poll performed at */ + private long lastPollTime; + /** duration in ms between successive polls */ + private long pollInterval; + /** state of the last poll */ + private int lastPollState; + /** can polling occur in parallel? */ + private boolean concurrentPollingAllowed = false; + /** The timer task that will trigger the next poll */ + TimerTask timerTask; + /** Flag indicating whether polling has been canceled. */ + boolean canceled; + + public AxisService getService() { + return service; + } + + void setService(AxisService service) { + this.service = service; + } + + public abstract EndpointReference getEndpointReference(); + + public long getNextPollTime() { + return nextPollTime; + } + + public void setNextPollTime(long nextPollTime) { + this.nextPollTime = nextPollTime; + } + + public long getLastPollTime() { + return lastPollTime; + } + + public void setLastPollTime(long lastPollTime) { + this.lastPollTime = lastPollTime; + } + + public long getPollInterval() { + return pollInterval; + } + + public void setPollInterval(long pollInterval) { + this.pollInterval = pollInterval; + } + + public int getLastPollState() { + return lastPollState; + } + + public void setLastPollState(int lastPollState) { + this.lastPollState = lastPollState; + } + + public boolean isConcurrentPollingAllowed() { + return concurrentPollingAllowed; + } + + public void setConcurrentPollingAllowed(boolean concurrentPollingAllowed) { + this.concurrentPollingAllowed = concurrentPollingAllowed; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java new file mode 100644 index 0000000000..0ee9d92443 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractPollingTransportListener.java @@ -0,0 +1,267 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterInclude; +import org.apache.axis2.description.TransportInDescription; + +public abstract class AbstractPollingTransportListener<T extends AbstractPollTableEntry> + extends AbstractTransportListener { + + /** The main timer. */ + private Timer timer; + /** Keep the list of endpoints and poll durations */ + private final List<T> pollTable = new ArrayList<T>(); + + @Override + public void init(ConfigurationContext cfgCtx, + TransportInDescription transportIn) throws AxisFault { + + timer = new Timer("PollTimer"); + super.init(cfgCtx, transportIn); + T entry = createPollTableEntry(transportIn); + if (entry != null) { + entry.setPollInterval(getPollInterval(transportIn)); + schedulePoll(entry); + pollTable.add(entry); + } + } + + @Override + public void destroy() { + // Explicitly cancel all polls not predispatched to services. All other polls will + // be canceled by stopListeningForService. Pay attention to the fact the cancelPoll + // modifies pollTable. + List<T> entriesToCancel = new ArrayList<T>(); + for (T entry : pollTable) { + if (entry.getService() == null) { + entriesToCancel.add(entry); + } + } + for (T entry : entriesToCancel) { + cancelPoll(entry); + } + + super.destroy(); + timer.cancel(); + timer = null; + } + + /** + * Schedule a repeated poll at the specified interval for a given service. + * The method will schedule a single-shot timer task with executes a work + * task on the worker pool. At the end of this work task, a new timer task + * is scheduled for the next poll (except if the polling for the service + * has been canceled). This effectively schedules the poll repeatedly + * with fixed delay. + * @param entry the poll table entry with the configuration for the service + * @param pollInterval the interval between successive polls in milliseconds + */ + void schedulePoll(final T entry) { + final long pollInterval = entry.getPollInterval(); + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + workerPool.execute(new Runnable() { + public void run() { + if (state == BaseConstants.PAUSED) { + if (log.isDebugEnabled()) { + log.debug("Transport " + getTransportName() + + " poll trigger : Transport is currently paused.."); + } + } else { + poll(entry); + } + } + }); + } + }; + entry.timerTask = timerTask; + if (entry.isConcurrentPollingAllowed()) { + timer.scheduleAtFixedRate(timerTask, pollInterval, pollInterval); + } else { + timer.schedule(timerTask, pollInterval); + } + } + + private void cancelPoll(T entry) { + synchronized (entry) { + entry.timerTask.cancel(); + entry.canceled = true; + } + pollTable.remove(entry); + } + + protected abstract void poll(T entry); + + protected void onPollCompletion(T entry) { + if (!entry.isConcurrentPollingAllowed()) { + synchronized (entry) { + if (!entry.canceled) { + schedulePoll(entry); + } + } + } + } + + /** + * method to log a failure to the log file and to update the last poll status and time + * @param msg text for the log message + * @param e optional exception encountered or null + * @param entry the PollTableEntry + */ + protected void processFailure(String msg, Exception e, T entry) { + if (e == null) { + log.error(msg); + } else { + log.error(msg, e); + } + long now = System.currentTimeMillis(); + entry.setLastPollState(AbstractPollTableEntry.FAILED); + entry.setLastPollTime(now); + entry.setNextPollTime(now + entry.getPollInterval()); + onPollCompletion(entry); + } + + private long getPollInterval(ParameterInclude params) { + Parameter param = params.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL); + long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL; + if (param != null && param.getValue() instanceof String) { + String s = (String)param.getValue(); + int multiplier; + if (s.endsWith("ms")) { + s = s.substring(0, s.length()-2); + multiplier = 1; + } else { + multiplier = 1000; + } + try { + pollInterval = Integer.parseInt(s) * multiplier; + } catch (NumberFormatException e) { + log.error("Invalid poll interval : " + param.getValue() + ", default to : " + + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e); + } + } + return pollInterval; + } + + @Override + protected void startListeningForService(AxisService service) throws AxisFault { + T entry = createPollTableEntry(service); + if (entry == null) { + throw new AxisFault("The service has no configuration for the transport"); + } + entry.setService(service); + entry.setPollInterval(getPollInterval(service)); + schedulePoll(entry); + pollTable.add(entry); + } + + /** + * Create a poll table entry based on the provided parameters. + * If no relevant parameters are found, the implementation should + * return null. An exception should only be thrown if there is an + * error or inconsistency in the parameters. + * + * @param params The source of the parameters to construct the + * poll table entry. If the parameters were defined on + * a service, this will be an {@link AxisService} + * instance. + * @return + */ + protected abstract T createPollTableEntry(ParameterInclude params) throws AxisFault; + + /** + * Get the EPR for the given service + * + * @param serviceName service name + * @param ip ignored + * @return the EPR for the service + * @throws AxisFault not used + */ + public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + for (T entry : pollTable) { + AxisService service = entry.getService(); + if (service != null) { + String candidateName = service.getName(); + if (candidateName.equals(serviceName) || + serviceName.startsWith(candidateName + ".")) { + return new EndpointReference[]{ entry.getEndpointReference() }; + } + } + } + return null; + } + + @Override + protected void stopListeningForService(AxisService service) { + for (T entry : pollTable) { + if (service == entry.getService()) { + cancelPoll(entry); + break; + } + } + } + + // -- jmx/management methods-- + /** + * Pause the listener - Stop accepting/processing new messages, but continues processing existing + * messages until they complete. This helps bring an instance into a maintenence mode + * @throws org.apache.axis2.AxisFault on error + */ + public void pause() throws AxisFault { + if (state != BaseConstants.STARTED) return; + state = BaseConstants.PAUSED; + log.info("Listener paused"); + } + + /** + * Resume the lister - Brings the lister into active mode back from a paused state + * @throws AxisFault on error + */ + public void resume() throws AxisFault { + if (state != BaseConstants.PAUSED) return; + state = BaseConstants.STARTED; + log.info("Listener resumed"); + } + + /** + * Stop processing new messages, and wait the specified maximum time for in-flight + * requests to complete before a controlled shutdown for maintenence + * + * @param millis a number of milliseconds to wait until pending requests are allowed to complete + * @throws AxisFault on error + */ + public void maintenenceShutdown(long millis) throws AxisFault { + if (state != BaseConstants.STARTED) return; + stop(); + state = BaseConstants.STOPPED; + log.info("Listener shutdown"); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java new file mode 100644 index 0000000000..440e09dc84 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportListener.java @@ -0,0 +1,550 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.axiom.om.util.UUIDGenerator; +import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.context.SessionContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.TransportInDescription; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.engine.AxisEngine; +import org.apache.axis2.transport.TransportListener; +import org.apache.axis2.util.MessageContextBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPoolFactory; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceFilter; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceTracker; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker.AxisServiceTrackerListener; + +public abstract class AbstractTransportListener implements TransportListener { + + /** the reference to the actual commons logger to be used for log messages */ + protected Log log = null; + + /** the axis2 configuration context */ + protected ConfigurationContext cfgCtx = null; + + /** transport in description */ + private TransportInDescription transportIn = null; + /** transport out description */ + private TransportOutDescription transportOut = null; + /** state of the listener */ + protected int state = BaseConstants.STOPPED; + /** is this transport non-blocking? */ + protected boolean isNonBlocking = false; + /** + * Service tracker used to invoke {@link #internalStartListeningForService(AxisService)} + * and {@link #internalStopListeningForService(AxisService)}. */ + private AxisServiceTracker serviceTracker; + + /** the thread pool to execute actual poll invocations */ + protected WorkerPool workerPool = null; + /** use the thread pool available in the axis2 configuration context */ + protected boolean useAxis2ThreadPool = false; + /** JMX support */ + private TransportMBeanSupport mbeanSupport; + /** Metrics collector for this transport */ + protected MetricsCollector metrics = new MetricsCollector(); + + /** + * A constructor that makes subclasses pick up the correct logger + */ + protected AbstractTransportListener() { + log = LogFactory.getLog(this.getClass()); + } + + /** + * Initialize the generic transport. Sets up the transport and the thread pool to be used + * for message processing. Also creates an AxisObserver that gets notified of service + * life cycle events for the transport to act on + * @param cfgCtx the axis configuration context + * @param transportIn the transport-in description + * @throws AxisFault on error + */ + public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn) + throws AxisFault { + + this.cfgCtx = cfgCtx; + this.transportIn = transportIn; + this.transportOut = cfgCtx.getAxisConfiguration().getTransportOut(getTransportName()); + + if (useAxis2ThreadPool) { + //this.workerPool = cfgCtx.getThreadPool(); not yet implemented + throw new AxisFault("Unsupported thread pool for task execution - Axis2 thread pool"); + } else { + this.workerPool = WorkerPoolFactory.getWorkerPool( + 10, 20, 5, -1, getTransportName() + "Server Worker thread group", getTransportName() + "-Worker"); + } + + // register to receive updates on services for lifetime management + serviceTracker = new AxisServiceTracker( + cfgCtx.getAxisConfiguration(), + new AxisServiceFilter() { + public boolean matches(AxisService service) { + return !service.getName().startsWith("__") // these are "private" services + && BaseUtils.isUsingTransport(service, getTransportName()); + } + }, + new AxisServiceTrackerListener() { + public void serviceAdded(AxisService service) { + internalStartListeningForService(service); + } + + public void serviceRemoved(AxisService service) { + internalStopListeningForService(service); + } + }); + + // register with JMX + mbeanSupport = new TransportMBeanSupport(this, getTransportName()); + mbeanSupport.register(); + } + + public void destroy() { + try { + if (state == BaseConstants.STARTED) { + try { + stop(); + } catch (AxisFault ignore) { + log.warn("Error stopping the transport : " + getTransportName()); + } + } + } finally { + state = BaseConstants.STOPPED; + mbeanSupport.unregister(); + } + try { + workerPool.shutdown(10000); + } catch (InterruptedException ex) { + log.warn("Thread interrupted while waiting for worker pool to shut down"); + } + } + + public void stop() throws AxisFault { + if (state == BaseConstants.STARTED) { + state = BaseConstants.STOPPED; + // cancel receipt of service lifecycle events + log.info(getTransportName().toUpperCase() + " Listener Shutdown"); + serviceTracker.stop(); + } + } + + public void start() throws AxisFault { + if (state != BaseConstants.STARTED) { + state = BaseConstants.STARTED; + // register to receive updates on services for lifetime management + // cfgCtx.getAxisConfiguration().addObservers(axisObserver); + log.info(getTransportName().toUpperCase() + " Listener started"); + // iterate through deployed services and start + serviceTracker.start(); + } + } + + public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + return getEPRsForService(serviceName); + } + + protected EndpointReference[] getEPRsForService(String serviceName) { + return null; + } + + public void disableTransportForService(AxisService service) { + + log.warn("Disabling the " + getTransportName() + " transport for the service " + + service.getName() + ", because it is not configured properly for the service"); + + if (service.isEnableAllTransports()) { + ArrayList<String> exposedTransports = new ArrayList<String>(); + for(Object obj: cfgCtx.getAxisConfiguration().getTransportsIn().values()) { + String transportName = ((TransportInDescription) obj).getName(); + if (!transportName.equals(getTransportName())) { + exposedTransports.add(transportName); + } + } + service.setEnableAllTransports(false); + service.setExposedTransports(exposedTransports); + } else { + service.removeExposedTransport(getTransportName()); + } + } + + void internalStartListeningForService(AxisService service) { + String serviceName = service.getName(); + try { + startListeningForService(service); + } catch (AxisFault ex) { + String transportName = getTransportName().toUpperCase(); + String msg = "Unable to configure the service " + serviceName + " for the " + + transportName + " transport: " + ex.getMessage() + ". " + + "This service is being marked as faulty and will not be available over the " + + transportName + " transport."; + // Only log the message at level WARN and log the full stack trace at level DEBUG. + // TODO: We should have a way to distinguish a missing configuration + // from an error. This may be addressed when implementing the enhancement + // described in point 3 of http://markmail.org/message/umhenrurlrekk5jh + log.warn(msg); + log.debug("Disabling service " + serviceName + " for the " + transportName + + "transport", ex); + BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration()); + disableTransportForService(service); + return; + } catch (Throwable ex) { + String msg = "Unexpected error when configuring service " + serviceName + + " for the " + getTransportName().toUpperCase() + " transport. It will be" + + " disabled for this transport and marked as faulty."; + log.error(msg, ex); + BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration()); + disableTransportForService(service); + return; + } + registerMBean(new TransportListenerEndpointView(this, serviceName), + getEndpointMBeanName(serviceName)); + } + + void internalStopListeningForService(AxisService service) { + unregisterMBean(getEndpointMBeanName(service.getName())); + stopListeningForService(service); + } + + protected abstract void startListeningForService(AxisService service) throws AxisFault; + + protected abstract void stopListeningForService(AxisService service); + + /** + * This is a deprecated method in Axis2 and this default implementation returns the first + * result from the getEPRsForService() method + */ + public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault { + return getEPRsForService(serviceName, ip)[0]; + } + + public SessionContext getSessionContext(MessageContext messageContext) { + return null; + } + + /** + * Create a new axis MessageContext for an incoming message through this transport + * @return the newly created message context + */ + public MessageContext createMessageContext() { + MessageContext msgCtx = new MessageContext(); + msgCtx.setConfigurationContext(cfgCtx); + + msgCtx.setIncomingTransportName(getTransportName()); + msgCtx.setTransportOut(transportOut); + msgCtx.setTransportIn(transportIn); + msgCtx.setServerSide(true); + msgCtx.setMessageID(UUIDGenerator.getUUID()); + + // There is a discrepency in what I thought, Axis2 spawns a nes threads to + // send a message is this is TRUE - and I want it to be the other way + msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.valueOf(!isNonBlocking)); + + // are these relevant? + //msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID()); + // this is required to support Sandesha 2 + //msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, + // new HttpCoreRequestResponseTransport(msgContext)); + + return msgCtx; + } + + /** + * Process a new incoming message through the axis engine + * @param msgCtx the axis MessageContext + * @param trpHeaders the map containing transport level message headers + * @param soapAction the optional soap action or null + * @param contentType the optional content-type for the message + */ + public void handleIncomingMessage( + MessageContext msgCtx, Map trpHeaders, + String soapAction, String contentType) throws AxisFault { + + // set the soapaction if one is available via a transport header + if (soapAction != null) { + msgCtx.setSoapAction(soapAction); + } + + // set the transport headers to the message context + msgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, trpHeaders); + + // send the message context through the axis engine + try { + // check if an Axis2 callback has been registered for this message + Map callBackMap = (Map) msgCtx.getConfigurationContext(). + getProperty(BaseConstants.CALLBACK_TABLE); + // FIXME: transport headers are protocol specific; the correlation ID should be + // passed as argument to this method + Object replyToMessageID = trpHeaders.get(BaseConstants.HEADER_IN_REPLY_TO); + // if there is a callback registerd with this replyto ID then this has to + // be handled as a synchronous incoming message, via the + if (replyToMessageID != null && callBackMap != null && + callBackMap.get(replyToMessageID) != null) { + + SynchronousCallback synchronousCallback = + (SynchronousCallback) callBackMap.get(replyToMessageID); + synchronousCallback.setInMessageContext(msgCtx); + callBackMap.remove(replyToMessageID); + } else { + AxisEngine.receive(msgCtx); + } + + } catch (AxisFault e) { + if (log.isDebugEnabled()) { + log.debug("Error receiving message", e); + } + if (msgCtx.isServerSide()) { + AxisEngine.sendFault(MessageContextBuilder.createFaultMessageContext(msgCtx, e)); + } + } + } + + protected void handleException(String msg, Exception e) throws AxisFault { + log.error(msg, e); + throw new AxisFault(msg, e); + } + + protected void logException(String msg, Exception e) { + log.error(msg, e); + } + + public String getTransportName() { + return transportIn.getName(); + } + + public MetricsCollector getMetricsCollector() { + return metrics; + } + + // -- jmx/management methods-- + /** + * Pause the listener - Stop accepting/processing new messages, but continues processing existing + * messages until they complete. This helps bring an instance into a maintenence mode + * @throws AxisFault on error + */ + public void pause() throws AxisFault {} + /** + * Resume the lister - Brings the lister into active mode back from a paused state + * @throws AxisFault on error + */ + public void resume() throws AxisFault {} + + /** + * Stop processing new messages, and wait the specified maximum time for in-flight + * requests to complete before a controlled shutdown for maintenence + * + * @param millis a number of milliseconds to wait until pending requests are allowed to complete + * @throws AxisFault on error + */ + public void maintenenceShutdown(long millis) throws AxisFault {} + + /** + * Returns the number of active threads processing messages + * @return number of active threads processing messages + */ + public int getActiveThreadCount() { + return workerPool.getActiveCount(); + } + + /** + * Return the number of requests queued in the thread pool + * @return queue size + */ + public int getQueueSize() { + return workerPool.getQueueSize(); + } + + public long getMessagesReceived() { + if (metrics != null) { + return metrics.getMessagesReceived(); + } + return -1; + } + + public long getFaultsReceiving() { + if (metrics != null) { + return metrics.getFaultsReceiving(); + } + return -1; + } + + public long getBytesReceived() { + if (metrics != null) { + return metrics.getBytesReceived(); + } + return -1; + } + + public long getMessagesSent() { + if (metrics != null) { + return metrics.getMessagesSent(); + } + return -1; + } + + public long getFaultsSending() { + if (metrics != null) { + return metrics.getFaultsSending(); + } + return -1; + } + + public long getBytesSent() { + if (metrics != null) { + return metrics.getBytesSent(); + } + return -1; + } + + public long getTimeoutsReceiving() { + if (metrics != null) { + return metrics.getTimeoutsReceiving(); + } + return -1; + } + + public long getTimeoutsSending() { + if (metrics != null) { + return metrics.getTimeoutsSending(); + } + return -1; + } + + public long getMinSizeReceived() { + if (metrics != null) { + return metrics.getMinSizeReceived(); + } + return -1; + } + + public long getMaxSizeReceived() { + if (metrics != null) { + return metrics.getMaxSizeReceived(); + } + return -1; + } + + public double getAvgSizeReceived() { + if (metrics != null) { + return metrics.getAvgSizeReceived(); + } + return -1; + } + + public long getMinSizeSent() { + if (metrics != null) { + return metrics.getMinSizeSent(); + } + return -1; + } + + public long getMaxSizeSent() { + if (metrics != null) { + return metrics.getMaxSizeSent(); + } + return -1; + } + + public double getAvgSizeSent() { + if (metrics != null) { + return metrics.getAvgSizeSent(); + } + return -1; + } + + public Map getResponseCodeTable() { + if (metrics != null) { + return metrics.getResponseCodeTable(); + } + return null; + } + + public void resetStatistics() { + if (metrics != null) { + metrics.reset(); + } + } + + public long getLastResetTime() { + if (metrics != null) { + return metrics.getLastResetTime(); + } + return -1; + } + + public long getMetricsWindow() { + if (metrics != null) { + return System.currentTimeMillis() - metrics.getLastResetTime(); + } + return -1; + } + + private String getEndpointMBeanName(String serviceName) { + return mbeanSupport.getMBeanName() + ",Group=Services,Service=" + serviceName; + } + + /** + * Utility method to allow transports to register MBeans + * @param mbeanInstance bean instance + * @param objectName name + */ + private void registerMBean(Object mbeanInstance, String objectName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName name = new ObjectName(objectName); + Set set = mbs.queryNames(name, null); + if (set != null && set.isEmpty()) { + mbs.registerMBean(mbeanInstance, name); + } else { + mbs.unregisterMBean(name); + mbs.registerMBean(mbeanInstance, name); + } + } catch (Exception e) { + log.warn("Error registering a MBean with objectname ' " + objectName + + " ' for JMX management", e); + } + } + + private void unregisterMBean(String objectName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName objName = new ObjectName(objectName); + if (mbs.isRegistered(objName)) { + mbs.unregisterMBean(objName); + } + } catch (Exception e) { + log.warn("Error un-registering a MBean with objectname ' " + objectName + + " ' for JMX management", e); + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportSender.java new file mode 100644 index 0000000000..c2c84c3eb2 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/AbstractTransportSender.java @@ -0,0 +1,419 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.util.Map; +import java.util.Set; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.axiom.om.util.UUIDGenerator; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.TransportInDescription; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.description.WSDL2Constants; +import org.apache.axis2.engine.AxisEngine; +import org.apache.axis2.handlers.AbstractHandler; +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.axis2.transport.TransportSender; +import org.apache.axis2.util.MessageContextBuilder; +import org.apache.axis2.wsdl.WSDLConstants; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public abstract class AbstractTransportSender extends AbstractHandler implements TransportSender { + + /** the reference to the actual commons logger to be used for log messages */ + protected Log log = null; + + /** the axis2 configuration context */ + protected ConfigurationContext cfgCtx = null; + /** transport in description */ + private TransportInDescription transportIn = null; + /** transport out description */ + private TransportOutDescription transportOut = null; + /** JMX support */ + private TransportMBeanSupport mbeanSupport; + /** Metrics collector for the sender */ + protected MetricsCollector metrics = new MetricsCollector(); + /** state of the listener */ + private int state = BaseConstants.STOPPED; + + /** + * A constructor that makes subclasses pick up the correct logger + */ + protected AbstractTransportSender() { + log = LogFactory.getLog(this.getClass()); + } + + /** + * Initialize the generic transport sender. + * + * @param cfgCtx the axis configuration context + * @param transportOut the transport-out description + * @throws AxisFault on error + */ + public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) + throws AxisFault { + this.cfgCtx = cfgCtx; + this.transportOut = transportOut; + this.transportIn = cfgCtx.getAxisConfiguration().getTransportIn(getTransportName()); + this.state = BaseConstants.STARTED; + + // register with JMX + mbeanSupport = new TransportMBeanSupport(this, getTransportName()); + mbeanSupport.register(); + log.info(getTransportName().toUpperCase() + " Sender started"); + } + + public void stop() { + if (state != BaseConstants.STARTED) return; + state = BaseConstants.STOPPED; + mbeanSupport.unregister(); + log.info(getTransportName().toUpperCase() + " Sender Shutdown"); + } + + public void cleanup(MessageContext msgContext) throws AxisFault {} + + public abstract void sendMessage(MessageContext msgCtx, String targetEPR, + OutTransportInfo outTransportInfo) throws AxisFault; + + public InvocationResponse invoke(MessageContext msgContext) throws AxisFault { + + // is there a transport url which may be different to the WS-A To but has higher precedence + String targetAddress = (String) msgContext.getProperty( + Constants.Configuration.TRANSPORT_URL); + + if (targetAddress != null) { + sendMessage(msgContext, targetAddress, null); + } else if (msgContext.getTo() != null && !msgContext.getTo().hasAnonymousAddress()) { + targetAddress = msgContext.getTo().getAddress(); + + if (!msgContext.getTo().hasNoneAddress()) { + sendMessage(msgContext, targetAddress, null); + } else { + //Don't send the message. + return InvocationResponse.CONTINUE; + } + } else if (msgContext.isServerSide()) { + // get the out transport info for server side when target EPR is unknown + sendMessage(msgContext, null, + (OutTransportInfo) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO)); + } + + return InvocationResponse.CONTINUE; + } + + /** + * Process a new incoming message (Response) through the axis engine + * @param msgCtx the axis MessageContext + * @param trpHeaders the map containing transport level message headers + * @param soapAction the optional soap action or null + * @param contentType the optional content-type for the message + */ + public void handleIncomingMessage( + MessageContext msgCtx, Map trpHeaders, + String soapAction, String contentType) { + + + // set the soapaction if one is available via a transport header + if (soapAction != null) { + msgCtx.setSoapAction(soapAction); + } + + // set the transport headers to the message context + msgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, trpHeaders); + + // send the message context through the axis engine + try { + try { + AxisEngine.receive(msgCtx); + } catch (AxisFault e) { + if (log.isDebugEnabled()) { + log.debug("Error receiving message", e); + } + if (msgCtx.isServerSide()) { + AxisEngine.sendFault(MessageContextBuilder.createFaultMessageContext(msgCtx, e)); + } + } + } catch (AxisFault axisFault) { + logException("Error processing response message", axisFault); + } + } + + /** + * Create a new axis MessageContext for an incoming response message + * through this transport, for the given outgoing message + * + * @param outMsgCtx the outgoing message + * @return the newly created message context + */ + public MessageContext createResponseMessageContext(MessageContext outMsgCtx) { + + MessageContext responseMsgCtx = null; + try { + responseMsgCtx = outMsgCtx.getOperationContext(). + getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN); + } catch (AxisFault af) { + log.error("Error getting IN message context from the operation context", af); + } + + if (responseMsgCtx == null) { + responseMsgCtx = new MessageContext(); + responseMsgCtx.setOperationContext(outMsgCtx.getOperationContext()); + } + + responseMsgCtx.setIncomingTransportName(getTransportName()); + responseMsgCtx.setTransportOut(transportOut); + responseMsgCtx.setTransportIn(transportIn); + + responseMsgCtx.setMessageID(UUIDGenerator.getUUID()); + + responseMsgCtx.setDoingREST(outMsgCtx.isDoingREST()); + responseMsgCtx.setProperty( + MessageContext.TRANSPORT_IN, outMsgCtx.getProperty(MessageContext.TRANSPORT_IN)); + responseMsgCtx.setAxisMessage(outMsgCtx.getOperationContext().getAxisOperation(). + getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE)); + responseMsgCtx.setTo(null); + //msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isNonBlocking); + + + // are these relevant? + //msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID()); + // this is required to support Sandesha 2 + //msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, + // new HttpCoreRequestResponseTransport(msgContext)); + + return responseMsgCtx; + } + + /** + * Should the transport sender wait for a synchronous response to be received? + * @param msgCtx the outgoing message context + * @return true if a sync response is expected + */ + protected boolean waitForSynchronousResponse(MessageContext msgCtx) { + return + msgCtx.getOperationContext() != null && + WSDL2Constants.MEP_URI_OUT_IN.equals( + msgCtx.getOperationContext().getAxisOperation().getMessageExchangePattern()); + } + + public String getTransportName() { + return transportOut.getName(); + } + + protected void handleException(String msg, Exception e) throws AxisFault { + log.error(msg, e); + throw new AxisFault(msg, e); + } + + protected void handleException(String msg) throws AxisFault { + log.error(msg); + throw new AxisFault(msg); + } + + protected void logException(String msg, Exception e) { + log.error(msg, e); + } + + //--- jmx/management methods --- + public void pause() throws AxisFault { + if (state != BaseConstants.STARTED) return; + state = BaseConstants.PAUSED; + log.info("Sender paused"); + } + + public void resume() throws AxisFault { + if (state != BaseConstants.PAUSED) return; + state = BaseConstants.STARTED; + log.info("Sender resumed"); + } + + public void maintenenceShutdown(long millis) throws AxisFault { + if (state != BaseConstants.STARTED) return; + long start = System.currentTimeMillis(); + stop(); + state = BaseConstants.STOPPED; + log.info("Sender shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s"); + } + + /** + * Returns the number of active threads processing messages + * @return number of active threads processing messages + */ + public int getActiveThreadCount() { + return 0; + } + + /** + * Return the number of requests queued in the thread pool + * @return queue size + */ + public int getQueueSize() { + return 0; + } + + // -- jmx/management methods-- + public long getMessagesReceived() { + if (metrics != null) { + return metrics.getMessagesReceived(); + } + return -1; + } + + public long getFaultsReceiving() { + if (metrics != null) { + return metrics.getFaultsReceiving(); + } + return -1; + } + + public long getBytesReceived() { + if (metrics != null) { + return metrics.getBytesReceived(); + } + return -1; + } + + public long getMessagesSent() { + if (metrics != null) { + return metrics.getMessagesSent(); + } + return -1; + } + + public long getFaultsSending() { + if (metrics != null) { + return metrics.getFaultsSending(); + } + return -1; + } + + public long getBytesSent() { + if (metrics != null) { + return metrics.getBytesSent(); + } + return -1; + } + + public long getTimeoutsReceiving() { + if (metrics != null) { + return metrics.getTimeoutsReceiving(); + } + return -1; + } + + public long getTimeoutsSending() { + if (metrics != null) { + return metrics.getTimeoutsSending(); + } + return -1; + } + + public long getMinSizeReceived() { + if (metrics != null) { + return metrics.getMinSizeReceived(); + } + return -1; + } + + public long getMaxSizeReceived() { + if (metrics != null) { + return metrics.getMaxSizeReceived(); + } + return -1; + } + + public double getAvgSizeReceived() { + if (metrics != null) { + return metrics.getAvgSizeReceived(); + } + return -1; + } + + public long getMinSizeSent() { + if (metrics != null) { + return metrics.getMinSizeSent(); + } + return -1; + } + + public long getMaxSizeSent() { + if (metrics != null) { + return metrics.getMaxSizeSent(); + } + return -1; + } + + public double getAvgSizeSent() { + if (metrics != null) { + return metrics.getAvgSizeSent(); + } + return -1; + } + + public Map getResponseCodeTable() { + if (metrics != null) { + return metrics.getResponseCodeTable(); + } + return null; + } + + public void resetStatistics() { + if (metrics != null) { + metrics.reset(); + } + } + + public long getLastResetTime() { + if (metrics != null) { + return metrics.getLastResetTime(); + } + return -1; + } + + public long getMetricsWindow() { + if (metrics != null) { + return System.currentTimeMillis() - metrics.getLastResetTime(); + } + return -1; + } + + private void registerMBean(MBeanServer mbs, Object mbeanInstance, String objectName) { + try { + ObjectName name = new ObjectName(objectName); + Set set = mbs.queryNames(name, null); + if (set != null && set.isEmpty()) { + mbs.registerMBean(mbeanInstance, name); + } else { + mbs.unregisterMBean(name); + mbs.registerMBean(mbeanInstance, name); + } + } catch (Exception e) { + log.warn("Error registering a MBean with objectname ' " + objectName + + " ' for JMX management", e); + } + } + +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseConstants.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseConstants.java new file mode 100644 index 0000000000..5edf76b898 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseConstants.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + + +import javax.xml.namespace.QName; + +public class BaseConstants { + // -- status of a transport -- + public final static int STOPPED = 0; + public final static int STARTED = 1; + public final static int PAUSED = 2; + + /** + * A message property specifying the SOAP Action + */ + public static final String SOAPACTION = "SOAPAction"; + /** + * A message property specifying the content type + */ + public static final String CONTENT_TYPE = "Content-Type"; + /** + * A message context property indicating "TRUE", if a transport or the message builder + * has information that the current message is a fault (e.g. SOAP faults, non-HTTP 2xx, etc) + */ + public static final String FAULT_MESSAGE = "FAULT_MESSAGE"; + /** + * content type identifier for multipart / MTOM messages + */ + public static final String MULTIPART_RELATED = "multipart/related"; + /** + * character set marker to identify charset from a Content-Type string + */ + public static final String CHARSET_PARAM = "; charset="; + /** + * The property specifying an optional message level metrics collector + */ + public static final String METRICS_COLLECTOR = "METRICS_COLLECTOR"; + + //------------------------------------ defaults ------------------------------------ + /** + * The default operation name to be used for non SOAP/XML messages + * if the operation cannot be determined + */ + public static final QName DEFAULT_OPERATION = new QName("urn:mediate"); + /** + * The name of the element which wraps binary content into a SOAP envelope + */ + + // This has to match org.apache.synapse.util.PayloadHelper + // at some future point this can be merged into Axiom as a common base + public final static String AXIOMPAYLOADNS = "http://ws.apache.org/commons/ns/payload"; + + + public static final QName DEFAULT_BINARY_WRAPPER = + new QName(AXIOMPAYLOADNS, "binary"); + /** + * The name of the element which wraps plain text content into a SOAP envelope + */ + public static final QName DEFAULT_TEXT_WRAPPER = + new QName(AXIOMPAYLOADNS, "text"); + + //-------------------------- services.xml parameters -------------------------------- + /** + * The Parameter name indicating the operation to dispatch non SOAP/XML messages + */ + public static final String OPERATION_PARAM = "Operation"; + /** + * The Parameter name indicating the wrapper element for non SOAP/XML messages + */ + public static final String WRAPPER_PARAM = "Wrapper"; + /** + * the parameter in the services.xml that specifies the poll interval for a service + */ + public static final String TRANSPORT_POLL_INTERVAL = "transport.PollInterval"; + /** + * Could polling take place in parallel, i.e. starting at fixed intervals? + */ + public static final String TRANSPORT_POLL_IN_PARALLEL = "transport.ConcurrentPollingAllowed"; + /** + * The default poll interval in milliseconds. + */ + public static final int DEFAULT_POLL_INTERVAL = 5 * 60 * 1000; // 5 mins by default + + public static final String CALLBACK_TABLE = "callbackTable"; + public static final String HEADER_IN_REPLY_TO = "In-Reply-To"; + + // this is an property required by axis2 + // FIXME: where is this required in Axis2? + public final static String MAIL_CONTENT_TYPE = "mail.contenttype"; + + /** Service transaction level - non-transactional */ + public static final int TRANSACTION_NONE = 0; + /** Service transaction level - use non-JTA (i.e. local) transactions */ + public static final int TRANSACTION_LOCAL = 1; + /** Service transaction level - use JTA transactions */ + public static final int TRANSACTION_JTA = 2; + /** Service transaction level - non-transactional */ + public static final String STR_TRANSACTION_NONE = "none"; + /** Service transaction level - use non-JTA (i.e. local) transactions */ + public static final String STR_TRANSACTION_LOCAL = "local"; + /** Service transaction level - use JTA transactions */ + public static final String STR_TRANSACTION_JTA = "jta"; + + /** The Parameter name indicating the transactionality of a service */ + public static final String PARAM_TRANSACTIONALITY = "transport.Transactionality"; + /** Parameter name indicating the JNDI name to get a UserTransaction from JNDI */ + public static final String PARAM_USER_TXN_JNDI_NAME = "transport.UserTxnJNDIName"; + /** Parameter that indicates if a UserTransaction reference could be cached - default yes */ + public static final String PARAM_CACHE_USER_TXN = "transport.CacheUserTxn"; + + /** The UserTransaction associated with this message */ + public static final String USER_TRANSACTION = "UserTransaction"; + /** A message level property indicating a request to rollback the transaction associated with the message */ + public static final String SET_ROLLBACK_ONLY = "SET_ROLLBACK_ONLY"; + /** A message level property indicating a commit is required after the next immidiate send over a transport */ + public static final String JTA_COMMIT_AFTER_SEND = "JTA_COMMIT_AFTER_SEND"; +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseTransportException.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseTransportException.java new file mode 100644 index 0000000000..b4d4243ad2 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseTransportException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +public class BaseTransportException extends RuntimeException { + + BaseTransportException() { + super(); + } + + public BaseTransportException(String msg) { + super(msg); + } + + public BaseTransportException(String msg, Exception e) { + super(msg, e); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseUtils.java new file mode 100644 index 0000000000..008d1b5f28 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/BaseUtils.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Hashtable; +import java.util.List; +import java.util.StringTokenizer; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMOutputFormat; +import org.apache.axiom.om.impl.builder.StAXBuilder; +import org.apache.axiom.om.util.StAXUtils; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axiom.soap.impl.builder.StAXSOAPModelBuilder; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.engine.AxisConfiguration; +import org.apache.axis2.transport.MessageFormatter; +import org.apache.axis2.transport.TransportUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.binding.ws.axis2.format.BinaryFormatter; +import org.apache.tuscany.sca.binding.ws.axis2.format.PlainTextFormatter; + +public class BaseUtils { + + private static final Log log = LogFactory.getLog(BaseUtils.class); + + /** + * Return a QName from the String passed in of the form {ns}element + * @param obj a QName or a String containing a QName in {ns}element form + * @return a corresponding QName object + */ + public static QName getQNameFromString(Object obj) { + String value; + if (obj instanceof QName) { + return (QName) obj; + } else { + value = obj.toString(); + } + int open = value.indexOf('{'); + int close = value.indexOf('}'); + if (close > open && open > -1 && value.length() > close) { + return new QName(value.substring(open+1, close-open), value.substring(close+1)); + } else { + return new QName(value); + } + } + + /** + * Marks the given service as faulty with the given comment + * + * @param serviceName service name + * @param msg comment for being faulty + * @param axisCfg configuration context + */ + public static void markServiceAsFaulty(String serviceName, String msg, + AxisConfiguration axisCfg) { + if (serviceName != null) { + try { + AxisService service = axisCfg.getService(serviceName); + axisCfg.getFaultyServices().put(service.getName(), msg); + + } catch (AxisFault axisFault) { + log.warn("Error marking service : " + serviceName + " as faulty", axisFault); + } + } + } + + /** + * Create a SOAP envelope using SOAP 1.1 or 1.2 depending on the namespace + * @param in InputStream for the payload + * @param namespace the SOAP namespace + * @return the SOAP envelope for the correct version + * @throws javax.xml.stream.XMLStreamException on error + */ + public static SOAPEnvelope getEnvelope(InputStream in, String namespace) throws XMLStreamException { + + try { + in.reset(); + } catch (IOException ignore) {} + XMLStreamReader xmlreader = + StAXUtils.createXMLStreamReader(in, MessageContext.DEFAULT_CHAR_SET_ENCODING); + StAXBuilder builder = new StAXSOAPModelBuilder(xmlreader, namespace); + return (SOAPEnvelope) builder.getDocumentElement(); + } + + /** + * Get the OMOutput format for the given message + * @param msgContext the axis message context + * @return the OMOutput format to be used + */ + public static OMOutputFormat getOMOutputFormat(MessageContext msgContext) { + + OMOutputFormat format = new OMOutputFormat(); + msgContext.setDoingMTOM(TransportUtils.doWriteMTOM(msgContext)); + msgContext.setDoingSwA(TransportUtils.doWriteSwA(msgContext)); + msgContext.setDoingREST(TransportUtils.isDoingREST(msgContext)); + format.setSOAP11(msgContext.isSOAP11()); + format.setDoOptimize(msgContext.isDoingMTOM()); + format.setDoingSWA(msgContext.isDoingSwA()); + + format.setCharSetEncoding(TransportUtils.getCharSetEncoding(msgContext)); + Object mimeBoundaryProperty = msgContext.getProperty(Constants.Configuration.MIME_BOUNDARY); + if (mimeBoundaryProperty != null) { + format.setMimeBoundary((String) mimeBoundaryProperty); + } + return format; + } + + /** + * Get the MessageFormatter for the given message. + * @param msgContext the axis message context + * @return the MessageFormatter to be used + */ + public static MessageFormatter getMessageFormatter(MessageContext msgContext) { + // check the first element of the SOAP body, do we have content wrapped using the + // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or + // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, select the appropriate + // message formatter directly ... + OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement(); + if (firstChild != null) { + if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) { + return new BinaryFormatter(); + } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) { + return new PlainTextFormatter(); + } + } + + // ... otherwise, let Axis choose the right message formatter: + try { + return TransportUtils.getMessageFormatter(msgContext); + } catch (AxisFault axisFault) { + throw new BaseTransportException("Unable to get the message formatter to use"); + } + } + + protected static void handleException(String s) { + log.error(s); + throw new BaseTransportException(s); + } + + protected static void handleException(String s, Exception e) { + log.error(s, e); + throw new BaseTransportException(s, e); + } + + /** + * Utility method to check if a string is null or empty + * @param str the string to check + * @return true if the string is null or empty + */ + public static boolean isBlank(String str) { + if (str == null || str.length() == 0) { + return true; + } + for (int i = 0; i < str.length(); i++) { + if (!Character.isWhitespace(str.charAt(i))) { + return false; + } + } + return true; + } + + public static boolean isUsingTransport(AxisService service, String transportName) { + boolean process = service.isEnableAllTransports(); + if (process) { + return true; + + } else { + List transports = service.getExposedTransports(); + for (Object transport : transports) { + if (transportName.equals(transport)) { + return true; + } + } + } + return false; + } + + /** + * Extract the properties from an endpoint reference. + * + * @param url an endpoint reference + * @return the extracted properties + */ + public static Hashtable<String,String> getEPRProperties(String url) { + Hashtable<String,String> h = new Hashtable<String,String>(); + int propPos = url.indexOf("?"); + if (propPos != -1) { + StringTokenizer st = new StringTokenizer(url.substring(propPos + 1), "&"); + while (st.hasMoreTokens()) { + String token = st.nextToken(); + int sep = token.indexOf("="); + if (sep != -1) { + h.put(token.substring(0, sep), token.substring(sep + 1)); + } else { + // ignore, what else can we do? + } + } + } + return h; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ManagementSupport.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ManagementSupport.java new file mode 100644 index 0000000000..8b20b4a0b3 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ManagementSupport.java @@ -0,0 +1,51 @@ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.util.Map; + +import org.apache.axis2.AxisFault; + +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +public interface ManagementSupport { + public void pause() throws AxisFault; + public void resume() throws AxisFault; + void maintenenceShutdown(long millis) throws AxisFault; + public int getActiveThreadCount(); + public int getQueueSize(); + + public long getMessagesReceived(); + public long getFaultsReceiving(); + public long getTimeoutsReceiving(); + public long getMessagesSent(); + public long getFaultsSending(); + public long getTimeoutsSending(); + public long getBytesReceived(); + public long getBytesSent(); + public long getMinSizeReceived(); + public long getMaxSizeReceived(); + public double getAvgSizeReceived(); + public long getMinSizeSent(); + public long getMaxSizeSent(); + public double getAvgSizeSent(); + public Map getResponseCodeTable(); + + public void resetStatistics(); + public long getLastResetTime(); + public long getMetricsWindow(); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MessageLevelMetricsCollector.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MessageLevelMetricsCollector.java new file mode 100644 index 0000000000..59dc9e9118 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MessageLevelMetricsCollector.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +public interface MessageLevelMetricsCollector { + + public void incrementMessagesReceived(); + + public void incrementFaultsReceiving(int errorCode); + + public void incrementTimeoutsReceiving(); + + public void incrementBytesReceived(long size); + + public void incrementMessagesSent(); + + public void incrementFaultsSending(int errorCode); + + public void incrementTimeoutsSending(); + + public void incrementBytesSent(long size); + + public void notifyReceivedMessageSize(long size); + + public void notifySentMessageSize(long size); + + public void reportReceivingFault(int errorCode); + + public void reportSendingFault(int errorCode); + + public void reportResponseCode(int respCode); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MetricsCollector.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MetricsCollector.java new file mode 100644 index 0000000000..45dcde944c --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/MetricsCollector.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.axis2.context.MessageContext; + +/** + * Collects metrics related to a transport that has metrics support enabled + */ +public class MetricsCollector { + + public static final int LEVEL_NONE = 0; + public static final int LEVEL_TRANSPORT = 1; + public static final int LEVEL_FULL = 2; + private static final Long ONE = (long) 1; + + /** By default, full metrics collection is enabled */ + private int level = LEVEL_FULL; + + private long messagesReceived; + private long faultsReceiving; + private long timeoutsReceiving; + private long bytesReceived; + private long minSizeReceived; + private long maxSizeReceived; + private double avgSizeReceived; + + private long messagesSent; + private long faultsSending; + private long timeoutsSending; + private long bytesSent; + private long minSizeSent; + private long maxSizeSent; + private double avgSizeSent; + + private final Map<Integer, Long> responseCodeTable = + Collections.synchronizedMap(new HashMap<Integer, Long>()); + + private long lastResetTime = System.currentTimeMillis(); + + public void reset() { + messagesReceived = 0; + faultsReceiving = 0; + timeoutsReceiving = 0; + bytesReceived = 0; + minSizeReceived = 0; + maxSizeReceived = 0; + avgSizeReceived = 0; + + messagesSent = 0; + faultsSending = 0; + timeoutsSending = 0; + bytesSent = 0; + minSizeSent = 0; + maxSizeSent = 0; + avgSizeSent = 0; + + responseCodeTable.clear(); + lastResetTime = System.currentTimeMillis(); + } + + public int getLevel() { + return level; + } + + public void setLevel(int level) { + this.level = level; + } + + public long getLastResetTime() { + return lastResetTime; + } + + public long getMessagesReceived() { + return messagesReceived; + } + + public long getFaultsReceiving() { + return faultsReceiving; + } + + public long getTimeoutsReceiving() { + return timeoutsReceiving; + } + + public long getBytesReceived() { + return bytesReceived; + } + + public long getMessagesSent() { + return messagesSent; + } + + public long getFaultsSending() { + return faultsSending; + } + + public long getTimeoutsSending() { + return timeoutsSending; + } + + public long getBytesSent() { + return bytesSent; + } + + public long getMinSizeReceived() { + return minSizeReceived; + } + + public long getMaxSizeReceived() { + return maxSizeReceived; + } + + public long getMinSizeSent() { + return minSizeSent; + } + + public long getMaxSizeSent() { + return maxSizeSent; + } + + public double getAvgSizeReceived() { + return avgSizeReceived; + } + + public double getAvgSizeSent() { + return avgSizeSent; + } + + public Map<Integer, Long> getResponseCodeTable() { + return responseCodeTable; + } + + public synchronized void incrementMessagesReceived() { + messagesReceived++; + } + + public synchronized void incrementFaultsReceiving() { + faultsReceiving++; + } + + public synchronized void incrementTimeoutsReceiving() { + timeoutsReceiving++; + } + + public synchronized void incrementBytesReceived(long size) { + bytesReceived += size; + } + + public synchronized void incrementMessagesSent() { + messagesSent++; + } + + public synchronized void incrementFaultsSending() { + faultsSending++; + } + + public synchronized void incrementTimeoutsSending() { + timeoutsSending++; + } + + public synchronized void incrementBytesSent(long size) { + bytesSent += size; + } + + public synchronized void notifyReceivedMessageSize(long size) { + if (minSizeReceived == 0 || size < minSizeReceived) { + minSizeReceived = size; + } + if (size > maxSizeReceived) { + maxSizeReceived = size; + } + avgSizeReceived = (avgSizeReceived == 0 ? size : (avgSizeReceived + size) / 2); + } + + public synchronized void notifySentMessageSize(long size) { + if (minSizeSent == 0 || size < minSizeSent) { + minSizeSent = size; + } + if (size > maxSizeSent) { + maxSizeSent = size; + } + avgSizeSent = (avgSizeSent == 0 ? size : (avgSizeSent + size) / 2); + } + + public void reportResponseCode(int respCode) { + synchronized(responseCodeTable) { + Object o = responseCodeTable.get(respCode); + if (o == null) { + responseCodeTable.put(respCode, ONE); + } else { + responseCodeTable.put(respCode, (Long) o + 1); + } + } + } + + // --- enhanced methods --- + private MessageLevelMetricsCollector getMsgLevelMetrics(MessageContext mc) { + if (mc != null && level == LEVEL_FULL) { + return (MessageLevelMetricsCollector) mc.getProperty(BaseConstants.METRICS_COLLECTOR); + } + return null; + } + + public void incrementMessagesReceived(MessageContext mc) { + incrementMessagesReceived(); + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.incrementMessagesReceived(); + } + } + + public void incrementFaultsReceiving(int errorCode, MessageContext mc) { + incrementFaultsReceiving(); + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.incrementFaultsReceiving(errorCode); + } + } + + public void incrementTimeoutsReceiving(MessageContext mc) { + incrementTimeoutsReceiving(); + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.incrementTimeoutsReceiving(); + } + } + + public void incrementBytesReceived(MessageContext mc, long size) { + incrementBytesReceived(size); + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.incrementBytesReceived(size); + } + } + + public void incrementMessagesSent(MessageContext mc) { + incrementMessagesSent(); + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.incrementMessagesSent(); + } + } + + public void incrementFaultsSending(int errorCode, MessageContext mc) { + incrementFaultsSending(); + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.incrementFaultsSending(errorCode); + } + } + + public void incrementTimeoutsSending(MessageContext mc) { + incrementTimeoutsSending(); + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.incrementTimeoutsSending(); + } + } + + public void incrementBytesSent(MessageContext mc, long size) { + incrementBytesSent(size); + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.incrementBytesSent(size); + } + } + + public void notifyReceivedMessageSize(MessageContext mc, long size) { + notifyReceivedMessageSize(size); + + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.notifyReceivedMessageSize(size); + } + } + + public void notifySentMessageSize(MessageContext mc, long size) { + notifySentMessageSize(size); + + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.notifySentMessageSize(size); + } + } + + public void reportResponseCode(MessageContext mc, int respCode) { + reportResponseCode(respCode); + + MessageLevelMetricsCollector m = getMsgLevelMetrics(mc); + if (m != null) { + m.reportResponseCode(respCode); + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ParamUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ParamUtils.java new file mode 100644 index 0000000000..7c77d553fc --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/ParamUtils.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterInclude; +import org.apache.axis2.description.TransportInDescription; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.util.JavaUtils; + +/** + * Utility class with methods to manipulate service or transport parameters. + */ +public class ParamUtils { + private ParamUtils() {} + + public static String getRequiredParam(ParameterInclude paramInclude, String paramName) throws AxisFault { + Parameter param = paramInclude.getParameter(paramName); + if (param != null && param.getValue() != null && param.getValue() instanceof String) { + return (String) param.getValue(); + } else { + throw new AxisFault("Cannot find parameter '" + paramName + "' for " + + getDescriptionFor(paramInclude)); + } + } + + public static String getOptionalParam(ParameterInclude paramInclude, String paramName) throws AxisFault { + Parameter param = paramInclude.getParameter(paramName); + if (param != null && param.getValue() != null && param.getValue() instanceof String) { + return (String) param.getValue(); + } else { + return null; + } + } + + public static Integer getOptionalParamInt(ParameterInclude paramInclude, String paramName) throws AxisFault { + Parameter param = paramInclude.getParameter(paramName); + if (param == null || param.getValue() == null) { + return null; + } else { + Object paramValue = param.getValue(); + if (paramValue instanceof Integer) { + return (Integer)paramValue; + } else if (paramValue instanceof String) { + try { + return Integer.valueOf((String)paramValue); + } catch (NumberFormatException ex) { + throw new AxisFault("Invalid value '" + paramValue + "' for parameter '" + paramName + + "' for " + getDescriptionFor(paramInclude)); + } + } else { + throw new AxisFault("Invalid type for parameter '" + paramName + "' for " + + getDescriptionFor(paramInclude)); + } + } + } + + public static int getOptionalParamInt(ParameterInclude paramInclude, String paramName, int defaultValue) throws AxisFault { + Integer value = getOptionalParamInt(paramInclude, paramName); + return value == null ? defaultValue : value.intValue(); + } + + public static boolean getOptionalParamBoolean(ParameterInclude paramInclude, String paramName, boolean defaultValue) throws AxisFault { + Parameter param = paramInclude.getParameter(paramName); + return param == null ? defaultValue : JavaUtils.isTrueExplicitly(param.getValue(), defaultValue); + } + + public static int getRequiredParamInt(ParameterInclude paramInclude, String paramName) throws AxisFault { + Integer value = getOptionalParamInt(paramInclude, paramName); + if (value == null) { + throw new AxisFault("Cannot find parameter '" + paramName + + "' for " + getDescriptionFor(paramInclude)); + } else { + return value.intValue(); + } + } + + private static String getDescriptionFor(ParameterInclude paramInclude) { + if (paramInclude instanceof AxisService) { + return "service '" + ((AxisService)paramInclude).getName() + "'"; + } else if (paramInclude instanceof TransportInDescription) { + return "transport receiver '" + ((TransportInDescription)paramInclude).getName() + "'"; + } else if (paramInclude instanceof TransportOutDescription) { + return "transport sender '" + ((TransportOutDescription)paramInclude).getName() + "'"; + } else { + return paramInclude.getClass().getName(); + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/SynchronousCallback.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/SynchronousCallback.java new file mode 100644 index 0000000000..1016e88a82 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/SynchronousCallback.java @@ -0,0 +1,109 @@ +/* + * Copyright 2004,2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.context.OperationContext; +import org.apache.axis2.description.AxisMessage; +import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.wsdl.WSDLConstants; + + +public class SynchronousCallback { + + private MessageContext outMessageContext; + private MessageContext inMessageContext; + + private boolean isComplete; + + public SynchronousCallback(MessageContext outMessageContext) { + this.outMessageContext = outMessageContext; + this.isComplete = false; + } + + public synchronized void setInMessageContext(MessageContext inMessageContext) throws AxisFault { + + // if some other thread has access and complete then return without doing any thing. + // thread should have activate by the first message. + if (!isComplete) { + // this code is invoked only if the code use with axis2 at the client side + // when axis2 client receive messages it waits in the sending thread until the response comes. + // so this thread only notify the waiting thread and hence we need to build the message here. + inMessageContext.getEnvelope().build(); + OperationContext operationContext = outMessageContext.getOperationContext(); + MessageContext msgCtx = + operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE); + + if (msgCtx == null) { + // try to see whether there is a piggy back message context + if (outMessageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) != null) { + + msgCtx = (MessageContext) outMessageContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE); + msgCtx.setTransportIn(inMessageContext.getTransportIn()); + msgCtx.setTransportOut(inMessageContext.getTransportOut()); + msgCtx.setServerSide(false); + msgCtx.setProperty(BaseConstants.MAIL_CONTENT_TYPE, + inMessageContext.getProperty(BaseConstants.MAIL_CONTENT_TYPE)); + // FIXME: this class must not be transport dependent since it is used by AbstractTransportListener + msgCtx.setIncomingTransportName(org.apache.axis2.Constants.TRANSPORT_MAIL); + msgCtx.setEnvelope(inMessageContext.getEnvelope()); + + } else { + inMessageContext.setOperationContext(operationContext); + inMessageContext.setServiceContext(outMessageContext.getServiceContext()); + if (!operationContext.isComplete()) { + operationContext.addMessageContext(inMessageContext); + } + AxisOperation axisOp = operationContext.getAxisOperation(); + AxisMessage inMessage = axisOp.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE); + inMessageContext.setAxisMessage(inMessage); + inMessageContext.setServerSide(false); + } + + } else { + msgCtx.setOperationContext(operationContext); + msgCtx.setServiceContext(outMessageContext.getServiceContext()); + AxisOperation axisOp = operationContext.getAxisOperation(); + AxisMessage inMessage = axisOp.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE); + msgCtx.setAxisMessage(inMessage); + msgCtx.setTransportIn(inMessageContext.getTransportIn()); + msgCtx.setTransportOut(inMessageContext.getTransportOut()); + msgCtx.setServerSide(false); + msgCtx.setProperty(BaseConstants.MAIL_CONTENT_TYPE, + inMessageContext.getProperty(BaseConstants.MAIL_CONTENT_TYPE)); + // FIXME: this class must not be transport dependent since it is used by AbstractTransportListener + msgCtx.setIncomingTransportName(org.apache.axis2.Constants.TRANSPORT_MAIL); + msgCtx.setEnvelope(inMessageContext.getEnvelope()); + + } + this.inMessageContext = inMessageContext; + isComplete = true; + this.notifyAll(); + } + + } + + + public boolean isComplete() { + return isComplete; + } + + public void setComplete(boolean complete) { + isComplete = complete; + } + +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointView.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointView.java new file mode 100644 index 0000000000..8919aea5e7 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointView.java @@ -0,0 +1,61 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; + +public class TransportListenerEndpointView implements TransportListenerEndpointViewMBean { + private final AbstractTransportListener listener; + private final String serviceName; + + public TransportListenerEndpointView(AbstractTransportListener listener, String serviceName) { + this.listener = listener; + this.serviceName = serviceName; + } + + public String[] getAddresses() { + String hostname; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } + catch (UnknownHostException ex) { + hostname = "localhost"; + } + EndpointReference[] epr; + try { + epr = listener.getEPRsForService(serviceName, hostname); + } + catch (AxisFault ex) { + return null; + } + if (epr == null) { + return null; + } else { + String[] result = new String[epr.length]; + for (int i=0; i<epr.length; i++) { + result[i] = epr[i].getAddress(); + } + return result; + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointViewMBean.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointViewMBean.java new file mode 100644 index 0000000000..71b9618977 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportListenerEndpointViewMBean.java @@ -0,0 +1,23 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +public interface TransportListenerEndpointViewMBean { + String[] getAddresses(); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportMBeanSupport.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportMBeanSupport.java new file mode 100644 index 0000000000..bb23c5f96d --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportMBeanSupport.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.lang.management.ManagementFactory; + +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.axis2.transport.TransportListener; +import org.apache.axis2.transport.TransportSender; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Support class to register MBeans for transport listeners and senders. + * This class can be used by {@link TransportListener} and {@link TransportSender} classes + * to register the {@link TransportView} management bean. It takes care of registering + * the bean under a consistent name and makes sure that a JMX related error doesn't stop + * the transport from working: a failure to register the MBean will cause JMX support + * to be disabled. + */ +public class TransportMBeanSupport { + private static final Log log = LogFactory.getLog(TransportMBeanSupport.class); + + private boolean enabled = true; + private boolean registered; + private MBeanServer mbs; + private ObjectName mbeanName; + private TransportView mbeanInstance; + + private TransportMBeanSupport(String connectorName, TransportView mbeanInstance) { + try { + mbs = ManagementFactory.getPlatformMBeanServer(); + } catch (SecurityException ex) { + log.warn("Unable to get the platform MBean server; JMX support disabled", ex); + enabled = false; + return; + } + String jmxAgentName = System.getProperty("jmx.agent.name"); + if (jmxAgentName == null || "".equals(jmxAgentName)) { + jmxAgentName = "org.apache.axis2"; + } + String mbeanNameString = jmxAgentName + ":Type=Transport,ConnectorName=" + connectorName; + try { + mbeanName = ObjectName.getInstance(mbeanNameString); + } catch (MalformedObjectNameException ex) { + log.warn("Unable to create object name '" + mbeanNameString + + "'; JMX support disabled", ex); + enabled = false; + } + this.mbeanInstance = mbeanInstance; + } + + public TransportMBeanSupport(TransportListener listener, String name) { + this(name + "-listener", new TransportView(listener, null)); + } + + public TransportMBeanSupport(TransportSender sender, String name) { + this(name + "-sender", new TransportView(null, sender)); + } + + public ObjectName getMBeanName() { + return mbeanName; + } + + /** + * Register the {@link TransportView} MBean. + */ + public void register() { + if (enabled && !registered) { + try { + mbs.registerMBean(mbeanInstance, mbeanName); + registered = true; + } catch (Exception e) { + log.warn("Error registering a MBean with objectname ' " + mbeanName + + " ' for JMX management", e); + enabled = false; + } + } + } + + /** + * Unregister the {@link TransportView} MBean. + */ + public void unregister() { + if (enabled && registered) { + try { + mbs.unregisterMBean(mbeanName); + registered = false; + } catch (Exception e) { + log.warn("Error un-registering a MBean with objectname ' " + mbeanName + + " ' for JMX management", e); + } + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportView.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportView.java new file mode 100644 index 0000000000..0724110ed3 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportView.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; + +import java.util.Map; + +import org.apache.axis2.transport.TransportListener; +import org.apache.axis2.transport.TransportSender; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class TransportView implements TransportViewMBean { + + private static final Log log = LogFactory.getLog(TransportView.class); + + public static final int STOPPED = 0; + public static final int RUNNING = 1; + public static final int PAUSED = 2; + public static final int SHUTTING_DOWN = 3; + + private TransportListener listener = null; + private TransportSender sender = null; + + public TransportView(TransportListener listener, TransportSender sender) { + this.listener = listener; + this.sender = sender; + } + + // JMX Attributes + public long getMessagesReceived() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getMessagesReceived(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getMessagesReceived(); + } + return -1; + } + + public long getFaultsReceiving() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getFaultsReceiving(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getFaultsReceiving(); + } + return -1; + } + + public long getTimeoutsReceiving() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getTimeoutsReceiving(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getTimeoutsReceiving(); + } + return -1; + } + + public long getTimeoutsSending() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getTimeoutsSending(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getTimeoutsSending(); + } + return -1; + } + + public long getBytesReceived() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getBytesReceived(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getBytesReceived(); + } + return -1; + } + + public long getMessagesSent() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getMessagesSent(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getMessagesSent(); + } + return -1; + } + + public long getFaultsSending() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getFaultsSending(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getFaultsSending(); + } + return -1; + } + + public long getBytesSent() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getBytesSent(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getBytesSent(); + } + return -1; + } + + public long getMinSizeReceived() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getMinSizeReceived(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getMinSizeReceived(); + } + return -1; + } + + public long getMaxSizeReceived() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getMaxSizeReceived(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getMaxSizeReceived(); + } + return -1; + } + + public double getAvgSizeReceived() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getAvgSizeReceived(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getAvgSizeReceived(); + } + return -1; + } + + public long getMinSizeSent() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getMinSizeSent(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getMinSizeSent(); + } + return -1; + } + + public long getMaxSizeSent() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getMaxSizeSent(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getMaxSizeSent(); + } + return -1; + } + + public double getAvgSizeSent() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getAvgSizeSent(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getAvgSizeSent(); + } + return -1; + } + + public Map getResponseCodeTable() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getResponseCodeTable(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getResponseCodeTable(); + } + return null; + } + + public int getActiveThreadCount() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getActiveThreadCount(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getActiveThreadCount(); + } + return -1; + } + + public int getQueueSize() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getQueueSize(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getQueueSize(); + } + return -1; + } + + // JMX Operations + public void start() throws Exception{ + if (listener != null) { + listener.start(); + } + } + + public void stop() throws Exception { + if (listener != null) { + listener.stop(); + } else if (sender != null) { + sender.stop(); + } + } + + public void pause() throws Exception { + if (listener instanceof ManagementSupport) { + ((ManagementSupport) listener).pause(); + } else if (sender instanceof ManagementSupport) { + ((ManagementSupport) sender).pause(); + } + } + + public void resume() throws Exception { + if (listener instanceof ManagementSupport) { + ((ManagementSupport) listener).resume(); + } else if (sender instanceof ManagementSupport) { + ((ManagementSupport) sender).resume(); + } + } + + public void maintenenceShutdown(long seconds) throws Exception { + if (listener instanceof ManagementSupport) { + ((ManagementSupport) listener).maintenenceShutdown(seconds * 1000); + } else if (sender instanceof ManagementSupport) { + ((ManagementSupport) sender).maintenenceShutdown(seconds * 1000); + } + } + + public void resetStatistics() { + if (listener != null && listener instanceof ManagementSupport) { + ((ManagementSupport) listener).resetStatistics(); + } else if (sender != null && sender instanceof ManagementSupport) { + ((ManagementSupport) sender).resetStatistics(); + } + } + + public long getLastResetTime() { + if (listener != null && listener instanceof ManagementSupport) { + return ((ManagementSupport) listener).getLastResetTime(); + } else if (sender != null && sender instanceof ManagementSupport) { + return ((ManagementSupport) sender).getLastResetTime(); + } + return -1; + } + + public long getMetricsWindow() { + if (listener != null && listener instanceof ManagementSupport) { + return System.currentTimeMillis() - ((ManagementSupport) listener).getLastResetTime(); + } else if (sender != null && sender instanceof ManagementSupport) { + return System.currentTimeMillis() - ((ManagementSupport) sender).getLastResetTime(); + } + return -1; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportViewMBean.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportViewMBean.java new file mode 100644 index 0000000000..2aa86c1e8e --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/TransportViewMBean.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base; +import java.util.Map; + +public interface TransportViewMBean { + + // JMX Attributes + public long getMessagesReceived(); + public long getFaultsReceiving(); + public long getTimeoutsReceiving(); + public long getMessagesSent(); + public long getFaultsSending(); + public long getTimeoutsSending(); + public long getBytesReceived(); + public long getBytesSent(); + public long getMinSizeReceived(); + public long getMaxSizeReceived(); + public double getAvgSizeReceived(); + public long getMinSizeSent(); + public long getMaxSizeSent(); + public double getAvgSizeSent(); + public int getActiveThreadCount(); + public int getQueueSize(); + public Map getResponseCodeTable(); + + // JMX Operations + public void start() throws Exception; + public void stop() throws Exception; + public void pause() throws Exception; + public void resume() throws Exception; + public void maintenenceShutdown(long seconds) throws Exception; + + public void resetStatistics(); + public long getLastResetTime(); + public long getMetricsWindow(); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/AbstractDatagramTransportListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/AbstractDatagramTransportListener.java new file mode 100644 index 0000000000..92cca71a79 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/AbstractDatagramTransportListener.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram; + +import java.io.IOException; +import java.net.SocketException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.TransportInDescription; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportListener; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ParamUtils; + +public abstract class AbstractDatagramTransportListener<E extends DatagramEndpoint> + extends AbstractTransportListener { + + private final Map<String,E> endpoints = new HashMap<String,E>(); + private DatagramDispatcher<E> dispatcher; + private String defaultIp; + + @Override + public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn) + throws AxisFault { + + super.init(cfgCtx, transportIn); + DatagramDispatcherCallback callback = new DatagramDispatcherCallback() { + public void receive(DatagramEndpoint endpoint, byte[] data, int length) { + workerPool.execute(new ProcessPacketTask(endpoint, data, length)); + } + }; + try { + dispatcher = createDispatcher(callback); + } catch (IOException ex) { + throw new AxisFault("Unable to create selector", ex); + } + try { + defaultIp = org.apache.axis2.util.Utils.getIpAddress(cfgCtx.getAxisConfiguration()); + } catch (SocketException ex) { + throw new AxisFault("Unable to determine the host's IP address", ex); + } + } + + @Override + protected void startListeningForService(AxisService service) throws AxisFault { + E endpoint = createEndpoint(service); + endpoint.setListener(this); + endpoint.setService(service); + endpoint.setContentType(ParamUtils.getRequiredParam( + service, "transport." + getTransportName() + ".contentType")); + endpoint.setMetrics(metrics); + + try { + dispatcher.addEndpoint(endpoint); + } catch (IOException ex) { + throw new AxisFault("Unable to listen on endpoint " + + endpoint.getEndpointReference(defaultIp), ex); + } + if (log.isDebugEnabled()) { + log.debug("Started listening on endpoint " + endpoint.getEndpointReference(defaultIp) + + " [contentType=" + endpoint.getContentType() + + "; service=" + service.getName() + "]"); + } + endpoints.put(service.getName(), endpoint); + } + + @Override + protected void stopListeningForService(AxisService service) { + try { + dispatcher.removeEndpoint(endpoints.get(service.getName())); + } catch (IOException ex) { + log.error("I/O exception while stopping listener for service " + service.getName(), ex); + } + endpoints.remove(service.getName()); + } + + @Override + public void destroy() { + super.destroy(); + try { + dispatcher.stop(); + } catch (IOException ex) { + log.error("Failed to stop dispatcher", ex); + } + } + + public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + + // strip out the endpoint name if present + if (serviceName.indexOf('.') != -1) { + serviceName = serviceName.substring(0, serviceName.indexOf('.')); + } + + E endpoint = endpoints.get(serviceName); + if (endpoint == null) { + return null; + } else { + return new EndpointReference[] { + endpoint.getEndpointReference(ip == null ? defaultIp : ip) }; + } + } + + protected abstract DatagramDispatcher<E> createDispatcher(DatagramDispatcherCallback callback) + throws IOException; + + protected abstract E createEndpoint(AxisService service) throws AxisFault; +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcher.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcher.java new file mode 100644 index 0000000000..e19aec4483 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcher.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram; + +import java.io.IOException; + +public interface DatagramDispatcher<E> { + void addEndpoint(E endpoint) throws IOException; + void removeEndpoint(E endpoint) throws IOException; + void stop() throws IOException; +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcherCallback.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcherCallback.java new file mode 100644 index 0000000000..691fe2fef9 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramDispatcherCallback.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram; + +public interface DatagramDispatcherCallback { + void receive(DatagramEndpoint endpoint, byte[] data, int length); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramEndpoint.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramEndpoint.java new file mode 100644 index 0000000000..95eebbca06 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/DatagramEndpoint.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram; + +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.description.AxisService; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector; + +/** + * Endpoint description. + * This class is used by the transport to store information + * about an endpoint, e.g. the Axis service it is bound to. + * Transports extend this abstract class to store additional + * transport specific information, such as the port number + * the transport listens on. + */ +public abstract class DatagramEndpoint { + private AbstractDatagramTransportListener listener; + private String contentType; + private AxisService service; + private MetricsCollector metrics; + + public AbstractDatagramTransportListener getListener() { + return listener; + } + + public void setListener(AbstractDatagramTransportListener listener) { + this.listener = listener; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public AxisService getService() { + return service; + } + + public void setService(AxisService service) { + this.service = service; + } + + public MetricsCollector getMetrics() { + return metrics; + } + + public void setMetrics(MetricsCollector metrics) { + this.metrics = metrics; + } + + public abstract EndpointReference getEndpointReference(String ip); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/ProcessPacketTask.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/ProcessPacketTask.java new file mode 100644 index 0000000000..bb102317b8 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/ProcessPacketTask.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.engine.AxisEngine; +import org.apache.axis2.transport.TransportUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector; + +/** + * Task encapsulating the processing of a datagram. + * Instances of this class will be dispatched to worker threads for + * execution. + */ +public class ProcessPacketTask implements Runnable { + private static final Log log = LogFactory.getLog(ProcessPacketTask.class); + + private final DatagramEndpoint endpoint; + private final byte[] data; + private final int length; + + public ProcessPacketTask(DatagramEndpoint endpoint, byte[] data, int length) { + this.endpoint = endpoint; + this.data = data; + this.length = length; + } + + public void run() { + MetricsCollector metrics = endpoint.getMetrics(); + try { + InputStream inputStream = new ByteArrayInputStream(data, 0, length); + MessageContext msgContext = endpoint.getListener().createMessageContext(); + msgContext.setAxisService(endpoint.getService()); + SOAPEnvelope envelope = TransportUtils.createSOAPMessage(msgContext, inputStream, endpoint.getContentType()); + msgContext.setEnvelope(envelope); + AxisEngine.receive(msgContext); + metrics.incrementMessagesReceived(); + metrics.incrementBytesReceived(length); + } catch (Exception ex) { + metrics.incrementFaultsReceiving(); + StringBuilder buffer = new StringBuilder("Error during processing of datagram:\n"); + Utils.hexDump(buffer, data, length); + log.error(buffer.toString(), ex); + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/Utils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/Utils.java new file mode 100644 index 0000000000..41230bbcdc --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/Utils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram; + +/** + * Utility class with methods used by datagram transports. + */ +public class Utils { + private Utils() {} + + public static void hexDump(StringBuilder buffer, byte[] data, int length) { + for (int start = 0; start < length; start += 16) { + for (int i=0; i<16; i++) { + int index = start+i; + if (index < length) { + String hex = Integer.toHexString(data[start+i] & 0xFF); + if (hex.length() < 2) { + buffer.append('0'); + } + buffer.append(hex); + } else { + buffer.append(" "); + } + buffer.append(' '); + if (i == 7) { + buffer.append(' '); + } + } + buffer.append(" |"); + for (int i=0; i<16; i++) { + int index = start+i; + if (index < length) { + int b = data[index] & 0xFF; + if (32 <= b && b < 128) { + buffer.append((char)b); + } else { + buffer.append('.'); + } + } else { + buffer.append(' '); + } + } + buffer.append('|'); + buffer.append('\n'); + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/package-info.java new file mode 100644 index 0000000000..a5765e7e55 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/datagram/package-info.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Base classes for datagram transports. + * <p> + * A datagram type transport is a transport that entirely reads a message + * into memory before starting to process it: in contrast to transports like HTTP, + * it doesn't support streaming. This approach can be chosen either because + * of the characteristics of the underlying protocol (such as in the case of UDP) + * or because streaming a message would unnecessarily delay the processing of the + * next available message (as in the case of a UNIX pipe). + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.datagram; diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportError.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportError.java new file mode 100644 index 0000000000..5de216163f --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportError.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.event; + +import org.apache.axis2.description.AxisService; + +public class TransportError { + private final Object source; + private final AxisService service; + private final Throwable exception; + + public TransportError(Object source, AxisService service, Throwable exception) { + this.source = source; + this.service = service; + this.exception = exception; + } + + public Object getSource() { + return source; + } + + public AxisService getService() { + return service; + } + + public Throwable getException() { + return exception; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorListener.java new file mode 100644 index 0000000000..40d46dba56 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.event; + +public interface TransportErrorListener { + void error(TransportError error); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSource.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSource.java new file mode 100644 index 0000000000..141ca20099 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSource.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.event; + +public interface TransportErrorSource { + void addErrorListener(TransportErrorListener listener); + void removeErrorListener(TransportErrorListener listener); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSourceSupport.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSourceSupport.java new file mode 100644 index 0000000000..274b7b7b90 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/event/TransportErrorSourceSupport.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.event; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.axis2.description.AxisService; + +public class TransportErrorSourceSupport implements TransportErrorSource { + private final Object source; + private final List<TransportErrorListener> listeners = new LinkedList<TransportErrorListener>(); + + public TransportErrorSourceSupport(Object source) { + this.source = source; + } + + public synchronized void addErrorListener(TransportErrorListener listener) { + listeners.add(listener); + } + + public synchronized void removeErrorListener(TransportErrorListener listener) { + listeners.remove(listener); + } + + public synchronized void error(AxisService service, Throwable ex) { + if (!listeners.isEmpty()) { + TransportError error = new TransportError(source, service, ex); + for (TransportErrorListener listener : listeners) { + listener.error(error); + } + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/ReaderInputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/ReaderInputStream.java new file mode 100644 index 0000000000..5047e1a499 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/ReaderInputStream.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; + +/** + * {@link InputStream} implementation that reads a character stream from a {@link Reader} + * and transforms it to a byte stream using a specified charset encoding. The stream + * is transformed using a {@link CharsetEncoder} object, guaranteeing that all charset + * encodings supported by the JRE are handled correctly. In particular for charsets such as + * UTF-16, the implementation ensures that one and only one byte order marker + * is produced. + * <p> + * Since in general it is not possible to predict the number of characters to be read from the + * {@link Reader} to satisfy a read request on the {@link ReaderInputStream}, all reads from + * the {@link Reader} are buffered. There is therefore no well defined correlation + * between the current position of the {@link Reader} and that of the {@link ReaderInputStream}. + * This also implies that in general there is no need to wrap the underlying {@link Reader} + * in a {@link java.io.BufferedReader}. + * <p> + * {@link ReaderInputStream} implements the inverse transformation of {@link java.io.InputStreamReader}; + * in the following example, reading from <tt>in2</tt> would return the same byte + * sequence as reading from <tt>in</tt> (provided that the initial byte sequence is legal + * with respect to the charset encoding): + * <pre> + * InputStream in = ... + * Charset cs = ... + * InputStreamReader reader = new InputStreamReader(in, cs); + * ReaderInputStream in2 = new ReaderInputStream(reader, cs);</pre> + * {@link ReaderInputStream} implements the same transformation as {@link java.io.OutputStreamWriter}, + * except that the control flow is reversed: both classes transform a character stream + * into a byte stream, but {@link java.io.OutputStreamWriter} pushes data to the underlying stream, + * while {@link ReaderInputStream} pulls it from the underlying stream. + * <p> + * Note that while there are use cases where there is no alternative to using + * this class, very often the need to use this class is an indication of a flaw + * in the design of the code. This class is typically used in situations where an existing + * API only accepts an {@link InputStream}, but where the most natural way to produce the data + * is as a character stream, i.e. by providing a {@link Reader} instance. An example of a situation + * where this problem may appear is when implementing the {@link javax.activation.DataSource} + * interface from the Java Activation Framework. + * <p> + * Given the fact that the {@link Reader} class doesn't provide any way to predict whether the next + * read operation will block or not, it is not possible to provide a meaningful + * implementation of the {@link InputStream#available()} method. A call to this method + * will always return 0. Also, this class doesn't support {@link InputStream#mark(int)}. + * <p> + * Instances of {@link ReaderInputStream} are not thread safe. + */ +// NOTE: Remove this class once Commons IO 2.0 is available (see IO-158) +public class ReaderInputStream extends InputStream { + private static final int DEFAULT_BUFFER_SIZE = 1024; + + private final Reader reader; + private final CharsetEncoder encoder; + + /** + * CharBuffer used as input for the decoder. It should be reasonably + * large as we read data from the underlying Reader into this buffer. + */ + private final CharBuffer encoderIn; + + /** + * ByteBuffer used as output for the decoder. This buffer can be small + * as it is only used to transfer data from the decoder to the + * buffer provided by the caller. + */ + private final ByteBuffer encoderOut = ByteBuffer.allocate(128); + + private CoderResult lastCoderResult; + private boolean endOfInput; + + /** + * Construct a new {@link ReaderInputStream}. + * + * @param reader the target {@link Reader} + * @param charset the charset encoding + * @param bufferSize the size of the input buffer in number of characters + */ + public ReaderInputStream(Reader reader, Charset charset, int bufferSize) { + this.reader = reader; + encoder = charset.newEncoder(); + encoderIn = CharBuffer.allocate(bufferSize); + encoderIn.flip(); + } + + /** + * Construct a new {@link ReaderInputStream} with a default input buffer size of + * 1024 characters. + * + * @param reader the target {@link Reader} + * @param charset the charset encoding + */ + public ReaderInputStream(Reader reader, Charset charset) { + this(reader, charset, DEFAULT_BUFFER_SIZE); + } + + /** + * Construct a new {@link ReaderInputStream}. + * + * @param reader the target {@link Reader} + * @param charsetName the name of the charset encoding + * @param bufferSize the size of the input buffer in number of characters + */ + public ReaderInputStream(Reader reader, String charsetName, int bufferSize) { + this(reader, Charset.forName(charsetName), bufferSize); + } + + /** + * Construct a new {@link ReaderInputStream} with a default input buffer size of + * 1024 characters. + * + * @param reader the target {@link Reader} + * @param charsetName the name of the charset encoding + */ + public ReaderInputStream(Reader reader, String charsetName) { + this(reader, charsetName, DEFAULT_BUFFER_SIZE); + } + + /** + * Construct a new {@link ReaderInputStream} that uses the default character encoding + * with a default input buffer size of 1024 characters. + * + * @param reader the target {@link Reader} + */ + public ReaderInputStream(Reader reader) { + this(reader, Charset.defaultCharset()); + } + + /** + * Read the specified number of bytes into an array. + * + * @param b the byte array to read into + * @param off the offset to start reading bytes into + * @param len the number of bytes to read + * @return the number of bytes read or <code>-1</code> + * if the end of the stream has been reached + */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + int read = 0; + while (len > 0) { + if (encoderOut.position() > 0) { + encoderOut.flip(); + int c = Math.min(encoderOut.remaining(), len); + encoderOut.get(b, off, c); + off += c; + len -= c; + read += c; + encoderOut.compact(); + } else { + if (!endOfInput && (lastCoderResult == null || lastCoderResult.isUnderflow())) { + encoderIn.compact(); + int position = encoderIn.position(); + // We don't use Reader#read(CharBuffer) here because it is more efficient + // to write directly to the underlying char array (the default implementation + // copies data to a temporary char array). + int c = reader.read(encoderIn.array(), position, encoderIn.remaining()); + if (c == -1) { + endOfInput = true; + } else { + encoderIn.position(position+c); + } + encoderIn.flip(); + } + lastCoderResult = encoder.encode(encoderIn, encoderOut, endOfInput); + if (endOfInput && encoderOut.position() == 0) { + break; + } + } + } + return read == 0 && endOfInput ? -1 : read; + } + + /** + * Read the specified number of bytes into an array. + * + * @param b the byte array to read into + * @return the number of bytes read or <code>-1</code> + * if the end of the stream has been reached + */ + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** + * Read a single byte. + * + * @return either the byte read or <code>-1</code> if the end of the stream + * has been reached + */ + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + return read(b) == -1 ? -1 : b[0] & 0xFF; + } + + /** + * Close the stream. This method will cause the underlying {@link Reader} + * to be closed. + */ + @Override + public void close() throws IOException { + reader.close(); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/WriterOutputStream.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/WriterOutputStream.java new file mode 100644 index 0000000000..3b6370da63 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/streams/WriterOutputStream.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Writer; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; + +/** + * {@link OutputStream} implementation that transforms a byte stream to a + * character stream using a specified charset encoding and writes the resulting + * stream to a {@link Writer}. The stream is transformed using a + * {@link CharsetDecoder} object, guaranteeing that all charset + * encodings supported by the JRE are handled correctly. + * <p> + * The output of the {@link CharsetDecoder} is buffered using a fixed size buffer. + * This implies that the data is written to the underlying {@link Writer} in chunks + * that are no larger than the size of this buffer. By default, the buffer is + * flushed only when it overflows or when {@link #flush()} or {@link #close()} + * is called. In general there is therefore no need to wrap the underlying {@link Writer} + * in a {@link java.io.BufferedWriter}. {@link WriterOutputStream} can also + * be instructed to flush the buffer after each write operation. In this case, all + * available data is written immediately to the underlying {@link Writer}, implying that + * the current position of the {@link Writer} is correlated to the current position + * of the {@link WriterOutputStream}. + * <p> + * {@link WriterOutputStream} implements the inverse transformation of {@link java.io.OutputStreamWriter}; + * in the following example, writing to <tt>out2</tt> would have the same result as writing to + * <tt>out</tt> directly (provided that the byte sequence is legal with respect to the + * charset encoding): + * <pre> + * OutputStream out = ... + * Charset cs = ... + * OutputStreamWriter writer = new OutputStreamWriter(out, cs); + * WriterOutputStream out2 = new WriterOutputStream(writer, cs);</pre> + * {@link WriterOutputStream} implements the same transformation as {@link java.io.InputStreamReader}, + * except that the control flow is reversed: both classes transform a byte stream + * into a character stream, but {@link java.io.InputStreamReader} pulls data from the underlying stream, + * while {@link WriterOutputStream} pushes it to the underlying stream. + * <p> + * Note that while there are use cases where there is no alternative to using + * this class, very often the need to use this class is an indication of a flaw + * in the design of the code. This class is typically used in situations where an existing + * API only accepts an {@link OutputStream} object, but where the stream is known to represent + * character data that must be decoded for further use. + * <p> + * Instances of {@link WriterOutputStream} are not thread safe. + */ +//NOTE: Remove this class once Commons IO 2.0 is available (see IO-158) +public class WriterOutputStream extends OutputStream { + private static final int DEFAULT_BUFFER_SIZE = 1024; + + private final Writer writer; + private final CharsetDecoder decoder; + private final boolean writeImmediately; + + /** + * ByteBuffer used as input for the decoder. This buffer can be small + * as it is used only to transfer the received data to the + * decoder. + */ + private final ByteBuffer decoderIn = ByteBuffer.allocate(128); + + /** + * CharBuffer used as output for the decoder. It should be + * somewhat larger as we write from this buffer to the + * underlying Writer. + */ + private final CharBuffer decoderOut; + + /** + * Constructs a new {@link WriterOutputStream}. + * + * @param writer the target {@link Writer} + * @param charset the charset encoding + * @param bufferSize the size of the output buffer in number of characters + * @param writeImmediately If <tt>true</tt> the output buffer will be flushed after each + * write operation, i.e. all available data will be written to the + * underlying {@link Writer} immediately. If <tt>false</tt>, the + * output buffer will only be flushed when it overflows or when + * {@link #flush()} or {@link #close()} is called. + */ + public WriterOutputStream(Writer writer, Charset charset, int bufferSize, boolean writeImmediately) { + this.writer = writer; + decoder = charset.newDecoder(); + decoder.onMalformedInput(CodingErrorAction.REPLACE); + decoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + decoder.replaceWith("?"); + this.writeImmediately = writeImmediately; + decoderOut = CharBuffer.allocate(bufferSize); + } + + /** + * Constructs a new {@link WriterOutputStream} with a default output buffer size of + * 1024 characters. The output buffer will only be flushed when it overflows or when + * {@link #flush()} or {@link #close()} is called. + * + * @param writer the target {@link Writer} + * @param charset the charset encoding + */ + public WriterOutputStream(Writer writer, Charset charset) { + this(writer, charset, DEFAULT_BUFFER_SIZE, false); + } + + /** + * Constructs a new {@link WriterOutputStream}. + * + * @param writer the target {@link Writer} + * @param charsetName the name of the charset encoding + * @param bufferSize the size of the output buffer in number of characters + * @param writeImmediately If <tt>true</tt> the output buffer will be flushed after each + * write operation, i.e. all available data will be written to the + * underlying {@link Writer} immediately. If <tt>false</tt>, the + * output buffer will only be flushed when it overflows or when + * {@link #flush()} or {@link #close()} is called. + */ + public WriterOutputStream(Writer writer, String charsetName, int bufferSize, boolean writeImmediately) { + this(writer, Charset.forName(charsetName), bufferSize, writeImmediately); + } + + /** + * Constructs a new {@link WriterOutputStream} with a default output buffer size of + * 1024 characters. The output buffer will only be flushed when it overflows or when + * {@link #flush()} or {@link #close()} is called. + * + * @param writer the target {@link Writer} + * @param charsetName the name of the charset encoding + */ + public WriterOutputStream(Writer writer, String charsetName) { + this(writer, charsetName, DEFAULT_BUFFER_SIZE, false); + } + + /** + * Constructs a new {@link WriterOutputStream} that uses the default character encoding + * and with a default output buffer size of 1024 characters. The output buffer will only + * be flushed when it overflows or when {@link #flush()} or {@link #close()} is called. + * + * @param writer the target {@link Writer} + */ + public WriterOutputStream(Writer writer) { + this(writer, Charset.defaultCharset(), DEFAULT_BUFFER_SIZE, false); + } + + /** + * Write bytes from the specified byte array to the stream. + * + * @param b the byte array containing the bytes to write + * @param off the start offset in the byte array + * @param len the number of bytes to write + */ + @Override + public void write(byte[] b, int off, int len) throws IOException { + while (len > 0) { + int c = Math.min(len, decoderIn.remaining()); + decoderIn.put(b, off, c); + processInput(false); + len -= c; + off += c; + } + if (writeImmediately) { + flushOutput(); + } + } + + /** + * Write bytes from the specified byte array to the stream. + * + * @param b the byte array containing the bytes to write + */ + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** + * Write a single byte to the stream. + * + * @param b the byte to write + */ + @Override + public void write(int b) throws IOException { + write(new byte[] { (byte)b }, 0, 1); + } + + /** + * Flush the stream. Any remaining content accumulated in the output buffer + * will be written to the underlying {@link Writer}. After that + * {@link Writer#flush()} will be called. + */ + @Override + public void flush() throws IOException { + flushOutput(); + writer.flush(); + } + + /** + * Close the stream. Any remaining content accumulated in the output buffer + * will be written to the underlying {@link Writer}. After that + * {@link Writer#close()} will be called. + */ + @Override + public void close() throws IOException { + processInput(true); + flushOutput(); + writer.close(); + } + + private void processInput(boolean endOfInput) throws IOException { + // Prepare decoderIn for reading + decoderIn.flip(); + CoderResult coderResult; + while (true) { + coderResult = decoder.decode(decoderIn, decoderOut, endOfInput); + if (coderResult.isOverflow()) { + flushOutput(); + } else if (coderResult.isUnderflow()) { + break; + } else { + // The decoder is configured to replace malformed input and unmappable characters, + // so we should not get here. + throw new IOException("Unexpected coder result"); + } + } + // Discard the bytes that have been read + decoderIn.compact(); + } + + private void flushOutput() throws IOException { + if (decoderOut.position() > 0) { + writer.write(decoderOut.array(), 0, decoderOut.position()); + decoderOut.rewind(); + } + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeThreadFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeThreadFactory.java new file mode 100644 index 0000000000..290ca08471 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeThreadFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This is a simple ThreadFactory implementation using java.util.concurrent + * Creates threads with the given name prefix + */ +public class NativeThreadFactory implements + ThreadFactory { + + final ThreadGroup group; + final AtomicInteger count; + final String namePrefix; + + public NativeThreadFactory(final ThreadGroup group, final String namePrefix) { + super(); + this.count = new AtomicInteger(1); + this.group = group; + this.namePrefix = namePrefix; + } + + public Thread newThread(final Runnable runnable) { + StringBuffer buffer = new StringBuffer(); + buffer.append(this.namePrefix); + buffer.append('-'); + buffer.append(this.count.getAndIncrement()); + Thread t = new Thread(group, runnable, buffer.toString(), 0); + t.setDaemon(false); + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +}
\ No newline at end of file diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeWorkerPool.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeWorkerPool.java new file mode 100644 index 0000000000..9401daee8e --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/NativeWorkerPool.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Worker pool implementation based on java.util.concurrent in JDK 1.5 or later. + */ +public class NativeWorkerPool implements WorkerPool { + + static final Log log = LogFactory.getLog(NativeWorkerPool.class); + + private final ThreadPoolExecutor executor; + private final LinkedBlockingQueue<Runnable> blockingQueue; + + public NativeWorkerPool(int core, int max, int keepAlive, + int queueLength, String threadGroupName, String threadGroupId) { + + if (log.isDebugEnabled()) { + log.debug("Using native util.concurrent package.."); + } + blockingQueue = + (queueLength == -1 ? new LinkedBlockingQueue<Runnable>() + : new LinkedBlockingQueue<Runnable>(queueLength)); + executor = new ThreadPoolExecutor( + core, max, keepAlive, + TimeUnit.SECONDS, + blockingQueue, + new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId)); + } + + public void execute(final Runnable task) { + executor.execute(new Runnable() { + public void run() { + try { + task.run(); + } catch (Throwable t) { + log.error("Uncaught exception", t); + } + } + }); + } + + public int getActiveCount() { + return executor.getActiveCount(); + } + + public int getQueueSize() { + return blockingQueue.size(); + } + + public void shutdown(int timeout) throws InterruptedException { + executor.shutdown(); + executor.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPool.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPool.java new file mode 100644 index 0000000000..61745545ba --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPool.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads; + +public interface WorkerPool { + /** + * Asynchronously execute the given task using one of the threads of the worker pool. + * The task is expected to terminate gracefully, i.e. {@link Runnable#run()} should not + * throw an exception. Any uncaught exceptions should be logged by the worker pool + * implementation. + * + * @param task the task to execute + */ + public void execute(Runnable task); + + public int getActiveCount(); + public int getQueueSize(); + + /** + * Destroy the worker pool. The pool will immediately stop + * accepting new tasks. All previously submitted tasks will + * be executed. The method blocks until all tasks have + * completed execution, or the timeout occurs, or the current + * thread is interrupted, whichever happens first. + * + * @param timeout the timeout value in milliseconds + * @throws InterruptedException if the current thread was + * interrupted while waiting for pending tasks to + * finish execution + */ + public void shutdown(int timeout) throws InterruptedException; +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPoolFactory.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPoolFactory.java new file mode 100644 index 0000000000..f93c8580cd --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/threads/WorkerPoolFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads; + +/** + * Worker pool factory. + * For the moment this always creates {@link NativeWorkerPool} instances since + * we assume that we are running on Java 1.5 or above. + */ +public class WorkerPoolFactory { + + public static WorkerPool getWorkerPool(int core, int max, int keepAlive, + int queueLength, String threadGroupName, String threadGroupId) { + return new NativeWorkerPool( + core, max, keepAlive, queueLength, threadGroupName, threadGroupId); + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceFilter.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceFilter.java new file mode 100644 index 0000000000..8d49a2a437 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceFilter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker; + +import org.apache.axis2.description.AxisService; + +/** + * Filter for {@link AxisService} instances. This interface is used by + * {@link AxisServiceTracker}. + */ +public interface AxisServiceFilter { + /** + * Examine whether a given service matches the filter criteria. + * + * @param service the service to examine + * @return <code>true</code> if the service matches the filter criteria + */ + boolean matches(AxisService service); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTracker.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTracker.java new file mode 100644 index 0000000000..fca85ee71e --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTracker.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + +import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.AxisModule; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.AxisServiceGroup; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.engine.AxisConfiguration; +import org.apache.axis2.engine.AxisEvent; +import org.apache.axis2.engine.AxisObserver; + +/** + * <p>Tracks services deployed in a given {@link AxisConfiguration}. + * The tracker is configured with references to three objects:</p> + * <ol> + * <li>An {@link AxisConfiguration} to watch.</li> + * <li>An {@link AxisServiceFilter} restricting the services to track.</li> + * <li>An {@link AxisServiceTrackerListener} receiving tracking events.</li> + * </ol> + * <p>An instance of this class maintains an up-to-date list of services + * satisfying all of the following criteria:</p> + * <ol> + * <li>The service is deployed in the given {@link AxisConfiguration}.</li> + * <li>The service is started, i.e. {@link AxisService#isActive()} returns true.</li> + * <li>The service matches the criteria specified by the given + * {@link AxisServiceFilter} instance.</li> + * </ol> + * <p>Whenever a service appears on the list, the tracker will call + * {@link AxisServiceTrackerListener#serviceAdded(AxisService)}. When a service disappears, it + * will call {@link AxisServiceTrackerListener#serviceRemoved(AxisService)}.</p> + * <p>When the tracker is created, it is initially in the stopped state. In this state no + * events will be sent to the listener. It can be started using {@link #start()} and stopped again + * using {@link #stop()}. The tracker list is defined to be empty when the tracker is in the + * stopped state. This implies that a call to {@link #start()} will generate + * {@link AxisServiceTrackerListener#serviceAdded(AxisService)} events for all services that meet + * the above criteria at that point in time. In the same way, {@link #stop()} will generate + * {@link AxisServiceTrackerListener#serviceRemoved(AxisService)} events for the current entries + * in the list.</p> + * <p>As a corollary the tracker guarantees that during a complete lifecycle (start-stop), + * there will be exactly one {@link AxisServiceTrackerListener#serviceRemoved(AxisService)} event + * for every {@link AxisServiceTrackerListener#serviceAdded(AxisService)} event and vice-versa. + * This property is important when the tracker is used to allocate resources for a dynamic set + * of services.</p> + * + * <h2>Limitations</h2> + * + * <p>The tracker is not able to detect property changes on services. E.g. if a service initially + * matches the filter criteria, but later changes so that it doesn't match the criteria any more, + * the tracker will not be able to detect this and the service will not be removed from the tracker + * list.</p> + */ +public class AxisServiceTracker { + private final AxisObserver observer = new AxisObserver() { + public void init(AxisConfiguration axisConfig) {} + + public void serviceUpdate(AxisEvent event, final AxisService service) { + switch (event.getEventType()) { + case AxisEvent.SERVICE_DEPLOY: + case AxisEvent.SERVICE_START: + if (filter.matches(service)) { + boolean pending; + synchronized (lock) { + if (pending = (pendingActions != null)) { + pendingActions.add(new Runnable() { + public void run() { + serviceAdded(service); + } + }); + } + } + if (!pending) { + serviceAdded(service); + } + } + break; + case AxisEvent.SERVICE_REMOVE: + case AxisEvent.SERVICE_STOP: + // Don't check filter here because the properties of the service may have + // changed in the meantime. + boolean pending; + synchronized (lock) { + if (pending = (pendingActions != null)) { + pendingActions.add(new Runnable() { + public void run() { + serviceRemoved(service); + } + }); + } + } + if (!pending) { + serviceRemoved(service); + } + } + } + + public void moduleUpdate(AxisEvent event, AxisModule module) {} + public void addParameter(Parameter param) throws AxisFault {} + public void removeParameter(Parameter param) throws AxisFault {} + public void deserializeParameters(OMElement parameterElement) throws AxisFault {} + public Parameter getParameter(String name) { return null; } + public ArrayList<Parameter> getParameters() { return null; } + public boolean isParameterLocked(String parameterName) { return false; } + public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup serviceGroup) {} + }; + + private final AxisConfiguration config; + final AxisServiceFilter filter; + private final AxisServiceTrackerListener listener; + + /** + * Object used to synchronize access to {@link #pendingActions} and {@link #services}. + */ + final Object lock = new Object(); + + /** + * Queue for notifications received by the {@link AxisObserver} during startup of the tracker. + * We need this because the events may already be reflected in the list of services returned + * by {@link AxisConfiguration#getServices()} (getting the list of currently deployed services + * and adding the observer can't be done atomically). It also allows us to make sure that + * events are sent to the listener in the right order, e.g. when a service is being removed + * during startup of the tracker. + */ + Queue<Runnable> pendingActions; + + /** + * The current list of services. <code>null</code> if the tracker is stopped. + */ + private Set<AxisService> services; + + public AxisServiceTracker(AxisConfiguration config, AxisServiceFilter filter, + AxisServiceTrackerListener listener) { + this.config = config; + this.filter = filter; + this.listener = listener; + } + + /** + * Check whether the tracker is started. + * + * @return <code>true</code> if the tracker is started + */ + public boolean isStarted() { + return services != null; + } + + /** + * Start the tracker. + * + * @throws IllegalStateException if the tracker has already been started + */ + public void start() { + if (services != null) { + throw new IllegalStateException(); + } + synchronized (lock) { + pendingActions = new LinkedList<Runnable>(); + config.addObservers(observer); + services = new HashSet<AxisService>(); + } + for (Object o : config.getServices().values()) { + AxisService service = (AxisService)o; + if (service.isActive() && filter.matches(service)) { + serviceAdded(service); + } + } + while (true) { + Runnable action; + synchronized (lock) { + action = pendingActions.poll(); + if (action == null) { + pendingActions = null; + break; + } + } + action.run(); + } + } + + void serviceAdded(AxisService service) { + // callListener may be false because the observer got an event for a service that + // was already in the initial list of services retrieved by AxisConfiguration#getServices. + boolean callListener; + synchronized (lock) { + callListener = services.add(service); + } + if (callListener) { + listener.serviceAdded(service); + } + } + + void serviceRemoved(AxisService service) { + // callListener may be false because the observer invokes this method without applying the + // filter. + boolean callListener; + synchronized (lock) { + callListener = services.remove(service); + } + if (callListener) { + listener.serviceRemoved(service); + } + } + + /** + * Stop the tracker. + * + * @throws IllegalStateException if the tracker is not started + */ + public void stop() { + if (services == null) { + throw new IllegalStateException(); + } + // TODO: This is very bad, but AxisConfiguration has no removeObserver method! + config.getObserversList().remove(observer); + for (AxisService service : services) { + listener.serviceRemoved(service); + } + services = null; + } +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTrackerListener.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTrackerListener.java new file mode 100644 index 0000000000..ccca293732 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/AxisServiceTrackerListener.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker; + +import org.apache.axis2.description.AxisService; + +/** + * Listener for events generated by an {@link AxisServiceTracker}. + */ +public interface AxisServiceTrackerListener { + /** + * Inform the listener that a service has been added to tracker list. + * + * @param service the service that has been added to the tracker list + */ + void serviceAdded(AxisService service); + + /** + * Inform the listener that a service has been removed from the tracker list. + * + * @param service the service that has been removed from the tracker list + */ + void serviceRemoved(AxisService service); +} diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/package-info.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/package-info.java new file mode 100644 index 0000000000..eefadbbc1e --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/transport/base/tracker/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Contains utility classes to track a dynamic set of services deployed in an + * Axis configuration. + * + * @see org.apache.axis2.transport.base.tracker.AxisServiceTracker + */ +package org.apache.tuscany.sca.binding.ws.axis2.transport.base.tracker;
\ No newline at end of file |