summaryrefslogtreecommitdiffstats
path: root/sca-java-1.x/tags/1.5.1-RC2/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-1.x/tags/1.5.1-RC2/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java')
-rw-r--r--sca-java-1.x/tags/1.5.1-RC2/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java389
1 files changed, 0 insertions, 389 deletions
diff --git a/sca-java-1.x/tags/1.5.1-RC2/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java b/sca-java-1.x/tags/1.5.1-RC2/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
deleted file mode 100644
index 7caa045015..0000000000
--- a/sca-java-1.x/tags/1.5.1-RC2/modules/binding-ws-axis2/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSSender.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.binding.ws.axis2.jms;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Hashtable;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
-import javax.naming.NamingException;
-import javax.xml.stream.XMLStreamException;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMOutputFormat;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.description.WSDL2Constants;
-import org.apache.axis2.handlers.AbstractHandler;
-import org.apache.axis2.java.security.AccessController;
-import org.apache.axis2.transport.TransportSender;
-import org.apache.axis2.transport.http.HTTPTransportUtils;
-import org.apache.axis2.transport.http.SOAPMessageFormatter;
-import org.apache.axis2.transport.jms.JMSConstants;
-import org.apache.axis2.transport.jms.JMSUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * The TransportSender for JMS
- */
-public class JMSSender extends AbstractHandler implements TransportSender {
-
- private static final Log log = LogFactory.getLog(JMSSender.class);
-
- /**
- * Performs the actual sending of the JMS message
- *
- * @param msgContext the message context to be sent
- * @throws AxisFault on exception
- */
- public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
-
- log.debug("JMSSender invoke()");
-
- /* Added due to possible bug in Axis2, MTOM enablement is based on msgContext.isDoingMTOM
- * However msgContext.isDoingMTOM will always return false unless set programmatically.
- * HTTP sets this boolean programmatically by looking up whether enableMTOM has been set
- * in axis2.xml or as an option on the client.
- */
- msgContext.setDoingMTOM(HTTPTransportUtils.doWriteMTOM(msgContext));
-
- JMSOutTransportInfo transportInfo = null;
- String targetAddress = null;
-
- // is there a transport url? which may be different from the WS-A To..
- targetAddress = (String) msgContext.getProperty(
- Constants.Configuration.TRANSPORT_URL);
-
- if (targetAddress != null) {
- transportInfo = new JMSOutTransportInfo(targetAddress);
- } else if (targetAddress == null && msgContext.getTo() != null &&
- !msgContext.getTo().hasAnonymousAddress()) {
- targetAddress = msgContext.getTo().getAddress();
-
- if (!msgContext.getTo().hasNoneAddress()) {
- transportInfo = new JMSOutTransportInfo(targetAddress);
- } else {
- //Don't send the message.
- return InvocationResponse.CONTINUE;
- }
- } else if (msgContext.isServerSide()) {
- // get the jms ReplyTo
- transportInfo = (JMSOutTransportInfo)
- msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
- }
-
- // get the ConnectionFactory to be used for the send
- ConnectionFactory connectionFac = transportInfo.getConnectionFactory();
-
- Connection con = null;
- try {
- String user = transportInfo.getConnectionFactoryUser();
- String password = transportInfo.getConnectionFactoryPassword();
-
- if ((user == null) || (password == null)){
- // Use the OS username and credentials
- con = connectionFac.createConnection();
- } else{
- // use an explicit username and password
- con = connectionFac.createConnection(user, password);
- }
-
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Message message = createJMSMessage(msgContext, session);
-
- // get the JMS destination for the message being sent
- Destination dest = transportInfo.getDestination();
-
- if (dest == null) {
- if (targetAddress != null) {
-
- // if it does not exist, create it
- String name = JMSUtils.getDestination(targetAddress);
- if (log.isDebugEnabled()) {
- log.debug("Creating JMS Destination : " + name);
- }
-
- try {
- dest = session.createQueue(name);
- } catch (JMSException e) {
- handleException("Error creating destination Queue : " + name, e);
- }
- } else {
- handleException("Cannot send reply to unknown JMS Destination");
- }
- }
-
- MessageProducer producer = session.createProducer(dest);
- Destination replyDest = null;
-
- boolean waitForResponse =
- msgContext.getOperationContext() != null &&
- WSDL2Constants.MEP_URI_OUT_IN.equals(
- msgContext.getOperationContext().getAxisOperation().getMessageExchangePattern());
-
- if (waitForResponse) {
- String replyToJNDIName = (String) msgContext.getProperty(JMSConstants.REPLY_PARAM);
- if (replyToJNDIName != null && replyToJNDIName.length() > 0) {
- Context context = null;
- final Hashtable props = JMSUtils.getProperties(targetAddress);
- try {
- try {
- context = (Context) AccessController.doPrivileged(
- new PrivilegedExceptionAction() {
- public Object run() throws NamingException{
- return new InitialContext(props);
- }
- }
- )
- ;
- } catch (PrivilegedActionException e) {
- throw (NamingException) e.getException();
- }
- } catch (NamingException e) {
- handleException("Could not get the initial context", e);
- }
-
- try {
- replyDest = (Destination) context.lookup(replyToJNDIName);
-
- } catch (NameNotFoundException e) {
- log.warn("Cannot get or lookup JMS response destination : " +
- replyToJNDIName + " : " + e.getMessage() +
- ". Attempting to create a Queue named : " + replyToJNDIName);
- replyDest = session.createQueue(replyToJNDIName);
-
- } catch (NamingException e) {
- handleException("Cannot get JMS response destination : " +
- replyToJNDIName + " : ", e);
- }
-
- } else {
- try {
- // create temporary queue to receive reply
- replyDest = session.createTemporaryQueue();
- } catch (JMSException e) {
- handleException("Error creating temporary queue for response");
- }
- }
- message.setJMSReplyTo(replyDest);
- if (log.isDebugEnabled()) {
- log.debug("Expecting a response to JMS Destination : " +
- (replyDest instanceof Queue ?
- ((Queue) replyDest).getQueueName() : ((Topic) replyDest).getTopicName()));
- }
- }
-
- try {
- log.debug("[" + (msgContext.isServerSide() ? "Server" : "Client") +
- "]Sending message to destination : " + dest);
- producer.send(message);
- producer.close();
-
- } catch (JMSException e) {
- handleException("Error sending JMS message to destination : " +
- dest.toString(), e);
- }
-
- if (waitForResponse) {
- try {
- // wait for reply
- MessageConsumer consumer = session.createConsumer(replyDest);
-
- long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
- Long waitReply = (Long) msgContext.getProperty(JMSConstants.JMS_WAIT_REPLY);
- if (waitReply != null) {
- timeout = waitReply.longValue();
- }
-
- log.debug("Waiting for a maximum of " + timeout +
- "ms for a response message to destination : " + replyDest);
- con.start();
- Message reply = consumer.receive(timeout);
-
- if (reply != null) {
- msgContext.setProperty(MessageContext.TRANSPORT_IN,
- JMSUtils.getInputStream(reply));
- } else {
- log.warn("Did not receive a JMS response within " +
- timeout + " ms to destination : " + dest);
- }
-
- } catch (JMSException e) {
- handleException("Error reading response from temporary " +
- "queue : " + replyDest, e);
- }
- }
- } catch (JMSException e) {
- handleException("Error preparing to send message to destination", e);
-
- } finally {
- if (con != null) {
- try {
- con.close(); // closes all sessions, producers, temp Q's etc
- } catch (JMSException e) {
- } // ignore
- }
- }
- return InvocationResponse.CONTINUE;
- }
-
- public void cleanup(MessageContext msgContext) throws AxisFault {
- // do nothing
- }
-
- public void init(ConfigurationContext confContext,
- TransportOutDescription transportOut) throws AxisFault {
- // do nothing
- }
-
- public void stop() {
- // do nothing
- }
-
- /**
- * Create a JMS Message from the given MessageContext and using the given
- * session
- *
- * @param msgContext the MessageContext
- * @param session the JMS session
- * @return a JMS message from the context and session
- * @throws JMSException on exception
- */
- private Message createJMSMessage(MessageContext msgContext, Session session)
- throws JMSException {
-
- Message message = null;
- String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
-
- OMElement msgElement = msgContext.getEnvelope();
- if (msgContext.isDoingREST()) {
- msgElement = msgContext.getEnvelope().getBody().getFirstElement();
- }
-
- if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType)) {
-
- message = session.createBytesMessage();
- BytesMessage bytesMsg = (BytesMessage) message;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OMOutputFormat format = new OMOutputFormat();
-
- /* Added due to possible bug in Axis2, OMOutputFormat's boolean isSOAP11 defaults to true.
- * This means that if left untouched all JMS byte messages must be SOAP 1.1
- * We set the boolean here based on the messageContexts value, which is assertained from
- * the soap namespace used. This is what HTTP does also.
- */
- format.setSOAP11(msgContext.isSOAP11());
- format.setCharSetEncoding(
- getProperty(msgContext, Constants.Configuration.CHARACTER_SET_ENCODING));
- format.setDoOptimize(msgContext.isDoingMTOM());
- try {
- msgElement.serializeAndConsume(baos, format);
- baos.flush();
- } catch (XMLStreamException e) {
- handleException("XML serialization error creating BytesMessage", e);
- } catch (IOException e) {
- handleException("IO Error while creating BytesMessage", e);
- }
- bytesMsg.writeBytes(baos.toByteArray());
-
- /* Added due to possible bug in Axis2, the content type is never set for a JMS byte message. This
- * goes unnoticed when MTOM is not used, as the server can handle the message. However once MTOM
- * is used a contentType of multipart/related is required.
- */
- bytesMsg.setStringProperty(JMSConstants.CONTENT_TYPE,
- new SOAPMessageFormatter().getContentType(msgContext, format, null));
- } else {
- message = session.createTextMessage(); // default
- TextMessage txtMsg = (TextMessage) message;
- txtMsg.setText(msgElement.toString());
- }
-
- // set the JMS correlation ID if specified
- String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID);
- if (correlationId == null && msgContext.getRelatesTo() != null) {
- correlationId = msgContext.getRelatesTo().getValue();
- }
-
- if (correlationId != null) {
- message.setJMSCorrelationID(correlationId);
- }
-
- if (msgContext.isServerSide()) {
- // set SOAP Action and context type as properties on the JMS message
- setProperty(message, msgContext, JMSConstants.SOAPACTION);
- setProperty(message, msgContext, JMSConstants.CONTENT_TYPE);
- } else {
- String action = msgContext.getOptions().getAction();
- if (action != null) {
- message.setStringProperty(JMSConstants.SOAPACTION, action);
- }
- }
-
- return message;
- }
-
- private void setProperty(Message message, MessageContext msgCtx, String key) {
-
- String value = getProperty(msgCtx, key);
- if (value != null) {
- try {
- message.setStringProperty(key, value);
- } catch (JMSException e) {
- log.warn("Couldn't set message property : " + key + " = " + value, e);
- }
- }
- }
-
- private String getProperty(MessageContext mc, String key) {
- return (String) mc.getProperty(key);
- }
-
- private static void handleException(String s) {
- log.error(s);
- throw new AxisJMSException(s);
- }
-
- private static void handleException(String s, Exception e) {
- log.error(s, e);
- throw new AxisJMSException(s, e);
- }
-
-}