diff options
Diffstat (limited to 'branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java')
-rw-r--r-- | branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java | 1115 |
1 files changed, 0 insertions, 1115 deletions
diff --git a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java b/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java deleted file mode 100644 index 63faa0b852..0000000000 --- a/branches/sca-java-1.x/modules/binding-ws-axis2-jms/src/main/java/org/apache/tuscany/sca/binding/ws/axis2/jms/JMSUtils.java +++ /dev/null @@ -1,1115 +0,0 @@ -/* -* Copyright 2004,2005 The Apache Software Foundation. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.tuscany.sca.binding.ws.axis2.jms; - -import java.lang.reflect.Method; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.jms.TopicSession; -import javax.mail.internet.ContentType; -import javax.mail.internet.ParseException; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.Reference; - -import org.apache.axiom.om.OMElement; -import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.builder.Builder; -import org.apache.axis2.builder.BuilderUtil; -import org.apache.axis2.builder.SOAPBuilder; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.transport.TransportUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tuscany.sca.binding.ws.axis2.format.DataSourceMessageBuilder; -import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilder; -import org.apache.tuscany.sca.binding.ws.axis2.format.TextMessageBuilderAdapter; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseConstants; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.BaseUtils; -import org.apache.tuscany.sca.binding.ws.axis2.transport.base.threads.WorkerPool; - -/** - * Miscallaneous methods used for the JMS transport - */ -public class JMSUtils extends BaseUtils { - - private static final Log log = LogFactory.getLog(JMSUtils.class); - private static final Class[] NOARGS = new Class[] {}; - private static final Object[] NOPARMS = new Object[] {}; - - /** - * Should this service be enabled over the JMS transport? - * - * @param service the Axis service - * @return true if JMS should be enabled - */ - public static boolean isJMSService(AxisService service) { - if (service.isEnableAllTransports()) { - return true; - - } else { - List transports = service.getExposedTransports(); - for (Object transport : transports) { - if (JMSListener.TRANSPORT_NAME.equals(transport)) { - return true; - } - } - } - return false; - } - - /** - * Get the EPR for the given JMS connection factory and destination - * the form of the URL is - * jms:/<destination>?[<key>=<value>&]* - * Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS - * JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered - * - * @param cf the Axis2 JMS connection factory - * @param destinationType the type of destination - * @param endpoint JMSEndpoint - * @return the EPR as a String - */ - static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) { - StringBuffer sb = new StringBuffer(); - - sb.append( - JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName()); - sb.append("?"). - append(JMSConstants.PARAM_DEST_TYPE).append("=").append( - destinationType == JMSConstants.TOPIC ? - JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE); - - if (endpoint.getContentTypeRuleSet() != null) { - String contentTypeProperty = - endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty(); - if (contentTypeProperty != null) { - sb.append("&"); - sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - sb.append("="); - sb.append(contentTypeProperty); - } - } - - for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) { - if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) && - !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) && - !JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) && - !JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) { - sb.append("&").append( - entry.getKey()).append("=").append(entry.getValue()); - } - } - return sb.toString(); - } - - /** - * Get a String property from the JMS message - * - * @param message JMS message - * @param property property name - * @return property value - */ - public static String getProperty(Message message, String property) { - try { - return message.getStringProperty(property); - } catch (JMSException e) { - return null; - } - } - - /** - * Return the destination name from the given URL - * - * @param url the URL - * @return the destination name - */ - public static String getDestination(String url) { - String tempUrl = url.substring(JMSConstants.JMS_PREFIX.length()); - int propPos = tempUrl.indexOf("?"); - - if (propPos == -1) { - return tempUrl; - } else { - return tempUrl.substring(0, propPos); - } - } - - /** - * Set the SOAPEnvelope to the Axis2 MessageContext, from the JMS Message passed in - * @param message the JMS message read - * @param msgContext the Axis2 MessageContext to be populated - * @param contentType content type for the message - * @throws AxisFault - * @throws JMSException - */ - public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) - throws AxisFault, JMSException { - - if (contentType == null) { - if (message instanceof TextMessage) { - contentType = "text/plain"; - } else { - contentType = "application/octet-stream"; - } - if (log.isDebugEnabled()) { - log.debug("No content type specified; assuming " + contentType); - } - } - - int index = contentType.indexOf(';'); - String type = index > 0 ? contentType.substring(0, index) : contentType; - Builder builder = BuilderUtil.getBuilderFromSelector(type, msgContext); - if (builder == null) { - if (log.isDebugEnabled()) { - log.debug("No message builder found for type '" + type + "'. Falling back to SOAP."); - } - builder = new SOAPBuilder(); - } - - OMElement documentElement; - if (message instanceof BytesMessage) { - // Extract the charset encoding from the content type and - // set the CHARACTER_SET_ENCODING property as e.g. SOAPBuilder relies on this. - String charSetEnc = null; - try { - if (contentType != null) { - charSetEnc = new ContentType(contentType).getParameter("charset"); - } - } catch (ParseException ex) { - // ignore - } - msgContext.setProperty(Constants.Configuration.CHARACTER_SET_ENCODING, charSetEnc); - - if (builder instanceof DataSourceMessageBuilder) { - documentElement = ((DataSourceMessageBuilder)builder).processDocument( - new BytesMessageDataSource((BytesMessage)message), contentType, - msgContext); - } else { - documentElement = builder.processDocument( - new BytesMessageInputStream((BytesMessage)message), contentType, - msgContext); - } - } else if (message instanceof TextMessage) { - TextMessageBuilder textMessageBuilder; - if (builder instanceof TextMessageBuilder) { - textMessageBuilder = (TextMessageBuilder)builder; - } else { - textMessageBuilder = new TextMessageBuilderAdapter(builder); - } - String content = ((TextMessage)message).getText(); - documentElement = textMessageBuilder.processDocument(content, contentType, msgContext); - } else { - handleException("Unsupported JMS message type " + message.getClass().getName()); - return; // Make compiler happy - } - msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement)); - } - - /** - * Set the JMS ReplyTo for the message - * - * @param replyDestination the JMS Destination where the reply is expected - * @param session the session to use to create a temp Queue if a response is expected - * but a Destination has not been specified - * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo - * @return the JMS ReplyTo Destination for the message - */ - public static Destination setReplyDestination(Destination replyDestination, Session session, - Message message) { - - if (replyDestination == null) { - try { - // create temporary queue to receive the reply - replyDestination = createTemporaryDestination(session); - } catch (JMSException e) { - handleException("Error creating temporary queue for response"); - } - } - - try { - message.setJMSReplyTo(replyDestination); - } catch (JMSException e) { - log.warn("Error setting JMS ReplyTo destination to : " + replyDestination, e); - } - - if (log.isDebugEnabled()) { - try { - assert replyDestination != null; - log.debug("Expecting a response to JMS Destination : " + - (replyDestination instanceof Queue ? - ((Queue) replyDestination).getQueueName() : - ((Topic) replyDestination).getTopicName())); - } catch (JMSException ignore) {} - } - return replyDestination; - } - - /** - * Set transport headers from the axis message context, into the JMS message - * - * @param msgContext the axis message context - * @param message the JMS Message - * @throws JMSException on exception - */ - public static void setTransportHeaders(MessageContext msgContext, Message message) - throws JMSException { - - Map headerMap = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS); - - if (headerMap == null) { - return; - } - - for (Object headerName : headerMap.keySet()) { - - String name = (String) headerName; - - if (name.startsWith(JMSConstants.JMSX_PREFIX) && - !(name.equals(JMSConstants.JMSX_GROUP_ID) || name.equals(JMSConstants.JMSX_GROUP_SEQ))) { - continue; - } - - if (JMSConstants.JMS_COORELATION_ID.equals(name)) { - message.setJMSCorrelationID( - (String) headerMap.get(JMSConstants.JMS_COORELATION_ID)); - } else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) { - Object o = headerMap.get(JMSConstants.JMS_DELIVERY_MODE); - if (o instanceof Integer) { - message.setJMSDeliveryMode((Integer) o); - } else if (o instanceof String) { - try { - message.setJMSDeliveryMode(Integer.parseInt((String) o)); - } catch (NumberFormatException nfe) { - log.warn("Invalid delivery mode ignored : " + o, nfe); - } - } else { - log.warn("Invalid delivery mode ignored : " + o); - } - - } else if (JMSConstants.JMS_EXPIRATION.equals(name)) { - message.setJMSExpiration( - Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION))); - } else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) { - message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID)); - } else if (JMSConstants.JMS_PRIORITY.equals(name)) { - message.setJMSPriority( - Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY))); - } else if (JMSConstants.JMS_TIMESTAMP.equals(name)) { - message.setJMSTimestamp( - Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP))); - } else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) { - message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE)); - - } else { - Object value = headerMap.get(name); - if (value instanceof String) { - message.setStringProperty(name, (String) value); - } else if (value instanceof Boolean) { - message.setBooleanProperty(name, (Boolean) value); - } else if (value instanceof Integer) { - message.setIntProperty(name, (Integer) value); - } else if (value instanceof Long) { - message.setLongProperty(name, (Long) value); - } else if (value instanceof Double) { - message.setDoubleProperty(name, (Double) value); - } else if (value instanceof Float) { - message.setFloatProperty(name, (Float) value); - } - } - } - } - - /** - * Read the transport headers from the JMS Message and set them to the axis2 message context - * - * @param message the JMS Message received - * @param responseMsgCtx the axis message context - * @throws AxisFault on error - */ - public static void loadTransportHeaders(Message message, MessageContext responseMsgCtx) - throws AxisFault { - responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, getTransportHeaders(message)); - } - - /** - * Extract transport level headers for JMS from the given message into a Map - * - * @param message the JMS message - * @return a Map of the transport headers - */ - public static Map<String, Object> getTransportHeaders(Message message) { - // create a Map to hold transport headers - Map<String, Object> map = new HashMap<String, Object>(); - - // correlation ID - try { - if (message.getJMSCorrelationID() != null) { - map.put(JMSConstants.JMS_COORELATION_ID, message.getJMSCorrelationID()); - } - } catch (JMSException ignore) {} - - // set the delivery mode as persistent or not - try { - map.put(JMSConstants.JMS_DELIVERY_MODE, Integer.toString(message.getJMSDeliveryMode())); - } catch (JMSException ignore) {} - - // destination name - try { - if (message.getJMSDestination() != null) { - Destination dest = message.getJMSDestination(); - map.put(JMSConstants.JMS_DESTINATION, - dest instanceof Queue ? - ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); - } - } catch (JMSException ignore) {} - - // expiration - try { - map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message.getJMSExpiration())); - } catch (JMSException ignore) {} - - // if a JMS message ID is found - try { - if (message.getJMSMessageID() != null) { - map.put(JMSConstants.JMS_MESSAGE_ID, message.getJMSMessageID()); - } - } catch (JMSException ignore) {} - - // priority - try { - map.put(JMSConstants.JMS_PRIORITY, Long.toString(message.getJMSPriority())); - } catch (JMSException ignore) {} - - // redelivered - try { - map.put(JMSConstants.JMS_REDELIVERED, Boolean.toString(message.getJMSRedelivered())); - } catch (JMSException ignore) {} - - // replyto destination name - try { - if (message.getJMSReplyTo() != null) { - Destination dest = message.getJMSReplyTo(); - map.put(JMSConstants.JMS_REPLY_TO, - dest instanceof Queue ? - ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); - } - } catch (JMSException ignore) {} - - // priority - try { - map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message.getJMSTimestamp())); - } catch (JMSException ignore) {} - - // message type - try { - if (message.getJMSType() != null) { - map.put(JMSConstants.JMS_TYPE, message.getJMSType()); - } - } catch (JMSException ignore) {} - - // any other transport properties / headers - Enumeration e = null; - try { - e = message.getPropertyNames(); - } catch (JMSException ignore) {} - - if (e != null) { - while (e.hasMoreElements()) { - String headerName = (String) e.nextElement(); - try { - map.put(headerName, message.getStringProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getBooleanProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getIntProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getLongProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getDoubleProperty(headerName)); - continue; - } catch (JMSException ignore) {} - try { - map.put(headerName, message.getFloatProperty(headerName)); - } catch (JMSException ignore) {} - } - } - - return map; - } - - - /** - * Create a MessageConsumer for the given Destination - * @param session JMS Session to use - * @param dest Destination for which the Consumer is to be created - * @param messageSelector the message selector to be used if any - * @return a MessageConsumer for the specified Destination - * @throws JMSException - */ - public static MessageConsumer createConsumer(Session session, Destination dest, String messageSelector) - throws JMSException { - - if (dest instanceof Queue) { - return ((QueueSession) session).createReceiver((Queue) dest, messageSelector); - } else { - return ((TopicSession) session).createSubscriber((Topic) dest, messageSelector, false); - } - } - - /** - * Create a temp queue or topic for synchronous receipt of responses, when a reply destination - * is not specified - * @param session the JMS Session to use - * @return a temporary Queue or Topic, depending on the session - * @throws JMSException - */ - public static Destination createTemporaryDestination(Session session) throws JMSException { - - if (session instanceof QueueSession) { - return session.createTemporaryQueue(); - } else { - return session.createTemporaryTopic(); - } - } - - /** - * Return the body length in bytes for a bytes message - * @param bMsg the JMS BytesMessage - * @return length of body in bytes - */ - public static long getBodyLength(BytesMessage bMsg) { - try { - Method mtd = bMsg.getClass().getMethod("getBodyLength", NOARGS); - if (mtd != null) { - return (Long) mtd.invoke(bMsg, NOPARMS); - } - } catch (Exception e) { - // JMS 1.0 - if (log.isDebugEnabled()) { - log.debug("Error trying to determine JMS BytesMessage body length", e); - } - } - - // if JMS 1.0 - long length = 0; - try { - byte[] buffer = new byte[2048]; - bMsg.reset(); - for (int bytesRead = bMsg.readBytes(buffer); bytesRead != -1; - bytesRead = bMsg.readBytes(buffer)) { - length += bytesRead; - } - } catch (JMSException ignore) {} - return length; - } - - /** - * Get the length of the message in bytes - * @param message - * @return message size (or approximation) in bytes - * @throws JMSException - */ - public static long getMessageSize(Message message) throws JMSException { - if (message instanceof BytesMessage) { - return JMSUtils.getBodyLength((BytesMessage) message); - } else if (message instanceof TextMessage) { - // TODO: Converting the whole message to a byte array is too much overhead just to determine the message size. - // Anyway, the result is not accurate since we don't know what encoding the JMS provider uses. - return ((TextMessage) message).getText().getBytes().length; - } else { - log.warn("Can't determine size of JMS message; unsupported message type : " - + message.getClass().getName()); - return 0; - } - } - - public static <T> T lookup(Context context, Class<T> clazz, String name) - throws NamingException { - - Object object = context.lookup(name); - try { - return clazz.cast(object); - } catch (ClassCastException ex) { - // Instead of a ClassCastException, throw an exception with some - // more information. - if (object instanceof Reference) { - Reference ref = (Reference)object; - handleException("JNDI failed to de-reference Reference with name " + - name + "; is the factory " + ref.getFactoryClassName() + - " in your classpath?"); - return null; - } else { - handleException("JNDI lookup of name " + name + " returned a " + - object.getClass().getName() + " while a " + clazz + " was expected"); - return null; - } - } - } - - /** - * Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory - * @param jcf - * @param service - * @param workerPool - * @return - */ - public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf, - AxisService service, WorkerPool workerPool) { - - String name = service.getName(); - Map<String, String> svc = getServiceStringParameters(service.getParameters()); - Map<String, String> cf = jcf.getParameters(); - - ServiceTaskManager stm = new ServiceTaskManager(); - - stm.setServiceName(name); - stm.addJmsProperties(cf); - stm.addJmsProperties(svc); - - stm.setConnFactoryJNDIName( - getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf)); - String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf); - if (destName == null) { - destName = service.getName(); - } - stm.setDestinationJNDIName(destName); - stm.setDestinationType(getDestinationType(svc, cf)); - - stm.setJmsSpec11( - getJMSSpecVersion(svc, cf)); - stm.setTransactionality( - getTransactionality(svc, cf)); - stm.setCacheUserTransaction( - getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf)); - stm.setUserTransactionJNDIName( - getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf)); - stm.setSessionTransacted( - getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf)); - stm.setSessionAckMode( - getSessionAck(svc, cf)); - stm.setMessageSelector( - getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf)); - stm.setSubscriptionDurable( - getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf)); - stm.setDurableSubscriberName( - getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf)); - - stm.setCacheLevel( - getCacheLevel(svc, cf)); - stm.setPubSubNoLocal( - getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf)); - - Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf); - if (value != null) { - stm.setReceiveTimeout(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf); - if (value != null) { - stm.setConcurrentConsumers(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf); - if (value != null) { - stm.setMaxConcurrentConsumers(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf); - if (value != null) { - stm.setIdleTaskExecutionLimit(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf); - if (value != null) { - stm.setMaxMessagesPerTask(value); - } - - value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf); - if (value != null) { - stm.setInitialReconnectDuration(value); - } - value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf); - if (value != null) { - stm.setMaxReconnectDuration(value); - } - Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf); - if (dValue != null) { - stm.setReconnectionProgressionFactor(dValue); - } - - stm.setWorkerPool(workerPool); - - // remove processed properties from property bag - stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION); - stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER); - stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY); - stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN); - stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED); - stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR); - stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE); - stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME); - stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL); - stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL); - stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT); - stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS); - stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS); - stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT); - stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION); - stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR); - - return stm; - } - - private static Map<String, String> getServiceStringParameters(List list) { - - Map<String, String> map = new HashMap<String, String>(); - for (Object o : list) { - Parameter p = (Parameter) o; - if (p.getValue() instanceof String) { - map.put(p.getName(), (String) p.getValue()); - } - } - return map; - } - - private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value == null) { - throw new AxisJMSException("Service/connection factory property : " + key); - } - return value; - } - - private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - return value; - } - - private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value == null) { - return null; - } else { - return Boolean.valueOf(value); - } - } - - private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value != null) { - try { - return Integer.parseInt(value); - } catch (NumberFormatException e) { - throw new AxisJMSException("Invalid value : " + value + " for " + key); - } - } - return null; - } - - private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) { - String value = (String) svcMap.get(key); - if (value == null) { - value = (String) cfMap.get(key); - } - if (value != null) { - try { - return Double.parseDouble(value); - } catch (NumberFormatException e) { - throw new AxisJMSException("Invalid value : " + value + " for " + key); - } - } - return null; - } - - private static int getTransactionality(Map svcMap, Map cfMap) { - - String key = BaseConstants.PARAM_TRANSACTIONALITY; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null) { - return BaseConstants.TRANSACTION_NONE; - - } else { - if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) { - return BaseConstants.TRANSACTION_JTA; - } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) { - return BaseConstants.TRANSACTION_LOCAL; - } else { - throw new AxisJMSException("Invalid option : " + val + " for parameter : " + - BaseConstants.STR_TRANSACTION_JTA); - } - } - } - - private static int getDestinationType(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_DEST_TYPE; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) { - return JMSConstants.TOPIC; - } - return JMSConstants.QUEUE; - } - - private static int getSessionAck(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_SESSION_ACK; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) { - return Session.AUTO_ACKNOWLEDGE; - } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) { - return Session.CLIENT_ACKNOWLEDGE; - } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){ - return Session.DUPS_OK_ACKNOWLEDGE; - } else if ("SESSION_TRANSACTED".equals(val)) { - return 0; //Session.SESSION_TRANSACTED; - } else { - try { - return Integer.parseInt(val); - } catch (NumberFormatException ignore) { - throw new AxisJMSException("Invalid session acknowledgement mode : " + val); - } - } - } - - private static int getCacheLevel(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_CACHE_LEVEL; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if ("none".equalsIgnoreCase(val)) { - return JMSConstants.CACHE_NONE; - } else if ("connection".equalsIgnoreCase(val)) { - return JMSConstants.CACHE_CONNECTION; - } else if ("session".equals(val)){ - return JMSConstants.CACHE_SESSION; - } else if ("consumer".equals(val)) { - return JMSConstants.CACHE_CONSUMER; - } else if (val != null) { - throw new AxisJMSException("Invalid cache level : " + val); - } - return JMSConstants.CACHE_AUTO; - } - - private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) { - - String key = JMSConstants.PARAM_JMS_SPEC_VER; - String val = (String) svcMap.get(key); - if (val == null) { - val = (String) cfMap.get(key); - } - - if (val == null || "1.1".equals(val)) { - return true; - } else { - return false; - } - } - - /** - * This is a JMS spec independent method to create a Connection. Please be cautious when - * making any changes - * - * @param conFac the ConnectionFactory to use - * @param user optional user name - * @param pass optional password - * @param jmsSpec11 should we use JMS 1.1 API ? - * @param isQueue is this to deal with a Queue? - * @return a JMS Connection as requested - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static Connection createConnection(ConnectionFactory conFac, - String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException { - - Connection connection = null; - if (log.isDebugEnabled()) { - log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") + - "Connection using credentials : (" + user + "/" + pass + ")"); - } - - if (jmsSpec11 || isQueue == null) { - if (user != null && pass != null) { - connection = conFac.createConnection(user, pass); - } else { - connection = conFac.createConnection(); - } - - } else { - QueueConnectionFactory qConFac = null; - TopicConnectionFactory tConFac = null; - if (isQueue) { - tConFac = (TopicConnectionFactory) conFac; - } else { - qConFac = (QueueConnectionFactory) conFac; - } - - if (user != null && pass != null) { - if (qConFac != null) { - connection = qConFac.createQueueConnection(user, pass); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(user, pass); - } - } else { - if (qConFac != null) { - connection = qConFac.createQueueConnection(); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(); - } - } - } - return connection; - } - - /** - * This is a JMS spec independent method to create a Session. Please be cautious when - * making any changes - * - * @param connection the JMS Connection - * @param transacted should the session be transacted? - * @param ackMode the ACK mode for the session - * @param jmsSpec11 should we use the JMS 1.1 API? - * @param isQueue is this Session to deal with a Queue? - * @return a Session created for the given information - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static Session createSession(Connection connection, boolean transacted, int ackMode, - boolean jmsSpec11, Boolean isQueue) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - return connection.createSession(transacted, ackMode); - - } else { - if (isQueue) { - return ((QueueConnection) connection).createQueueSession(transacted, ackMode); - } else { - return ((TopicConnection) connection).createTopicSession(transacted, ackMode); - } - } - } - - /** - * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when - * making any changes - * - * @param session JMS session - * @param destination the Destination - * @param isQueue is the Destination a queue? - * @param subscriberName optional client name to use for a durable subscription to a topic - * @param messageSelector optional message selector - * @param pubSubNoLocal should we receive messages sent by us during pub-sub? - * @param isDurable is this a durable topic subscription? - * @param jmsSpec11 should we use JMS 1.1 API ? - * @return a MessageConsumer to receive messages - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static MessageConsumer createConsumer( - Session session, Destination destination, Boolean isQueue, - String subscriberName, String messageSelector, boolean pubSubNoLocal, - boolean isDurable, boolean jmsSpec11) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - if (isDurable) { - return session.createDurableSubscriber( - (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); - } else { - return session.createConsumer(destination, messageSelector, pubSubNoLocal); - } - } else { - if (isQueue) { - return ((QueueSession) session).createReceiver((Queue) destination, messageSelector); - } else { - if (isDurable) { - return ((TopicSession) session).createDurableSubscriber( - (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); - } else { - return ((TopicSession) session).createSubscriber( - (Topic) destination, messageSelector, pubSubNoLocal); - } - } - } - } - - /** - * This is a JMS spec independent method to create a MessageProducer. Please be cautious when - * making any changes - * - * @param session JMS session - * @param destination the Destination - * @param isQueue is the Destination a queue? - * @param jmsSpec11 should we use JMS 1.1 API ? - * @return a MessageProducer to send messages to the given Destination - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static MessageProducer createProducer( - Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException { - - if (jmsSpec11 || isQueue == null) { - return session.createProducer(destination); - } else { - if (isQueue) { - return ((QueueSession) session).createSender((Queue) destination); - } else { - return ((TopicSession) session).createPublisher((Topic) destination); - } - } - } - - /** - * Create a one time MessageProducer for the given JMS OutTransport information - * For simplicity and best compatibility, this method uses only JMS 1.0.2b API. - * Please be cautious when making any changes - * - * @param jmsOut the JMS OutTransport information (contains all properties) - * @return a JMSSender based on one-time use resources - * @throws JMSException on errors, to be handled and logged by the caller - */ - public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut) - throws JMSException { - - // digest the targetAddress and locate CF from the EPR - jmsOut.loadConnectionFactoryFromProperies(); - - // create a one time connection and session to be used - Hashtable<String,String> jmsProps = jmsOut.getProperties(); - String user = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_USERNAME) : null; - String pass = jmsProps != null ? jmsProps.get(JMSConstants.PARAM_JMS_PASSWORD) : null; - - QueueConnectionFactory qConFac = null; - TopicConnectionFactory tConFac = null; - - int destType = -1; - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) { - destType = JMSConstants.QUEUE; - qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory(); - - } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) { - destType = JMSConstants.TOPIC; - tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory(); - } - - Connection connection = null; - if (user != null && pass != null) { - if (qConFac != null) { - connection = qConFac.createQueueConnection(user, pass); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(user, pass); - } - } else { - if (qConFac != null) { - connection = qConFac.createQueueConnection(); - } else if (tConFac != null) { - connection = tConFac.createTopicConnection(); - } - } - - if (connection == null && jmsOut.getJmsConnectionFactory() != null) { - connection = jmsOut.getJmsConnectionFactory().getConnection(); - } - - Session session = null; - MessageProducer producer = null; - Destination destination = jmsOut.getDestination(); - - if (destType == JMSConstants.QUEUE) { - session = ((QueueConnection) connection). - createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - producer = ((QueueSession) session).createSender((Queue) destination); - } else { - session = ((TopicConnection) connection). - createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - producer = ((TopicSession) session).createPublisher((Topic) destination); - } - - return new JMSMessageSender(connection, session, producer, - destination, (jmsOut.getJmsConnectionFactory() == null ? - JMSConstants.CACHE_NONE : jmsOut.getJmsConnectionFactory().getCacheLevel()), false, - destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE); - } - - /** - * Return a String representation of the destination type - * @param destType the destination type indicator int - * @return a descriptive String - */ - public static String getDestinationTypeAsString(int destType) { - if (destType == JMSConstants.QUEUE) { - return "Queue"; - } else if (destType == JMSConstants.TOPIC) { - return "Topic"; - } else { - return "Generic"; - } - } -} |