summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java237
1 files changed, 237 insertions, 0 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
new file mode 100644
index 0000000000..ebd67e53e1
--- /dev/null
+++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSMessageReceiver.java
@@ -0,0 +1,237 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tuscany.sca.binding.ws.axis2.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.transaction.UserTransaction;
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.binding.ws.axis2.jms.ctype.ContentTypeInfo;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
+import org.apache.tuscany.sca.binding.ws.axis2.transport.base.MetricsCollector;
+
+/**
+ * This is the JMS message receiver which is invoked when a message is received. This processes
+ * the message through the engine
+ */
+public class JMSMessageReceiver {
+
+ private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
+
+ /** The JMSListener */
+ private JMSListener jmsListener = null;
+ /** A reference to the JMS Connection Factory */
+ private JMSConnectionFactory jmsConnectionFactory = null;
+ /** The JMS metrics collector */
+ private MetricsCollector metrics = null;
+ /** The endpoint this message receiver is bound to */
+ final JMSEndpoint endpoint;
+
+ /**
+ * Create a new JMSMessage receiver
+ *
+ * @param jmsListener the JMS transport Listener
+ * @param jmsConFac the JMS connection factory we are associated with
+ * @param workerPool the worker thread pool to be used
+ * @param cfgCtx the axis ConfigurationContext
+ * @param serviceName the name of the Axis service
+ * @param endpoint the JMSEndpoint definition to be used
+ */
+ JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
+ this.jmsListener = jmsListener;
+ this.jmsConnectionFactory = jmsConFac;
+ this.endpoint = endpoint;
+ this.metrics = jmsListener.getMetricsCollector();
+ }
+
+ /**
+ * Process a new message received
+ *
+ * @param message the JMS message received
+ * @param ut UserTransaction which was used to receive the message
+ * @return true if caller should commit
+ */
+ public boolean onMessage(Message message, UserTransaction ut) {
+
+ try {
+ if (log.isDebugEnabled()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
+ sb.append("\nDestination : ").append(message.getJMSDestination());
+ sb.append("\nMessage ID : ").append(message.getJMSMessageID());
+ sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
+ sb.append("\nReplyTo : ").append(message.getJMSReplyTo());
+ sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered());
+ sb.append("\nPriority : ").append(message.getJMSPriority());
+ sb.append("\nExpiration : ").append(message.getJMSExpiration());
+ sb.append("\nTimestamp : ").append(message.getJMSTimestamp());
+ sb.append("\nMessage Type : ").append(message.getJMSType());
+ sb.append("\nPersistent ? : ").append(
+ DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
+
+ log.debug(sb.toString());
+ if (log.isTraceEnabled() && message instanceof TextMessage) {
+ log.trace("\nMessage : " + ((TextMessage) message).getText());
+ }
+ }
+ } catch (JMSException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Error reading JMS message headers for debug logging", e);
+ }
+ }
+
+ // update transport level metrics
+ try {
+ metrics.incrementBytesReceived(JMSUtils.getMessageSize(message));
+ } catch (JMSException e) {
+ log.warn("Error reading JMS message size to update transport metrics", e);
+ }
+
+ // has this message already expired? expiration time == 0 means never expires
+ try {
+ long expiryTime = message.getJMSExpiration();
+ if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
+ if (log.isDebugEnabled()) {
+ log.debug("Discard expired message with ID : " + message.getJMSMessageID());
+ }
+ return true;
+ }
+ } catch (JMSException ignore) {}
+
+
+ boolean successful = false;
+ try {
+ successful = processThoughEngine(message, ut);
+
+ } catch (JMSException e) {
+ log.error("JMS Exception encountered while processing", e);
+ } catch (AxisFault e) {
+ log.error("Axis fault processing message", e);
+ } catch (Exception e) {
+ log.error("Unknown error processing message", e);
+
+ } finally {
+ if (successful) {
+ metrics.incrementMessagesReceived();
+ } else {
+ metrics.incrementFaultsReceiving();
+ }
+ }
+
+ return successful;
+ }
+
+ /**
+ * Process the new message through Axis2
+ *
+ * @param message the JMS message
+ * @param ut the UserTransaction used for receipt
+ * @return true if the caller should commit
+ * @throws JMSException, on JMS exceptions
+ * @throws AxisFault on Axis2 errors
+ */
+ private boolean processThoughEngine(Message message, UserTransaction ut)
+ throws JMSException, AxisFault {
+
+ MessageContext msgContext = jmsListener.createMessageContext();
+
+ // set the JMS Message ID as the Message ID of the MessageContext
+ try {
+ msgContext.setMessageID(message.getJMSMessageID());
+ msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
+ } catch (JMSException ignore) {}
+
+ String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
+
+ AxisService service = endpoint.getService();
+ msgContext.setAxisService(service);
+
+ // find the operation for the message, or default to one
+ Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
+ QName operationQName = (
+ operationParam != null ?
+ BaseUtils.getQNameFromString(operationParam.getValue()) :
+ BaseConstants.DEFAULT_OPERATION);
+
+ AxisOperation operation = service.getOperation(operationQName);
+ if (operation != null) {
+ msgContext.setAxisOperation(operation);
+ msgContext.setSoapAction("urn:" + operation.getName().getLocalPart());
+ }
+
+ ContentTypeInfo contentTypeInfo =
+ endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
+ if (contentTypeInfo == null) {
+ throw new AxisFault("Unable to determine content type for message " +
+ msgContext.getMessageID());
+ }
+
+ // set the message property OUT_TRANSPORT_INFO
+ // the reply is assumed to be over the JMSReplyTo destination, using
+ // the same incoming connection factory, if a JMSReplyTo is available
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo == null) {
+ // does the service specify a default reply destination ?
+ Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION);
+ if (param != null && param.getValue() != null) {
+ replyTo = jmsConnectionFactory.getDestination((String) param.getValue());
+ }
+
+ }
+ if (replyTo != null) {
+ msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
+ new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
+ contentTypeInfo.getPropertyName()));
+ }
+
+ JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
+ if (ut != null) {
+ msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
+ }
+
+ try {
+ jmsListener.handleIncomingMessage(
+ msgContext,
+ JMSUtils.getTransportHeaders(message),
+ soapAction,
+ contentTypeInfo.getContentType());
+
+ } finally {
+
+ Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
+ if (o != null) {
+ if ((o instanceof Boolean && ((Boolean) o)) ||
+ (o instanceof String && Boolean.valueOf((String) o))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+}