diff options
Diffstat (limited to 'sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java')
3 files changed, 298 insertions, 0 deletions
diff --git a/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/JmsDiscoveryService.java b/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/JmsDiscoveryService.java new file mode 100644 index 0000000000..518d482245 --- /dev/null +++ b/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/JmsDiscoveryService.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.discovery.jms; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.tuscany.spi.services.discovery.AbstractDiscoveryService; +import org.apache.tuscany.spi.services.discovery.DiscoveryException; +import org.apache.tuscany.spi.util.stax.StaxUtil; +import org.osoa.sca.annotations.Property; + +/** + * JMS based implementation of the discovery service. This class uses + * ActiveMQ specific API instead of JNDI based administered objects. This can + * be changed later if required. + * + * @version $Revision$ $Date$ + */ +public class JmsDiscoveryService extends AbstractDiscoveryService { + + // Connection factory + private ConnectionFactory connectionFactory; + + // Underlying JMS connection + private Connection connection; + + // Session used for reception + private Session receiverSession; + + // Message consumer + private MessageConsumer messageConsumer; + + // Topic to use + private Topic topic; + + // String broker url + private String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL; + + /** + * Injects the topic used for communication. + * @param topic Topic used for communication. + */ + @Property + public void setTopic(String topic) { + this.topic = new ActiveMQTopic(topic); + } + + /** + * Injects the broker URL. + * @param brokerUrl Broker URL to use. + */ + @Property + public void setBrokerUrl(String brokerUrl) { + this.brokerUrl = brokerUrl; + } + + /** + * Starts the service and sets up the message listener. + */ + @Override + protected synchronized void onStart() throws DiscoveryException { + + String runtimeId = getRuntimeInfo().getRuntimeId(); + + try { + + connectionFactory = new ActiveMQConnectionFactory(brokerUrl); + + connection = connectionFactory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + public void onException(JMSException jmsException) { + // Try restarting: TODO this may need further refinement + try { + onStop(); + } catch (DiscoveryException ex) { + ex.printStackTrace(); + } + try { + onStart(); + } catch (DiscoveryException ex) { + ex.printStackTrace(); + } + } + }); + receiverSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + messageConsumer = receiverSession.createConsumer(topic); + final MessageListener messageListener = new TuscanyMessageListener(this, runtimeId); + messageConsumer.setMessageListener(messageListener); + connection.start(); + + } catch (JMSException ex) { + throw new DiscoveryException(ex); + } + + } + + /** + * Closes the connection. + */ + @Override + protected synchronized void onStop() throws DiscoveryException { + + try { + receiverSession.close(); + } catch (JMSException ex) { + throw new DiscoveryException(ex); + } finally { + try { + connection.close(); + } catch (JMSException ex) { + throw new DiscoveryException(ex); + } + } + + } + + /** + * Sends the message. + */ + public synchronized int sendMessage(String runtimeId, XMLStreamReader reader) throws DiscoveryException { + + try { + + String text = StaxUtil.serialize(reader); + Session senderSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = senderSession.createProducer(topic); + + TextMessage textMessage = senderSession.createTextMessage(text); + + textMessage.setStringProperty("runtimeId", runtimeId); + messageProducer.send(textMessage); + senderSession.commit(); + senderSession.close(); + + return 1; + + } catch (XMLStreamException ex) { + throw new DiscoveryException(ex); + } catch (JMSException ex) { + throw new DiscoveryException(ex); + } + } + +} diff --git a/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/TuscanyJmsException.java b/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/TuscanyJmsException.java new file mode 100644 index 0000000000..6b5f4066bf --- /dev/null +++ b/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/TuscanyJmsException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.discovery.jms; + +import org.apache.tuscany.api.TuscanyRuntimeException; + +/** + * @version $Revision$ $Date$ + */ +@SuppressWarnings("serial") +public class TuscanyJmsException extends TuscanyRuntimeException { + + /** + * Initializes the cause. + * @param th Root cause for the exception. + */ + public TuscanyJmsException(Throwable th) { + super(th); + } + +} diff --git a/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/TuscanyMessageListener.java b/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/TuscanyMessageListener.java new file mode 100644 index 0000000000..e34e1f6199 --- /dev/null +++ b/sca-java-1.x/branches/sca-java-0.99/modules/discovery-jms/src/main/java/org/apache/tuscany/service/discovery/jms/TuscanyMessageListener.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.discovery.jms; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.tuscany.spi.services.discovery.RequestListener; +import org.apache.tuscany.spi.util.stax.StaxUtil; + +/** + * Tuscany JMS message listsner. + * + * @version $Revision$ $Date$ + */ +public class TuscanyMessageListener implements MessageListener { + + // Discovery service + private JmsDiscoveryService discoveryService; + + // Runtime id + private String runtimeId; + + /** + * Initializes the discovery service. + * @param discoveryService Discovery service. + */ + public TuscanyMessageListener(JmsDiscoveryService discoveryService, String runtimeId) { + this.discoveryService = discoveryService; + this.runtimeId = runtimeId; + } + + /** + * Message listener callback. + */ + public void onMessage(Message message) { + + try { + + // TODO investigate why selectors are not working + if(!runtimeId.equals(message.getStringProperty("runtimeId"))) { + return; + } + + final TextMessage textMessage = (TextMessage)message; + final String text = textMessage.getText(); + + final QName messageType = StaxUtil.getDocumentElementQName(text); + System.err.println("Message received: " + messageType); + + RequestListener messageListener = discoveryService.getRequestListener(messageType); + if (messageListener != null) { + XMLStreamReader requestReader = StaxUtil.createReader(text); + messageListener.onRequest(requestReader); + } + + } catch (JMSException ex) { + throw new TuscanyJmsException(ex); + } catch (XMLStreamException ex) { + throw new TuscanyJmsException(ex); + } + + } + +} |