diff options
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java')
-rw-r--r-- | branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java | 1217 |
1 files changed, 0 insertions, 1217 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java deleted file mode 100644 index 28c8da2a8d..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/ServiceTaskManager.java +++ /dev/null @@ -1,1217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.transaction.NotSupportedException; -import javax.transaction.SystemException; -import javax.transaction.UserTransaction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; - -/** - * Each service will have one ServiceTaskManager instance that will create, manage and also destroy - * idle tasks created for it, for message receipt. This will also allow individual tasks to cache - * the Connection, Session or Consumer as necessary, considering the transactionality required and - * user preference. - * - * This also acts as the ExceptionListener for all JMS connections made on behalf of the service. - * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try - * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh - * for the service, by discarding all connections. - */ -public class ServiceTaskManager { - - /** The logger */ - private static final Log log = LogFactory.getLog(ServiceTaskManager.class); - - /** The Task manager is stopped or has not started */ - private static final int STATE_STOPPED = 0; - /** The Task manager is started and active */ - private static final int STATE_STARTED = 1; - /** The Task manager is paused temporarily */ - private static final int STATE_PAUSED = 2; - /** The Task manager is started, but a shutdown has been requested */ - private static final int STATE_SHUTTING_DOWN = 3; - /** The Task manager has encountered an error */ - private static final int STATE_FAILURE = 4; - - /** The name of the service managed by this instance */ - private String serviceName; - /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */ - private String connFactoryJNDIName; - /** The JNDI name of the Destination Queue or Topic */ - private String destinationJNDIName; - /** JNDI location for the JTA UserTransaction */ - private String userTransactionJNDIName = "java:comp/UserTransaction"; - /** The type of destination - P2P or PubSub (or JMS 1.1 API generic?) */ - private int destinationType = JMSConstants.GENERIC; - /** An optional message selector */ - private String messageSelector = null; - - /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */ - private int transactionality = BaseConstants.TRANSACTION_NONE; - /** Should created Sessions be transactional ? - should be false when using JTA */ - private boolean sessionTransacted = true; - /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */ - private int sessionAckMode = Session.AUTO_ACKNOWLEDGE; - - /** Is the subscription durable ? */ - private boolean subscriptionDurable = false; - /** The name of the durable subscriber for this client */ - private String durableSubscriberName = null; - /** In PubSub mode, should I receive messages sent by me / my connection ? */ - private boolean pubSubNoLocal = false; - /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */ - private int concurrentConsumers = 1; - /** Maximum number of consumers to create - see @concurrentConsumers */ - private int maxConcurrentConsumers = 1; - /** The number of idle (i.e. message-less) attempts to be tried before suicide, to scale down */ - private int idleTaskExecutionLimit = 10; - /** The maximum number of successful message receipts for a task - to limit thread life span */ - private int maxMessagesPerTask = -1; // default is unlimited - /** The default receive timeout - a negative value means wait forever, zero dont wait at all */ - private int receiveTimeout = 1000; - /** JMS Resource cache level - Connection, Session, Consumer. Auto will select safe default */ - private int cacheLevel = JMSConstants.CACHE_AUTO; - /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */ - private boolean cacheUserTransaction = true; - /** Shared UserTransactionHandle */ - private UserTransaction sharedUserTransaction = null; - /** Should this service use JMS 1.1 ? (when false, defaults to 1.0.2b) */ - private boolean jmsSpec11 = true; - - /** Initial duration to attempt re-connection to JMS provider after failure */ - private int initialReconnectDuration = 10000; - /** Progression factory for geometric series that calculates re-connection times */ - private double reconnectionProgressionFactor = 2.0; // default to [bounded] exponential - /** Upper limit on reconnection attempt duration */ - private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour - - /** The JNDI context properties and other general properties */ - private Hashtable<String,String> jmsProperties = new Hashtable<String, String>(); - /** The JNDI Context acuired */ - private Context context = null; - /** The ConnectionFactory to be used */ - private ConnectionFactory conFactory = null; - /** The JMS Destination */ - private Destination destination = null; - - /** The list of active tasks thats managed by this instance */ - private final List<MessageListenerTask> pollingTasks = - Collections.synchronizedList(new ArrayList<MessageListenerTask>()); - /** The per-service JMS message receiver to be invoked after receipt of messages */ - private JMSMessageReceiver jmsMessageReceiver = null; - - /** State of this Task Manager */ - private volatile int serviceTaskManagerState = STATE_STOPPED; - /** Number of invoker tasks active */ - private volatile int activeTaskCount = 0; - /** The shared thread pool from the Listener */ - private WorkerPool workerPool = null; - - /** The JMS Connection shared between multiple polling tasks - when enabled (reccomended) */ - private Connection sharedConnection = null; - - /** - * Start or re-start the Task Manager by shutting down any existing worker tasks and - * re-creating them. However, if this is STM is PAUSED, a start request is ignored. - * This applies for any connection failures during paused state as well, which then will - * not try to auto recover - */ - public synchronized void start() { - - if (serviceTaskManagerState == STATE_PAUSED) { - log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead"); - return; - } - - // if any tasks are running, stop whats running now - if (!pollingTasks.isEmpty()) { - stop(); - } - - if (cacheLevel == JMSConstants.CACHE_AUTO) { - cacheLevel = - transactionality == BaseConstants.TRANSACTION_NONE ? - JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE; - } - switch (cacheLevel) { - case JMSConstants.CACHE_NONE: - log.debug("No JMS resources will be cached/shared between poller " + - "worker tasks of service : " + serviceName); - break; - case JMSConstants.CACHE_CONNECTION: - log.debug("Only the JMS Connection will be cached and shared between *all* " + - "poller task invocations"); - break; - case JMSConstants.CACHE_SESSION: - log.debug("The JMS Connection and Session will be cached and shared between " + - "successive poller task invocations"); - break; - case JMSConstants.CACHE_CONSUMER: - log.debug("The JMS Connection, Session and MessageConsumer will be cached and " + - "shared between successive poller task invocations"); - break; - default : { - handleException("Invalid cache level : " + cacheLevel + - " for service : " + serviceName); - } - } - - for (int i=0; i<concurrentConsumers; i++) { - workerPool.execute(new MessageListenerTask()); - } - - serviceTaskManagerState = STATE_STARTED; - log.info("Task manager for service : " + serviceName + " [re-]initialized"); - } - - /** - * Shutdown the tasks and release any shared resources - */ - public synchronized void stop() { - - if (log.isDebugEnabled()) { - log.debug("Stopping ServiceTaskManager for service : " + serviceName); - } - - if (serviceTaskManagerState != STATE_FAILURE) { - serviceTaskManagerState = STATE_SHUTTING_DOWN; - } - - synchronized(pollingTasks) { - for (MessageListenerTask lstTask : pollingTasks) { - lstTask.requestShutdown(); - } - } - - // try to wait a bit for task shutdown - for (int i=0; i<5; i++) { - if (activeTaskCount == 0) { - break; - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) {} - } - - if (sharedConnection != null) { - try { - sharedConnection.stop(); - } catch (JMSException e) { - logError("Error stopping shared Connection", e); - } finally { - sharedConnection = null; - } - } - - if (activeTaskCount > 0) { - log.warn("Unable to shutdown all polling tasks of service : " + serviceName); - } - - if (serviceTaskManagerState != STATE_FAILURE) { - serviceTaskManagerState = STATE_STOPPED; - } - log.info("Task manager for service : " + serviceName + " shutdown"); - } - - /** - * Temporarily suspend receipt and processing of messages. Accomplished by stopping the - * connection / or connections used by the poller tasks - */ - public synchronized void pause() { - for (MessageListenerTask lstTask : pollingTasks) { - lstTask.pause(); - } - if (sharedConnection != null) { - try { - sharedConnection.stop(); - } catch (JMSException e) { - logError("Error pausing shared Connection", e); - } - } - } - - /** - * Resume receipt and processing of messages of paused tasks - */ - public synchronized void resume() { - for (MessageListenerTask lstTask : pollingTasks) { - lstTask.resume(); - } - if (sharedConnection != null) { - try { - sharedConnection.start(); - } catch (JMSException e) { - logError("Error resuming shared Connection", e); - } - } - } - - /** - * Start a new MessageListenerTask if we are still active, the threshold is not reached, and w - * e do not have any idle tasks - i.e. scale up listening - */ - private void scheduleNewTaskIfAppropriate() { - if (serviceTaskManagerState == STATE_STARTED && - pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) { - workerPool.execute(new MessageListenerTask()); - } - } - - /** - * Get the number of MessageListenerTasks that are currently idle - * @return idle task count - */ - private int getIdleTaskCount() { - int count = 0; - for (MessageListenerTask lstTask : pollingTasks) { - if (lstTask.isTaskIdle()) { - count++; - } - } - return count; - } - - /** - * Get the number of MessageListenerTasks that are currently connected to the JMS provider - * @return connected task count - */ - private int getConnectedTaskCount() { - int count = 0; - for (MessageListenerTask lstTask : pollingTasks) { - if (lstTask.isConnected()) { - count++; - } - } - return count; - } - - /** - * The actual threads/tasks that perform message polling - */ - private class MessageListenerTask implements Runnable, ExceptionListener { - - /** The Connection used by the polling task */ - private Connection connection = null; - /** The Sesson used by the polling task */ - private Session session = null; - /** The MessageConsumer used by the polling task */ - private MessageConsumer consumer = null; - /** State of the worker polling task */ - private volatile int workerState = STATE_STOPPED; - /** The number of idle (i.e. without fetching a message) polls for this task */ - private int idleExecutionCount = 0; - /** Is this task idle right now? */ - private volatile boolean idle = false; - /** Is this task connected to the JMS provider successfully? */ - private boolean connected = false; - - /** As soon as we create a new polling task, add it to the STM for control later */ - MessageListenerTask() { - synchronized(pollingTasks) { - pollingTasks.add(this); - } - } - - /** - * Pause this polling worker task - */ - public void pause() { - if (isActive()) { - if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - connection.stop(); - } catch (JMSException e) { - log.warn("Error pausing Message Listener task for service : " + serviceName); - } - } - workerState = STATE_PAUSED; - } - } - - /** - * Resume this polling task - */ - public void resume() { - if (connection != null && cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - connection.start(); - } catch (JMSException e) { - log.warn("Error resuming Message Listener task for service : " + serviceName); - } - } - workerState = STATE_STARTED; - } - - /** - * Execute the polling worker task - */ - public void run() { - workerState = STATE_STARTED; - activeTaskCount++; - int messageCount = 0; - - if (log.isDebugEnabled()) { - log.debug("New poll task starting : thread id = " + Thread.currentThread().getId()); - } - - try { - while (isActive() && - (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) && - (getConcurrentConsumers() == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) { - - UserTransaction ut = null; - try { - if (transactionality == BaseConstants.TRANSACTION_JTA) { - ut = getUserTransaction(); - ut.begin(); - } - } catch (NotSupportedException e) { - handleException("Listener Task is already associated with a transaction", e); - } catch (SystemException e) { - handleException("Error starting a JTA transaction", e); - } - - // Get a message by polling, or receive null - Message message = receiveMessage(); - - if (log.isTraceEnabled()) { - if (message != null) { - try { - log.trace("<<<<<<< READ message with Message ID : " + - message.getJMSMessageID() + " from : " + destination + - " by Thread ID : " + Thread.currentThread().getId()); - } catch (JMSException ignore) {} - } else { - log.trace("No message received by Thread ID : " + - Thread.currentThread().getId() + " for destination : " + destination); - } - } - - if (message != null) { - idle = false; - idleExecutionCount = 0; - messageCount++; - // I will be busy now while processing this message, so start another if needed - scheduleNewTaskIfAppropriate(); - handleMessage(message, ut); - - } else { - idle = true; - idleExecutionCount++; - } - } - - } finally { - workerState = STATE_STOPPED; - activeTaskCount--; - synchronized(pollingTasks) { - pollingTasks.remove(this); - } - } - - if (log.isTraceEnabled()) { - log.trace("Listener task with Thread ID : " + Thread.currentThread().getId() + - " is stopping after processing : " + messageCount + " messages :: " + - " isActive : " + isActive() + " maxMessagesPerTask : " + - getMaxMessagesPerTask() + " concurrentConsumers : " + getConcurrentConsumers() + - " idleExecutionCount : " + idleExecutionCount + " idleTaskExecutionLimit : " + - getIdleTaskExecutionLimit()); - } else if (log.isDebugEnabled()) { - log.debug("Listener task with Thread ID : " + Thread.currentThread().getId() + - " is stopping after processing : " + messageCount + " messages"); - } - - closeConsumer(true); - closeSession(true); - closeConnection(); - - // My time is up, so if I am going away, create another - scheduleNewTaskIfAppropriate(); - } - - /** - * Poll for and return a message if available - * - * @return a message read, or null - */ - private Message receiveMessage() { - - // get a new connection, session and consumer to prevent a conflict. - // If idle, it means we can re-use what we already have - if (consumer == null) { - connection = getConnection(); - session = getSession(); - consumer = getMessageConsumer(); - if (log.isDebugEnabled()) { - log.debug("Preparing a Connection, Session and Consumer to read messages"); - } - } - - if (log.isDebugEnabled()) { - log.debug("Waiting for a message for service : " + serviceName + " - duration : " - + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms"))); - } - - try { - if (getReceiveTimeout() < 0) { - return consumer.receive(); - } else { - return consumer.receive(getReceiveTimeout()); - } - } catch (IllegalStateException ignore) { - // probably the consumer (shared) was closed.. which is still ok.. as we didn't read - } catch (JMSException e) { - logError("Error receiving message for service : " + serviceName, e); - } - return null; - } - - /** - * Invoke ultimate message handler/listener and ack message and/or - * commit/rollback transactions - * @param message the JMS message received - * @param ut the UserTransaction used to receive this message, or null - */ - private void handleMessage(Message message, UserTransaction ut) { - - String messageId = null; - try { - messageId = message.getJMSMessageID(); - } catch (JMSException ignore) {} - - boolean commitOrAck = true; - try { - commitOrAck = jmsMessageReceiver.onMessage(message, ut); - - } finally { - - // if client acknowledgement is selected, and processing requested ACK - if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) { - try { - message.acknowledge(); - if (log.isDebugEnabled()) { - log.debug("Message : " + messageId + " acknowledged"); - } - } catch (JMSException e) { - logError("Error acknowledging message : " + messageId, e); - } - } - - // close the consumer - closeConsumer(false); - - // if session was transacted, commit it or rollback - try { - if (session.getTransacted()) { - if (commitOrAck) { - session.commit(); - if (log.isDebugEnabled()) { - log.debug("Session for message : " + messageId + " committed"); - } - } else { - session.rollback(); - if (log.isDebugEnabled()) { - log.debug("Session for message : " + messageId + " rolled back"); - } - } - } - } catch (JMSException e) { - logError("Error " + (commitOrAck ? "committing" : "rolling back") + - " local session txn for message : " + messageId, e); - } - - // if a JTA transaction was being used, commit it or rollback - try { - if (ut != null) { - if (commitOrAck) { - ut.commit(); - if (log.isDebugEnabled()) { - log.debug("JTA txn for message : " + messageId + " committed"); - } - } else { - ut.rollback(); - if (log.isDebugEnabled()) { - log.debug("JTA txn for message : " + messageId + " rolled back"); - } - } - } - } catch (Exception e) { - logError("Error " + (commitOrAck ? "committing" : "rolling back") + - " JTA txn for message : " + messageId + " from the session", e); - } - - closeSession(false); - closeConnection(); - } - } - - /** Handle JMS Connection exceptions by re-initializing. A single connection failure could - * cause re-initialization of multiple MessageListenerTasks / Connections - */ - public void onException(JMSException j) { - - if (!isSTMActive()) { - requestShutdown(); - return; - } - - log.warn("JMS Connection failure : " + j.getMessage()); - setConnected(false); - - if (cacheLevel < JMSConstants.CACHE_CONNECTION) { - // failed Connection was not shared, thus no need to restart the whole STM - requestShutdown(); - return; - } - - // if we failed while active, update state to show failure - setServiceTaskManagerState(STATE_FAILURE); - log.error("JMS Connection failed : " + j.getMessage() + " - shutting down worker tasks"); - - int r = 1; - long retryDuration = initialReconnectDuration; - - do { - try { - log.info("Reconnection attempt : " + r + " for service : " + serviceName); - start(); - } catch (Exception ignore) {} - - boolean connected = false; - for (int i=0; i<5; i++) { - if (getConnectedTaskCount() == concurrentConsumers) { - connected = true; - break; - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) {} - } - - if (!connected) { - log.error("Reconnection attempt : " + (r++) + " for service : " + serviceName + - " failed. Next retry in " + (retryDuration/1000) + "seconds"); - retryDuration = (long) (retryDuration * reconnectionProgressionFactor); - if (retryDuration > maxReconnectDuration) { - retryDuration = maxReconnectDuration; - } - - try { - Thread.sleep(retryDuration); - } catch (InterruptedException ignore) {} - } - - } while (!isSTMActive() || getConnectedTaskCount() < concurrentConsumers); - } - - protected void requestShutdown() { - workerState = STATE_SHUTTING_DOWN; - } - - private boolean isActive() { - return workerState == STATE_STARTED; - } - - protected boolean isTaskIdle() { - return idle; - } - - public boolean isConnected() { - return connected; - } - - public void setConnected(boolean connected) { - this.connected = connected; - } - - /** - * Get a Connection that could/should be used by this task - depends on the cache level to reuse - * @return the shared Connection if cache level is higher than CACHE_NONE, or a new Connection - */ - private Connection getConnection() { - if (cacheLevel < JMSConstants.CACHE_CONNECTION) { - // Connection is not shared - if (connection == null) { - connection = createConnection(); - } - } else { - if (sharedConnection != null) { - connection = sharedConnection; - } else { - synchronized(this) { - if (sharedConnection == null) { - sharedConnection = createConnection(); - } - connection = sharedConnection; - } - } - } - setConnected(true); - return connection; - } - - /** - * Get a Session that could/should be used by this task - depends on the cache level to reuse - * @param connection the connection (could be the shared connection) to use to create a Session - * @return the shared Session if cache level is higher than CACHE_CONNECTION, or a new Session - * created using the Connection passed, or a new/shared connection - */ - private Session getSession() { - if (session == null || cacheLevel < JMSConstants.CACHE_SESSION) { - session = createSession(); - } - return session; - } - - /** - * Get a MessageConsumer that chould/should be used by this task - depends on the cache - * level to reuse - * @param connection option Connection to be used - * @param session optional Session to be used - * @return the shared MessageConsumer if cache level is higher than CACHE_SESSION, or a new - * MessageConsumer possibly using the Connection and Session passed in - */ - private MessageConsumer getMessageConsumer() { - if (consumer == null || cacheLevel < JMSConstants.CACHE_CONSUMER) { - consumer = createConsumer(); - } - return consumer; - } - - /** - * Close the given Connection, hiding exceptions if any which are logged - * @param connection the Connection to be closed - */ - private void closeConnection() { - if (connection != null && - cacheLevel < JMSConstants.CACHE_CONNECTION) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS connection for service : " + serviceName); - } - connection.close(); - } catch (JMSException e) { - logError("Error closing JMS connection", e); - } finally { - connection = null; - } - } - } - - /** - * Close the given Session, hiding exceptions if any which are logged - * @param session the Session to be closed - */ - private void closeSession(boolean forced) { - if (session != null && - (cacheLevel < JMSConstants.CACHE_SESSION || forced)) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS session for service : " + serviceName); - } - session.close(); - } catch (JMSException e) { - logError("Error closing JMS session", e); - } finally { - session = null; - } - } - } - - /** - * Close the given Consumer, hiding exceptions if any which are logged - * @param consumer the Consumer to be closed - */ - private void closeConsumer(boolean forced) { - if (consumer != null && - (cacheLevel < JMSConstants.CACHE_CONSUMER || forced)) { - try { - if (log.isDebugEnabled()) { - log.debug("Closing non-shared JMS consumer for service : " + serviceName); - } - consumer.close(); - } catch (JMSException e) { - logError("Error closing JMS consumer", e); - } finally { - consumer = null; - } - } - } - - /** - * Create a new Connection for this STM, using JNDI properties and credentials provided - * @return a new Connection for this STM, using JNDI properties and credentials provided - */ - private Connection createConnection() { - - try { - conFactory = JMSUtils.lookup( - getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName()); - log.debug("Connected to the JMS connection factory : " + getConnFactoryJNDIName()); - } catch (NamingException e) { - handleException("Error looking up connection factory : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - - Connection connection = null; - try { - connection = JMSUtils.createConnection( - conFactory, - jmsProperties.get(JMSConstants.PARAM_JMS_USERNAME), - jmsProperties.get(JMSConstants.PARAM_JMS_PASSWORD), - isJmsSpec11(), isQueue()); - - connection.setExceptionListener(this); - connection.start(); - log.debug("JMS Connection for service : " + serviceName + " created and started"); - - } catch (JMSException e) { - handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - return connection; - } - - /** - * Create a new Session for this STM - * @param connection the Connection to be used - * @return a new Session created using the Connection passed in - */ - private Session createSession() { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS Session for service : " + serviceName); - } - return JMSUtils.createSession( - connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue()); - - } catch (JMSException e) { - handleException("Error creating JMS session for service : " + serviceName, e); - } - return null; - } - - /** - * Create a new MessageConsumer for this STM - * @param session the Session to be used - * @return a new MessageConsumer created using the Session passed in - */ - private MessageConsumer createConsumer() { - try { - if (log.isDebugEnabled()) { - log.debug("Creating a new JMS MessageConsumer for service : " + serviceName); - } - - return JMSUtils.createConsumer( - session, getDestination(session), isQueue(), - (isSubscriptionDurable() && getDurableSubscriberName() == null ? - getDurableSubscriberName() : serviceName), - getMessageSelector(), isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11()); - - } catch (JMSException e) { - handleException("Error creating JMS consumer for service : " + serviceName,e); - } - return null; - } - } - - // -------------- mundane private methods ---------------- - /** - * Get the InitialContext for lookup using the JNDI parameters applicable to the service - * @return the InitialContext to be used - * @throws NamingException - */ - private Context getInitialContext() throws NamingException { - if (context == null) { - context = new InitialContext(jmsProperties); - } - return context; - } - - /** - * Return the JMS Destination for the JNDI name of the Destination from the InitialContext - * @return the JMS Destination to which this STM listens for messages - */ - private Destination getDestination(Session session) { - if (destination == null) { - try { - context = getInitialContext(); - destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName()); - if (log.isDebugEnabled()) { - log.debug("JMS Destination with JNDI name : " + getDestinationJNDIName() + - " found for service " + serviceName); - } - } catch (NamingException e) { - try { - switch (destinationType) { - case JMSConstants.QUEUE: { - destination = session.createQueue(getDestinationJNDIName()); - break; - } - case JMSConstants.TOPIC: { - destination = session.createTopic(getDestinationJNDIName()); - break; - } - default: { - handleException("Error looking up JMS destination : " + - getDestinationJNDIName() + " using JNDI properties : " + - jmsProperties, e); - } - } - } catch (JMSException j) { - handleException("Error looking up and creating JMS destination : " + - getDestinationJNDIName() + " using JNDI properties : " + jmsProperties, e); - } - } - } - return destination; - } - - /** - * The UserTransaction to be used, looked up from the JNDI - * @return The UserTransaction to be used, looked up from the JNDI - */ - private UserTransaction getUserTransaction() { - if (!cacheUserTransaction) { - if (log.isDebugEnabled()) { - log.debug("Acquiring a new UserTransaction for service : " + serviceName); - } - - try { - context = getInitialContext(); - return - JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); - } catch (NamingException e) { - handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - } - - if (sharedUserTransaction == null) { - try { - context = getInitialContext(); - sharedUserTransaction = - JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); - if (log.isDebugEnabled()) { - log.debug("Acquired shared UserTransaction for service : " + serviceName); - } - } catch (NamingException e) { - handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + - " using JNDI properties : " + jmsProperties, e); - } - } - return sharedUserTransaction; - } - - // -------------------- trivial methods --------------------- - private boolean isSTMActive() { - return serviceTaskManagerState == STATE_STARTED; - } - - /** - * Is this STM bound to a Queue, Topic or a JMS 1.1 Generic Destination? - * @return TRUE for a Queue, FALSE for a Topic and NULL for a Generic Destination - */ - private Boolean isQueue() { - if (destinationType == JMSConstants.GENERIC) { - return null; - } else { - return destinationType == JMSConstants.QUEUE; - } - } - - private void logError(String msg, Exception e) { - log.error(msg, e); - } - - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - private void handleException(String msg) { - log.error(msg); - throw new AxisJMSException(msg); - } - - // -------------- getters and setters ------------------ - public String getServiceName() { - return serviceName; - } - - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - public String getConnFactoryJNDIName() { - return connFactoryJNDIName; - } - - public void setConnFactoryJNDIName(String connFactoryJNDIName) { - this.connFactoryJNDIName = connFactoryJNDIName; - } - - public String getDestinationJNDIName() { - return destinationJNDIName; - } - - public void setDestinationJNDIName(String destinationJNDIName) { - this.destinationJNDIName = destinationJNDIName; - } - - public int getDestinationType() { - return destinationType; - } - - public void setDestinationType(int destinationType) { - this.destinationType = destinationType; - } - - public String getMessageSelector() { - return messageSelector; - } - - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - public int getTransactionality() { - return transactionality; - } - - public void setTransactionality(int transactionality) { - this.transactionality = transactionality; - sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL); - } - - public boolean isSessionTransacted() { - return sessionTransacted; - } - - public void setSessionTransacted(Boolean sessionTransacted) { - if (sessionTransacted != null) { - this.sessionTransacted = sessionTransacted; - // sesstionTransacted means local transactions are used, however !sessionTransacted does - // not mean that JTA is used - if (sessionTransacted) { - transactionality = BaseConstants.TRANSACTION_LOCAL; - } - } - } - - public int getSessionAckMode() { - return sessionAckMode; - } - - public void setSessionAckMode(int sessionAckMode) { - this.sessionAckMode = sessionAckMode; - } - - public boolean isSubscriptionDurable() { - return subscriptionDurable; - } - - public void setSubscriptionDurable(Boolean subscriptionDurable) { - if (subscriptionDurable != null) { - this.subscriptionDurable = subscriptionDurable; - } - } - - public String getDurableSubscriberName() { - return durableSubscriberName; - } - - public void setDurableSubscriberName(String durableSubscriberName) { - this.durableSubscriberName = durableSubscriberName; - } - - public boolean isPubSubNoLocal() { - return pubSubNoLocal; - } - - public void setPubSubNoLocal(Boolean pubSubNoLocal) { - if (pubSubNoLocal != null) { - this.pubSubNoLocal = pubSubNoLocal; - } - } - - public int getConcurrentConsumers() { - return concurrentConsumers; - } - - public void setConcurrentConsumers(int concurrentConsumers) { - this.concurrentConsumers = concurrentConsumers; - } - - public int getMaxConcurrentConsumers() { - return maxConcurrentConsumers; - } - - public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { - this.maxConcurrentConsumers = maxConcurrentConsumers; - } - - public int getIdleTaskExecutionLimit() { - return idleTaskExecutionLimit; - } - - public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { - this.idleTaskExecutionLimit = idleTaskExecutionLimit; - } - - public int getReceiveTimeout() { - return receiveTimeout; - } - - public void setReceiveTimeout(int receiveTimeout) { - this.receiveTimeout = receiveTimeout; - } - - public int getCacheLevel() { - return cacheLevel; - } - - public void setCacheLevel(int cacheLevel) { - this.cacheLevel = cacheLevel; - } - - public int getInitialReconnectDuration() { - return initialReconnectDuration; - } - - public void setInitialReconnectDuration(int initialReconnectDuration) { - this.initialReconnectDuration = initialReconnectDuration; - } - - public double getReconnectionProgressionFactor() { - return reconnectionProgressionFactor; - } - - public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) { - this.reconnectionProgressionFactor = reconnectionProgressionFactor; - } - - public long getMaxReconnectDuration() { - return maxReconnectDuration; - } - - public void setMaxReconnectDuration(long maxReconnectDuration) { - this.maxReconnectDuration = maxReconnectDuration; - } - - public int getMaxMessagesPerTask() { - return maxMessagesPerTask; - } - - public void setMaxMessagesPerTask(int maxMessagesPerTask) { - this.maxMessagesPerTask = maxMessagesPerTask; - } - - public String getUserTransactionJNDIName() { - return userTransactionJNDIName; - } - - public void setUserTransactionJNDIName(String userTransactionJNDIName) { - if (userTransactionJNDIName != null) { - this.userTransactionJNDIName = userTransactionJNDIName; - } - } - - public boolean isCacheUserTransaction() { - return cacheUserTransaction; - } - - public void setCacheUserTransaction(Boolean cacheUserTransaction) { - if (cacheUserTransaction != null) { - this.cacheUserTransaction = cacheUserTransaction; - } - } - - public boolean isJmsSpec11() { - return jmsSpec11; - } - - public void setJmsSpec11(boolean jmsSpec11) { - this.jmsSpec11 = jmsSpec11; - } - - public Hashtable<String, String> getJmsProperties() { - return jmsProperties; - } - - public void addJmsProperties(Map<String, String> jmsProperties) { - this.jmsProperties.putAll(jmsProperties); - } - - public void removeJmsProperties(String key) { - this.jmsProperties.remove(key); - } - - public Context getContext() { - return context; - } - - public ConnectionFactory getConnectionFactory() { - return conFactory; - } - - public List<MessageListenerTask> getPollingTasks() { - return pollingTasks; - } - - public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) { - this.jmsMessageReceiver = jmsMessageReceiver; - } - - public void setWorkerPool(WorkerPool workerPool) { - this.workerPool = workerPool; - } - - public int getActiveTaskCount() { - return activeTaskCount; - } - - public void setServiceTaskManagerState(int serviceTaskManagerState) { - this.serviceTaskManagerState = serviceTaskManagerState; - } -} |