summaryrefslogtreecommitdiffstats
path: root/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider
diff options
context:
space:
mode:
Diffstat (limited to 'tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider')
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/AbstractMessageProcessor.java136
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/BytesMessageProcessor.java124
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/DefaultJMSResourceFactoryExtensionPoint.java30
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/DefaultMessageProcessor.java287
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java70
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java170
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java228
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessor.java55
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessorUtil.java75
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactory.java95
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryExtensionPoint.java28
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryImpl.java282
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/ObjectMessageProcessor.java230
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java270
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/TextMessageProcessor.java79
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/XMLBytesMessageProcessor.java134
-rw-r--r--tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/XMLTextMessageProcessor.java129
17 files changed, 2422 insertions, 0 deletions
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/AbstractMessageProcessor.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/AbstractMessageProcessor.java
new file mode 100644
index 0000000000..e2fc49834e
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/AbstractMessageProcessor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.logging.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.osoa.sca.ServiceRuntimeException;
+
+/**
+ * Base MessageProcessor for the JMSBinding.
+ *
+ * @version $Rev$ $Date$
+ */
+public abstract class AbstractMessageProcessor implements JMSMessageProcessor {
+ private static final Logger logger = Logger.getLogger(AbstractMessageProcessor.class.getName());
+
+ protected String operationPropertyName;
+ protected boolean xmlFormat = true;
+
+ public AbstractMessageProcessor(JMSBinding jmsBinding) {
+ this.operationPropertyName = jmsBinding.getOperationSelectorPropertyName();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.binding.jms.OperationAndDataBinding#getOperationName(javax.jms.Message)
+ */
+ public String getOperationName(Message message) {
+ try {
+
+ return message.getStringProperty(operationPropertyName);
+
+ } catch (JMSException e) {
+ throw new JMSBindingException("Exception retreiving operation name from message", e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.binding.jms.OperationAndDataBinding#setOperationName(javax.jms.Message, java.lang.String)
+ */
+ public void setOperationName(String operationName, Message message) {
+ try {
+
+ message.setStringProperty(operationPropertyName, operationName);
+
+ } catch (JMSException e) {
+ throw new JMSBindingException("Exception setting the operation name on message", e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.binding.jms.OperationAndDataBinding#extractPayload(javax.jms.Session, java.lang.Object)
+ */
+ public Message insertPayloadIntoJMSMessage(Session session, Object o) {
+ return createJMSMessage(session, o);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.binding.jms.OperationAndDataBinding#extractPayload(javax.jms.Message)
+ */
+ public Object extractPayloadFromJMSMessage(Message msg) {
+ try {
+ if (msg.getBooleanProperty(JMSBindingConstants.FAULT_PROPERTY)) {
+ Object exc = ((ObjectMessage)msg).getObject();
+ if (exc instanceof RuntimeException) {
+ throw new ServiceRuntimeException("remote service exception, see nested exception", (Throwable)exc);
+ } else {
+ return new InvocationTargetException((Throwable) exc);
+ }
+ }
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ return extractPayload(msg);
+ }
+
+ public Message createFaultMessage(Session session, Throwable o) {
+ if (session == null) {
+ logger.fine("no response session to create fault message: " + String.valueOf(o));
+ return null;
+ }
+ try {
+
+ ObjectMessage message = session.createObjectMessage();
+ String causeMsg;
+ if (o instanceof RuntimeException) {
+ message.setObject(new RuntimeException(o.getMessage()));
+ } else {
+ // for a checked exception return the checked exception
+ message.setObject(o);
+ }
+ message.setBooleanProperty(JMSBindingConstants.FAULT_PROPERTY, true);
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ protected abstract Object extractPayload(Message msg);
+
+ protected abstract Message createJMSMessage(Session session, Object o);
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/BytesMessageProcessor.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/BytesMessageProcessor.java
new file mode 100644
index 0000000000..5d1990eb90
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/BytesMessageProcessor.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.osoa.sca.ServiceRuntimeException;
+
+/**
+ * MessageProcessor for sending/receiving javax.jms.BytesMessage with the JMSBinding.
+ *
+ * @version $Rev$ $Date$
+ */
+public class BytesMessageProcessor extends AbstractMessageProcessor {
+ private static final Logger logger = Logger.getLogger(AbstractMessageProcessor.class.getName());
+
+ public BytesMessageProcessor(JMSBinding jmsBinding) {
+ super(jmsBinding);
+ }
+
+/* TUSCANY-2967 - disable this change while we decide what to do and
+ * return faults as JMSObject messages to be consistent
+ * again with other wire formats
+ @Override
+ public Object extractPayloadFromJMSMessage(Message msg) {
+ byte [] bytes = (byte [])extractPayload(msg);
+
+ try {
+ if (msg.getBooleanProperty(JMSBindingConstants.FAULT_PROPERTY)) {
+ return new InvocationTargetException(new ServiceRuntimeException(new String(bytes)));
+ } else {
+ return bytes;
+ }
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+*/
+
+ @Override
+ protected Object extractPayload(Message msg) {
+ try {
+
+ if (!(msg instanceof BytesMessage)) {
+ throw new IllegalStateException("expecting JMS BytesMessage: " + msg);
+ }
+
+ long noOfBytes = ((BytesMessage)msg).getBodyLength();
+ byte [] bytes = new byte[(int)noOfBytes];
+ ((BytesMessage)msg).readBytes(bytes);
+ ((BytesMessage)msg).reset();
+ return bytes;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+/* TUSCANY-2967 - disable this change while we decide what to do and
+ * return faults as JMSObject messages to be consistent
+ * again with other wire formats
+ @Override
+ public Message createFaultMessage(Session session, Throwable o) {
+ try {
+ Message message = createJMSMessage(session, o.toString().getBytes());
+ message.setBooleanProperty(JMSBindingConstants.FAULT_PROPERTY, true);
+ return message;
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+*/
+
+ @Override
+ protected Message createJMSMessage(Session session, Object o) {
+ if (session == null) {
+ logger.fine("no response session to create message: " + String.valueOf(o));
+ return null;
+ }
+ try {
+
+ // TODO - an experiment. How to enforce a single
+ // byte array parameter
+ BytesMessage message = session.createBytesMessage();
+
+ if (o != null){
+ message.writeBytes((byte[])o);
+ }
+
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/DefaultJMSResourceFactoryExtensionPoint.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/DefaultJMSResourceFactoryExtensionPoint.java
new file mode 100644
index 0000000000..1de85ea773
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/DefaultJMSResourceFactoryExtensionPoint.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+
+public class DefaultJMSResourceFactoryExtensionPoint implements JMSResourceFactoryExtensionPoint {
+
+ public JMSResourceFactory createJMSResourceFactory(JMSBinding binding) {
+ return new JMSResourceFactoryImpl(binding.getConnectionFactoryName(), binding.getResponseConnectionFactoryName(), binding.getInitialContextFactoryName(), binding.getJndiURL());
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/DefaultMessageProcessor.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/DefaultMessageProcessor.java
new file mode 100644
index 0000000000..05f264db78
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/DefaultMessageProcessor.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringReader;
+import java.util.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMNode;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.apache.tuscany.sca.interfacedef.util.FaultException;
+
+/**
+ * MessageProcessor for sending/receiving XML over javax.jms.TextMessage or javax.jms.BytesMessage
+ * with the JMSBinding.
+ * This is very specific to the default wire format and is not tied into the usual hierarchy
+ * of message processors
+ *
+ * @version $Rev$ $Date$
+ */
+public class DefaultMessageProcessor extends AbstractMessageProcessor {
+ private static final Logger logger = Logger.getLogger(DefaultMessageProcessor.class.getName());
+
+ public DefaultMessageProcessor(JMSBinding jmsBinding) {
+ super(jmsBinding);
+ }
+
+ // inherited methods that don't do anything useful
+ @Override
+ protected Message createJMSMessage(Session session, Object o) {
+ // should not be used
+ return null;
+ }
+
+ @Override
+ protected Object extractPayload(Message msg) {
+ // if it's not a text/bytes message or a fault then we don;t know what to do with it
+ return null;
+ }
+
+ // TODO - This makes the assumption that whatever the text/bytes configuration of the
+ // jms binding, unchecked faults will be sent as bytes.
+ @Override
+ public Message createFaultMessage(Session session, Throwable o) {
+ return createFaultJMSBytesMessage(session, o);
+ }
+
+ // handle text messages
+
+ public Object extractPayloadFromJMSTextMessage(Message msg, OMElement wrapper) {
+ if (msg instanceof TextMessage) {
+ try {
+ String xml = ((TextMessage) msg).getText();
+ Object os;
+ if (xml != null) {
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(xml));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ os = builder.getDocumentElement();
+ } else {
+ os = null;
+ }
+
+ if (wrapper != null){
+ //don't modify the original wrapper since it will be reused
+ //clone the wrapper
+ OMElement newWrapper = wrapper.cloneOMElement();
+ if (os != null){
+ newWrapper.addChild((OMNode)os);
+ }
+ return newWrapper;
+ }
+
+ return os;
+
+ } catch (XMLStreamException e) {
+ throw new JMSBindingException(e);
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ } else {
+ // handle the non-text fault case
+ return super.extractPayloadFromJMSMessage(msg);
+ }
+ }
+
+ public Message insertPayloadIntoJMSTextMessage(Session session, Object o, boolean unwrap) {
+
+ try {
+
+ TextMessage message = session.createTextMessage();
+
+ if (o instanceof OMElement) {
+
+ if (unwrap){
+ OMElement firstElement = ((OMElement)o).getFirstElement();
+ if (firstElement == null ) {
+ message.setText(null);
+ } else {
+ message.setText(firstElement.toString());
+ }
+ }else {
+ message.setText(o.toString());
+ }
+ } else if ((o instanceof Object[]) && ((Object[]) o)[0] instanceof OMElement) {
+ if (unwrap){
+ OMElement firstElement = ((OMElement)((Object[]) o)[0]).getFirstElement();
+ if (firstElement == null ) {
+ message.setText(null);
+ } else {
+ message.setText(firstElement.toString());
+ }
+ }else {
+ message.setText(((Object[]) o)[0].toString());
+ }
+ } else if (o != null) {
+ throw new IllegalStateException("expecting OMElement payload: " + o);
+ }
+
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ public Message createFaultJMSTextMessage(Session session, Throwable o) {
+
+ if (session == null) {
+ logger.fine("no response session to create fault message: " + String.valueOf(o));
+ return null;
+ }
+ if (o instanceof FaultException) {
+ try {
+
+ TextMessage message = session.createTextMessage();
+ message.setText(String.valueOf(((FaultException) o).getFaultInfo()));
+ message.setBooleanProperty(JMSBindingConstants.FAULT_PROPERTY, true);
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ } else {
+ // handle the non XML fault case
+ return super.createFaultMessage(session, o);
+ }
+ }
+
+ // handle bytes messages
+
+ public Object extractPayloadFromJMSBytesMessage(Message msg, OMElement wrapper) {
+
+ if (msg instanceof BytesMessage) {
+ try {
+ Object os;
+
+ long noOfBytes = ((BytesMessage) msg).getBodyLength();
+ byte[] bytes = new byte[(int) noOfBytes];
+ ((BytesMessage) msg).readBytes(bytes);
+ ((BytesMessage)msg).reset();
+
+ if ((bytes != null) && (bytes.length > 0)) {
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new ByteArrayInputStream(bytes));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ os = builder.getDocumentElement();
+ } else {
+ os = null;
+ }
+
+ if (wrapper != null){
+ //don't modify the original wrapper since it will be reused
+ //clone the wrapper
+ OMElement newWrapper = wrapper.cloneOMElement();
+ if (os != null){
+ newWrapper.addChild((OMNode)os);
+ }
+ return newWrapper;
+ }
+
+ return os;
+
+ } catch (XMLStreamException e) {
+ throw new JMSBindingException(e);
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ } else {
+ // trap the non-bytes fault case
+ return super.extractPayloadFromJMSMessage(msg);
+ }
+ }
+
+ public Message insertPayloadIntoJMSBytesMessage(Session session, Object o, boolean unwrap) {
+
+ try {
+
+ BytesMessage message = session.createBytesMessage();
+
+
+ if (o instanceof OMElement) {
+ if (unwrap) {
+ OMElement firstElement = ((OMElement)o).getFirstElement();
+ if (firstElement == null ) {
+ //do nothing, the message will just be set with a byte[0]
+ } else {
+ message.writeBytes(firstElement.toString().getBytes());
+ }
+
+ } else {
+ message.writeBytes(o.toString().getBytes());
+ }
+
+ } else if ((o instanceof Object[]) && ((Object[]) o)[0] instanceof OMElement) {
+ if (unwrap){
+ OMElement firstElement = ((OMElement)((Object[]) o)[0]).getFirstElement();
+ if (firstElement == null ) {
+ //do nothing, the message will just be set with a byte[0]
+ } else {
+ message.writeBytes(firstElement.toString().getBytes());
+ }
+
+ }else {
+ message.writeBytes(((Object[]) o)[0].toString().getBytes());
+ }
+ } else if (o != null) {
+ throw new IllegalStateException("expecting OMElement payload: " + o);
+ }
+
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ public Message createFaultJMSBytesMessage(Session session, Throwable o) {
+
+ if (session == null) {
+ logger.fine("no response session to create fault message: " + String.valueOf(o));
+ return null;
+ }
+
+ if (o instanceof FaultException) {
+ try {
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(String.valueOf(((FaultException) o).getFaultInfo()).getBytes());
+ message.setBooleanProperty(JMSBindingConstants.FAULT_PROPERTY, true);
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ } else {
+ return super.createFaultMessage(session, o);
+ }
+ }
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java
new file mode 100644
index 0000000000..c9a11dc41e
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.host.jms.JMSHostExtensionPoint;
+import org.apache.tuscany.sca.host.jms.JMSServiceListenerFactory;
+import org.apache.tuscany.sca.provider.BindingProviderFactory;
+import org.apache.tuscany.sca.provider.ReferenceBindingProvider;
+import org.apache.tuscany.sca.provider.ServiceBindingProvider;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
+import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+
+/**
+ * A factory from creating the JMS binding provider.
+ *
+ * @version $Rev$ $Date$
+ */
+public class JMSBindingProviderFactory implements BindingProviderFactory<JMSBinding> {
+
+ private ExtensionPointRegistry extensionPoints;
+ private JMSResourceFactoryExtensionPoint jmsRFEP;
+ private JMSServiceListenerFactory serviceListenerFactory;
+
+ public JMSBindingProviderFactory(ExtensionPointRegistry extensionPoints) {
+ this.extensionPoints = extensionPoints;
+
+ jmsRFEP = (JMSResourceFactoryExtensionPoint)extensionPoints.getExtensionPoint(JMSResourceFactoryExtensionPoint.class);
+ if (jmsRFEP == null) {
+ jmsRFEP = new DefaultJMSResourceFactoryExtensionPoint();
+ extensionPoints.addExtensionPoint(jmsRFEP);
+ }
+
+ JMSHostExtensionPoint jmsHostExtensionPoint = (JMSHostExtensionPoint)extensionPoints.getExtensionPoint(JMSHostExtensionPoint.class);
+ serviceListenerFactory = jmsHostExtensionPoint.getJMSServiceListenerFactory();
+ }
+
+ public ReferenceBindingProvider createReferenceBindingProvider(RuntimeComponent component, RuntimeComponentReference reference, JMSBinding binding) {
+ JMSResourceFactory jmsRF = jmsRFEP.createJMSResourceFactory(binding);
+ return new JMSBindingReferenceBindingProvider(component, reference, binding, extensionPoints, jmsRF);
+ }
+
+ public ServiceBindingProvider createServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, JMSBinding binding) {
+ JMSResourceFactory jmsRF = jmsRFEP.createJMSResourceFactory(binding);
+ return new JMSBindingServiceBindingProvider(component, service, binding, binding, serviceListenerFactory, extensionPoints, jmsRF);
+ }
+
+ public Class<JMSBinding> getModelType() {
+ return JMSBinding.class;
+ }
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
new file mode 100644
index 0000000000..0e44b84ea0
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.tuscany.sca.binding.jms.headers.HeaderReferenceInterceptor;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.apache.tuscany.sca.binding.jms.transport.TransportReferenceInterceptor;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.interfacedef.InterfaceContract;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.InvocationChain;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.Phase;
+import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint;
+import org.apache.tuscany.sca.provider.ReferenceBindingProviderRRB;
+import org.apache.tuscany.sca.provider.WireFormatProvider;
+import org.apache.tuscany.sca.provider.WireFormatProviderFactory;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
+import org.apache.tuscany.sca.runtime.RuntimeWire;
+
+/**
+ * Implementation of the JMS reference binding provider.
+ *
+ * @version $Rev$ $Date$
+ */
+public class JMSBindingReferenceBindingProvider implements ReferenceBindingProviderRRB {
+
+ private RuntimeComponentReference reference;
+ private JMSBinding jmsBinding;
+ private JMSResourceFactory jmsResourceFactory;
+ private RuntimeComponent component;
+ private InterfaceContract interfaceContract;
+ private ExtensionPointRegistry extensions;
+
+ private ProviderFactoryExtensionPoint providerFactories;
+
+ private WireFormatProviderFactory requestWireFormatProviderFactory;
+ private WireFormatProvider requestWireFormatProvider;
+
+ private WireFormatProviderFactory responseWireFormatProviderFactory;
+ private WireFormatProvider responseWireFormatProvider;
+
+ public JMSBindingReferenceBindingProvider(RuntimeComponent component, RuntimeComponentReference reference, JMSBinding binding, ExtensionPointRegistry extensions, JMSResourceFactory jmsResourceFactory) {
+ this.reference = reference;
+ this.jmsBinding = binding;
+ this.extensions = extensions;
+ this.component = component;
+ this.jmsResourceFactory = jmsResourceFactory;
+
+ // Get the factories/providers for operation selection
+ this.providerFactories = extensions.getExtensionPoint(ProviderFactoryExtensionPoint.class);
+
+ // Get the factories/providers for wire format
+ this.requestWireFormatProviderFactory =
+ (WireFormatProviderFactory)providerFactories.getProviderFactory(jmsBinding.getRequestWireFormat().getClass());
+ if (this.requestWireFormatProviderFactory != null){
+ this.requestWireFormatProvider = requestWireFormatProviderFactory.createReferenceWireFormatProvider(component, reference, jmsBinding);
+ }
+
+ this.responseWireFormatProviderFactory =
+ (WireFormatProviderFactory)providerFactories.getProviderFactory(jmsBinding.getResponseWireFormat().getClass());
+ if (this.responseWireFormatProviderFactory != null){
+ this.responseWireFormatProvider = responseWireFormatProviderFactory.createReferenceWireFormatProvider(component, reference, jmsBinding);
+ }
+
+ // create an interface contract that reflects both request and response
+ // wire formats
+ try {
+ interfaceContract = (InterfaceContract)reference.getInterfaceContract().clone();
+
+ requestWireFormatProvider.configureWireFormatInterfaceContract(interfaceContract);
+ responseWireFormatProvider.configureWireFormatInterfaceContract(interfaceContract);
+ } catch (CloneNotSupportedException ex){
+ interfaceContract = reference.getInterfaceContract();
+ }
+ }
+
+ public Invoker createInvoker(Operation operation) {
+
+ if (jmsBinding.getDestinationName() == null) {
+ if (!reference.isCallback()) {
+ throw new JMSBindingException("No destination specified for reference " + reference.getName());
+ }
+ }
+
+ Invoker invoker = null;
+ invoker = new RRBJMSBindingInvoker(jmsBinding, operation, jmsResourceFactory, reference);
+
+ return invoker;
+ }
+
+ public boolean supportsOneWayInvocation() {
+ return true;
+ }
+
+ public InterfaceContract getBindingInterfaceContract() {
+ return interfaceContract;
+ }
+
+ public void start() {
+
+ }
+
+ public void stop() {
+ try {
+ jmsResourceFactory.closeConnection();
+ jmsResourceFactory.closeResponseConnection();
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ /*
+ * set up the reference binding wire with the right set of jms reference
+ * interceptors
+ */
+ public void configureBindingChain(RuntimeWire runtimeWire) {
+
+ InvocationChain bindingChain = runtimeWire.getBindingInvocationChain();
+
+ // add transport interceptor
+ bindingChain.addInterceptor(Phase.REFERENCE_BINDING_TRANSPORT,
+ new TransportReferenceInterceptor(jmsBinding,
+ jmsResourceFactory,
+ runtimeWire) );
+
+ // add request wire format
+ bindingChain.addInterceptor(requestWireFormatProvider.getPhase(),
+ requestWireFormatProvider.createInterceptor());
+
+ // add response wire format, but only add it if it's different from the request
+ if (!jmsBinding.getRequestWireFormat().equals(jmsBinding.getResponseWireFormat())){
+ bindingChain.addInterceptor(responseWireFormatProvider.getPhase(),
+ responseWireFormatProvider.createInterceptor());
+ }
+
+ // add the header processor that comes after the wire formatter but before the
+ // policy interceptors
+ bindingChain.addInterceptor(Phase.REFERENCE_BINDING_WIREFORMAT,
+ new HeaderReferenceInterceptor(jmsBinding,
+ jmsResourceFactory,
+ runtimeWire) );
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
new file mode 100644
index 0000000000..de16edb044
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.util.logging.Logger;
+
+import org.apache.tuscany.sca.assembly.Binding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.apache.tuscany.sca.binding.jms.transport.TransportServiceInterceptor;
+import org.apache.tuscany.sca.binding.jms.wire.CallbackDestinationInterceptor;
+import org.apache.tuscany.sca.binding.jms.wire.OperationPropertiesInterceptor;
+import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.host.jms.JMSServiceListener;
+import org.apache.tuscany.sca.host.jms.JMSServiceListenerDetails;
+import org.apache.tuscany.sca.host.jms.JMSServiceListenerFactory;
+import org.apache.tuscany.sca.interfacedef.InterfaceContract;
+import org.apache.tuscany.sca.invocation.InvocationChain;
+import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.invocation.Phase;
+import org.apache.tuscany.sca.provider.OperationSelectorProvider;
+import org.apache.tuscany.sca.provider.OperationSelectorProviderFactory;
+import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint;
+import org.apache.tuscany.sca.provider.ServiceBindingProviderRRB;
+import org.apache.tuscany.sca.provider.WireFormatProvider;
+import org.apache.tuscany.sca.provider.WireFormatProviderFactory;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.runtime.RuntimeWire;
+
+/**
+ * Implementation of the JMS service binding provider.
+ *
+ * @version $Rev$ $Date$
+ */
+public class JMSBindingServiceBindingProvider implements ServiceBindingProviderRRB, JMSServiceListenerDetails {
+ private static final Logger logger = Logger.getLogger(JMSBindingServiceBindingProvider.class.getName());
+
+ private RuntimeComponentService service;
+ private Binding targetBinding;
+ private JMSBinding jmsBinding;
+ private JMSResourceFactory jmsResourceFactory;
+ private JMSServiceListenerFactory serviceListenerFactory;
+ private JMSServiceListener serviceListener;
+
+ private RuntimeComponent component;
+ private InterfaceContract interfaceContract;
+
+ private ProviderFactoryExtensionPoint providerFactories;
+ private ModelFactoryExtensionPoint modelFactories;
+
+ private MessageFactory messageFactory;
+
+ private OperationSelectorProviderFactory operationSelectorProviderFactory;
+ private OperationSelectorProvider operationSelectorProvider;
+
+ private WireFormatProviderFactory requestWireFormatProviderFactory;
+ private WireFormatProvider requestWireFormatProvider;
+
+ private WireFormatProviderFactory responseWireFormatProviderFactory;
+ private WireFormatProvider responseWireFormatProvider;
+
+ public JMSBindingServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, Binding targetBinding, JMSBinding binding, JMSServiceListenerFactory serviceListenerFactory, ExtensionPointRegistry extensionPoints, JMSResourceFactory jmsResourceFactory) {
+ this.component = component;
+ this.service = service;
+ this.jmsBinding = binding;
+ this.serviceListenerFactory = serviceListenerFactory;
+ this.targetBinding = targetBinding;
+ this.jmsResourceFactory = jmsResourceFactory;
+
+ // Set the default destination when using a connection factory.
+ // If an activation spec is being used, do not set the destination
+ // because the activation spec provides the destination.
+ if (jmsBinding.getDestinationName() == null &&
+ (jmsBinding.getActivationSpecName() == null || jmsBinding.getActivationSpecName().equals(""))) {
+ if (!service.isCallback()) {
+ // use the SCA service name as the default destination name
+ jmsBinding.setDestinationName(service.getName());
+ }
+ }
+
+ // Get Message factory
+ modelFactories = extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
+ messageFactory = modelFactories.getFactory(MessageFactory.class);
+
+ // Get the factories/providers for operation selection
+ this.providerFactories = extensionPoints.getExtensionPoint(ProviderFactoryExtensionPoint.class);
+ this.operationSelectorProviderFactory =
+ (OperationSelectorProviderFactory)providerFactories.getProviderFactory(jmsBinding.getOperationSelector().getClass());
+ if (this.operationSelectorProviderFactory != null){
+ this.operationSelectorProvider = operationSelectorProviderFactory.createServiceOperationSelectorProvider(component, service, jmsBinding);
+ }
+
+ // Get the factories/providers for wire format
+ this.requestWireFormatProviderFactory =
+ (WireFormatProviderFactory)providerFactories.getProviderFactory(jmsBinding.getRequestWireFormat().getClass());
+ if (this.requestWireFormatProviderFactory != null){
+ this.requestWireFormatProvider = requestWireFormatProviderFactory.createServiceWireFormatProvider(component, service, jmsBinding);
+ }
+
+ this.responseWireFormatProviderFactory =
+ (WireFormatProviderFactory)providerFactories.getProviderFactory(jmsBinding.getResponseWireFormat().getClass());
+ if (this.responseWireFormatProviderFactory != null){
+ this.responseWireFormatProvider = responseWireFormatProviderFactory.createServiceWireFormatProvider(component, service, jmsBinding);
+ }
+
+ // create an interface contract that reflects both request and response
+ // wire formats
+ try {
+ interfaceContract = (InterfaceContract)service.getInterfaceContract().clone();
+
+ requestWireFormatProvider.configureWireFormatInterfaceContract(interfaceContract);
+ responseWireFormatProvider.configureWireFormatInterfaceContract(interfaceContract);
+ } catch (CloneNotSupportedException ex){
+ interfaceContract = service.getInterfaceContract();
+ }
+ }
+
+ public InterfaceContract getBindingInterfaceContract() {
+ return interfaceContract;
+ }
+
+ public boolean supportsOneWayInvocation() {
+ return true;
+ }
+
+ public void start() {
+ try {
+
+ this.serviceListener = serviceListenerFactory.createJMSServiceListener(this);
+ serviceListener.start();
+
+ } catch (Exception e) {
+ throw new JMSBindingException("Error starting JMSServiceBinding", e);
+ }
+ }
+
+ public void stop() {
+ try {
+ serviceListener.stop();
+ } catch (Exception e) {
+ throw new JMSBindingException("Error stopping JMSServiceBinding", e);
+ }
+ }
+
+ public String getDestinationName() {
+ return serviceListener.getDestinationName();
+ }
+
+ /*
+ * Adds JMS specific interceptors to the binding chain
+ */
+ public void configureBindingChain(RuntimeWire runtimeWire) {
+
+ InvocationChain bindingChain = runtimeWire.getBindingInvocationChain();
+
+ // add transport interceptor
+ bindingChain.addInterceptor(Phase.SERVICE_BINDING_TRANSPORT,
+ new TransportServiceInterceptor(jmsBinding,
+ jmsResourceFactory,
+ runtimeWire) );
+
+ // add operation selector interceptor
+ bindingChain.addInterceptor(operationSelectorProvider.getPhase(),
+ operationSelectorProvider.createInterceptor());
+
+ // add operationProperties interceptor after operation selector
+ bindingChain.addInterceptor(Phase.SERVICE_BINDING_OPERATION_SELECTOR,
+ new OperationPropertiesInterceptor(jmsBinding, runtimeWire));
+
+ // add callback destination interceptor after operation selector
+ bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT,
+ new CallbackDestinationInterceptor(runtimeWire));
+
+ // add request wire format
+ bindingChain.addInterceptor(requestWireFormatProvider.getPhase(),
+ requestWireFormatProvider.createInterceptor());
+
+ // add response wire format, but only add it if it's different from the request
+ if (!jmsBinding.getRequestWireFormat().equals(jmsBinding.getResponseWireFormat())){
+ bindingChain.addInterceptor(responseWireFormatProvider.getPhase(),
+ responseWireFormatProvider.createInterceptor());
+ }
+ }
+
+ public RuntimeComponent getComponent() {
+ return component;
+ }
+
+ public RuntimeComponentService getService() {
+ return service;
+ }
+
+ public Binding getTargetBinding() {
+ return targetBinding;
+ }
+
+ public JMSBinding getJmsBinding() {
+ return jmsBinding;
+ }
+
+ public MessageFactory getMessageFactory() {
+ return messageFactory;
+ }
+
+ public JMSResourceFactory getResourceFactory() {
+ return jmsResourceFactory;
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessor.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessor.java
new file mode 100644
index 0000000000..821b9d7873
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import javax.jms.Message;
+import javax.jms.Session;
+
+/**
+ * Interface for a component that does operation selection and message payload processing
+ *
+ * @version $Rev$ $Date$
+ */
+public interface JMSMessageProcessor {
+
+ /**
+ * Get the operation name from a JMS Message
+ */
+ String getOperationName(Message message);
+
+ /**
+ * Set the operation name on a JMS Message
+ */
+ void setOperationName(String operationName, Message message);
+
+ /**
+ * Extracts the payload from a JMS Message
+ */
+ Object extractPayloadFromJMSMessage(Message msg);
+
+ /**
+ * Create a JMS Message containing the payload
+ */
+ Message insertPayloadIntoJMSMessage(Session session, Object payload);
+
+ /**
+ * Create a JMS Message for reporting an exception
+ */
+ Message createFaultMessage(Session session, Throwable responsePayload);
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessorUtil.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessorUtil.java
new file mode 100644
index 0000000000..699b1cb176
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessorUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+
+/**
+ * Utility methods to load JMS message processors.
+ *
+ * @version $Rev: $ $Date: $
+ */
+public class JMSMessageProcessorUtil {
+
+ /**
+ * Used to create instances of the JMSResourceFactory and RequestMessageProcessor and ResponseMessageProcessor from
+ * string based class name provided in the configuration
+ *
+ * @param cl ClassLoader
+ * @param className the string based class name to load and instantiate
+ * @return the new object
+ */
+ private static Object instantiate(ClassLoader cl, String className, JMSBinding binding) {
+ Object instance;
+ if (cl == null) {
+ cl = binding.getClass().getClassLoader();
+ }
+
+ try {
+ Class<?> clazz;
+
+ try {
+ clazz = cl.loadClass(className);
+ } catch (ClassNotFoundException e) {
+ clazz = binding.getClass().getClassLoader().loadClass(className);
+ }
+
+ Constructor<?> constructor = clazz.getDeclaredConstructor(new Class[] {JMSBinding.class});
+ instance = constructor.newInstance(binding);
+
+ } catch (Throwable e) {
+ throw new JMSBindingException("Exception instantiating OperationAndDataBinding class", e);
+ }
+
+ return instance;
+ }
+
+ public static JMSMessageProcessor getRequestMessageProcessor(JMSBinding binding) {
+ return (JMSMessageProcessor)instantiate(null, binding.getRequestMessageProcessorName(), binding);
+ }
+
+ public static JMSMessageProcessor getResponseMessageProcessor(JMSBinding binding) {
+ return (JMSMessageProcessor)instantiate(null, binding.getResponseMessageProcessorName(), binding);
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactory.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactory.java
new file mode 100644
index 0000000000..348764fe43
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+public interface JMSResourceFactory {
+
+ /*
+ * This is a simple implementation where a connection is created per binding Ideally the resource factory should be
+ * able to leverage the host environment to provide connection pooling if it can. E.g. if Tuscany is running inside
+ * an AppServer Then we could leverage the JMS resources it provides
+ *
+ * @see org.apache.tuscany.binding.jms.JMSResourceFactory#getConnection()
+ */
+ public abstract Connection getConnection() throws NamingException, JMSException;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.binding.jms.JMSResourceFactory#createSession()
+ */
+ public abstract Session createSession() throws JMSException, NamingException;
+
+ public abstract void closeSession(Session session) throws JMSException;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.binding.jms.JMSResourceFactory#startConnection()
+ */
+ public abstract void startConnection() throws JMSException, NamingException;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.binding.jms.JMSResourceFactory#closeConnection()
+ */
+ public abstract void closeConnection() throws JMSException;
+
+ public abstract Destination lookupDestination(String destName) throws NamingException;
+
+ /**
+ * You can create a destination in ActiveMQ (and have it appear in JNDI) by putting "dynamicQueues/" in front of the queue name being looked up
+ */
+ public abstract Destination createDestination(String jndiName) throws NamingException;
+
+ /*
+ * This is a simple implementation where a connection is created per binding Ideally the resource factory should be
+ * able to leverage the host environment to provide connection pooling if it can. E.g. if Tuscany is running inside
+ * an AppServer Then we could leverage the JMS resources it provides
+ *
+ * @see org.apache.tuscany.binding.jms.JMSResourceFactory#getConnection()
+ */
+ public abstract Connection getResponseConnection() throws NamingException, JMSException;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.binding.jms.JMSResourceFactory#createSession()
+ */
+ public abstract Session createResponseSession() throws JMSException, NamingException;
+
+ public abstract void closeResponseSession(Session session) throws JMSException;
+
+ public abstract void closeResponseConnection() throws JMSException;
+
+ /*
+ * Indicates whether connections obtained using getConnection() or getResponseConnection()
+ * must be closed after each use. This is necessary in environments where connections are
+ * shared with other users, or where connections cannot be held across transaction boundaries.
+ */
+ public abstract boolean isConnectionClosedAfterUse();
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryExtensionPoint.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryExtensionPoint.java
new file mode 100644
index 0000000000..57fbfb3215
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryExtensionPoint.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+
+public interface JMSResourceFactoryExtensionPoint {
+
+ JMSResourceFactory createJMSResourceFactory(JMSBinding binding);
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryImpl.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryImpl.java
new file mode 100644
index 0000000000..1debc61dff
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryImpl.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+
+/**
+ * Abstracts away any JMS provide specific feature from the JMS binding
+ *
+ * @version $Rev$ $Date$
+ */
+public class JMSResourceFactoryImpl implements JMSResourceFactory {
+
+ protected String initialContextFactoryName;
+ protected String connectionFactoryName = "ConnectionFactory";
+ protected String jndiURL;
+
+ protected Connection connection;
+ protected Context context;
+ protected boolean isConnectionStarted;
+ private Connection responseConnection;
+ private String responseConnectionFactoryName;
+
+ public JMSResourceFactoryImpl(String connectionFactoryName, String responseConnectionFactoryName, String initialContextFactoryName, String jndiURL) {
+ if (connectionFactoryName != null && connectionFactoryName.trim().length() > 0) {
+ this.connectionFactoryName = connectionFactoryName.trim();
+ }
+ if (responseConnectionFactoryName != null && responseConnectionFactoryName.trim().length() > 0) {
+ this.responseConnectionFactoryName = responseConnectionFactoryName.trim();
+ }
+ if (initialContextFactoryName != null && initialContextFactoryName.trim().length() > 0) {
+ this.initialContextFactoryName = initialContextFactoryName.trim();
+ }
+ if (jndiURL != null) {
+ this.jndiURL = jndiURL.trim();
+ }
+ }
+
+ /*
+ * This is a simple implementation where a connection is created per binding Ideally the resource factory should be
+ * able to leverage the host environment to provide connection pooling if it can. E.g. if Tuscany is running inside
+ * an AppServer Then we could leverage the JMS resources it provides
+ *
+ * @see org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory#getConnection()
+ */
+ public Connection getConnection() throws NamingException, JMSException {
+ if (connection == null) {
+ createConnection();
+ }
+ return connection;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory#createSession()
+ */
+ public Session createSession() throws JMSException, NamingException {
+ return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory#closeSession(javax.jms.Session)
+ */
+ public void closeSession(Session session) throws JMSException {
+ session.close();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory#startConnection()
+ */
+ public void startConnection() throws JMSException, NamingException {
+ if (!isConnectionStarted) {
+ getConnection().start();
+ isConnectionStarted = true;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory#closeConnection()
+ */
+ public void closeConnection() throws JMSException {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (JMSException e) {
+ // if using an embedded broker then when shutting down Tuscany the broker may get closed
+ // before this stop method is called. I can't see how to detect that so for now just
+ // ignore the exception if the message is that the transport is already disposed
+ if (!e.getMessage().contains("disposed")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ protected void createConnection() throws NamingException, JMSException {
+ ConnectionFactory connectionFactory = (ConnectionFactory)jndiLookUp(connectionFactoryName);
+ if (connectionFactory == null) {
+ throw new JMSBindingException("connection factory not found: " + connectionFactoryName);
+ }
+ connection = connectionFactory.createConnection();
+ }
+
+ protected synchronized Context getInitialContext() throws NamingException {
+ if (context == null) {
+ Properties props = new Properties();
+
+ if (initialContextFactoryName != null) {
+ props.setProperty(Context.INITIAL_CONTEXT_FACTORY, initialContextFactoryName);
+ }
+ if (jndiURL != null) {
+ props.setProperty(Context.PROVIDER_URL, jndiURL);
+ }
+
+ initJREEnvironment(props);
+
+ context = new InitialContext(props);
+ }
+ return context;
+ }
+
+ /**
+ * If using the WAS JMS Client with a non-IBM JRE then an additional
+ * environment property needs to be set to initialize the ORB correctly.
+ * See: http://www-1.ibm.com/support/docview.wss?uid=swg24012804
+ */
+ protected void initJREEnvironment(Properties props) {
+ if ("com.ibm.websphere.naming.WsnInitialContextFactory".equals(props.get(Context.INITIAL_CONTEXT_FACTORY))) {
+ String vendor = System.getProperty("java.vendor");
+ if (vendor == null || !vendor.contains("IBM")) {
+ props.setProperty("com.ibm.CORBA.ORBInit", "com.ibm.ws.sib.client.ORB");
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory#lookupDestination(java.lang.String)
+ */
+ public Destination lookupDestination(String destName) throws NamingException {
+ if (destName == null) {
+ return null;
+ }
+
+ Destination dest = (Destination)jndiLookUp(destName);
+ if (dest == null) {
+ dest = lookupPhysical(destName);
+ }
+ return dest;
+ }
+
+ protected Destination lookupPhysical(String jndiName) {
+
+ // TODO: the SCA JMS spec says a destination name may be a non-jndi plain destination name
+
+// Session session = null;
+// try {
+//
+// Destination dest;
+// session = createSession();
+// dest = session.createQueue(jndiName);
+// return dest;
+//
+// } catch (JMSException e) {
+// throw new JMSBindingException(e);
+// } catch (NamingException e) {
+// throw new JMSBindingException(e);
+// } finally {
+// if (session != null) {
+// try {
+// session.close();
+// } catch (JMSException e) {
+// throw new JMSBindingException(e);
+// }
+// }
+// }
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory#createDestination(java.lang.String)
+ */
+ public Destination createDestination(String jndiName) throws NamingException {
+ return lookupDestination("dynamicQueues/" + jndiName);
+ }
+
+ protected Object jndiLookUp(String name) {
+ Object o = null;
+ try {
+ o = getInitialContext().lookup("java:comp/env/" + name);
+ } catch (Exception ex) {
+ // ignore
+ }
+ if (o == null) {
+ try {
+ o = getInitialContext().lookup(name);
+ } catch (NamingException ex) {
+ // ignore
+ }
+ }
+ return o;
+ }
+
+ public Session createResponseSession() throws JMSException, NamingException {
+ return getResponseConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void closeResponseSession(Session session) throws JMSException {
+ session.close();
+ }
+
+ public Connection getResponseConnection() throws NamingException, JMSException {
+ if (responseConnection == null) {
+ if (responseConnectionFactoryName != null) {
+ ConnectionFactory connectionFactory = (ConnectionFactory)jndiLookUp(responseConnectionFactoryName);
+ if (connectionFactory == null) {
+ throw new JMSBindingException("connection factory not found: " + responseConnectionFactoryName);
+ }
+ responseConnection = connectionFactory.createConnection();
+ } else {
+ // if no response connection is defined in the SCDL use the request connection
+ responseConnection = getConnection();
+ }
+ }
+ return responseConnection;
+ }
+
+ public void closeResponseConnection() throws JMSException {
+ if (responseConnection != null && !responseConnection.equals(connection)) {
+ try {
+ responseConnection.close();
+ } catch (JMSException e) {
+ // if using an embedded broker then when shutting down Tuscany the broker may get closed
+ // before this stop method is called. I can't see how to detect that so for now just
+ // ignore the exception if the message is that the transport is already disposed
+ if (!e.getMessage().contains("disposed")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ public boolean isConnectionClosedAfterUse() {
+ // It is assumed this resource factory is used in an environment
+ // where the connection can be held for the life of the binding.
+ return false;
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/ObjectMessageProcessor.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/ObjectMessageProcessor.java
new file mode 100644
index 0000000000..331c63f20f
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/ObjectMessageProcessor.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.logging.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.osoa.sca.ServiceRuntimeException;
+
+/**
+ * MessageProcessor for sending/receiving Serializable objects with the JMSBinding.
+ *
+ */
+public class ObjectMessageProcessor extends AbstractMessageProcessor {
+ private static final Logger logger = Logger.getLogger(ObjectMessageProcessor.class.getName());
+
+ public ObjectMessageProcessor(JMSBinding jmsBinding) {
+ super(jmsBinding);
+ }
+
+ @Override
+ protected Message createJMSMessage(Session session, Object o) {
+ if (session == null) {
+ logger.fine("no response session to create message: " + String.valueOf(o));
+ return null;
+ }
+ try {
+
+ ObjectMessage message = session.createObjectMessage();
+
+ if (o != null){
+ if (!(o instanceof Serializable)) {
+ throw new IllegalStateException("JMS ObjectMessage payload not Serializable: " + o);
+ }
+
+ message.setObject((Serializable)o);
+ }
+
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ @Override
+ public Object extractPayloadFromJMSMessage(Message msg) {
+ try {
+ Object o = ((ObjectMessage)msg).getObject();
+ if (o instanceof Throwable ) {
+ if (o instanceof RuntimeException) {
+ throw new ServiceRuntimeException("remote service exception, see nested exception", (RuntimeException)o);
+ } else {
+ return new InvocationTargetException((Throwable) o);
+ }
+ }
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ return extractPayload(msg);
+ }
+
+ @Override
+ protected Object extractPayload(Message msg) {
+ try {
+
+ return ((ObjectMessage)msg).getObject();
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ // special methods for handling operations with single parameters
+
+ public Message createJMSMessageForSingleParamOperation(Session session, Object o, boolean wrapSingleInput) {
+ if (session == null) {
+ logger.fine("no response session to create message: " + String.valueOf(o));
+ return null;
+ }
+ try {
+
+ ObjectMessage message = session.createObjectMessage();
+
+ if (o != null) {
+ if (!(o instanceof Serializable)) {
+ throw new IllegalStateException("JMS ObjectMessage payload not Serializable: " + o);
+ }
+
+ // If the user has specifically requests that single parameters
+ // be wrapped then leave is as is as it will have already been
+ // wrapped by Tuscany. Otherwise unwrap it.
+ if (wrapSingleInput) {
+ message.setObject((Serializable) o);
+ } else { // unwrap from array
+ message.setObject((Serializable) ((Object[]) o)[0]);
+ }
+
+ }
+
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ public Object extractPayloadFromJMSMessageForSingleParamOperation(Message msg, Class<?> argType, boolean wrapSingle) {
+ // We always have a one arg operation if this method is called so we need to
+ // decide if the data on the wire is wrapped or not. This is the algorithm.
+ //
+ // If the payload is null then create an empty array and pass it on
+ // If the payload is not an array then it must represent an unwrapped
+ // single arg. Wrap it up and pass it on
+ // If the payload is an array then determine if it's a wrapped single arg or not
+ // If the service interface arg type matches the type of the array and not it's contents
+ // then it's an unwrapped argument so wrap it and pass it on
+ // If the service interface arg type matches the type of the contents and not the type
+ // of the array then the parameter is already wrapped so pass it on as is
+ // If the service interface arg type matches both the type of the
+ // array and the type of its contents then assume that the whole array is the
+ // parameter and decide whether to unwrap it or pass it on as is based on the
+ // setting of the wrapSingle attribute
+ //
+
+ try {
+ Object payload = ((ObjectMessage) msg).getObject();
+
+ if (payload instanceof Throwable) {
+ if (payload instanceof RuntimeException) {
+ throw new ServiceRuntimeException("remote service exception, see nested exception", (RuntimeException) payload);
+ } else {
+ return new InvocationTargetException((Throwable) payload);
+ }
+ }
+
+ if (payload == null) {
+ // methodA(null) was not wrapped on wire so wrap it here in order
+ // that it passes through the rest of the Tuscany wire successfully
+ return new Object[] { payload };
+ }
+
+ boolean payloadIsArray = payload.getClass().isArray();
+
+ // Non-array payload is single arg
+ if (!payloadIsArray) {
+ // methodB(arg) wasn't wrapped on wire so wrap it here in order
+ // that it passes through the rest of the Tuscany wire successfully
+ return new Object[] { payload };
+ } else {
+ int size = ((Object[]) payload).length;
+
+ // An initial quick check to determine whether the payload is not
+ // wrapped. If the array has anything other than a single entry
+ // then it's not the result of reference side wrapping so wrap it
+ // here and pass it on
+ if (size != 1) {
+ return new Object[] { payload };
+ }
+
+ // we know the array has only one entry now so get it
+ Object arrayContents = ((Object[]) payload)[0];
+
+ // Is the operation argument the same type as the array itself?
+ if (argType.isAssignableFrom(payload.getClass())) {
+
+ // So we believe that the whole array is the argument but need
+ // to check what is in the array to be sure
+ if (arrayContents == null) {
+ // There is nothing in the array so it could be an accident that
+ // the array type matches the argument type, e.g. op(Object)
+ // so rely on the wrapSingle setting to choose
+ if (wrapSingle) {
+ return payload;
+ } else {
+ return new Object[] { payload };
+ }
+ } else if (argType.isAssignableFrom(arrayContents.getClass())) {
+ // We can't tell as the argument type matches both the array type and
+ // the array contents type so use the wrapSingle setting to choose
+ if (wrapSingle) {
+ return payload;
+ } else {
+ return new Object[] { payload };
+ }
+ } else {
+ // So by now we know the whole array is intended to be the
+ // parameter to wrap it and send it on
+ return new Object[] { payload };
+ }
+
+ } else {
+ // The array type doesn't match the argument type so assume that the
+ // array contents will match the argument type and that hence the
+ // parameter is already wrapped so just send it as is. If the contents
+ // type doesn't match the argument type a exception will be thrown further
+ // along the wire
+ return payload;
+ }
+ }
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java
new file mode 100644
index 0000000000..ee9501bf89
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingInvoker.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.lang.reflect.InvocationTargetException;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.interfacedef.util.FaultException;
+import org.apache.tuscany.sca.invocation.DataExchangeSemantics;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
+import org.apache.tuscany.sca.runtime.RuntimeWire;
+import org.osoa.sca.ServiceRuntimeException;
+
+/**
+ * Invoker for the JMS binding.
+ *
+ * @version $Rev$ $Date$
+ */
+public class RRBJMSBindingInvoker implements Invoker, DataExchangeSemantics {
+
+ protected Operation operation;
+ protected String operationName;
+
+ protected JMSBinding jmsBinding;
+ protected JMSResourceFactory jmsResourceFactory;
+ protected Destination bindingRequestDest;
+ protected Destination bindingReplyDest;
+ protected RuntimeComponentReference reference;
+ protected RuntimeWire runtimeWire;
+
+ public RRBJMSBindingInvoker(JMSBinding jmsBinding, Operation operation, JMSResourceFactory jmsResourceFactory, RuntimeComponentReference reference) {
+
+ this.operation = operation;
+ operationName = operation.getName();
+
+ this.jmsBinding = jmsBinding;
+ this.jmsResourceFactory = jmsResourceFactory;
+ this.reference = reference;
+ this.runtimeWire = reference.getRuntimeWire(jmsBinding);
+
+ try {
+ // If this is a callback reference, the destination is determined dynamically based on
+ // properties of the inbound service request. We should not look for or require a
+ // statically-configured destination unless a message is received that does not have
+ // the necessary properties.
+ if (!reference.isCallback()) {
+ bindingRequestDest = lookupDestination();
+ }
+ bindingReplyDest = lookupResponseDestination();
+ } catch (NamingException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ /**
+ * Looks up the Destination Queue for the JMS Binding
+ *
+ * @return The Destination Queue
+ * @throws NamingException Failed to lookup Destination Queue
+ * @throws JMSBindingException Failed to lookup Destination Queue
+ * @see #lookupDestinationQueue(boolean)
+ */
+ protected Destination lookupDestination() throws NamingException, JMSBindingException {
+ return lookupDestinationQueue(false);
+ }
+
+ /**
+ * Looks up the Destination Response Queue for the JMS Binding
+ *
+ * @return The Destination Response Queue
+ * @throws NamingException Failed to lookup Destination Response Queue
+ * @throws JMSBindingException Failed to lookup Destination Response Queue
+ * @see #lookupDestinationQueue(boolean)
+ */
+ protected Destination lookupResponseDestination() throws NamingException, JMSBindingException {
+ return lookupDestinationQueue(true);
+ }
+
+ /**
+ * Looks up the Destination Queue for the JMS Binding.
+ * <p>
+ * What happens in the look up will depend on the create mode specified for the JMS Binding:
+ * <ul>
+ * <li>always - the JMS queue is always created. It is an error if the queue already exists
+ * <li>ifnotexist - the JMS queue is created if it does not exist. It is not an error if the queue already exists
+ * <li>never - the JMS queue is never created. It is an error if the queue does not exist
+ * </ul>
+ * See the SCA JMS Binding specification for more information.
+ * <p>
+ *
+ * @param isReponseQueue <code>true</code> if we are creating a response queue.
+ * <code>false</code> if we are creating a request queue
+ * @return The Destination queue.
+ * @throws NamingException Failed to lookup JMS queue
+ * @throws JMSBindingException Failed to lookup JMS Queue. Probable cause is that
+ * the JMS queue's current existence/non-existence is not compatible with
+ * the create mode specified on the binding
+ */
+ protected Destination lookupDestinationQueue(boolean isReponseQueue) throws NamingException, JMSBindingException {
+ String queueName;
+ String queueType;
+ String qCreateMode;
+
+ if (isReponseQueue) {
+ queueName = jmsBinding.getResponseDestinationName();
+ queueType = "JMS Response Destination ";
+ qCreateMode = jmsBinding.getResponseDestinationCreate();
+ if (queueName == null) {
+ return null;
+ }
+ } else {
+ queueName = jmsBinding.getDestinationName();
+ queueType = "JMS Destination ";
+ qCreateMode = jmsBinding.getDestinationCreate();
+ }
+
+ Destination dest = jmsResourceFactory.lookupDestination(queueName);
+
+ if (qCreateMode.equals(JMSBindingConstants.CREATE_ALWAYS)) {
+ // In this mode, the queue must not already exist as we are creating it
+ if (dest != null) {
+ throw new JMSBindingException(queueType + queueName
+ + " already exists but has create mode of \""
+ + qCreateMode
+ + "\" while registering binding "
+ + jmsBinding.getName()
+ + " invoker");
+ }
+ // Create the queue
+ dest = jmsResourceFactory.createDestination(queueName);
+
+ } else if (qCreateMode.equals(JMSBindingConstants.CREATE_IF_NOT_EXIST)) {
+ // In this mode, the queue may nor may not exist. It will be created if it does not exist
+ if (dest == null) {
+ dest = jmsResourceFactory.createDestination(queueName);
+ }
+
+ } else if (qCreateMode.equals(JMSBindingConstants.CREATE_NEVER)) {
+ // In this mode, the queue must have already been created.
+ if (dest == null) {
+ throw new JMSBindingException(queueType + queueName
+ + " not found but create mode of \""
+ + qCreateMode
+ + "\" while registering binding "
+ + jmsBinding.getName()
+ + " invoker");
+ }
+ }
+
+ // Make sure we ended up with a queue
+ if (dest == null) {
+ throw new JMSBindingException(queueType + queueName
+ + " not found with create mode of \""
+ + qCreateMode
+ + "\" while registering binding "
+ + jmsBinding.getName()
+ + " invoker");
+ }
+
+ return dest;
+ }
+
+ public org.apache.tuscany.sca.invocation.Message invoke(org.apache.tuscany.sca.invocation.Message tuscanyMsg) {
+ try {
+ // populate the message context with JMS binding information
+ JMSBindingContext context = new JMSBindingContext();
+ context.setJmsResourceFactory(jmsResourceFactory);
+ tuscanyMsg.setBindingContext(context);
+
+ // get a jms session to cover the creation and sending of the message
+ Session session = context.getJmsSession();
+
+ context.setRequestDestination(getRequestDestination(tuscanyMsg, session));
+ context.setReplyToDestination(getReplyToDestination(session));
+
+ try {
+ tuscanyMsg = runtimeWire.getBindingInvocationChain().getHeadInvoker().invoke(tuscanyMsg);
+ } catch (ServiceRuntimeException e) {
+ if (e.getCause() instanceof InvocationTargetException) {
+ if ((e.getCause().getCause() instanceof RuntimeException)) {
+ tuscanyMsg.setFaultBody(e.getCause());
+ } else {
+ tuscanyMsg.setFaultBody(((InvocationTargetException)e.getCause()).getTargetException());
+ }
+ } else if (e.getCause() instanceof FaultException) {
+ tuscanyMsg.setFaultBody(e.getCause());
+ } else {
+ tuscanyMsg.setFaultBody(e);
+ }
+ } catch (IllegalStateException e) {
+ tuscanyMsg.setFaultBody(e);
+ } catch (Throwable e) {
+ tuscanyMsg.setFaultBody(e);
+ } finally {
+ context.closeJmsSession();
+ if (jmsResourceFactory.isConnectionClosedAfterUse()) {
+ jmsResourceFactory.closeConnection();
+ }
+ }
+
+ return tuscanyMsg;
+ } catch (Exception e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ protected Destination getRequestDestination(org.apache.tuscany.sca.invocation.Message tuscanyMsg, Session session) throws JMSBindingException, NamingException, JMSException {
+ Destination requestDestination;
+ if (reference.isCallback()) {
+ String toURI = tuscanyMsg.getTo().getURI();
+ if (toURI != null && toURI.startsWith("jms:")) {
+ // the msg to uri contains the callback destination name
+ // this is an jms physical name not a jndi name so need to use session.createQueue
+ requestDestination = session.createQueue(toURI.substring(4));
+ } else {
+ requestDestination = lookupDestination();
+ }
+ } else {
+ requestDestination = bindingRequestDest;
+ }
+
+ return requestDestination;
+ }
+
+ protected Destination getReplyToDestination(Session session) throws JMSException, JMSBindingException, NamingException {
+ Destination replyToDest;
+ if (operation.isNonBlocking()) {
+ replyToDest = null;
+ } else {
+ if (bindingReplyDest != null) {
+ replyToDest = bindingReplyDest;
+ } else {
+ replyToDest = session.createTemporaryQueue();
+ }
+ }
+ return replyToDest;
+ }
+
+ public boolean allowsPassByReference() {
+ // JMS always pass by value
+ return true;
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/TextMessageProcessor.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/TextMessageProcessor.java
new file mode 100644
index 0000000000..c1b5e6562d
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/TextMessageProcessor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.util.logging.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+
+/**
+ * MessageProcessor for sending/receiving javax.jms.TextMessage with the JMSBinding.
+ *
+ * @version $Rev$ $Date$
+ */
+public class TextMessageProcessor extends AbstractMessageProcessor {
+ private static final Logger logger = Logger.getLogger(TextMessageProcessor.class.getName());
+
+ public TextMessageProcessor(JMSBinding jmsBinding) {
+ super(jmsBinding);
+ }
+
+ @Override
+ protected Object extractPayload(Message msg) {
+ try {
+
+ if (!(msg instanceof TextMessage)) {
+ throw new IllegalStateException("expecting JMS TextMessage: " + msg);
+ }
+
+ return ((TextMessage)msg).getText();
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ @Override
+ protected Message createJMSMessage(Session session, Object o) {
+ if (session == null) {
+ logger.fine("no response session to create message: " + String.valueOf(o));
+ return null;
+ }
+ try {
+
+ TextMessage message = session.createTextMessage();
+
+ if (o != null){
+ message.setText(String.valueOf(o));
+ }
+
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/XMLBytesMessageProcessor.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/XMLBytesMessageProcessor.java
new file mode 100644
index 0000000000..5a9e82b7da
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/XMLBytesMessageProcessor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.io.ByteArrayInputStream;
+import java.util.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.apache.tuscany.sca.interfacedef.util.FaultException;
+
+/**
+ * MessageProcessor for sending/receiving XML javax.jms.BytesMessage with the JMSBinding.
+ */
+public class XMLBytesMessageProcessor extends AbstractMessageProcessor {
+ private static final Logger logger = Logger.getLogger(XMLBytesMessageProcessor.class.getName());
+
+ public XMLBytesMessageProcessor(JMSBinding jmsBinding) {
+ super(jmsBinding);
+ }
+
+ @Override
+ protected Object extractPayload(Message msg) {
+ try {
+
+ if (!(msg instanceof BytesMessage)) {
+ throw new IllegalStateException("expecting JMS BytesMessage: " + msg);
+ }
+
+ long noOfBytes = ((BytesMessage)msg).getBodyLength();
+ byte [] bytes = new byte[(int)noOfBytes];
+ ((BytesMessage)msg).readBytes(bytes);
+ ((BytesMessage)msg).reset();
+
+ Object os;
+ if (noOfBytes > 0) {
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new ByteArrayInputStream(bytes));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ os = builder.getDocumentElement();
+ } else {
+ os = null;
+ }
+ return os;
+ } catch (XMLStreamException e) {
+ throw new JMSBindingException(e);
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ @Override
+ public Object extractPayloadFromJMSMessage(Message msg) {
+ if (msg instanceof BytesMessage) {
+ return extractPayload(msg);
+ } else {
+ return super.extractPayloadFromJMSMessage(msg);
+ }
+ }
+
+ @Override
+ protected Message createJMSMessage(Session session, Object o) {
+ if (session == null) {
+ logger.fine("no response session to create message: " + String.valueOf(o));
+ return null;
+ }
+ try {
+ BytesMessage message = session.createBytesMessage();
+
+ if (o instanceof OMElement) {
+ message.writeBytes(o.toString().getBytes());
+ } else if ((o instanceof Object[]) && ((Object[])o)[0] instanceof OMElement) {
+ message.writeBytes(((Object[])o)[0].toString().getBytes());
+ } else if (o != null) {
+ throw new IllegalStateException("expecting OMElement payload: " + o);
+ }
+
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ @Override
+ public Message createFaultMessage(Session session, Throwable o) {
+
+ if (session == null) {
+ logger.fine("no response session to create fault message: " + String.valueOf(o));
+ return null;
+ }
+ if (o instanceof FaultException) {
+ try {
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(String.valueOf(((FaultException) o).getFaultInfo()).getBytes());
+ message.setBooleanProperty(JMSBindingConstants.FAULT_PROPERTY, true);
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ } else {
+ return super.createFaultMessage(session, o);
+ }
+ }
+
+}
diff --git a/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/XMLTextMessageProcessor.java b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/XMLTextMessageProcessor.java
new file mode 100644
index 0000000000..8794f80126
--- /dev/null
+++ b/tags/java/sca/1.5.1/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/XMLTextMessageProcessor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.jms.provider;
+
+import java.io.StringReader;
+import java.util.logging.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants;
+import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException;
+import org.apache.tuscany.sca.interfacedef.util.FaultException;
+
+/**
+ * MessageProcessor for sending/receiving XML javax.jms.TextMessage with the JMSBinding.
+ *
+ * @version $Rev$ $Date$
+ */
+public class XMLTextMessageProcessor extends AbstractMessageProcessor {
+ private static final Logger logger = Logger.getLogger(XMLTextMessageProcessor.class.getName());
+
+ public XMLTextMessageProcessor(JMSBinding jmsBinding) {
+ super(jmsBinding);
+ }
+
+ @Override
+ protected Object extractPayload(Message msg) {
+ try {
+
+ String xml = ((TextMessage)msg).getText();
+ Object os;
+ if (xml != null) {
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(xml));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ os = builder.getDocumentElement();
+ } else {
+ os = null;
+ }
+ return os;
+
+ } catch (XMLStreamException e) {
+ throw new JMSBindingException(e);
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ @Override
+ public Object extractPayloadFromJMSMessage(Message msg) {
+ if (msg instanceof TextMessage) {
+ return extractPayload(msg);
+ } else {
+ return super.extractPayloadFromJMSMessage(msg);
+ }
+ }
+
+ @Override
+ protected Message createJMSMessage(Session session, Object o) {
+ if (session == null) {
+ logger.fine("no response session to create message: " + String.valueOf(o));
+ return null;
+ }
+ try {
+
+ TextMessage message = session.createTextMessage();
+
+ if (o instanceof OMElement) {
+ message.setText(o.toString());
+ } else if ((o instanceof Object[]) && ((Object[])o)[0] instanceof OMElement) {
+ message.setText(((Object[])o)[0].toString());
+ } else if (o != null) {
+ throw new IllegalStateException("expecting OMElement payload: " + o);
+ }
+
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ }
+
+ @Override
+ public Message createFaultMessage(Session session, Throwable o) {
+ if (session == null) {
+ logger.fine("no response session to create fault message: " + String.valueOf(o));
+ return null;
+ }
+ if (o instanceof FaultException) {
+ try {
+
+ TextMessage message = session.createTextMessage();
+ message.setText(String.valueOf(((FaultException)o).getFaultInfo()));
+ message.setBooleanProperty(JMSBindingConstants.FAULT_PROPERTY, true);
+ return message;
+
+ } catch (JMSException e) {
+ throw new JMSBindingException(e);
+ }
+ } else {
+ return super.createFaultMessage(session, o);
+ }
+ }
+
+}