summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslaws <slaws@13f79535-47bb-0310-9956-ffa450edef68>2010-12-03 15:22:31 +0000
committerslaws <slaws@13f79535-47bb-0310-9956-ffa450edef68>2010-12-03 15:22:31 +0000
commit471a23dbe35f1389a9fd43ee409ac4bb03d995c4 (patch)
tree570206b73e95462251d3f311076bcdb5a7da06d5
parent4d1e8a5032010161a569df3f03285676a3d48fb0 (diff)
TUSCANY-3801 - Update the invocation chain infrastructure, and the enpoints/endpointreferences that call it, to allow async response messages to be processed backwards along the response part of the chain independently of the forward message processing.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1041866 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--sca-java-2.x/trunk/modules/binding-corba-runtime/src/test/java/org/apache/tuscany/sca/binding/corba/testing/service/mocks/TestRuntimeWire.java9
-rw-r--r--sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvocationChain.java12
-rw-r--r--sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsync.java51
-rw-r--r--sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java11
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java41
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java20
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java99
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java29
-rw-r--r--sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java4
9 files changed, 251 insertions, 25 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-corba-runtime/src/test/java/org/apache/tuscany/sca/binding/corba/testing/service/mocks/TestRuntimeWire.java b/sca-java-2.x/trunk/modules/binding-corba-runtime/src/test/java/org/apache/tuscany/sca/binding/corba/testing/service/mocks/TestRuntimeWire.java
index d109b74e34..756299696a 100644
--- a/sca-java-2.x/trunk/modules/binding-corba-runtime/src/test/java/org/apache/tuscany/sca/binding/corba/testing/service/mocks/TestRuntimeWire.java
+++ b/sca-java-2.x/trunk/modules/binding-corba-runtime/src/test/java/org/apache/tuscany/sca/binding/corba/testing/service/mocks/TestRuntimeWire.java
@@ -33,6 +33,7 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.InvocationChain;
+import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.policy.ExtensionType;
import org.apache.tuscany.sca.policy.Intent;
@@ -206,7 +207,9 @@ public class TestRuntimeWire implements RuntimeEndpoint {
}
public void invokeAsync(Operation operation, Message msg) {
- return;
+ }
+
+ public void invokeAsyncResponse(InvokerAsync tailInvoker, Message msg) {
}
public void unbind() {
@@ -311,4 +314,8 @@ public class TestRuntimeWire implements RuntimeEndpoint {
return null;
}
+ public boolean isAsyncInvocation() {
+ // TODO Auto-generated method stub
+ return false;
+ }
}
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvocationChain.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvocationChain.java
index 579e0c3da9..d10a5689d4 100644
--- a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvocationChain.java
+++ b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvocationChain.java
@@ -105,4 +105,16 @@ public interface InvocationChain {
* @param allowsPBR
*/
void setAllowsPassByReference(boolean allowsPBR);
+
+ /**
+ * Returns true if this chain must be able to support async
+ * invocation. This will be as a consequence of the EPR/EP
+ * detecting the asyncInvocation intent. The flag is set on
+ * construction and used as an internal guard against non
+ * async interceptors being added to a chain that expect to
+ * be able to handle async calls
+ *
+ * @return true is the chain supports async invocation.
+ */
+ boolean isAsyncInvocation();
}
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsync.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsync.java
index 624e8fdab4..a67a28a931 100644
--- a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsync.java
+++ b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsync.java
@@ -19,21 +19,56 @@
package org.apache.tuscany.sca.invocation;
/**
- * TUSCANY-3786 - Possibly temporary interface to describe an
- * async invocation. Need to make it work end to
- * end before committing to this.
- *
- * Asynchronous mediation associated with a client- or target- side wire.
+ * TUSCANY-3786
+ *
+ * Interface to describe an invoation where the request processing
+ * can be performed independently of the response processing. This
+ * has been instigated to allow async responses to be processed
+ * independently of the requests that instigated them. Due to the need
+ * to run the reponse processing interceptors effectively backwards the
+ * methods defined here are not responsible for finding the next invoker
+ * in the chain.
*
*/
public interface InvokerAsync {
+
+ /**
+ * Process the forward message and pass it down the chain
+ *
+ * @param msg The request Message
+ * @return the processed message
+ *
+ */
+ void invokeAsyncRequest(Message msg);
+
+ /**
+ * Process response message and pass it back up the chain.
+ * This returns the message that is processed by the chain
+ * so that it can be passes onto the appropriate invoker by the caller
+ * the response path doesn't have an invoker.
+ *
+ * @param msg The request Message
+ * @return the processed message
+ *
+ */
+ Message invokeAsyncResponse(Message msg);
/**
- * Process an asynchronous wire
+ * Process a request message
+ *
+ * @param msg The request Message
+ * @return the processed message
+ *
+ */
+ Message processRequest(Message msg);
+
+ /**
+ * Process a response message
*
- * @param msg The request Message for the wire
+ * @param msg The request Message
+ * @return the processed message
*
*/
- void invokeAsync(Message msg);
+ Message processResponse(Message msg);
}
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java
index 31260f1afa..cf9b8ac49e 100644
--- a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java
+++ b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java
@@ -29,6 +29,7 @@ import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.InvocationChain;
+import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.provider.PolicyProvider;
@@ -136,7 +137,15 @@ public interface Invocable {
* @return The ticket that can be used to identify this invocation
* @throws InvocationTargetException
*/
- void invokeAsync(Operation operation, Message msg);
+ void invokeAsync(Operation operation, Message msg) throws Throwable;
+ // TODO - this shouldn't throw an exception
+
+ /**
+ * Asynchronously invoke an operation with a context message
+ * @param tailInvoker the invoker at the end of the chain
+ * @param msg The request message
+ */
+ void invokeAsyncResponse(InvokerAsync tailInvoker, Message msg);
/**
* Get a list of policy providers
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
index 31738c3a5d..32aa0c0646 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
@@ -75,11 +75,13 @@ import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.invocation.Phase;
import org.apache.tuscany.sca.provider.BindingProviderFactory;
import org.apache.tuscany.sca.provider.EndpointProvider;
+import org.apache.tuscany.sca.provider.ImplementationAsyncProvider;
import org.apache.tuscany.sca.provider.ImplementationProvider;
import org.apache.tuscany.sca.provider.PolicyProvider;
import org.apache.tuscany.sca.provider.PolicyProviderFactory;
@@ -209,7 +211,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
public synchronized InvocationChain getBindingInvocationChain() {
if (bindingInvocationChain == null) {
- bindingInvocationChain = new InvocationChainImpl(null, null, false, phaseManager);
+ bindingInvocationChain = new InvocationChainImpl(null, null, false, phaseManager, isAsyncInvocation());
initServiceBindingInvocationChains();
}
@@ -224,7 +226,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
/**
* A dummy invocation chain representing null as ConcurrentHashMap doesn't allow null values
*/
- private static final InvocationChain NULL_CHAIN = new InvocationChainImpl(null, null, false, null);
+ private static final InvocationChain NULL_CHAIN = new InvocationChainImpl(null, null, false, null, false);
public InvocationChain getInvocationChain(Operation operation) {
InvocationChain cached = invocationChainMap.get(operation);
@@ -286,13 +288,17 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
return invoker.invoke(operation, msg);
}
- public void invokeAsync(Operation operation, Message msg) {
+ public void invokeAsync(Operation operation, Message msg) throws Throwable {
msg.setOperation(operation);
invoker.invokeAsync(msg);
}
+
+ public void invokeAsyncResponse(InvokerAsync tailInvoker, Message msg){
+ invoker.invokeAsyncResponse(tailInvoker, msg);
+ }
/**
- * Navigate the component/componentType inheritence chain to find the leaf contract
+ * Navigate the component/componentType inheritance chain to find the leaf contract
* @param contract
* @return
*/
@@ -344,7 +350,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
+ "#"
+ service.getName());
}
- InvocationChain chain = new InvocationChainImpl(operation, targetOperation, false, phaseManager);
+ InvocationChain chain = new InvocationChainImpl(operation, targetOperation, false, phaseManager, isAsyncInvocation());
if (operation.isNonBlocking()) {
addNonBlockingInterceptor(chain);
}
@@ -630,7 +636,30 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
if (provider != null) {
Invoker invoker = null;
- invoker = provider.createInvoker((RuntimeComponentService)service, operation);
+ RuntimeComponentService runtimeService = (RuntimeComponentService)service;
+ if (runtimeService.getName().endsWith("_asyncCallback")){
+ if (provider instanceof ImplementationAsyncProvider){
+ invoker = ((ImplementationAsyncProvider)provider).createAsyncResponseInvoker(operation);
+ } else {
+ // TODO - This should be an error but taking account of the
+ // existing non-native async support
+ invoker = provider.createInvoker((RuntimeComponentService)service, operation);
+/*
+ throw new ServiceRuntimeException("Component " +
+ this.getComponent().getName() +
+ " Service " +
+ getService().getName() +
+ " implementation provider doesn't implement ImplementationAsyncProvider but the implementation uses a " +
+ "refrence interface with the asyncInvocation intent set" +
+ " - [" + this.toString() + "]");
+*/
+ }
+ } else if (isAsyncInvocation() &&
+ provider instanceof ImplementationAsyncProvider){
+ invoker = ((ImplementationAsyncProvider)provider).createAsyncInvoker(this, (RuntimeComponentService)service, operation);
+ } else {
+ invoker = provider.createInvoker((RuntimeComponentService)service, operation);
+ }
chain.addInvoker(invoker);
}
// TODO - EPR - don't we need to get the policy from the right level in the
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java
index b18af33a39..2aadf34295 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java
@@ -61,9 +61,11 @@ import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
+import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterfaceContract;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.invocation.Phase;
@@ -198,7 +200,7 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
public synchronized InvocationChain getBindingInvocationChain() {
if (bindingInvocationChain == null) {
- bindingInvocationChain = new InvocationChainImpl(null, null, true, phaseManager);
+ bindingInvocationChain = new InvocationChainImpl(null, null, true, phaseManager, isAsyncInvocation());
initReferenceBindingInvocationChains();
}
return bindingInvocationChain;
@@ -238,10 +240,14 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
return invoker.invoke(operation, msg);
}
- public void invokeAsync(Operation operation, Message msg) {
+ public void invokeAsync(Operation operation, Message msg) throws Throwable {
msg.setOperation(operation);
invoker.invokeAsync(msg);
}
+
+ public void invokeAsyncResponse(InvokerAsync tailInvoker, Message msg){
+ invoker.invokeAsyncResponse(tailInvoker, msg);
+ }
/**
* Navigate the component/componentType inheritence chain to find the leaf contract
@@ -309,7 +315,7 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
+ "#"
+ reference.getName());
}
- InvocationChain chain = new InvocationChainImpl(operation, targetOperation, true, phaseManager);
+ InvocationChain chain = new InvocationChainImpl(operation, targetOperation, true, phaseManager, isAsyncInvocation());
if (operation.isNonBlocking()) {
addNonBlockingInterceptor(chain);
}
@@ -653,6 +659,7 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
} // end try
service.setInterfaceContract(interfaceContract);
+
String serviceName = getReference().getName() + "_asyncCallback";
service.setName(serviceName);
service.getEndpoints().add(endpoint);
@@ -661,6 +668,13 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
// Set pseudo-service onto the component
getComponent().getServices().add(service);
+
+ // if the reference has a WSDL contract reset the response endpoint to be WSDL also
+ InterfaceContract referenceInterfaceContract = getComponentTypeReferenceInterfaceContract();
+ if (referenceInterfaceContract instanceof WSDLInterfaceContract){
+ WSDLInterfaceContract wsdlInterfaceContract = (WSDLInterfaceContract)endpoint.getGeneratedWSDLContract(interfaceContract);
+ service.setInterfaceContract(wsdlInterfaceContract);
+ }
// Create a binding
// Mike had to go via the XML but I don't remember why
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java
index b5e3eec282..7700eeb79c 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java
@@ -30,12 +30,20 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.Interceptor;
+import org.apache.tuscany.sca.invocation.InterceptorAsync;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.provider.EndpointAsyncProvider;
+import org.apache.tuscany.sca.provider.ImplementationAsyncProvider;
+import org.apache.tuscany.sca.provider.ImplementationProvider;
+import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.Invocable;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.apache.tuscany.sca.work.WorkScheduler;
@@ -43,7 +51,7 @@ import org.apache.tuscany.sca.work.WorkScheduler;
* Invoker for a endpoint or endpoint reference
* @version $Rev$ $Date$
*/
-public class RuntimeInvoker implements Invoker, InvokerAsync {
+public class RuntimeInvoker implements Invoker{
protected ExtensionPointRegistry registry;
protected MessageFactory messageFactory;
protected Invocable invocable;
@@ -120,6 +128,7 @@ public class RuntimeInvoker implements Invoker, InvokerAsync {
epr.rebuild();
}
msg.setFrom((EndpointReference)invocable);
+ msg.setTo(((EndpointReference)invocable).getTargetEndpoint());
}
Operation operation = msg.getOperation();
@@ -140,12 +149,96 @@ public class RuntimeInvoker implements Invoker, InvokerAsync {
Message msgContext = ThreadMessageContext.setMessageContext(msg);
try {
// TODO - is this the way we'll pass async messages down the chain?
- headInvoker.invokeAsync(msg);
+ Message resp = null;
+ try {
+ headInvoker.invokeAsyncRequest(msg);
+ } catch (Throwable ex) {
+ // temporary fix to swallow the dummy exception that's
+ // thrown back to get past the response chain processing.
+ if (!(ex instanceof AsyncResponseException)){
+ // throw ex;
+ }
+ }
+
+ // This is async but we check the response in case there is a
+ // fault reported on the forward request, i.e. before the
+ // request reaches the binding
+ if (resp != null){
+ Object body = resp.getBody();
+ if (resp.isFault()) {
+ //throw (Throwable)body;
+ }
+ }
} finally {
ThreadMessageContext.setMessageContext(msgContext);
}
return;
}
-
+
+ public void invokeAsyncResponse(InvokerAsync tailInvoker, Message msg) {
+
+ // TODO - I pass a tail invoker in as on the service side I have one handy
+ // but calculate it here if it's not passed in
+ if (tailInvoker == null){
+ Operation operation = msg.getOperation();
+ InvocationChain chain = invocable.getInvocationChain(operation);
+
+ // find the tail invoker
+ Invoker next = chain.getHeadInvoker();
+ Invoker tail = null;
+ while (next != null){
+ tail = next;
+ if (next instanceof Interceptor){
+ next = ((Interceptor)next).getNext();
+
+ // TODO - hack to get round SCA binding optimization
+ // On the refrence side this loop will go all the way
+ // across to the service invoker so stop the look if we find
+ // an invoker with no previous pointer. This will be the point
+ // where the SCA binding invoker points to the head of the
+ // service chain
+
+ if (!(next instanceof InterceptorAsync) ||
+ ((InterceptorAsync)next).getPrevious() == null){
+ break;
+ }
+ } else {
+ next = null;
+ }
+ }
+ tailInvoker = (InvokerAsync)tail;
+ }
+
+ Message asyncResponseMsg = tailInvoker.invokeAsyncResponse(msg);
+
+ // now get the asyncResponseInvoker
+ Invoker asyncResponseInvoker = null;
+
+ // We'd want to cache this based on the reference EPR
+ if (invocable instanceof Endpoint) {
+ // get it from the binding
+ RuntimeEndpoint ep = (RuntimeEndpoint)invocable;
+ ServiceBindingProvider serviceBindingProvider = ep.getBindingProvider();
+ if (serviceBindingProvider instanceof EndpointAsyncProvider){
+ EndpointAsyncProvider asyncEndpointProvider = (EndpointAsyncProvider)serviceBindingProvider;
+ asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(asyncResponseMsg.getOperation());
+
+ } else {
+ // TODO - throw error
+ }
+ } else if (invocable instanceof EndpointReference) {
+ // get it from the implementation
+ RuntimeEndpointReference epr = (RuntimeEndpointReference)invocable;
+ ImplementationProvider implementationProvider = ((RuntimeComponent)epr.getComponent()).getImplementationProvider();
+
+ if (implementationProvider instanceof ImplementationAsyncProvider){
+ asyncResponseInvoker = ((ImplementationAsyncProvider)implementationProvider).createAsyncResponseInvoker(asyncResponseMsg.getOperation());
+ } else {
+ // TODO - throw an error
+ }
+ }
+
+ asyncResponseInvoker.invoke(asyncResponseMsg);
+ }
}
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java
index 8569af0de8..477f84e690 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java
@@ -25,8 +25,10 @@ import java.util.ListIterator;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.DataExchangeSemantics;
import org.apache.tuscany.sca.invocation.Interceptor;
+import org.apache.tuscany.sca.invocation.InterceptorAsync;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Phase;
import org.apache.tuscany.sca.invocation.PhasedInterceptor;
@@ -43,12 +45,14 @@ public class InvocationChainImpl implements InvocationChain {
private final PhaseManager phaseManager;
private boolean forReference;
private boolean allowsPassByReference;
+ private boolean isAsyncInvocation;
- public InvocationChainImpl(Operation sourceOperation, Operation targetOperation, boolean forReference, PhaseManager phaseManager) {
+ public InvocationChainImpl(Operation sourceOperation, Operation targetOperation, boolean forReference, PhaseManager phaseManager, boolean isAsyncInvocation) {
this.targetOperation = targetOperation;
this.sourceOperation = sourceOperation;
this.forReference = forReference;
this.phaseManager = phaseManager;
+ this.isAsyncInvocation = isAsyncInvocation;
}
public Operation getTargetOperation() {
@@ -119,6 +123,17 @@ public class InvocationChainImpl implements InvocationChain {
}
private void addInvoker(String phase, Invoker invoker) {
+ if (isAsyncInvocation &&
+ !(invoker instanceof InvokerAsync)){
+ // TODO - should raise an error but don't want to break
+ // the existing non-native async support
+/*
+ throw new IllegalArgumentException("Trying to add synchronous invoker " +
+ invoker.getClass().getName() +
+ " to asynchronous chain");
+*/
+ }
+
int index = phaseManager.getAllPhases().indexOf(phase);
if (index == -1) {
throw new IllegalArgumentException("Invalid phase name: " + phase);
@@ -149,11 +164,19 @@ public class InvocationChainImpl implements InvocationChain {
if (before != null) {
if (before.getInvoker() instanceof Interceptor) {
((Interceptor)before.getInvoker()).setNext(invoker);
+ if (invoker instanceof InterceptorAsync &&
+ before.getInvoker() instanceof InvokerAsync){
+ ((InterceptorAsync) invoker).setPrevious((InvokerAsync)before.getInvoker());
+ }
}
}
if (after != null) {
if (invoker instanceof Interceptor) {
((Interceptor)invoker).setNext(after.getInvoker());
+ if (after.getInvoker() instanceof InterceptorAsync &&
+ invoker instanceof InvokerAsync){
+ ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsync)invoker);
+ }
}
}
@@ -204,5 +227,9 @@ public class InvocationChainImpl implements InvocationChain {
return "(" + phaseIndex + ")" + invoker;
}
}
+
+ public boolean isAsyncInvocation() {
+ return isAsyncInvocation;
+ }
}
diff --git a/sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java b/sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java
index 1302bed681..38306317b4 100644
--- a/sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java
+++ b/sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java
@@ -38,7 +38,7 @@ public class InvocationChainImplTestCase {
@Test
public void testInsertAtEnd() throws Exception {
Operation op = newOperation("foo");
- InvocationChain chain = new InvocationChainImpl(op, op, true, new PhaseManager(new DefaultExtensionPointRegistry()));
+ InvocationChain chain = new InvocationChainImpl(op, op, true, new PhaseManager(new DefaultExtensionPointRegistry()), false);
Interceptor inter2 = new MockInterceptor();
Interceptor inter1 = new MockInterceptor();
chain.addInterceptor(inter1);
@@ -51,7 +51,7 @@ public class InvocationChainImplTestCase {
@Test
public void testAddByPhase() throws Exception {
Operation op = newOperation("foo");
- InvocationChain chain = new InvocationChainImpl(op, op, false, new PhaseManager(new DefaultExtensionPointRegistry()));
+ InvocationChain chain = new InvocationChainImpl(op, op, false, new PhaseManager(new DefaultExtensionPointRegistry()), false);
Interceptor inter1 = new MockInterceptor();
Interceptor inter2 = new MockInterceptor();
Interceptor inter3 = new MockInterceptor();