diff options
Diffstat (limited to '')
9 files changed, 1045 insertions, 0 deletions
diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java new file mode 100644 index 0000000000..c6e3b799f4 --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingInvoker.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.provider; + +import java.lang.reflect.InvocationTargetException; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; + +/** + * Interceptor for the JMS binding. + * + * @version $Rev$ $Date$ + */ +public class JMSBindingInvoker implements Invoker { + + protected Operation operation; + protected String operationName; + + protected JMSBinding jmsBinding; + protected JMSResourceFactory jmsResourceFactory; + protected JMSMessageProcessor requestMessageProcessor; + protected JMSMessageProcessor responseMessageProcessor; + protected Destination requestDest; + protected Destination replyDest; + + public JMSBindingInvoker(JMSBinding jmsBinding, + Operation operation ) { + + this.operation = operation; + operationName = operation.getName(); + + this.jmsBinding = jmsBinding; + jmsResourceFactory = jmsBinding.getJmsResourceFactory(); + requestMessageProcessor = jmsBinding.getRequestMessageProcessor(); + responseMessageProcessor = jmsBinding.getResponseMessageProcessor(); + try { + requestDest = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName()); + + if (requestDest == null){ + if (jmsBinding.getDestinationCreate().equals(JMSBindingConstants.CREATE_ALLWAYS)) { + requestDest = jmsResourceFactory.createDestination(jmsBinding.getDestinationName()); + } else { + throw new JMSBindingException("JMS Destination " + + jmsBinding.getDestinationName() + + " not found while registering binding " + + jmsBinding.getName() + + " invoker"); + } + } + + replyDest = jmsResourceFactory.lookupDestination(jmsBinding.getResponseDestinationName()); + + if (replyDest == null){ + if (jmsBinding.getResponseDestinationCreate().equals(JMSBindingConstants.CREATE_ALLWAYS)) { + replyDest = jmsResourceFactory.createDestination(jmsBinding.getResponseDestinationName()); + } else { + throw new JMSBindingException("JMS Response Destination " + + jmsBinding.getDestinationName() + + " not found while registering binding " + + jmsBinding.getName() + + " invoker"); + } + } + } catch (NamingException e) { + throw new JMSBindingException(e); + } + + } + + public org.apache.tuscany.sca.invocation.Message invoke(org.apache.tuscany.sca.invocation.Message msg) { + try { + Object resp = invokeTarget((Object[])msg.getBody(), (short)0); + msg.setBody(resp); + } catch (InvocationTargetException e) { + msg.setFaultBody(e.getCause()); + } catch (Throwable e) { + msg.setFaultBody(e); + } + return msg; + } + + public Object invokeTarget(Object payload, final short sequence) throws InvocationTargetException { + try { + Session session = jmsResourceFactory.createSession(); + try { + + Destination replyToDest = (replyDest != null) ? replyDest : session.createTemporaryQueue(); + + Message requestMsg = sendRequest((Object[])payload, session, replyToDest); + Message replyMsg = receiveReply(session, replyToDest, requestMsg.getJMSMessageID()); + + return responseMessageProcessor.extractPayloadFromJMSMessage(replyMsg); + + } finally { + session.close(); + } + } catch (JMSException e) { + throw new InvocationTargetException(e); + } catch (NamingException e) { + throw new InvocationTargetException(e); + } + } + + public void stop() throws NamingException, JMSException{ + jmsResourceFactory.closeConnection(); + } + + protected Message sendRequest(Object[] payload, Session session, Destination replyToDest) throws JMSException { + + Message requestMsg = requestMessageProcessor.insertPayloadIntoJMSMessage(session, payload); + + requestMsg.setJMSDeliveryMode(jmsBinding.getDeliveryMode()); + requestMsg.setJMSPriority(jmsBinding.getPriority()); + + requestMessageProcessor.setOperationName(operationName, requestMsg); + requestMsg.setJMSReplyTo(replyToDest); + + MessageProducer producer = session.createProducer(requestDest); + try { + producer.send(requestMsg); + } finally { + producer.close(); + } + return requestMsg; + } + + protected Message receiveReply(Session session, Destination replyToDest, String requestMsgId) throws JMSException, + NamingException { + String msgSelector = "JMSCorrelationID = '" + requestMsgId + "'"; + MessageConsumer consumer = session.createConsumer(replyToDest, msgSelector); + Message replyMsg; + try { + jmsResourceFactory.startConnection(); + replyMsg = consumer.receive(jmsBinding.getTimeToLive()); + } finally { + consumer.close(); + } + return replyMsg; + } + +} diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java new file mode 100644 index 0000000000..418a6467bd --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingListener.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.provider; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.core.invocation.MessageFactoryImpl; +import org.apache.tuscany.sca.core.invocation.ThreadMessageContext; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +public class JMSBindingListener implements MessageListener { + + + private JMSBinding jmsBinding; + private JMSResourceFactory jmsResourceFactory; + private RuntimeComponentService service; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private String correlationScheme; + private MessageFactory messageFactory; + //private Method operationMethod; + + public JMSBindingListener(JMSBinding jmsBinding, + JMSResourceFactory jmsResourceFactory, + RuntimeComponentService service) + throws NamingException { + this.jmsBinding = jmsBinding; + this.jmsResourceFactory = jmsResourceFactory; + this.service = service; + requestMessageProcessor = jmsBinding.getRequestMessageProcessor(); + responseMessageProcessor = jmsBinding.getResponseMessageProcessor(); + correlationScheme = jmsBinding.getCorrelationScheme(); + messageFactory = new MessageFactoryImpl(); + } + + public void onMessage(Message requestJMSMsg) { + try { + Object responsePayload = invokeService(requestJMSMsg); + sendReply(requestJMSMsg, responsePayload); + } catch (Exception e) { + sendFaultReply(requestJMSMsg, e); + } + } + + /** + * Turn the JMS message back into a Tuscany message and invoke the + * target component + * + * @param requestJMSMsg + * @return + * @throws JMSException + * @throws InvocationTargetException + */ + protected Object invokeService(Message requestJMSMsg) + throws JMSException, InvocationTargetException { + + String operationName = requestMessageProcessor.getOperationName(requestJMSMsg); + Object requestPayload = requestMessageProcessor.extractPayloadFromJMSMessage(requestJMSMsg); + + org.apache.tuscany.sca.invocation.Message requestMsg = messageFactory.createMessage(); + + requestMsg.setBody(requestPayload); + + org.apache.tuscany.sca.invocation.Message workContext = ThreadMessageContext.getMessageContext(); + ThreadMessageContext.setMessageContext(requestMsg); + + try { + /* TODO - work out how to do this bit + + if (isConversational() && conversationID != null) { + requestMsg.setConversationID(conversationID); + } else { + requestMsg.setConversationID(null); + } + */ + // get the operation object + List<Operation> opList = service.getInterfaceContract().getInterface().getOperations(); + + Operation operation = null; + + for(Operation op : opList){ + if ( op.getName().equals(operationName)) { + operation = op; + break; + } + } + + if ( operation != null ){ + + // get the component invoker + Invoker invoker = service.getInvoker(jmsBinding, operation); + + org.apache.tuscany.sca.invocation.Message responseMsg = invoker.invoke(requestMsg); + + if (responseMsg.isFault()) { + throw new InvocationTargetException((Throwable)responseMsg.getBody()); + } + return responseMsg.getBody(); + } else { + throw new JMSBindingException("Can't find operation " + operationName ); + } + + } finally { + ThreadMessageContext.setMessageContext(workContext); + } + } + + protected void sendReply(Message requestJMSMsg, Object responsePayload) { + try { + + if (requestJMSMsg.getJMSReplyTo() == null) { + // assume no reply is expected + return; + } + + Session session = jmsResourceFactory.createSession(); + Message replyJMSMsg = responseMessageProcessor.insertPayloadIntoJMSMessage(session, responsePayload); + + replyJMSMsg.setJMSDeliveryMode(requestJMSMsg.getJMSDeliveryMode()); + replyJMSMsg.setJMSPriority(requestJMSMsg.getJMSPriority()); + + if (correlationScheme == null || + JMSBindingConstants.CORRELATE_MSG_ID.equalsIgnoreCase(correlationScheme)) { + replyJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSMessageID()); + } else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) { + replyJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID()); + } + + Destination destination = requestJMSMsg.getJMSReplyTo(); + MessageProducer producer = session.createProducer(destination); + + producer.send(replyJMSMsg); + + producer.close(); + session.close(); + + } catch (JMSException e) { + throw new JMSBindingException(e); + } catch (NamingException e) { + throw new JMSBindingException(e); + } + } + + protected void sendFaultReply(Message requestJMSMsg, Exception e) { + sendReply(requestJMSMsg, new JMSBindingException("exception invoking JMS service", e)); + } + +} diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java new file mode 100644 index 0000000000..b61ccaaaee --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingProviderFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.jms.provider; + +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.provider.BindingProviderFactory; +import org.apache.tuscany.sca.provider.ReferenceBindingProvider; +import org.apache.tuscany.sca.provider.ServiceBindingProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + + + +/** + * A factory from creating the JMS binding provider. + * + * @version $Rev$ $Date$ + */ +public class JMSBindingProviderFactory implements BindingProviderFactory<JMSBinding> { + + public JMSBindingProviderFactory(ExtensionPointRegistry extensionPoints) { + } + + public ReferenceBindingProvider createReferenceBindingProvider(RuntimeComponent component, RuntimeComponentReference reference, JMSBinding binding) { + return new JMSBindingReferenceBindingProvider(component, reference, binding); + } + + public ServiceBindingProvider createServiceBindingProvider(RuntimeComponent component, RuntimeComponentService service, JMSBinding binding) { + return new JMSBindingServiceBindingProvider(component, service, binding); + } + + public Class<JMSBinding> getModelType() { + return JMSBinding.class; + } +} diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java new file mode 100644 index 0000000000..7c5b677745 --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingReferenceBindingProvider.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.jms.provider; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.interfacedef.InterfaceContract; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.provider.ReferenceBindingProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; + + +/** + * Implementation of the JMS reference binding provider. + * + * @version $Rev$ $Date$ + */ +public class JMSBindingReferenceBindingProvider implements ReferenceBindingProvider { + + private RuntimeComponentReference reference; + private JMSBinding jmsBinding; + private List<JMSBindingInvoker> jmsBindingInvokers = new ArrayList<JMSBindingInvoker>(); + + public JMSBindingReferenceBindingProvider(RuntimeComponent component, + RuntimeComponentReference reference, + JMSBinding binding) { + this.reference = reference; + this.jmsBinding = binding; + + } + + public Invoker createInvoker(Operation operation, boolean isCallback) { + + if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)){ + throw new JMSBindingException("No destination specified for reference " + + reference.getName()); + } + + if (jmsBinding.getResponseDestinationName().equals(JMSBindingConstants.DEFAULT_RESPONSE_DESTINATION_NAME)){ + throw new JMSBindingException("No response destination specified for reference " + + reference.getName()); + } +/* The following doesn't work as I can't get to the + * target list on the composite reference + // if the default destination queue name is set + // set the destination queue name to the wired service name + // so that any wires can be assured a unique endpoint. + + if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)){ + // get the name of the target service + List<ComponentService> targets = reference.getTargets(); + + if (targets.size() < 1){ + throw new JMSBindingException("No target specified for reference " + + reference.getName() + + " so destination queue name can't be determined"); + } + + if (targets.size() > 1){ + throw new JMSBindingException("More than one target specified for reference " + + reference.getName() + + " so destination queue name can't be determined"); + } + + ComponentService service = targets.get(0); + jmsBinding.setDestinationName(service.getName()); + } + + + // if the default response queue name is set + // set the response queue to the names of this + // reference + if (jmsBinding.getResponseDestinationName().equals(JMSBindingConstants.DEFAULT_RESPONSE_DESTINATION_NAME)){ + jmsBinding.setResponseDestinationName(reference.getName()); + } +*/ + if (isCallback) { + throw new UnsupportedOperationException(); + } else { + JMSBindingInvoker invoker = new JMSBindingInvoker(jmsBinding, + operation); + jmsBindingInvokers.add(invoker); + return invoker; + } + } + + public InterfaceContract getBindingInterfaceContract() { + return reference.getInterfaceContract(); + } + + public void start() { + + } + + public void stop() { + try { + for (JMSBindingInvoker invoker : jmsBindingInvokers) { + invoker.stop(); + + } + } catch (Exception e) { + throw new JMSBindingException("Error stopping JMSReferenceBinding", e); + } + } + +} diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java new file mode 100644 index 0000000000..f6f34324f2 --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.jms.provider; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.interfacedef.InterfaceContract; +import org.apache.tuscany.sca.provider.ServiceBindingProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +/** + * Implementation of the JMS service binding provider. + * + * @version $Rev$ $Date$ + */ +public class JMSBindingServiceBindingProvider implements ServiceBindingProvider { + + + private RuntimeComponentService service; + private JMSBinding jmsBinding; + private JMSResourceFactory jmsResourceFactory; + private MessageConsumer consumer; + + public JMSBindingServiceBindingProvider(RuntimeComponent component, + RuntimeComponentService service, + JMSBinding binding) { + this.service = service; + this.jmsBinding = binding; + + jmsResourceFactory = jmsBinding.getJmsResourceFactory(); + + // if the default destination queue names is set + // set the destinate queue name to the reference name + // so that any wires can be assured a unique endpoint. + if (jmsBinding.getDestinationName().equals(JMSBindingConstants.DEFAULT_DESTINATION_NAME)){ + //jmsBinding.setDestinationName(service.getName()); + throw new JMSBindingException("No destination specified for service " + + service.getName()); + } + + } + + public InterfaceContract getBindingInterfaceContract() { + return service.getInterfaceContract(); + } + + public void start() { + + try { + registerListerner(); + } catch (Exception e) { + throw new JMSBindingException("Error starting JMSServiceBinding", e); + } + } + + public void stop() { + try { + consumer.close(); + jmsResourceFactory.closeConnection(); + } catch (Exception e) { + throw new JMSBindingException("Error stopping JMSServiceBinding", e); + } + } + + private void registerListerner() throws NamingException, JMSException { + + Session session = jmsResourceFactory.createSession(); + Destination destination = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName()); + + if (destination == null){ + if (jmsBinding.getDestinationCreate().equals(JMSBindingConstants.CREATE_ALLWAYS)) { + destination = jmsResourceFactory.createDestination(jmsBinding.getDestinationName()); + } else { + throw new JMSBindingException("JMS Destination " + + jmsBinding.getDestinationName() + + "not found while registering service " + + service.getName() + + " listener"); + } + } + + consumer = session.createConsumer(destination); + + // TODO - We assume the target is a Java class here!!! + //Class<?> aClass = getTargetJavaClass(getBindingInterfaceContract().getInterface()); + // Object instance = component.createSelfReference(aClass).getService(); + + consumer.setMessageListener(new JMSBindingListener(jmsBinding, jmsResourceFactory, service)); + + jmsResourceFactory.startConnection(); + + } +} diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessor.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessor.java new file mode 100644 index 0000000000..f51b62b5ac --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.provider; + +import javax.jms.Message; +import javax.jms.Session; + +/** + * Interface for a component that does operation selection and message payload + * processing + */ +public interface JMSMessageProcessor { + + /** + * Get the operation name from a JMS Message + */ + public abstract String getOperationName(Message message); + + /** + * Set the operation name on a JMS Message + */ + public abstract void setOperationName(String operationName, Message message); + + /** + * Extracts the payload from a JMS Message + */ + public abstract Object extractPayloadFromJMSMessage(Message msg); + + /** + * Create a JMS Message containing the payload + */ + public abstract Message insertPayloadIntoJMSMessage(Session session, Object payload); +} diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessorImpl.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessorImpl.java new file mode 100644 index 0000000000..0a0a4cde2b --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSMessageProcessorImpl.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.provider; + +import java.io.Serializable; +import java.io.StringReader; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.impl.builder.StAXOMBuilder; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; + +public class JMSMessageProcessorImpl implements JMSMessageProcessor { + + protected String operationPropertyName; + protected boolean xmlFormat; + + public JMSMessageProcessorImpl(JMSBinding jmsBinding) { + this.operationPropertyName = jmsBinding.getOperationSelectorPropertyName(); + this.xmlFormat = jmsBinding.getXMLFormat(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.tuscany.binding.jms.OperationAndDataBinding#getOperationName(javax.jms.Message) + */ + public String getOperationName(Message message) { + try { + + return message.getStringProperty(operationPropertyName); + + } catch (JMSException e) { + throw new JMSBindingException("Exception retreiving operation name from message", e); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.tuscany.binding.jms.OperationAndDataBinding#setOperationName(javax.jms.Message, + * java.lang.String) + */ + public void setOperationName(String operationName, Message message) { + try { + + message.setStringProperty(operationPropertyName, operationName); + + } catch (JMSException e) { + throw new JMSBindingException("Exception setting the operation name on message", e); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.tuscany.binding.jms.OperationAndDataBinding#extractPayload(javax.jms.Session, + * java.lang.Object) + */ + public Message insertPayloadIntoJMSMessage(Session session, Object o) { + if (xmlFormat) { + return createXMLJMSMessage(session, o); + } else { + return createObjectJMSMessage(session, o); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.tuscany.binding.jms.OperationAndDataBinding#extractPayload(javax.jms.Message) + */ + public Object extractPayloadFromJMSMessage(Message msg) { + if (xmlFormat) { + return extractXMLPayload(msg); + } else { + return extractObjectPayload(msg); + } + } + + protected Object extractXMLPayload(Message msg) { + try { + + String xml = ((TextMessage)msg).getText(); + + XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(xml)); + StAXOMBuilder builder = new StAXOMBuilder(reader); + OMElement omElement = builder.getDocumentElement(); + + return new Object[] {omElement}; + + } catch (XMLStreamException e) { + throw new JMSBindingException(e); + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } + + protected Object extractObjectPayload(Message msg) { + try { + + return ((ObjectMessage)msg).getObject(); + + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } + + protected Message createXMLJMSMessage(Session session, Object o) { + try { + + TextMessage message = session.createTextMessage(); + + if (o instanceof OMElement) { + message.setText(o.toString()); + } else { + message.setText(((Object[])o)[0].toString()); + } + + return message; + + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } + + protected Message createObjectJMSMessage(Session session, Object o) { + try { + + ObjectMessage message = session.createObjectMessage(); // default + message.setObject((Serializable)o); + return message; + + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } +} diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactory.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactory.java new file mode 100644 index 0000000000..21a6847840 --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.provider; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.NamingException; + +/* + * Brings together the JMS binding description and the + * API used for generating and manageing JMS resources + */ + +public interface JMSResourceFactory { + + public abstract Connection getConnection() throws NamingException, JMSException; + + public abstract Session createSession() throws JMSException, NamingException; + + public abstract void startConnection() throws JMSException, NamingException; + + public abstract void closeConnection() throws JMSException, NamingException; + + public abstract Destination lookupDestination(String jndiName) throws NamingException; + + public abstract Destination createDestination(String jndiName) throws NamingException; +} diff --git a/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryActiveMQImpl.java b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryActiveMQImpl.java new file mode 100644 index 0000000000..8d9a0ef210 --- /dev/null +++ b/branches/sca-java-0.99/modules/binding-jms/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSResourceFactoryActiveMQImpl.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.provider; + +import java.util.Properties; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; + +/** + * Abstracts away any JMS provide specific feature from the JMS binding + * + * @version $Rev$ $Date$ + */ +public class JMSResourceFactoryActiveMQImpl implements JMSResourceFactory { + + private JMSBinding jmsBinding; + private Connection connection; + private Context context; + private boolean isConnectionStarted; + + public JMSResourceFactoryActiveMQImpl(JMSBinding jmsBinding) { + this.jmsBinding = jmsBinding; + } + + /* + * This is a simple implementation where a connection is created per binding + * Ideally the resource factory should be able to leverage the host + * environment to provide connection pooling if it can. E.g. if Tuscany is + * running inside an AppServer Then we could leverage the JMS resources it + * provides + * + * @see org.apache.tuscany.binding.jms.JMSResourceFactory#getConnection() + */ + public Connection getConnection() throws NamingException, JMSException { + if (connection == null) { + createConnection(); + } + return connection; + } + + /* + * (non-Javadoc) + * + * @see org.apache.tuscany.binding.jms.JMSResourceFactory#createSession() + */ + public Session createSession() throws JMSException, NamingException { + return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + /* + * (non-Javadoc) + * + * @see org.apache.tuscany.binding.jms.JMSResourceFactory#startConnection() + */ + public void startConnection() throws JMSException, NamingException { + if (!isConnectionStarted) { + getConnection().start(); + isConnectionStarted = true; + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.tuscany.binding.jms.JMSResourceFactory#closeConnection() + */ + public void closeConnection() throws JMSException, NamingException { + if (connection != null) { + connection.close(); + } + } + + private void createConnection() throws NamingException, JMSException { + if (context == null) { + createInitialContext(); + } + ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup(jmsBinding.getConnectionFactoryName()); + connection = connectionFactory.createConnection(); + } + + private void createInitialContext() throws NamingException { + Properties props = new Properties(); + props.setProperty(Context.INITIAL_CONTEXT_FACTORY, + jmsBinding.getInitialContextFactoryName().trim()); + props.setProperty(Context.PROVIDER_URL, + jmsBinding.getJndiURL().trim()); + + context = new InitialContext(props); + } + + public Destination lookupDestination(String jndiName) throws NamingException { + if (context == null) { + createInitialContext(); + } + + Destination dest = null; + + try { + dest = (Destination)context.lookup(jndiName); + } catch(NamingException ex){ + + } + return dest; + } + + /** + * You can create a destination in ActiveMQ (and have it appear in JNDI) + * by putting "dynamicQueues/" in front of the queue name being looked up + * + */ + public Destination createDestination(String jndiName) throws NamingException { + return lookupDestination("dynamicQueues/" + jndiName); + } +} |