summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java237
1 files changed, 0 insertions, 237 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
deleted file mode 100644
index ebd67e53e1..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-import javax.transaction.UserTransaction;
-import javax.xml.namespace.QName;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeInfo;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector;
-
-/**
- * This is the JMS message receiver which is invoked when a message is received. This processes
- * the message through the engine
- */
-public class JMSMessageReceiver {
-
- private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
-
- /** The JMSListener */
- private JMSListener jmsListener = null;
- /** A reference to the JMS Connection Factory */
- private JMSConnectionFactory jmsConnectionFactory = null;
- /** The JMS metrics collector */
- private MetricsCollector metrics = null;
- /** The endpoint this message receiver is bound to */
- final JMSEndpoint endpoint;
-
- /**
- * Create a new JMSMessage receiver
- *
- * @param jmsListener the JMS transport Listener
- * @param jmsConFac the JMS connection factory we are associated with
- * @param workerPool the worker thread pool to be used
- * @param cfgCtx the axis ConfigurationContext
- * @param serviceName the name of the Axis service
- * @param endpoint the JMSEndpoint definition to be used
- */
- JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
- this.jmsListener = jmsListener;
- this.jmsConnectionFactory = jmsConFac;
- this.endpoint = endpoint;
- this.metrics = jmsListener.getMetricsCollector();
- }
-
- /**
- * Process a new message received
- *
- * @param message the JMS message received
- * @param ut UserTransaction which was used to receive the message
- * @return true if caller should commit
- */
- public boolean onMessage(Message message, UserTransaction ut) {
-
- try {
- if (log.isDebugEnabled()) {
- StringBuffer sb = new StringBuffer();
- sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
- sb.append("\nDestination : ").append(message.getJMSDestination());
- sb.append("\nMessage ID : ").append(message.getJMSMessageID());
- sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
- sb.append("\nReplyTo : ").append(message.getJMSReplyTo());
- sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered());
- sb.append("\nPriority : ").append(message.getJMSPriority());
- sb.append("\nExpiration : ").append(message.getJMSExpiration());
- sb.append("\nTimestamp : ").append(message.getJMSTimestamp());
- sb.append("\nMessage Type : ").append(message.getJMSType());
- sb.append("\nPersistent ? : ").append(
- DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
-
- log.debug(sb.toString());
- if (log.isTraceEnabled() && message instanceof TextMessage) {
- log.trace("\nMessage : " + ((TextMessage) message).getText());
- }
- }
- } catch (JMSException e) {
- if (log.isDebugEnabled()) {
- log.debug("Error reading JMS message headers for debug logging", e);
- }
- }
-
- // update transport level metrics
- try {
- metrics.incrementBytesReceived(JMSUtils.getMessageSize(message));
- } catch (JMSException e) {
- log.warn("Error reading JMS message size to update transport metrics", e);
- }
-
- // has this message already expired? expiration time == 0 means never expires
- try {
- long expiryTime = message.getJMSExpiration();
- if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
- if (log.isDebugEnabled()) {
- log.debug("Discard expired message with ID : " + message.getJMSMessageID());
- }
- return true;
- }
- } catch (JMSException ignore) {}
-
-
- boolean successful = false;
- try {
- successful = processThoughEngine(message, ut);
-
- } catch (JMSException e) {
- log.error("JMS Exception encountered while processing", e);
- } catch (AxisFault e) {
- log.error("Axis fault processing message", e);
- } catch (Exception e) {
- log.error("Unknown error processing message", e);
-
- } finally {
- if (successful) {
- metrics.incrementMessagesReceived();
- } else {
- metrics.incrementFaultsReceiving();
- }
- }
-
- return successful;
- }
-
- /**
- * Process the new message through Axis2
- *
- * @param message the JMS message
- * @param ut the UserTransaction used for receipt
- * @return true if the caller should commit
- * @throws JMSException, on JMS exceptions
- * @throws AxisFault on Axis2 errors
- */
- private boolean processThoughEngine(Message message, UserTransaction ut)
- throws JMSException, AxisFault {
-
- MessageContext msgContext = jmsListener.createMessageContext();
-
- // set the JMS Message ID as the Message ID of the MessageContext
- try {
- msgContext.setMessageID(message.getJMSMessageID());
- msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
- } catch (JMSException ignore) {}
-
- String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
-
- AxisService service = endpoint.getService();
- msgContext.setAxisService(service);
-
- // find the operation for the message, or default to one
- Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
- QName operationQName = (
- operationParam != null ?
- BaseUtils.getQNameFromString(operationParam.getValue()) :
- BaseConstants.DEFAULT_OPERATION);
-
- AxisOperation operation = service.getOperation(operationQName);
- if (operation != null) {
- msgContext.setAxisOperation(operation);
- msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
- }
-
- ContentTypeInfo contentTypeInfo =
- endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
- if (contentTypeInfo == null) {
- throw new AxisFault("Unable to determine content type for message " +
- msgContext.getMessageID());
- }
-
- // set the message property OUT_TRANSPORT_INFO
- // the reply is assumed to be over the JMSReplyTo destination, using
- // the same incoming connection factory, if a JMSReplyTo is available
- Destination replyTo = message.getJMSReplyTo();
- if (replyTo == null) {
- // does the service specify a default reply destination ?
- Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION);
- if (param != null && param.getValue() != null) {
- replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
- }
-
- }
- if (replyTo != null) {
- msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
- new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
- contentTypeInfo.getPropertyName()));
- }
-
- JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
- if (ut != null) {
- msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
- }
-
- try {
- jmsListener.handleIncomingMessage(
- msgContext,
- JMSUtils.getTransportHeaders(message),
- soapAction,
- contentTypeInfo.getContentType());
-
- } finally {
-
- Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
- if (o != null) {
- if ((o instanceof Boolean && ((Boolean) o)) ||
- (o instanceof String && Boolean.valueOf((String) o))) {
- return false;
- }
- }
- return true;
- }
- }
-}