From 9145d1479e838918317bc9d4c5e25fe537e5f6de Mon Sep 17 00:00:00 2001 From: antelder Date: Wed, 13 May 2009 12:39:53 +0000 Subject: Abandon trying to use the new Axis2 JMS transport for now as its proving too messy tryingto backport it to the 1.4.1 release. Now trying a new approach which modifies the JMS transport from Axis2 1.4.1 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@774293 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/ws/axis2/jms/JMSSender.java | 499 --------------------- 1 file changed, 499 deletions(-) delete mode 100644 branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java') diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java deleted file mode 100644 index a5f77dc4c9..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java +++ /dev/null @@ -1,499 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.StringWriter; -import java.nio.charset.UnsupportedCharsetException; -import java.util.Map; - -import javax.activation.DataHandler; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMNode; -import org.apache.axiom.om.OMOutputFormat; -import org.apache.axiom.om.OMText; -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.TransportOutDescription; -import org.apache.axis2.transport.MessageFormatter; -import org.apache.axis2.transport.OutTransportInfo; -import org.apache.axis2.transport.TransportUtils; -import org.apache.axis2.transport.http.HTTPConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.AbstractTransportSender; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.ManagementSupport; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.streams.WriterOutputStream; - -/** - * The TransportSender for JMS - */ -public class JMSSender extends AbstractTransportSender implements ManagementSupport { - - public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; - - /** The JMS connection factory manager to be used when sending messages out */ - private JMSConnectionFactoryManager connFacManager; - - /** - * Initialize the transport sender by reading pre-defined connection factories for - * outgoing messages. - * - * @param cfgCtx the configuration context - * @param transportOut the transport sender definition from axis2.xml - * @throws AxisFault on error - */ - public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { - super.init(cfgCtx, transportOut); - connFacManager = new JMSConnectionFactoryManager(transportOut); - log.info("JMS Transport Sender initialized..."); - } - - /** - * Get corresponding JMS connection factory defined within the transport sender for the - * transport-out information - usually constructed from a targetEPR - * - * @param trpInfo the transport-out information - * @return the corresponding JMS connection factory, if any - */ - private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { - Map props = trpInfo.getProperties(); - if (trpInfo.getProperties() != null) { - String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); - if (jmsConnectionFactoryName != null) { - return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); - } else { - return connFacManager.getJMSConnectionFactory(props); - } - } else { - return null; - } - } - - /** - * Performs the actual sending of the JMS message - */ - public void sendMessage(MessageContext msgCtx, String targetAddress, - OutTransportInfo outTransportInfo) throws AxisFault { - - JMSConnectionFactory jmsConnectionFactory = null; - JMSOutTransportInfo jmsOut = null; - JMSMessageSender messageSender = null; - - if (targetAddress != null) { - - jmsOut = new JMSOutTransportInfo(targetAddress); - // do we have a definition for a connection factory to use for this address? - jmsConnectionFactory = getJMSConnectionFactory(jmsOut); - - if (jmsConnectionFactory != null) { - messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress); - - } else { - try { - messageSender = JMSUtils.createJMSSender(jmsOut); - } catch (JMSException e) { - handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); - } - } - - } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { - - jmsOut = (JMSOutTransportInfo) outTransportInfo; - try { - messageSender = JMSUtils.createJMSSender(jmsOut); - } catch (JMSException e) { - handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); - } - } - - // The message property to be used to send the content type is determined by - // the out transport info, i.e. either from the EPR if we are sending a request, - // or, if we are sending a response, from the configuration of the service that - // received the request). The property name can be overridden by a message - // context property. - String contentTypeProperty = - (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - if (contentTypeProperty == null) { - contentTypeProperty = jmsOut.getContentTypeProperty(); - } - - // need to synchronize as Sessions are not thread safe - synchronized (messageSender.getSession()) { - try { - sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); - } finally { - messageSender.close(); - } - } - } - - /** - * Perform actual sending of the JMS message - */ - private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender, - String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory, - JMSOutTransportInfo jmsOut) throws AxisFault { - - // convert the axis message context into a JMS Message that we can send over JMS - Message message = null; - String correlationId = null; - try { - message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty); - } catch (JMSException e) { - handleException("Error creating a JMS message from the message context", e); - } - - // should we wait for a synchronous response on this same thread? - boolean waitForResponse = waitForSynchronousResponse(msgCtx); - Destination replyDestination = jmsOut.getReplyDestination(); - - // if this is a synchronous out-in, prepare to listen on the response destination - if (waitForResponse) { - - String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); - if (replyDestName == null && jmsConnectionFactory != null) { - replyDestName = jmsConnectionFactory.getReplyToDestination(); - } - - if (replyDestName != null) { - if (jmsConnectionFactory != null) { - replyDestination = jmsConnectionFactory.getDestination(replyDestName); - } else { - replyDestination = jmsOut.getReplyDestination(replyDestName); - } - } - replyDestination = JMSUtils.setReplyDestination( - replyDestination, messageSender.getSession(), message); - } - - try { - messageSender.send(message, msgCtx); - metrics.incrementMessagesSent(msgCtx); - - } catch (AxisJMSException e) { - metrics.incrementFaultsSending(); - handleException("Error sending JMS message", e); - } - - try { - metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - - // if we are expecting a synchronous response back for the message sent out - if (waitForResponse) { - // TODO ******************************************************************************** - // TODO **** replace with asynchronous polling via a poller task to process this ******* - // information would be given. Then it should poll (until timeout) the - // requested destination for the response message and inject it from a - // asynchronous worker thread - try { - messageSender.getConnection().start(); // multiple calls are safely ignored - } catch (JMSException ignore) {} - - try { - correlationId = message.getJMSMessageID(); - } catch(JMSException ignore) {} - - // We assume here that the response uses the same message property to - // specify the content type of the message. - waitForResponseAndProcess(messageSender.getSession(), replyDestination, - msgCtx, correlationId, contentTypeProperty); - // TODO ******************************************************************************** - } - } - - /** - * Create a Consumer for the reply destination and wait for the response JMS message - * synchronously. If a message arrives within the specified time interval, process it - * through Axis2 - * @param session the session to use to listen for the response - * @param replyDestination the JMS reply Destination - * @param msgCtx the outgoing message for which we are expecting the response - * @param contentTypeProperty the message property used to determine the content type - * of the response message - * @throws AxisFault on error - */ - private void waitForResponseAndProcess(Session session, Destination replyDestination, - MessageContext msgCtx, String correlationId, - String contentTypeProperty) throws AxisFault { - - try { - MessageConsumer consumer; - consumer = JMSUtils.createConsumer(session, replyDestination, - "JMSCorrelationID = '" + correlationId + "'"); - - // how long are we willing to wait for the sync response - long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; - String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY); - if (waitReply != null) { - timeout = Long.valueOf(waitReply).longValue(); - } - - if (log.isDebugEnabled()) { - log.debug("Waiting for a maximum of " + timeout + - "ms for a response message to destination : " + replyDestination + - " with JMS correlation ID : " + correlationId); - } - - Message reply = consumer.receive(timeout); - - if (reply != null) { - - // update transport level metrics - metrics.incrementMessagesReceived(); - try { - metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - - try { - processSyncResponse(msgCtx, reply, contentTypeProperty); - metrics.incrementMessagesReceived(); - } catch (AxisFault e) { - metrics.incrementFaultsReceiving(); - throw e; - } - - } else { - log.warn("Did not receive a JMS response within " + - timeout + " ms to destination : " + replyDestination + - " with JMS correlation ID : " + correlationId); - metrics.incrementTimeoutsReceiving(); - } - - } catch (JMSException e) { - metrics.incrementFaultsReceiving(); - handleException("Error creating a consumer, or receiving a synchronous reply " + - "for outgoing MessageContext ID : " + msgCtx.getMessageID() + - " and reply Destination : " + replyDestination, e); - } - } - - /** - * Create a JMS Message from the given MessageContext and using the given - * session - * - * @param msgContext the MessageContext - * @param session the JMS session - * @param contentTypeProperty the message property to be used to store the - * content type - * @return a JMS message from the context and session - * @throws JMSException on exception - * @throws AxisFault on exception - */ - private Message createJMSMessage(MessageContext msgContext, Session session, - String contentTypeProperty) throws JMSException, AxisFault { - - Message message = null; - String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); - - // check the first element of the SOAP body, do we have content wrapped using the - // default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or - // text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages - // for JMS but just get the payload in its native format - String jmsPayloadType = guessMessageType(msgContext); - - if (jmsPayloadType == null) { - - OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); - MessageFormatter messageFormatter = null; - try { - messageFormatter = TransportUtils.getMessageFormatter(msgContext); - } catch (AxisFault axisFault) { - throw new JMSException("Unable to get the message formatter to use"); - } - - String contentType = messageFormatter.getContentType( - msgContext, format, msgContext.getSoapAction()); - - boolean useBytesMessage = - msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) || - contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1; - - OutputStream out; - StringWriter sw; - if (useBytesMessage) { - BytesMessage bytesMsg = session.createBytesMessage(); - sw = null; - out = new BytesMessageOutputStream(bytesMsg); - message = bytesMsg; - } else { - sw = new StringWriter(); - try { - out = new WriterOutputStream(sw, format.getCharSetEncoding()); - } catch (UnsupportedCharsetException ex) { - handleException("Unsupported encoding " + format.getCharSetEncoding(), ex); - return null; - } - } - - try { - messageFormatter.writeTo(msgContext, format, out, true); - out.close(); - } catch (IOException e) { - handleException("IO Error while creating BytesMessage", e); - } - - if (!useBytesMessage) { - TextMessage txtMsg = session.createTextMessage(); - txtMsg.setText(sw.toString()); - message = txtMsg; - } - - if (contentTypeProperty != null) { - message.setStringProperty(contentTypeProperty, contentType); - } - - } else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) { - message = session.createBytesMessage(); - BytesMessage bytesMsg = (BytesMessage) message; - OMElement wrapper = msgContext.getEnvelope().getBody(). - getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER); - OMNode omNode = wrapper.getFirstOMChild(); - if (omNode != null && omNode instanceof OMText) { - Object dh = ((OMText) omNode).getDataHandler(); - if (dh != null && dh instanceof DataHandler) { - try { - ((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg)); - } catch (IOException e) { - handleException("Error serializing binary content of element : " + - BaseConstants.DEFAULT_BINARY_WRAPPER, e); - } - } - } - - } else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) { - message = session.createTextMessage(); - TextMessage txtMsg = (TextMessage) message; - txtMsg.setText(msgContext.getEnvelope().getBody(). - getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText()); - } - - // set the JMS correlation ID if specified - String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID); - if (correlationId == null && msgContext.getRelatesTo() != null) { - correlationId = msgContext.getRelatesTo().getValue(); - } - - if (correlationId != null) { - message.setJMSCorrelationID(correlationId); - } - - if (msgContext.isServerSide()) { - // set SOAP Action as a property on the JMS message - setProperty(message, msgContext, BaseConstants.SOAPACTION); - } else { - String action = msgContext.getOptions().getAction(); - if (action != null) { - message.setStringProperty(BaseConstants.SOAPACTION, action); - } - } - - JMSUtils.setTransportHeaders(msgContext, message); - return message; - } - - /** - * Guess the message type to use for JMS looking at the message contexts' envelope - * @param msgContext the message context - * @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null - */ - private String guessMessageType(MessageContext msgContext) { - OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement(); - if (firstChild != null) { - if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) { - return JMSConstants.JMS_BYTE_MESSAGE; - } else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) { - return JMSConstants.JMS_TEXT_MESSAGE; - } - } - return null; - } - - /** - * Creates an Axis MessageContext for the received JMS message and - * sets up the transports and various properties - * - * @param outMsgCtx the outgoing message for which we are expecting the response - * @param message the JMS response message received - * @param contentTypeProperty the message property used to determine the content type - * of the response message - * @throws AxisFault on error - */ - private void processSyncResponse(MessageContext outMsgCtx, Message message, - String contentTypeProperty) throws AxisFault { - - MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx); - - // load any transport headers from received message - JMSUtils.loadTransportHeaders(message, responseMsgCtx); - - // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response - // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This - // question is still under debate and due to the timelines, I am commiting this - // workaround as Axis2 1.2 is about to be released and Synapse 1.0 - responseMsgCtx.setServerSide(false); - - String contentType = - contentTypeProperty == null ? null - : JMSUtils.getProperty(message, contentTypeProperty); - - try { - JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType); - } catch (JMSException ex) { - throw AxisFault.makeFault(ex); - } -// responseMsgCtx.setServerSide(true); - - handleIncomingMessage( - responseMsgCtx, - JMSUtils.getTransportHeaders(message), - JMSUtils.getProperty(message, BaseConstants.SOAPACTION), - contentType - ); - } - - private void setProperty(Message message, MessageContext msgCtx, String key) { - - String value = getProperty(msgCtx, key); - if (value != null) { - try { - message.setStringProperty(key, value); - } catch (JMSException e) { - log.warn("Couldn't set message property : " + key + " = " + value, e); - } - } - } - - private String getProperty(MessageContext mc, String key) { - return (String) mc.getProperty(key); - } -} -- cgit v1.2.3