summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
diff options
context:
space:
mode:
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java')
-rw-r--r--branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java499
1 files changed, 0 insertions, 499 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
deleted file mode 100644
index a5f77dc4c9..0000000000
--- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
-* Copyright 2004,2005 The Apache Software Foundation.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.StringWriter;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-
-import javax.activation.DataHandler;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMNode;
-import org.apache.axiom.om.OMOutputFormat;
-import org.apache.axiom.om.OMText;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.transport.MessageFormatter;
-import org.apache.axis2.transport.OutTransportInfo;
-import org.apache.axis2.transport.TransportUtils;
-import org.apache.axis2.transport.http.HTTPConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportSender;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport;
-import org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams.WriterOutputStream;
-
-/**
- * The TransportSender for JMS
- */
-public class JMSSender extends AbstractTransportSender implements ManagementSupport {
-
- public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
-
- /** The JMS connection factory manager to be used when sending messages out */
- private JMSConnectionFactoryManager connFacManager;
-
- /**
- * Initialize the transport sender by reading pre-defined connection factories for
- * outgoing messages.
- *
- * @param cfgCtx the configuration context
- * @param transportOut the transport sender definition from axis2.xml
- * @throws AxisFault on error
- */
- public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
- super.init(cfgCtx, transportOut);
- connFacManager = new JMSConnectionFactoryManager(transportOut);
- log.info("JMS Transport Sender initialized...");
- }
-
- /**
- * Get corresponding JMS connection factory defined within the transport sender for the
- * transport-out information - usually constructed from a targetEPR
- *
- * @param trpInfo the transport-out information
- * @return the corresponding JMS connection factory, if any
- */
- private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) {
- Map<String,String> props = trpInfo.getProperties();
- if (trpInfo.getProperties() != null) {
- String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC);
- if (jmsConnectionFactoryName != null) {
- return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
- } else {
- return connFacManager.getJMSConnectionFactory(props);
- }
- } else {
- return null;
- }
- }
-
- /**
- * Performs the actual sending of the JMS message
- */
- public void sendMessage(MessageContext msgCtx, String targetAddress,
- OutTransportInfo outTransportInfo) throws AxisFault {
-
- JMSConnectionFactory jmsConnectionFactory = null;
- JMSOutTransportInfo jmsOut = null;
- JMSMessageSender messageSender = null;
-
- if (targetAddress != null) {
-
- jmsOut = new JMSOutTransportInfo(targetAddress);
- // do we have a definition for a connection factory to use for this address?
- jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
-
- if (jmsConnectionFactory != null) {
- messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress);
-
- } else {
- try {
- messageSender = JMSUtils.createJMSSender(jmsOut);
- } catch (JMSException e) {
- handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
- }
- }
-
- } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
-
- jmsOut = (JMSOutTransportInfo) outTransportInfo;
- try {
- messageSender = JMSUtils.createJMSSender(jmsOut);
- } catch (JMSException e) {
- handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
- }
- }
-
- // The message property to be used to send the content type is determined by
- // the out transport info, i.e. either from the EPR if we are sending a request,
- // or, if we are sending a response, from the configuration of the service that
- // received the request). The property name can be overridden by a message
- // context property.
- String contentTypeProperty =
- (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
- if (contentTypeProperty == null) {
- contentTypeProperty = jmsOut.getContentTypeProperty();
- }
-
- // need to synchronize as Sessions are not thread safe
- synchronized (messageSender.getSession()) {
- try {
- sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
- } finally {
- messageSender.close();
- }
- }
- }
-
- /**
- * Perform actual sending of the JMS message
- */
- private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender,
- String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory,
- JMSOutTransportInfo jmsOut) throws AxisFault {
-
- // convert the axis message context into a JMS Message that we can send over JMS
- Message message = null;
- String correlationId = null;
- try {
- message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty);
- } catch (JMSException e) {
- handleException("Error creating a JMS message from the message context", e);
- }
-
- // should we wait for a synchronous response on this same thread?
- boolean waitForResponse = waitForSynchronousResponse(msgCtx);
- Destination replyDestination = jmsOut.getReplyDestination();
-
- // if this is a synchronous out-in, prepare to listen on the response destination
- if (waitForResponse) {
-
- String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
- if (replyDestName == null && jmsConnectionFactory != null) {
- replyDestName = jmsConnectionFactory.getReplyToDestination();
- }
-
- if (replyDestName != null) {
- if (jmsConnectionFactory != null) {
- replyDestination = jmsConnectionFactory.getDestination(replyDestName);
- } else {
- replyDestination = jmsOut.getReplyDestination(replyDestName);
- }
- }
- replyDestination = JMSUtils.setReplyDestination(
- replyDestination, messageSender.getSession(), message);
- }
-
- try {
- messageSender.send(message, msgCtx);
- metrics.incrementMessagesSent(msgCtx);
-
- } catch (AxisJMSException e) {
- metrics.incrementFaultsSending();
- handleException("Error sending JMS message", e);
- }
-
- try {
- metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message));
- } catch (JMSException e) {
- log.warn("Error reading JMS message size to update transport metrics", e);
- }
-
- // if we are expecting a synchronous response back for the message sent out
- if (waitForResponse) {
- // TODO ********************************************************************************
- // TODO **** replace with asynchronous polling via a poller task to process this *******
- // information would be given. Then it should poll (until timeout) the
- // requested destination for the response message and inject it from a
- // asynchronous worker thread
- try {
- messageSender.getConnection().start(); // multiple calls are safely ignored
- } catch (JMSException ignore) {}
-
- try {
- correlationId = message.getJMSMessageID();
- } catch(JMSException ignore) {}
-
- // We assume here that the response uses the same message property to
- // specify the content type of the message.
- waitForResponseAndProcess(messageSender.getSession(), replyDestination,
- msgCtx, correlationId, contentTypeProperty);
- // TODO ********************************************************************************
- }
- }
-
- /**
- * Create a Consumer for the reply destination and wait for the response JMS message
- * synchronously. If a message arrives within the specified time interval, process it
- * through Axis2
- * @param session the session to use to listen for the response
- * @param replyDestination the JMS reply Destination
- * @param msgCtx the outgoing message for which we are expecting the response
- * @param contentTypeProperty the message property used to determine the content type
- * of the response message
- * @throws AxisFault on error
- */
- private void waitForResponseAndProcess(Session session, Destination replyDestination,
- MessageContext msgCtx, String correlationId,
- String contentTypeProperty) throws AxisFault {
-
- try {
- MessageConsumer consumer;
- consumer = JMSUtils.createConsumer(session, replyDestination,
- "JMSCorrelationID = '" + correlationId + "'");
-
- // how long are we willing to wait for the sync response
- long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
- String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY);
- if (waitReply != null) {
- timeout = Long.valueOf(waitReply).longValue();
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Waiting for a maximum of " + timeout +
- "ms for a response message to destination : " + replyDestination +
- " with JMS correlation ID : " + correlationId);
- }
-
- Message reply = consumer.receive(timeout);
-
- if (reply != null) {
-
- // update transport level metrics
- metrics.incrementMessagesReceived();
- try {
- metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply));
- } catch (JMSException e) {
- log.warn("Error reading JMS message size to update transport metrics", e);
- }
-
- try {
- processSyncResponse(msgCtx, reply, contentTypeProperty);
- metrics.incrementMessagesReceived();
- } catch (AxisFault e) {
- metrics.incrementFaultsReceiving();
- throw e;
- }
-
- } else {
- log.warn("Did not receive a JMS response within " +
- timeout + " ms to destination : " + replyDestination +
- " with JMS correlation ID : " + correlationId);
- metrics.incrementTimeoutsReceiving();
- }
-
- } catch (JMSException e) {
- metrics.incrementFaultsReceiving();
- handleException("Error creating a consumer, or receiving a synchronous reply " +
- "for outgoing MessageContext ID : " + msgCtx.getMessageID() +
- " and reply Destination : " + replyDestination, e);
- }
- }
-
- /**
- * Create a JMS Message from the given MessageContext and using the given
- * session
- *
- * @param msgContext the MessageContext
- * @param session the JMS session
- * @param contentTypeProperty the message property to be used to store the
- * content type
- * @return a JMS message from the context and session
- * @throws JMSException on exception
- * @throws AxisFault on exception
- */
- private Message createJMSMessage(MessageContext msgContext, Session session,
- String contentTypeProperty) throws JMSException, AxisFault {
-
- Message message = null;
- String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
-
- // check the first element of the SOAP body, do we have content wrapped using the
- // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or
- // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages
- // for JMS but just get the payload in its native format
- String jmsPayloadType = guessMessageType(msgContext);
-
- if (jmsPayloadType == null) {
-
- OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
- MessageFormatter messageFormatter = null;
- try {
- messageFormatter = TransportUtils.getMessageFormatter(msgContext);
- } catch (AxisFault axisFault) {
- throw new JMSException("Unable to get the message formatter to use");
- }
-
- String contentType = messageFormatter.getContentType(
- msgContext, format, msgContext.getSoapAction());
-
- boolean useBytesMessage =
- msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
- contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1;
-
- OutputStream out;
- StringWriter sw;
- if (useBytesMessage) {
- BytesMessage bytesMsg = session.createBytesMessage();
- sw = null;
- out = new BytesMessageOutputStream(bytesMsg);
- message = bytesMsg;
- } else {
- sw = new StringWriter();
- try {
- out = new WriterOutputStream(sw, format.getCharSetEncoding());
- } catch (UnsupportedCharsetException ex) {
- handleException("Unsupported encoding " + format.getCharSetEncoding(), ex);
- return null;
- }
- }
-
- try {
- messageFormatter.writeTo(msgContext, format, out, true);
- out.close();
- } catch (IOException e) {
- handleException("IO Error while creating BytesMessage", e);
- }
-
- if (!useBytesMessage) {
- TextMessage txtMsg = session.createTextMessage();
- txtMsg.setText(sw.toString());
- message = txtMsg;
- }
-
- if (contentTypeProperty != null) {
- message.setStringProperty(contentTypeProperty, contentType);
- }
-
- } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
- message = session.createBytesMessage();
- BytesMessage bytesMsg = (BytesMessage) message;
- OMElement wrapper = msgContext.getEnvelope().getBody().
- getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER);
- OMNode omNode = wrapper.getFirstOMChild();
- if (omNode != null && omNode instanceof OMText) {
- Object dh = ((OMText) omNode).getDataHandler();
- if (dh != null && dh instanceof DataHandler) {
- try {
- ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg));
- } catch (IOException e) {
- handleException("Error serializing binary content of element : " +
- BaseConstants.DEFAULT_BINARY_WRAPPER, e);
- }
- }
- }
-
- } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) {
- message = session.createTextMessage();
- TextMessage txtMsg = (TextMessage) message;
- txtMsg.setText(msgContext.getEnvelope().getBody().
- getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText());
- }
-
- // set the JMS correlation ID if specified
- String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID);
- if (correlationId == null && msgContext.getRelatesTo() != null) {
- correlationId = msgContext.getRelatesTo().getValue();
- }
-
- if (correlationId != null) {
- message.setJMSCorrelationID(correlationId);
- }
-
- if (msgContext.isServerSide()) {
- // set SOAP Action as a property on the JMS message
- setProperty(message, msgContext, BaseConstants.SOAPACTION);
- } else {
- String action = msgContext.getOptions().getAction();
- if (action != null) {
- message.setStringProperty(BaseConstants.SOAPACTION, action);
- }
- }
-
- JMSUtils.setTransportHeaders(msgContext, message);
- return message;
- }
-
- /**
- * Guess the message type to use for JMS looking at the message contexts' envelope
- * @param msgContext the message context
- * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null
- */
- private String guessMessageType(MessageContext msgContext) {
- OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement();
- if (firstChild != null) {
- if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) {
- return JMSConstants.JMS_BYTE_MESSAGE;
- } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) {
- return JMSConstants.JMS_TEXT_MESSAGE;
- }
- }
- return null;
- }
-
- /**
- * Creates an Axis MessageContext for the received JMS message and
- * sets up the transports and various properties
- *
- * @param outMsgCtx the outgoing message for which we are expecting the response
- * @param message the JMS response message received
- * @param contentTypeProperty the message property used to determine the content type
- * of the response message
- * @throws AxisFault on error
- */
- private void processSyncResponse(MessageContext outMsgCtx, Message message,
- String contentTypeProperty) throws AxisFault {
-
- MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
-
- // load any transport headers from received message
- JMSUtils.loadTransportHeaders(message, responseMsgCtx);
-
- // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response
- // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This
- // question is still under debate and due to the timelines, I am commiting this
- // workaround as Axis2 1.2 is about to be released and Synapse 1.0
- responseMsgCtx.setServerSide(false);
-
- String contentType =
- contentTypeProperty == null ? null
- : JMSUtils.getProperty(message, contentTypeProperty);
-
- try {
- JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType);
- } catch (JMSException ex) {
- throw AxisFault.makeFault(ex);
- }
-// responseMsgCtx.setServerSide(true);
-
- handleIncomingMessage(
- responseMsgCtx,
- JMSUtils.getTransportHeaders(message),
- JMSUtils.getProperty(message, BaseConstants.SOAPACTION),
- contentType
- );
- }
-
- private void setProperty(Message message, MessageContext msgCtx, String key) {
-
- String value = getProperty(msgCtx, key);
- if (value != null) {
- try {
- message.setStringProperty(key, value);
- } catch (JMSException e) {
- log.warn("Couldn't set message property : " + key + " = " + value, e);
- }
- }
- }
-
- private String getProperty(MessageContext mc, String key) {
- return (String) mc.getProperty(key);
- }
-}