From a3cbf8e5ffabac239cd965d8c0f9c680a83246f7 Mon Sep 17 00:00:00 2001 From: antelder Date: Mon, 11 May 2009 07:45:29 +0000 Subject: Add a new soap/jms transport module copied from the Apache WS Commons transports but with the code backported to work with Axis2 1.4.1 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@773489 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/ws/axis2/jms/JMSSender.java | 499 +++++++++++++++++++++ 1 file changed, 499 insertions(+) create mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java') diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java new file mode 100644 index 0000000000..a5f77dc4c9 --- /dev/null +++ b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java @@ -0,0 +1,499 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tuscany.sca.binding.ws.axis2.jms; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.StringWriter; +import java.nio.charset.UnsupportedCharsetException; +import java.util.Map; + +import javax.activation.DataHandler; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMNode; +import org.apache.axiom.om.OMOutputFormat; +import org.apache.axiom.om.OMText; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.transport.MessageFormatter; +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.axis2.transport.TransportUtils; +import org.apache.axis2.transport.http.HTTPConstants; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportSender; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport; +import org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams.WriterOutputStream; + +/** + * The TransportSender for JMS + */ +public class JMSSender extends AbstractTransportSender implements ManagementSupport { + + public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; + + /** The JMS connection factory manager to be used when sending messages out */ + private JMSConnectionFactoryManager connFacManager; + + /** + * Initialize the transport sender by reading pre-defined connection factories for + * outgoing messages. + * + * @param cfgCtx the configuration context + * @param transportOut the transport sender definition from axis2.xml + * @throws AxisFault on error + */ + public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { + super.init(cfgCtx, transportOut); + connFacManager = new JMSConnectionFactoryManager(transportOut); + log.info("JMS Transport Sender initialized..."); + } + + /** + * Get corresponding JMS connection factory defined within the transport sender for the + * transport-out information - usually constructed from a targetEPR + * + * @param trpInfo the transport-out information + * @return the corresponding JMS connection factory, if any + */ + private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { + Map props = trpInfo.getProperties(); + if (trpInfo.getProperties() != null) { + String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); + if (jmsConnectionFactoryName != null) { + return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); + } else { + return connFacManager.getJMSConnectionFactory(props); + } + } else { + return null; + } + } + + /** + * Performs the actual sending of the JMS message + */ + public void sendMessage(MessageContext msgCtx, String targetAddress, + OutTransportInfo outTransportInfo) throws AxisFault { + + JMSConnectionFactory jmsConnectionFactory = null; + JMSOutTransportInfo jmsOut = null; + JMSMessageSender messageSender = null; + + if (targetAddress != null) { + + jmsOut = new JMSOutTransportInfo(targetAddress); + // do we have a definition for a connection factory to use for this address? + jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + + if (jmsConnectionFactory != null) { + messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress); + + } else { + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } + + } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { + + jmsOut = (JMSOutTransportInfo) outTransportInfo; + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } + + // The message property to be used to send the content type is determined by + // the out transport info, i.e. either from the EPR if we are sending a request, + // or, if we are sending a response, from the configuration of the service that + // received the request). The property name can be overridden by a message + // context property. + String contentTypeProperty = + (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + if (contentTypeProperty == null) { + contentTypeProperty = jmsOut.getContentTypeProperty(); + } + + // need to synchronize as Sessions are not thread safe + synchronized (messageSender.getSession()) { + try { + sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); + } finally { + messageSender.close(); + } + } + } + + /** + * Perform actual sending of the JMS message + */ + private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender, + String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory, + JMSOutTransportInfo jmsOut) throws AxisFault { + + // convert the axis message context into a JMS Message that we can send over JMS + Message message = null; + String correlationId = null; + try { + message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty); + } catch (JMSException e) { + handleException("Error creating a JMS message from the message context", e); + } + + // should we wait for a synchronous response on this same thread? + boolean waitForResponse = waitForSynchronousResponse(msgCtx); + Destination replyDestination = jmsOut.getReplyDestination(); + + // if this is a synchronous out-in, prepare to listen on the response destination + if (waitForResponse) { + + String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); + if (replyDestName == null && jmsConnectionFactory != null) { + replyDestName = jmsConnectionFactory.getReplyToDestination(); + } + + if (replyDestName != null) { + if (jmsConnectionFactory != null) { + replyDestination = jmsConnectionFactory.getDestination(replyDestName); + } else { + replyDestination = jmsOut.getReplyDestination(replyDestName); + } + } + replyDestination = JMSUtils.setReplyDestination( + replyDestination, messageSender.getSession(), message); + } + + try { + messageSender.send(message, msgCtx); + metrics.incrementMessagesSent(msgCtx); + + } catch (AxisJMSException e) { + metrics.incrementFaultsSending(); + handleException("Error sending JMS message", e); + } + + try { + metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message)); + } catch (JMSException e) { + log.warn("Error reading JMS message size to update transport metrics", e); + } + + // if we are expecting a synchronous response back for the message sent out + if (waitForResponse) { + // TODO ******************************************************************************** + // TODO **** replace with asynchronous polling via a poller task to process this ******* + // information would be given. Then it should poll (until timeout) the + // requested destination for the response message and inject it from a + // asynchronous worker thread + try { + messageSender.getConnection().start(); // multiple calls are safely ignored + } catch (JMSException ignore) {} + + try { + correlationId = message.getJMSMessageID(); + } catch(JMSException ignore) {} + + // We assume here that the response uses the same message property to + // specify the content type of the message. + waitForResponseAndProcess(messageSender.getSession(), replyDestination, + msgCtx, correlationId, contentTypeProperty); + // TODO ******************************************************************************** + } + } + + /** + * Create a Consumer for the reply destination and wait for the response JMS message + * synchronously. If a message arrives within the specified time interval, process it + * through Axis2 + * @param session the session to use to listen for the response + * @param replyDestination the JMS reply Destination + * @param msgCtx the outgoing message for which we are expecting the response + * @param contentTypeProperty the message property used to determine the content type + * of the response message + * @throws AxisFault on error + */ + private void waitForResponseAndProcess(Session session, Destination replyDestination, + MessageContext msgCtx, String correlationId, + String contentTypeProperty) throws AxisFault { + + try { + MessageConsumer consumer; + consumer = JMSUtils.createConsumer(session, replyDestination, + "JMSCorrelationID = '" + correlationId + "'"); + + // how long are we willing to wait for the sync response + long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; + String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY); + if (waitReply != null) { + timeout = Long.valueOf(waitReply).longValue(); + } + + if (log.isDebugEnabled()) { + log.debug("Waiting for a maximum of " + timeout + + "ms for a response message to destination : " + replyDestination + + " with JMS correlation ID : " + correlationId); + } + + Message reply = consumer.receive(timeout); + + if (reply != null) { + + // update transport level metrics + metrics.incrementMessagesReceived(); + try { + metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply)); + } catch (JMSException e) { + log.warn("Error reading JMS message size to update transport metrics", e); + } + + try { + processSyncResponse(msgCtx, reply, contentTypeProperty); + metrics.incrementMessagesReceived(); + } catch (AxisFault e) { + metrics.incrementFaultsReceiving(); + throw e; + } + + } else { + log.warn("Did not receive a JMS response within " + + timeout + " ms to destination : " + replyDestination + + " with JMS correlation ID : " + correlationId); + metrics.incrementTimeoutsReceiving(); + } + + } catch (JMSException e) { + metrics.incrementFaultsReceiving(); + handleException("Error creating a consumer, or receiving a synchronous reply " + + "for outgoing MessageContext ID : " + msgCtx.getMessageID() + + " and reply Destination : " + replyDestination, e); + } + } + + /** + * Create a JMS Message from the given MessageContext and using the given + * session + * + * @param msgContext the MessageContext + * @param session the JMS session + * @param contentTypeProperty the message property to be used to store the + * content type + * @return a JMS message from the context and session + * @throws JMSException on exception + * @throws AxisFault on exception + */ + private Message createJMSMessage(MessageContext msgContext, Session session, + String contentTypeProperty) throws JMSException, AxisFault { + + Message message = null; + String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); + + // check the first element of the SOAP body, do we have content wrapped using the + // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or + // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages + // for JMS but just get the payload in its native format + String jmsPayloadType = guessMessageType(msgContext); + + if (jmsPayloadType == null) { + + OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); + MessageFormatter messageFormatter = null; + try { + messageFormatter = TransportUtils.getMessageFormatter(msgContext); + } catch (AxisFault axisFault) { + throw new JMSException("Unable to get the message formatter to use"); + } + + String contentType = messageFormatter.getContentType( + msgContext, format, msgContext.getSoapAction()); + + boolean useBytesMessage = + msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) || + contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1; + + OutputStream out; + StringWriter sw; + if (useBytesMessage) { + BytesMessage bytesMsg = session.createBytesMessage(); + sw = null; + out = new BytesMessageOutputStream(bytesMsg); + message = bytesMsg; + } else { + sw = new StringWriter(); + try { + out = new WriterOutputStream(sw, format.getCharSetEncoding()); + } catch (UnsupportedCharsetException ex) { + handleException("Unsupported encoding " + format.getCharSetEncoding(), ex); + return null; + } + } + + try { + messageFormatter.writeTo(msgContext, format, out, true); + out.close(); + } catch (IOException e) { + handleException("IO Error while creating BytesMessage", e); + } + + if (!useBytesMessage) { + TextMessage txtMsg = session.createTextMessage(); + txtMsg.setText(sw.toString()); + message = txtMsg; + } + + if (contentTypeProperty != null) { + message.setStringProperty(contentTypeProperty, contentType); + } + + } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) { + message = session.createBytesMessage(); + BytesMessage bytesMsg = (BytesMessage) message; + OMElement wrapper = msgContext.getEnvelope().getBody(). + getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER); + OMNode omNode = wrapper.getFirstOMChild(); + if (omNode != null && omNode instanceof OMText) { + Object dh = ((OMText) omNode).getDataHandler(); + if (dh != null && dh instanceof DataHandler) { + try { + ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg)); + } catch (IOException e) { + handleException("Error serializing binary content of element : " + + BaseConstants.DEFAULT_BINARY_WRAPPER, e); + } + } + } + + } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) { + message = session.createTextMessage(); + TextMessage txtMsg = (TextMessage) message; + txtMsg.setText(msgContext.getEnvelope().getBody(). + getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText()); + } + + // set the JMS correlation ID if specified + String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID); + if (correlationId == null && msgContext.getRelatesTo() != null) { + correlationId = msgContext.getRelatesTo().getValue(); + } + + if (correlationId != null) { + message.setJMSCorrelationID(correlationId); + } + + if (msgContext.isServerSide()) { + // set SOAP Action as a property on the JMS message + setProperty(message, msgContext, BaseConstants.SOAPACTION); + } else { + String action = msgContext.getOptions().getAction(); + if (action != null) { + message.setStringProperty(BaseConstants.SOAPACTION, action); + } + } + + JMSUtils.setTransportHeaders(msgContext, message); + return message; + } + + /** + * Guess the message type to use for JMS looking at the message contexts' envelope + * @param msgContext the message context + * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null + */ + private String guessMessageType(MessageContext msgContext) { + OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement(); + if (firstChild != null) { + if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) { + return JMSConstants.JMS_BYTE_MESSAGE; + } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) { + return JMSConstants.JMS_TEXT_MESSAGE; + } + } + return null; + } + + /** + * Creates an Axis MessageContext for the received JMS message and + * sets up the transports and various properties + * + * @param outMsgCtx the outgoing message for which we are expecting the response + * @param message the JMS response message received + * @param contentTypeProperty the message property used to determine the content type + * of the response message + * @throws AxisFault on error + */ + private void processSyncResponse(MessageContext outMsgCtx, Message message, + String contentTypeProperty) throws AxisFault { + + MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx); + + // load any transport headers from received message + JMSUtils.loadTransportHeaders(message, responseMsgCtx); + + // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response + // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This + // question is still under debate and due to the timelines, I am commiting this + // workaround as Axis2 1.2 is about to be released and Synapse 1.0 + responseMsgCtx.setServerSide(false); + + String contentType = + contentTypeProperty == null ? null + : JMSUtils.getProperty(message, contentTypeProperty); + + try { + JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType); + } catch (JMSException ex) { + throw AxisFault.makeFault(ex); + } +// responseMsgCtx.setServerSide(true); + + handleIncomingMessage( + responseMsgCtx, + JMSUtils.getTransportHeaders(message), + JMSUtils.getProperty(message, BaseConstants.SOAPACTION), + contentType + ); + } + + private void setProperty(Message message, MessageContext msgCtx, String key) { + + String value = getProperty(msgCtx, key); + if (value != null) { + try { + message.setStringProperty(key, value); + } catch (JMSException e) { + log.warn("Couldn't set message property : " + key + " = " + value, e); + } + } + } + + private String getProperty(MessageContext mc, String key) { + return (String) mc.getProperty(key); + } +} -- cgit v1.2.3