Event prototype: event binding with JMS-based default binding

git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@713362 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
rsivaram 2008-11-12 13:16:06 +00:00
parent 29f3940a57
commit af0cfc4995
35 changed files with 1170 additions and 66 deletions

View file

@ -32,5 +32,14 @@ public interface EventBindingFactory {
* @return a new Event binding
*/
EventBinding createEventBinding();
/**
* Create a new Event binding with the provided base binding. If the base binding specified
* is an event binding, return that event binding.
*
* @param baseBinding The base binding for this event binding
* @return a new Event binding
*/
EventBinding createEventBinding(Binding baseBinding);
}

View file

@ -1372,11 +1372,12 @@ public abstract class BaseConfigurationBuilderImpl {
reference.getPolicySets().addAll(producer.getPolicySets());
reference.getRequiredIntents().addAll(producer.getRequiredIntents());
for (Binding binding : producer.getBindings()) {
EventBinding eventBinding = eventBindingFactory.createEventBinding();
eventBinding.setBaseBinding(binding);
reference.getBindings().add(binding);// FIXME: remove
//reference.getBindings().add(eventBinding);
reference.getBindings().add(eventBindingFactory.createEventBinding(binding));
}
// TODO: Uncomment the following code to make Event binding the default binding for producers and consumers
// if (producer.getBindings().size() == 0) {
// reference.getBindings().add(eventBindingFactory.createEventBinding());
// }
impl.getReferences().add(reference);
}
@ -1391,11 +1392,12 @@ public abstract class BaseConfigurationBuilderImpl {
service.getPolicySets().addAll(consumer.getPolicySets());
service.getRequiredIntents().addAll(consumer.getRequiredIntents());
for (Binding binding : consumer.getBindings()) {
EventBinding eventBinding = eventBindingFactory.createEventBinding();
eventBinding.setBaseBinding(binding);
service.getBindings().add(binding);// FIXME: remove
//service.getBindings().add(eventBinding);
service.getBindings().add(eventBindingFactory.createEventBinding(binding));
}
// TODO: Uncomment the following code to make Event binding the default binding for producers and consumers
// if (consumer.getBindings().size() == 0) {
// service.getBindings().add(eventBindingFactory.createEventBinding());
// }
impl.getServices().add(service);
}
}
@ -1423,11 +1425,13 @@ public abstract class BaseConfigurationBuilderImpl {
reference.getPolicySets().addAll(producer.getPolicySets());
reference.getRequiredIntents().addAll(producer.getRequiredIntents());
for (Binding binding : producer.getBindings()) {
EventBinding eventBinding = eventBindingFactory.createEventBinding();
eventBinding.setBaseBinding(binding);
reference.getBindings().add(binding);// FIXME: remove
//reference.getBindings().add(eventBinding);
reference.getBindings().add(eventBindingFactory.createEventBinding(binding));
}
// TODO: Uncomment the following code to make Event binding the default binding for producers and consumers
// if (producer.getBindings().size() == 0) {
// reference.getBindings().add(eventBindingFactory.createEventBinding());
// }
component.getReferences().add(reference);
}
@ -1443,11 +1447,12 @@ public abstract class BaseConfigurationBuilderImpl {
service.getPolicySets().addAll(consumer.getPolicySets());
service.getRequiredIntents().addAll(consumer.getRequiredIntents());
for (Binding binding : consumer.getBindings()) {
EventBinding eventBinding = eventBindingFactory.createEventBinding();
eventBinding.setBaseBinding(binding);
service.getBindings().add(binding); // FIXME: remove
//service.getBindings().add(eventBinding);
service.getBindings().add(eventBindingFactory.createEventBinding(binding));
}
// TODO: Uncomment the following code to make Event binding the default binding for producers and consumers
// if (consumer.getBindings().size() == 0) {
// service.getBindings().add(eventBindingFactory.createEventBinding());
// }
component.getServices().add(service);
}
}

View file

@ -0,0 +1,124 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-modules</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>tuscany-binding-event-jms</artifactId>
<name>Apache Tuscany Event Binding using JMS</name>
<dependencies>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-assembly</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-assembly-xml</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-core-spi</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-core</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-contribution-impl</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-interface-wsdl</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-databinding-axiom</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-binding-event</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-binding-jms-runtime</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>2.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
<instructions>
<Bundle-Version>${tuscany.version}</Bundle-Version>
<Bundle-SymbolicName>org.apache.tuscany.sca.binding.sca</Bundle-SymbolicName>
<Bundle-Description>${pom.name}</Bundle-Description>
<Export-Package>org.apache.tuscany.sca.binding.sca*</Export-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.binding.event.jms.impl;
import java.net.URI;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.ModuleActivator;
/**
* The JMS broker for the JMS based SCA binding TODO: configure from a binding.jms in definitions.xml
*/
public class JMSBroker implements ModuleActivator {
private static BrokerService broker;
public void start(ExtensionPointRegistry arg0) {
if (broker == null) {
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
try {
TransportConnector tc = broker.addConnector("tcp://localhost:0");
tc.setDiscoveryUri(URI.create("multicast://default"));
broker.addNetworkConnector("multicast://default");
broker.start();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
public void stop(ExtensionPointRegistry arg0) {
if (broker != null) {
try {
broker.stop();
broker = null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

View file

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.binding.event.jms.impl;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.binding.event.DefaultEventBindingFactory;
import org.apache.tuscany.sca.binding.jms.impl.JMSBinding;
import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants;
/**
* Default JMS binding for events
*/
public class JmsEventBindingFactoryImpl implements DefaultEventBindingFactory {
public Binding createDefaultEventBinding() {
JMSBinding b = new JMSBinding();
b.setInitialContextFactoryName("org.apache.activemq.jndi.ActiveMQInitialContextFactory");
b.setJndiURL("vm://localhost"); // TODO: plug in jndi url from definitions.xml
b.setRequestMessageProcessorName(JMSBindingConstants.OBJECT_MP_CLASSNAME);
b.setResponseMessageProcessorName(JMSBindingConstants.OBJECT_MP_CLASSNAME);
b.setDestinationName("DEFAULT"); // FIXME: What should this be?
b.setDestinationType("topic");
return b;
}
}

View file

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
org.apache.tuscany.sca.binding.event.jms.impl.JmsEventBindingFactoryImpl

View file

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
org.apache.tuscany.sca.binding.event.jms.impl.JMSBroker

View file

@ -28,7 +28,7 @@
</parent>
<artifactId>tuscany-binding-event</artifactId>
<name>Apache Tuscany SCA Default Binding Model</name>
<name>Apache Tuscany SCA Event Binding</name>
<dependencies>
@ -61,6 +61,25 @@
<artifactId>tuscany-contribution-impl</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-interface-wsdl</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-databinding-axiom</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-binding-ws-wsdlgen</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>

View file

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.binding.event;
import org.apache.tuscany.sca.assembly.Binding;
/**
* Factory to create base binding for producers and consumers
*/
public interface DefaultEventBindingFactory {
public Binding createDefaultEventBinding();
}

View file

@ -19,6 +19,7 @@
package org.apache.tuscany.sca.binding.event.impl;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.assembly.EventBinding;
import org.apache.tuscany.sca.assembly.EventBindingFactory;
@ -29,12 +30,33 @@ import org.apache.tuscany.sca.assembly.EventBindingFactory;
*/
public class EventBindingFactoryImpl implements EventBindingFactory {
/*
* Constructor
*/
public EventBindingFactoryImpl (){
}
/*
* (non-Javadoc)
* @see org.apache.tuscany.sca.assembly.EventBindingFactory#createEventBinding()
*/
public EventBinding createEventBinding() {
return new EventBindingImpl();
}
/*
* (non-Javadoc)
* @see org.apache.tuscany.sca.assembly.EventBindingFactory#createEventBinding(org.apache.tuscany.sca.assembly.Binding)
*/
public EventBinding createEventBinding(Binding baseBinding) {
if (baseBinding instanceof EventBinding)
return (EventBinding)baseBinding;
EventBindingImpl eventBinding = new EventBindingImpl();
eventBinding.setBaseBinding(baseBinding);
return eventBinding;
}
}

View file

@ -26,8 +26,8 @@ import org.apache.tuscany.sca.assembly.Component;
import org.apache.tuscany.sca.assembly.ComponentService;
import org.apache.tuscany.sca.assembly.EventBinding;
import org.apache.tuscany.sca.assembly.Extensible;
import org.apache.tuscany.sca.assembly.OptimizableBinding;
import org.apache.tuscany.sca.assembly.builder.AutomaticBinding;
import org.apache.tuscany.sca.binding.event.DefaultEventBindingFactory;
import org.apache.tuscany.sca.policy.Intent;
import org.apache.tuscany.sca.policy.IntentAttachPointType;
import org.apache.tuscany.sca.policy.PolicySet;
@ -38,7 +38,7 @@ import org.apache.tuscany.sca.policy.PolicySetAttachPoint;
*
* @version $$
*/
public class EventBindingImpl implements EventBinding, Extensible, PolicySetAttachPoint, OptimizableBinding, AutomaticBinding {
public class EventBindingImpl implements EventBinding, Extensible, PolicySetAttachPoint, AutomaticBinding {
private String name;
private String uri;
private Binding baseBinding;
@ -59,7 +59,7 @@ public class EventBindingImpl implements EventBinding, Extensible, PolicySetAtta
}
/**
* Constructs a new SCA binding.
* Constructs a new Event binding.
*/
protected EventBindingImpl() {
}
@ -240,4 +240,12 @@ public class EventBindingImpl implements EventBinding, Extensible, PolicySetAtta
public boolean getIsAutomatic(){
return this.isAutomatic;
}
public Binding createDefaultBaseBinding(DefaultEventBindingFactory factory) {
if (baseBinding == null)
baseBinding = factory.createDefaultEventBinding();
return baseBinding;
}
}

View file

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.binding.event.impl;
import java.util.List;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeWire;
import org.osoa.sca.annotations.EventType;
/**
* Policy interceptor for events
*
* @version $$
*/
public class EventBindingInterceptor implements Interceptor {
private Invoker next;
@SuppressWarnings("unused")
private Operation operation;
private RuntimeComponentService consumerService;
private Binding binding;
private InterfaceContractMapper interfaceContractMapper;
public EventBindingInterceptor(RuntimeComponentService consumerService,
Binding binding,
Operation operation,
InterfaceContractMapper interfaceContractMapper) {
super();
this.consumerService = consumerService;
this.operation = operation;
this.binding = binding;
this.interfaceContractMapper = interfaceContractMapper;
}
public Message invoke(Message msg) {
try {
Object arg = msg.getBody();
if (arg instanceof Object[]) {
arg = ((Object [])arg)[0];
}
Operation consumerOperation = findOperation(consumerService.getInterfaceContract().getInterface().getOperations(), arg);
InvocationChain chain = getInvocationChain(consumerService.getRuntimeWire(binding), consumerOperation);
Invoker invoker = chain.getHeadInvoker();
while (invoker instanceof Interceptor && ((Interceptor)invoker).getNext() != null) {
invoker = ((Interceptor)invoker).getNext();
}
msg = invoker.invoke(msg);
} catch (Exception e) {
e.printStackTrace();
msg.setBody(e);
}
return msg;
}
public Invoker getNext() {
return next;
}
public void setNext(Invoker next) {
this.next = next;
}
private String getEventType(Class<?> clazz) {
EventType eventTypeAnnotation = clazz.getAnnotation(EventType.class);
if (eventTypeAnnotation != null)
return eventTypeAnnotation.name();
if (clazz.getSuperclass() != null && clazz.getSuperclass() != Object.class) {
String eventType = getEventType(clazz.getSuperclass());
if (eventType != null)
return eventType;
}
for (Class<?> interfaze : clazz.getInterfaces()) {
String eventType = getEventType(interfaze);
if (eventType != null)
return eventType;
}
return null;
}
@SuppressWarnings("unchecked")
private Operation findOperation(List<Operation> operations, Object arg) {
if (arg == null) {
return operations.get(0);
}
else {
String eventType = getEventType(arg.getClass());
Operation firstMatching = null;
for (Operation op : operations) {
String[] eventTypes = op.getEventTypes();
if (eventTypes == null || eventTypes.length == 0) {
if (op.getInputType().getLogical().get(0).getPhysical().isAssignableFrom(arg.getClass())) {
// If no event type is specified, return the first method which
// (1) has assignable parameters
// (2) has no explicit event types specified
if (eventType == null) {
return op;
}
else if (firstMatching == null) {
// Remember first matching method without event types and
// return this operation if no operation has explicitly specified
// this event type.
firstMatching = op;
}
}
} else {
for (String type : eventTypes) {
if (type.equals(eventType)) {
return op;
}
}
}
}
return firstMatching;
}
}
public InvocationChain getInvocationChain(RuntimeWire wire, Operation operation) {
for (InvocationChain chain : wire.getInvocationChains()) {
Operation op = chain.getTargetOperation();
if (interfaceContractMapper.isCompatible(operation, op, op.getInterface().isRemotable())) {
return chain;
}
}
return null;
}
}

View file

@ -19,54 +19,50 @@
package org.apache.tuscany.sca.binding.event.impl;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.DataExchangeSemantics;
import org.apache.tuscany.sca.provider.ReferenceBindingProvider;
/**
* TODO: Implement event binding invoker
* Event binding invoker
* @version $$
*/
public class EventBindingInvoker implements Interceptor, DataExchangeSemantics {
private InvocationChain chain;
public class EventBindingInvoker implements Invoker, DataExchangeSemantics {
private Invoker baseInvoker;
/**
* Construct a EventBindingInvoker that delegates to the service invocaiton chain
* Construct a EventBindingInvoker that delegates to the service invocation chain
* @param chain
*/
public EventBindingInvoker(InvocationChain chain) {
public EventBindingInvoker(ReferenceBindingProvider baseBindingProvider, Operation operation) {
super();
this.chain = chain;
// TODO: Should we match producer event types with the event types of the operation?
baseInvoker = baseBindingProvider.createInvoker(operation);
}
/**
* @see org.apache.tuscany.sca.invocation.Interceptor#getNext()
*/
public Invoker getNext() {
return chain.getHeadInvoker();
}
/**
* @see org.apache.tuscany.sca.invocation.Interceptor#setNext(org.apache.tuscany.sca.invocation.Invoker)
*/
public void setNext(Invoker next) {
// NOOP
}
/**
* @see org.apache.tuscany.sca.invocation.Invoker#invoke(org.apache.tuscany.sca.invocation.Message)
*/
public Message invoke(Message msg) {
return getNext().invoke(msg);
if (baseInvoker != null) {
return baseInvoker.invoke(msg);
}
else {
msg.setBody(null);
return msg;
}
}
/**
* @see org.apache.tuscany.sca.invocation.DataExchangeSemantics#allowsPassByReference()
*/
public boolean allowsPassByReference() {
return chain.allowsPassByReference();
return true;
}
}

View file

@ -21,10 +21,17 @@ package org.apache.tuscany.sca.binding.event.impl;
import java.util.logging.Logger;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.binding.event.DefaultEventBindingFactory;
import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.provider.BindingProviderFactory;
import org.apache.tuscany.sca.provider.PolicyProvider;
import org.apache.tuscany.sca.provider.PolicyProviderFactory;
import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint;
import org.apache.tuscany.sca.provider.ReferenceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
@ -36,18 +43,20 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
*/
public class EventReferenceBindingProvider implements ReferenceBindingProvider {
@SuppressWarnings("unused")
private static final Logger logger = Logger.getLogger(EventReferenceBindingProvider.class.getName());
@SuppressWarnings("unused")
private RuntimeComponent component;
@SuppressWarnings("unused")
private EventBindingImpl binding;
@SuppressWarnings("unused")
private RuntimeComponentReference reference;
private boolean started = false;
private ReferenceBindingProvider baseBindingProvider;
@SuppressWarnings("unchecked")
public EventReferenceBindingProvider(ExtensionPointRegistry extensionPoints,
RuntimeComponent component,
RuntimeComponentReference reference,
@ -55,17 +64,47 @@ public class EventReferenceBindingProvider implements ReferenceBindingProvider {
this.component = component;
this.reference = reference;
this.binding = binding;
// TODO: Set baseBindingProvider
ProviderFactoryExtensionPoint providerFactories = extensionPoints.getExtensionPoint(ProviderFactoryExtensionPoint.class);
ModelFactoryExtensionPoint modelFactories = extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
Binding baseBinding = binding.getBaseBinding();
if (baseBinding == null) {
DefaultEventBindingFactory defaultEventBindingFactory =
(DefaultEventBindingFactory)modelFactories.getFactory(DefaultEventBindingFactory.class);
if (defaultEventBindingFactory == null) {
logger.severe("Default binding for Event binding not available");
}
baseBinding = binding.createDefaultBaseBinding(defaultEventBindingFactory);
}
BindingProviderFactory providerFactory =
(BindingProviderFactory)providerFactories.getProviderFactory(baseBinding.getClass());
if (providerFactory != null) {
baseBindingProvider =
providerFactory.createReferenceBindingProvider((RuntimeComponent)component,
(RuntimeComponentReference)reference,
baseBinding);
for (PolicyProviderFactory f : providerFactories.getPolicyProviderFactories()) {
PolicyProvider policyProvider = f.createReferencePolicyProvider(component, reference, baseBinding);
if (policyProvider != null) {
reference.addPolicyProvider(baseBinding, policyProvider);
}
}
}
if (baseBindingProvider == null) {
throw new IllegalStateException("Provider factory not found for class: " + baseBinding.getClass().getName());
}
}
public InterfaceContract getBindingInterfaceContract() {
if (reference.getReference() != null) {
return reference.getReference().getInterfaceContract();
} else {
return reference.getInterfaceContract();
}
return baseBindingProvider.getBindingInterfaceContract();
}
public boolean supportsOneWayInvocation() {
@ -74,13 +113,14 @@ public class EventReferenceBindingProvider implements ReferenceBindingProvider {
public Invoker createInvoker(Operation operation) {
return baseBindingProvider.createInvoker(operation);
return new EventBindingInvoker(baseBindingProvider, operation);
}
public void start() {
if (started) {
return;
} else {
baseBindingProvider.start();
started = true;
}
}
@ -89,6 +129,7 @@ public class EventReferenceBindingProvider implements ReferenceBindingProvider {
if (!started) {
return;
} else {
baseBindingProvider.stop();
started = false;
}

View file

@ -20,11 +20,30 @@
package org.apache.tuscany.sca.binding.event.impl;
import java.util.List;
import java.util.logging.Logger;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.assembly.EventBinding;
import org.apache.tuscany.sca.assembly.Service;
import org.apache.tuscany.sca.binding.event.DefaultEventBindingFactory;
import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper;
import org.apache.tuscany.sca.interfacedef.java.JavaInterface;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Phase;
import org.apache.tuscany.sca.provider.AsyncServiceBindingProvider;
import org.apache.tuscany.sca.provider.BindingProviderFactory;
import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeWire;
/**
* The event service binding provider
@ -32,22 +51,92 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService;
* @version $$
*/
public class EventServiceBindingProvider implements ServiceBindingProvider {
private RuntimeComponentService service;
private static final Logger logger = Logger.getLogger(EventReferenceBindingProvider.class.getName());
private RuntimeComponentService service;
private RuntimeComponentService eventService;
private ServiceBindingProvider baseBindingProvider;
private InterfaceContract eventInterfaceContract;
private EventBinding eventBinding;
private InterfaceContractMapper interfaceContractMapper;
@SuppressWarnings("unchecked")
public EventServiceBindingProvider(ExtensionPointRegistry extensionPoints,
RuntimeComponent component,
RuntimeComponentService service,
EventBindingImpl binding) {
this.service = service;
eventBinding = binding;
ProviderFactoryExtensionPoint providerFactories = extensionPoints.getExtensionPoint(ProviderFactoryExtensionPoint.class);
ModelFactoryExtensionPoint modelFactories = extensionPoints.getExtensionPoint(ModelFactoryExtensionPoint.class);
Binding baseBinding = binding.getBaseBinding();
if (baseBinding == null) {
DefaultEventBindingFactory defaultEventBindingFactory =
(DefaultEventBindingFactory)modelFactories.getFactory(DefaultEventBindingFactory.class);
if (defaultEventBindingFactory == null) {
logger.severe("Default binding for Event binding not available");
}
baseBinding = binding.createDefaultBaseBinding(defaultEventBindingFactory);
}
BindingProviderFactory providerFactory =
(BindingProviderFactory)providerFactories.getProviderFactory(baseBinding.getClass());
JavaInterfaceFactory javaInterfaceFactory = modelFactories.getFactory(JavaInterfaceFactory.class);
UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
interfaceContractMapper = utilities.getUtility(InterfaceContractMapper.class);
if (providerFactory != null) {
try {
this.service = service;
eventService = (RuntimeComponentService)service.clone();
JavaInterface iface = javaInterfaceFactory.createJavaInterface(EventServiceInterface.class);
eventInterfaceContract = javaInterfaceFactory.createJavaInterfaceContract();
eventInterfaceContract.setInterface(iface);
eventService.setInterfaceContract(eventInterfaceContract);
if (service.getService() != null) {
eventService.setService((Service)service.getService().clone());
eventService.getService().setInterfaceContract(eventInterfaceContract);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
baseBindingProvider =
providerFactory.createServiceBindingProvider((RuntimeComponent)component,
(RuntimeComponentService)eventService,
baseBinding);
if (baseBindingProvider instanceof AsyncServiceBindingProvider) {
((AsyncServiceBindingProvider)baseBindingProvider).setTargetBinding(binding);
}
}
if (baseBindingProvider == null) {
throw new IllegalStateException("Provider factory not found for class: " + baseBinding.getClass().getName());
}
}
public InterfaceContract getBindingInterfaceContract() {
if (service.getService() != null) {
return service.getService().getInterfaceContract();
} else {
return service.getInterfaceContract();
}
return service.getInterfaceContract();
}
public boolean supportsOneWayInvocation() {
@ -55,9 +144,35 @@ public class EventServiceBindingProvider implements ServiceBindingProvider {
}
public void start() {
try {
for (RuntimeWire wire : service.getRuntimeWires()) {
RuntimeWire clonedWire = (RuntimeWire)wire.clone();
clonedWire.rebuild();
clonedWire.getSource().setInterfaceContract(eventInterfaceContract);
clonedWire.getTarget().setInterfaceContract(eventInterfaceContract);
if (clonedWire.getTarget().getContract() == service)
clonedWire.getTarget().setContract(eventService);
eventService.getRuntimeWires().add(clonedWire);
}
List<InvocationChain> chains = eventService.getRuntimeWire(eventBinding).getInvocationChains();
for (InvocationChain chain : chains) {
Interceptor interceptor = new EventBindingInterceptor(service,
eventBinding,
chain.getSourceOperation(),
interfaceContractMapper);
chain.addInterceptor(Phase.IMPLEMENTATION_POLICY, interceptor);
}
baseBindingProvider.start();
} catch (CloneNotSupportedException e) {
// Should never happen
}
}
public void stop() {
baseBindingProvider.stop();
}
}

View file

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.binding.event.impl;
import org.osoa.sca.annotations.Remotable;
/**
* Interface for event services
*
* @version $$
*/
@Remotable
public interface EventServiceInterface {
void onEvent(Object event);
}

View file

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.binding.event.xml;
import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.stream.XMLStreamWriter;
import org.apache.tuscany.sca.assembly.EventBinding;
import org.apache.tuscany.sca.assembly.EventBindingFactory;
import org.apache.tuscany.sca.assembly.xml.Constants;
import org.apache.tuscany.sca.assembly.xml.PolicyAttachPointProcessor;
import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint;
import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor;
import org.apache.tuscany.sca.contribution.resolver.ModelResolver;
import org.apache.tuscany.sca.contribution.service.ContributionReadException;
import org.apache.tuscany.sca.contribution.service.ContributionResolveException;
import org.apache.tuscany.sca.contribution.service.ContributionWriteException;
import org.apache.tuscany.sca.monitor.Monitor;
import org.apache.tuscany.sca.policy.IntentAttachPointType;
import org.apache.tuscany.sca.policy.IntentAttachPointTypeFactory;
import org.apache.tuscany.sca.policy.PolicyFactory;
import org.apache.tuscany.sca.policy.PolicySetAttachPoint;
/**
* A processor to read the XML that describes the event binding.
*
* @version $$
*/
public class EventBindingProcessor implements StAXArtifactProcessor<EventBinding>, Constants{
private PolicyFactory policyFactory;
private EventBindingFactory eventBindingFactory;
private PolicyAttachPointProcessor policyProcessor;
private IntentAttachPointTypeFactory intentAttachPointTypeFactory;
@SuppressWarnings("unused")
private Monitor monitor;
protected static final String BINDING_EVENT = "binding.event";
protected static final QName BINDING_EVENT_QNAME = new QName(Constants.SCA10_TUSCANY_NS, BINDING_EVENT);
public EventBindingProcessor(ModelFactoryExtensionPoint modelFactories, Monitor monitor) {
this.policyFactory = modelFactories.getFactory(PolicyFactory.class);
this.eventBindingFactory = modelFactories.getFactory(EventBindingFactory.class);
policyProcessor = new PolicyAttachPointProcessor(policyFactory);
this.intentAttachPointTypeFactory = modelFactories.getFactory(IntentAttachPointTypeFactory.class);
this.monitor = monitor;
}
public QName getArtifactType() {
return BINDING_EVENT_QNAME;
}
public Class<EventBinding> getModelType() {
return EventBinding.class;
}
public EventBinding read(XMLStreamReader reader) throws ContributionReadException, XMLStreamException {
EventBinding eventBinding = eventBindingFactory.createEventBinding();
IntentAttachPointType bindingType = intentAttachPointTypeFactory.createBindingType();
bindingType.setName(getArtifactType());
bindingType.setUnresolved(true);
((PolicySetAttachPoint)eventBinding).setType(bindingType);
// Read policies
policyProcessor.readPolicies(eventBinding, reader);
// Read binding name
String name = reader.getAttributeValue(null, NAME);
if (name != null) {
eventBinding.setName(name);
}
// Read binding URI
String uri = reader.getAttributeValue(null, URI);
if (uri != null) {
eventBinding.setURI(uri);
}
// Skip to end element
while (reader.hasNext()) {
if (reader.next() == END_ELEMENT && BINDING_EVENT_QNAME.equals(reader.getName())) {
break;
}
}
return eventBinding;
}
public void resolve(EventBinding model, ModelResolver resolver) throws ContributionResolveException {
policyProcessor.resolvePolicies(model, resolver);
}
public void write(EventBinding eventBinding, XMLStreamWriter writer) throws ContributionWriteException, XMLStreamException {
// Write <binding.event>
policyProcessor.writePolicyPrefixes(eventBinding, writer);
writer.writeStartElement(Constants.SCA10_TUSCANY_NS, BINDING_EVENT);
policyProcessor.writePolicyAttributes(eventBinding, writer);
// Write binding name
if (eventBinding.getName() != null) {
writer.writeAttribute(NAME, eventBinding.getName());
}
// Write binding URI
if (eventBinding.getURI() != null) {
writer.writeAttribute(URI, eventBinding.getURI());
}
writer.writeEndElement();
}
}

View file

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Implementation class for the artifact processor extension
org.apache.tuscany.sca.binding.event.xml.EventBindingProcessor;qname=http://tuscany.apache.org/xmlns/sca/1.0#binding.event,model=org.apache.tuscany.sca.binding.event.impl.EventBindingImpl

View file

@ -41,6 +41,7 @@ import org.apache.tuscany.sca.binding.ws.WebServiceBindingFactory;
import org.apache.tuscany.sca.binding.ws.wsdlgen.BindingWSDLGenerator;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.provider.AsyncServiceBindingProvider;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
@ -51,10 +52,11 @@ import org.apache.tuscany.sca.work.WorkScheduler;
*
* @version $Rev$ $Date$
*/
public class JMSBindingServiceBindingProvider implements ServiceBindingProvider {
public class JMSBindingServiceBindingProvider implements ServiceBindingProvider, AsyncServiceBindingProvider {
private static final Logger logger = Logger.getLogger(JMSBindingServiceBindingProvider.class.getName());
private RuntimeComponentService service;
private RuntimeComponentService targetService;
private Binding targetBinding;
private JMSBinding jmsBinding;
private JMSResourceFactory jmsResourceFactory;
@ -75,6 +77,7 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider
this.jmsBinding = binding;
this.workScheduler = workScheduler;
this.targetBinding = targetBinding;
this.targetService = service;
this.extensionPoints = extensionPoints;
this.jmsResourceFactory = jmsResourceFactory;
@ -159,7 +162,7 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider
// if using an embedded broker then when shutting down Tuscany the broker may get closed
// before this stop method is called. I can't see how to detect that so for now just
// ignore the exception if the message is that the transport is already disposed
if (!"Transport disposed.".equals(e.getMessage())) {
if (e.getMessage() == null || !e.getMessage().contains("disposed")) {
throw new JMSBindingException("Error stopping JMSServiceBinding", e);
}
}
@ -179,7 +182,7 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider
consumer = session.createConsumer(destination);
}
final JMSBindingListener listener = new JMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding);
final JMSBindingListener listener = new JMSBindingListener(jmsBinding, jmsResourceFactory, targetService, targetBinding);
try {
consumer.setMessageListener(listener);
@ -303,4 +306,14 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider
throw new JMSBindingException(e);
}
}
public Binding getTargetBinding() {
return targetBinding;
}
public void setTargetBinding(Binding binding) {
this.targetBinding = binding;
}
}

View file

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.provider;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
/**
* A service binding can optionally implement this interface to enable
* it to be used for event handling
*
* @version $$
*/
public interface AsyncServiceBindingProvider {
Binding getTargetBinding();
void setTargetBinding(Binding binding);
}

View file

@ -40,6 +40,12 @@ public interface EndpointReference extends Cloneable {
* @return The component service or reference, null if the EPR is for a non-SCA service
*/
Contract getContract();
/**
* Set the component service or reference for the endpoint
*
*/
void setContract(Contract contract);
/**
* Get the binding for the endpoint

View file

@ -50,6 +50,7 @@ import org.apache.tuscany.sca.interfacedef.util.JavaXMLMapper;
import org.apache.tuscany.sca.interfacedef.util.XMLType;
import org.osoa.sca.annotations.Conversational;
import org.osoa.sca.annotations.EndsConversation;
import org.osoa.sca.annotations.EventTypes;
import org.osoa.sca.annotations.OneWay;
import org.osoa.sca.annotations.Remotable;
@ -198,6 +199,15 @@ public class JavaInterfaceIntrospectorImpl {
method);
}
}
EventTypes eventTypesAnnotation = method.getAnnotation(EventTypes.class);
String[] eventTypes = null;
if (eventTypesAnnotation != null && eventTypesAnnotation.value().trim().length() > 0) {
eventTypes = eventTypesAnnotation.value().split(",");
for (int i = 0; i < eventTypes.length; i++)
eventTypes[i] = eventTypes[i].trim();
}
ConversationSequence conversationSequence = ConversationSequence.CONVERSATION_NONE;
if (method.isAnnotationPresent(EndsConversation.class)) {
@ -250,6 +260,7 @@ public class JavaInterfaceIntrospectorImpl {
operation.setConversationSequence(conversationSequence);
operation.setNonBlocking(nonBlocking);
operation.setJavaMethod(method);
operation.setEventTypes(eventTypes);
operations.add(operation);
}
return operations;

View file

@ -194,6 +194,18 @@ public interface Operation extends Cloneable, PolicySetAttachPoint {
* @param faultBeans
*/
void setFaultBeans(Map<QName, List<DataType<XMLType>>> faultBeans);
/**
* Event types supported by this operation
* @return event types if specified, or null otherwise
*/
String[] getEventTypes();
/**
* Set event types supported by this operation
* @param eventType list of event types
*/
void setEventTypes(String[] eventTypes);
/**
* Implementations must support cloning.

View file

@ -54,6 +54,7 @@ public class OperationImpl implements Operation {
private WrapperInfo wrapper;
private boolean dynamic;
private Map<QName, List<DataType<XMLType>>> faultBeans;
private String[] eventTypes;
private List<PolicySet> applicablePolicySets = new ArrayList<PolicySet>();
private List<PolicySet> policySets = new ArrayList<PolicySet>();
@ -288,6 +289,16 @@ public class OperationImpl implements Operation {
public void setFaultBeans(Map<QName, List<DataType<XMLType>>> faultBeans) {
this.faultBeans = faultBeans;
}
public String[] getEventTypes() {
return eventTypes;
}
public void setEventTypes(String[] eventTypes) {
this.eventTypes = eventTypes;
}
@Override
public OperationImpl clone() throws CloneNotSupportedException {

View file

@ -53,6 +53,7 @@
<module>binding-ws-wsdlgen</module>
<module>binding-ws-xml</module>
<module>binding-event</module>
<module>binding-event-jms</module>
<module>contribution</module>
<module>contribution-namespace</module>
<module>contribution-java</module>

View file

@ -71,6 +71,13 @@
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-binding-event-jms</artifactId>
<version>1.4-EVENT-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View file

@ -28,5 +28,7 @@ public interface WeatherPublisher {
@EventTypes("ExampleEvent")
void publishWeatherReport(String report);
void publishWeatherWarning(WeatherWarning warning);
}

View file

@ -36,8 +36,16 @@ public class WeatherPublisherComponent implements WeatherService {
public void start() {
System.out.println("weatherPublisher code - start() called");
try {
WeatherWarning warning = new WeatherWarning();
warning.setWarning("Heavy rains expected");
warning.setReportTime(new Date().toString());
weatherPublisher.publishWeatherWarning(warning);
for (int i = 0; i < 1; i++) {
generateWeatherReport();
Thread.sleep(500);
}
} catch (InterruptedException e) {

View file

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package weather;
/**
* The interface for the weather service
*/
public interface WeatherSubscriber {
public int[] getMessageList();
}

View file

@ -23,19 +23,49 @@ import java.util.Date;
import org.osoa.sca.annotations.Consumer;
import org.osoa.sca.annotations.Remotable;
import org.osoa.sca.annotations.EventTypes;
import org.osoa.sca.annotations.Scope;
import org.osoa.sca.annotations.Service;
/**
* The WeatherService subscriber implementation - receives weather reports and prints it on stdout
*/
@Remotable
public class WeatherSubscriberComponent {
@Scope("COMPOSITE")
@Service(WeatherSubscriber.class)
public class WeatherSubscriberComponent implements WeatherSubscriber {
private int[] messagesReceivedList = new int[3];
@Consumer(name="weatherSubscriber")
@EventTypes("ExampleEvent")
public void onWeather(String report) {
System.out.println("Weather report received at " + new Date() + ": " + report);
messagesReceivedList[0]++;
}
@Consumer(name="weatherSubscriber")
@EventTypes("WeatherWarning")
public void onWeatherWarning(WeatherWarning warning) {
System.out.println("WEATHER WARNING received at " + new Date() + ": " + warning);
messagesReceivedList[1]++;
}
@Consumer(name="weatherSubscriber")
public void onWeatherInfo(String info) {
System.out.println("Weather info received at " + new Date() + ": " + info);
messagesReceivedList[2]++;
}
public int[] getMessageList() {
return messagesReceivedList;
}
}

View file

@ -0,0 +1,32 @@
package weather;
import java.io.Serializable;
import org.osoa.sca.annotations.EventType;
@EventType(name="WeatherWarning")
public class WeatherWarning implements Serializable {
private static final long serialVersionUID = 1;
private String reportTime;
private String warning;
public String getReportTime() {
return reportTime;
}
public void setReportTime(String reportTime) {
this.reportTime = reportTime;
}
public String getWarning() {
return warning;
}
public void setWarning(String warning) {
this.warning = warning;
}
public String toString() {
return warning;
}
}

View file

@ -31,10 +31,14 @@
<producer name="weatherPublisher" eventTypes="WeatherEvent">
<tuscany:binding.event/>
<!--
<binding.jms initialContextFactory="org.apache.activemq.jndi.ActiveMQInitialContextFactory"
jndiURL="tcp://localhost:61619">
jndiURL="tcp://localhost:61619"
messageProcessor="org.apache.tuscany.sca.binding.jms.provider.ObjectMessageProcessor">
<destination name="WeatherQueue" create="ifnotexist"/>
</binding.jms>
-->
</producer>
</component>
@ -51,6 +55,9 @@
<component name="WeatherSubscriberComponent2">
<implementation.java class="weather.WeatherSubscriberComponent"/>
<service name="WeatherSubscriber">
<interface.java interface="weather.WeatherSubscriber"/>
</service>
<consumer name="weatherSubscriber"/>
</component>

View file

@ -26,12 +26,20 @@
<component name="WeatherSubscriberComponent">
<implementation.java class="weather.WeatherSubscriberComponent"/>
<service name="WeatherSubscriber">
<interface.java interface="weather.WeatherSubscriber"/>
</service>
<consumer name="weatherSubscriber" eventTypes="WeatherEvent">
<tuscany:binding.event/>
<!--
<binding.jms initialContextFactory="org.apache.activemq.jndi.ActiveMQInitialContextFactory"
jndiURL="tcp://localhost:61619">
jndiURL="tcp://localhost:61619"
messageProcessor="org.apache.tuscany.sca.binding.jms.provider.ObjectMessageProcessor">
<destination name="WeatherQueue" create="ifnotexist"/>
</binding.jms>
-->
</consumer>
</component>

View file

@ -20,6 +20,8 @@
package weather;
import junit.framework.Assert;
import org.apache.activemq.broker.BrokerService;
import org.apache.tuscany.sca.host.embedded.SCADomain;
import org.junit.After;
@ -38,6 +40,8 @@ public class WeatherTestCase {
private WeatherService weatherService;
private WeatherService weatherService2;
private WeatherSubscriber weatherSubscriber;
private WeatherSubscriber weatherSubscriber2;
private SCADomain weatherSubscriberDomain;
private SCADomain weatherPublisherDomain;
@ -52,6 +56,8 @@ public class WeatherTestCase {
weatherSubscriberDomain = SCADomain.newInstance("weatherSubscriber.composite");
weatherService = weatherPublisherDomain.getService(WeatherService.class, "WeatherPublisherComponent");
weatherService2 = weatherPublisherDomain.getService(WeatherService.class, "WeatherPublisherComponent2");
weatherSubscriber = weatherSubscriberDomain.getService(WeatherSubscriber.class, "WeatherSubscriberComponent/WeatherSubscriber");
weatherSubscriber2 = weatherPublisherDomain.getService(WeatherSubscriber.class, "WeatherSubscriberComponent2/WeatherSubscriber");
} catch (Throwable e) {
e.printStackTrace();
}
@ -59,8 +65,27 @@ public class WeatherTestCase {
@Test
public void runWeatherTest() throws Exception {
weatherService.start();
weatherService2.start();
Thread.sleep(2000);
int[] messageList = weatherSubscriber.getMessageList();
Assert.assertNotNull("Message list is null.", messageList);
Assert.assertEquals("Message list invalid.", 3, messageList.length);
int[] expectedMessages = {0, 1, 1};
for (int i = 0; i < messageList.length; i++) {
Assert.assertEquals("Message type " + i + " not received correctly.", expectedMessages[i], messageList[i]);
}
int[] messageList2 = weatherSubscriber2.getMessageList();
Assert.assertNotNull("Message list is null.", messageList2);
Assert.assertEquals("Message list invalid.", 3, messageList2.length);
int[] expectedMessages2 = {1, 0, 0};
for (int i = 0; i < messageList2.length; i++) {
Assert.assertEquals("Message type " + i + " not received correctly.", expectedMessages2[i], messageList2[i]);
}
}

View file

@ -45,6 +45,7 @@
<modules>
<module>helloworld-reference-jms</module>
<module>helloworld-service-jms</module>
<module>event-jms</module>
</modules>
</profile>
</profiles>