summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/core/src
diff options
context:
space:
mode:
authorslaws <slaws@13f79535-47bb-0310-9956-ffa450edef68>2010-12-03 15:22:31 +0000
committerslaws <slaws@13f79535-47bb-0310-9956-ffa450edef68>2010-12-03 15:22:31 +0000
commit471a23dbe35f1389a9fd43ee409ac4bb03d995c4 (patch)
tree570206b73e95462251d3f311076bcdb5a7da06d5 /sca-java-2.x/trunk/modules/core/src
parent4d1e8a5032010161a569df3f03285676a3d48fb0 (diff)
TUSCANY-3801 - Update the invocation chain infrastructure, and the enpoints/endpointreferences that call it, to allow async response messages to be processed backwards along the response part of the chain independently of the forward message processing.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1041866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules/core/src')
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java41
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java20
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java99
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java29
-rw-r--r--sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java4
5 files changed, 178 insertions, 15 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
index 31738c3a5d..32aa0c0646 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
@@ -75,11 +75,13 @@ import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.invocation.Phase;
import org.apache.tuscany.sca.provider.BindingProviderFactory;
import org.apache.tuscany.sca.provider.EndpointProvider;
+import org.apache.tuscany.sca.provider.ImplementationAsyncProvider;
import org.apache.tuscany.sca.provider.ImplementationProvider;
import org.apache.tuscany.sca.provider.PolicyProvider;
import org.apache.tuscany.sca.provider.PolicyProviderFactory;
@@ -209,7 +211,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
public synchronized InvocationChain getBindingInvocationChain() {
if (bindingInvocationChain == null) {
- bindingInvocationChain = new InvocationChainImpl(null, null, false, phaseManager);
+ bindingInvocationChain = new InvocationChainImpl(null, null, false, phaseManager, isAsyncInvocation());
initServiceBindingInvocationChains();
}
@@ -224,7 +226,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
/**
* A dummy invocation chain representing null as ConcurrentHashMap doesn't allow null values
*/
- private static final InvocationChain NULL_CHAIN = new InvocationChainImpl(null, null, false, null);
+ private static final InvocationChain NULL_CHAIN = new InvocationChainImpl(null, null, false, null, false);
public InvocationChain getInvocationChain(Operation operation) {
InvocationChain cached = invocationChainMap.get(operation);
@@ -286,13 +288,17 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
return invoker.invoke(operation, msg);
}
- public void invokeAsync(Operation operation, Message msg) {
+ public void invokeAsync(Operation operation, Message msg) throws Throwable {
msg.setOperation(operation);
invoker.invokeAsync(msg);
}
+
+ public void invokeAsyncResponse(InvokerAsync tailInvoker, Message msg){
+ invoker.invokeAsyncResponse(tailInvoker, msg);
+ }
/**
- * Navigate the component/componentType inheritence chain to find the leaf contract
+ * Navigate the component/componentType inheritance chain to find the leaf contract
* @param contract
* @return
*/
@@ -344,7 +350,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
+ "#"
+ service.getName());
}
- InvocationChain chain = new InvocationChainImpl(operation, targetOperation, false, phaseManager);
+ InvocationChain chain = new InvocationChainImpl(operation, targetOperation, false, phaseManager, isAsyncInvocation());
if (operation.isNonBlocking()) {
addNonBlockingInterceptor(chain);
}
@@ -630,7 +636,30 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
if (provider != null) {
Invoker invoker = null;
- invoker = provider.createInvoker((RuntimeComponentService)service, operation);
+ RuntimeComponentService runtimeService = (RuntimeComponentService)service;
+ if (runtimeService.getName().endsWith("_asyncCallback")){
+ if (provider instanceof ImplementationAsyncProvider){
+ invoker = ((ImplementationAsyncProvider)provider).createAsyncResponseInvoker(operation);
+ } else {
+ // TODO - This should be an error but taking account of the
+ // existing non-native async support
+ invoker = provider.createInvoker((RuntimeComponentService)service, operation);
+/*
+ throw new ServiceRuntimeException("Component " +
+ this.getComponent().getName() +
+ " Service " +
+ getService().getName() +
+ " implementation provider doesn't implement ImplementationAsyncProvider but the implementation uses a " +
+ "refrence interface with the asyncInvocation intent set" +
+ " - [" + this.toString() + "]");
+*/
+ }
+ } else if (isAsyncInvocation() &&
+ provider instanceof ImplementationAsyncProvider){
+ invoker = ((ImplementationAsyncProvider)provider).createAsyncInvoker(this, (RuntimeComponentService)service, operation);
+ } else {
+ invoker = provider.createInvoker((RuntimeComponentService)service, operation);
+ }
chain.addInvoker(invoker);
}
// TODO - EPR - don't we need to get the policy from the right level in the
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java
index b18af33a39..2aadf34295 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java
@@ -61,9 +61,11 @@ import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
+import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterfaceContract;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.invocation.Phase;
@@ -198,7 +200,7 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
public synchronized InvocationChain getBindingInvocationChain() {
if (bindingInvocationChain == null) {
- bindingInvocationChain = new InvocationChainImpl(null, null, true, phaseManager);
+ bindingInvocationChain = new InvocationChainImpl(null, null, true, phaseManager, isAsyncInvocation());
initReferenceBindingInvocationChains();
}
return bindingInvocationChain;
@@ -238,10 +240,14 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
return invoker.invoke(operation, msg);
}
- public void invokeAsync(Operation operation, Message msg) {
+ public void invokeAsync(Operation operation, Message msg) throws Throwable {
msg.setOperation(operation);
invoker.invokeAsync(msg);
}
+
+ public void invokeAsyncResponse(InvokerAsync tailInvoker, Message msg){
+ invoker.invokeAsyncResponse(tailInvoker, msg);
+ }
/**
* Navigate the component/componentType inheritence chain to find the leaf contract
@@ -309,7 +315,7 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
+ "#"
+ reference.getName());
}
- InvocationChain chain = new InvocationChainImpl(operation, targetOperation, true, phaseManager);
+ InvocationChain chain = new InvocationChainImpl(operation, targetOperation, true, phaseManager, isAsyncInvocation());
if (operation.isNonBlocking()) {
addNonBlockingInterceptor(chain);
}
@@ -653,6 +659,7 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
} // end try
service.setInterfaceContract(interfaceContract);
+
String serviceName = getReference().getName() + "_asyncCallback";
service.setName(serviceName);
service.getEndpoints().add(endpoint);
@@ -661,6 +668,13 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen
// Set pseudo-service onto the component
getComponent().getServices().add(service);
+
+ // if the reference has a WSDL contract reset the response endpoint to be WSDL also
+ InterfaceContract referenceInterfaceContract = getComponentTypeReferenceInterfaceContract();
+ if (referenceInterfaceContract instanceof WSDLInterfaceContract){
+ WSDLInterfaceContract wsdlInterfaceContract = (WSDLInterfaceContract)endpoint.getGeneratedWSDLContract(interfaceContract);
+ service.setInterfaceContract(wsdlInterfaceContract);
+ }
// Create a binding
// Mike had to go via the XML but I don't remember why
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java
index b5e3eec282..7700eeb79c 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java
@@ -30,12 +30,20 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.Interceptor;
+import org.apache.tuscany.sca.invocation.InterceptorAsync;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.provider.EndpointAsyncProvider;
+import org.apache.tuscany.sca.provider.ImplementationAsyncProvider;
+import org.apache.tuscany.sca.provider.ImplementationProvider;
+import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.Invocable;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.apache.tuscany.sca.work.WorkScheduler;
@@ -43,7 +51,7 @@ import org.apache.tuscany.sca.work.WorkScheduler;
* Invoker for a endpoint or endpoint reference
* @version $Rev$ $Date$
*/
-public class RuntimeInvoker implements Invoker, InvokerAsync {
+public class RuntimeInvoker implements Invoker{
protected ExtensionPointRegistry registry;
protected MessageFactory messageFactory;
protected Invocable invocable;
@@ -120,6 +128,7 @@ public class RuntimeInvoker implements Invoker, InvokerAsync {
epr.rebuild();
}
msg.setFrom((EndpointReference)invocable);
+ msg.setTo(((EndpointReference)invocable).getTargetEndpoint());
}
Operation operation = msg.getOperation();
@@ -140,12 +149,96 @@ public class RuntimeInvoker implements Invoker, InvokerAsync {
Message msgContext = ThreadMessageContext.setMessageContext(msg);
try {
// TODO - is this the way we'll pass async messages down the chain?
- headInvoker.invokeAsync(msg);
+ Message resp = null;
+ try {
+ headInvoker.invokeAsyncRequest(msg);
+ } catch (Throwable ex) {
+ // temporary fix to swallow the dummy exception that's
+ // thrown back to get past the response chain processing.
+ if (!(ex instanceof AsyncResponseException)){
+ // throw ex;
+ }
+ }
+
+ // This is async but we check the response in case there is a
+ // fault reported on the forward request, i.e. before the
+ // request reaches the binding
+ if (resp != null){
+ Object body = resp.getBody();
+ if (resp.isFault()) {
+ //throw (Throwable)body;
+ }
+ }
} finally {
ThreadMessageContext.setMessageContext(msgContext);
}
return;
}
-
+
+ public void invokeAsyncResponse(InvokerAsync tailInvoker, Message msg) {
+
+ // TODO - I pass a tail invoker in as on the service side I have one handy
+ // but calculate it here if it's not passed in
+ if (tailInvoker == null){
+ Operation operation = msg.getOperation();
+ InvocationChain chain = invocable.getInvocationChain(operation);
+
+ // find the tail invoker
+ Invoker next = chain.getHeadInvoker();
+ Invoker tail = null;
+ while (next != null){
+ tail = next;
+ if (next instanceof Interceptor){
+ next = ((Interceptor)next).getNext();
+
+ // TODO - hack to get round SCA binding optimization
+ // On the refrence side this loop will go all the way
+ // across to the service invoker so stop the look if we find
+ // an invoker with no previous pointer. This will be the point
+ // where the SCA binding invoker points to the head of the
+ // service chain
+
+ if (!(next instanceof InterceptorAsync) ||
+ ((InterceptorAsync)next).getPrevious() == null){
+ break;
+ }
+ } else {
+ next = null;
+ }
+ }
+ tailInvoker = (InvokerAsync)tail;
+ }
+
+ Message asyncResponseMsg = tailInvoker.invokeAsyncResponse(msg);
+
+ // now get the asyncResponseInvoker
+ Invoker asyncResponseInvoker = null;
+
+ // We'd want to cache this based on the reference EPR
+ if (invocable instanceof Endpoint) {
+ // get it from the binding
+ RuntimeEndpoint ep = (RuntimeEndpoint)invocable;
+ ServiceBindingProvider serviceBindingProvider = ep.getBindingProvider();
+ if (serviceBindingProvider instanceof EndpointAsyncProvider){
+ EndpointAsyncProvider asyncEndpointProvider = (EndpointAsyncProvider)serviceBindingProvider;
+ asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(asyncResponseMsg.getOperation());
+
+ } else {
+ // TODO - throw error
+ }
+ } else if (invocable instanceof EndpointReference) {
+ // get it from the implementation
+ RuntimeEndpointReference epr = (RuntimeEndpointReference)invocable;
+ ImplementationProvider implementationProvider = ((RuntimeComponent)epr.getComponent()).getImplementationProvider();
+
+ if (implementationProvider instanceof ImplementationAsyncProvider){
+ asyncResponseInvoker = ((ImplementationAsyncProvider)implementationProvider).createAsyncResponseInvoker(asyncResponseMsg.getOperation());
+ } else {
+ // TODO - throw an error
+ }
+ }
+
+ asyncResponseInvoker.invoke(asyncResponseMsg);
+ }
}
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java
index 8569af0de8..477f84e690 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java
@@ -25,8 +25,10 @@ import java.util.ListIterator;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.DataExchangeSemantics;
import org.apache.tuscany.sca.invocation.Interceptor;
+import org.apache.tuscany.sca.invocation.InterceptorAsync;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsync;
import org.apache.tuscany.sca.invocation.Phase;
import org.apache.tuscany.sca.invocation.PhasedInterceptor;
@@ -43,12 +45,14 @@ public class InvocationChainImpl implements InvocationChain {
private final PhaseManager phaseManager;
private boolean forReference;
private boolean allowsPassByReference;
+ private boolean isAsyncInvocation;
- public InvocationChainImpl(Operation sourceOperation, Operation targetOperation, boolean forReference, PhaseManager phaseManager) {
+ public InvocationChainImpl(Operation sourceOperation, Operation targetOperation, boolean forReference, PhaseManager phaseManager, boolean isAsyncInvocation) {
this.targetOperation = targetOperation;
this.sourceOperation = sourceOperation;
this.forReference = forReference;
this.phaseManager = phaseManager;
+ this.isAsyncInvocation = isAsyncInvocation;
}
public Operation getTargetOperation() {
@@ -119,6 +123,17 @@ public class InvocationChainImpl implements InvocationChain {
}
private void addInvoker(String phase, Invoker invoker) {
+ if (isAsyncInvocation &&
+ !(invoker instanceof InvokerAsync)){
+ // TODO - should raise an error but don't want to break
+ // the existing non-native async support
+/*
+ throw new IllegalArgumentException("Trying to add synchronous invoker " +
+ invoker.getClass().getName() +
+ " to asynchronous chain");
+*/
+ }
+
int index = phaseManager.getAllPhases().indexOf(phase);
if (index == -1) {
throw new IllegalArgumentException("Invalid phase name: " + phase);
@@ -149,11 +164,19 @@ public class InvocationChainImpl implements InvocationChain {
if (before != null) {
if (before.getInvoker() instanceof Interceptor) {
((Interceptor)before.getInvoker()).setNext(invoker);
+ if (invoker instanceof InterceptorAsync &&
+ before.getInvoker() instanceof InvokerAsync){
+ ((InterceptorAsync) invoker).setPrevious((InvokerAsync)before.getInvoker());
+ }
}
}
if (after != null) {
if (invoker instanceof Interceptor) {
((Interceptor)invoker).setNext(after.getInvoker());
+ if (after.getInvoker() instanceof InterceptorAsync &&
+ invoker instanceof InvokerAsync){
+ ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsync)invoker);
+ }
}
}
@@ -204,5 +227,9 @@ public class InvocationChainImpl implements InvocationChain {
return "(" + phaseIndex + ")" + invoker;
}
}
+
+ public boolean isAsyncInvocation() {
+ return isAsyncInvocation;
+ }
}
diff --git a/sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java b/sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java
index 1302bed681..38306317b4 100644
--- a/sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java
+++ b/sca-java-2.x/trunk/modules/core/src/test/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImplTestCase.java
@@ -38,7 +38,7 @@ public class InvocationChainImplTestCase {
@Test
public void testInsertAtEnd() throws Exception {
Operation op = newOperation("foo");
- InvocationChain chain = new InvocationChainImpl(op, op, true, new PhaseManager(new DefaultExtensionPointRegistry()));
+ InvocationChain chain = new InvocationChainImpl(op, op, true, new PhaseManager(new DefaultExtensionPointRegistry()), false);
Interceptor inter2 = new MockInterceptor();
Interceptor inter1 = new MockInterceptor();
chain.addInterceptor(inter1);
@@ -51,7 +51,7 @@ public class InvocationChainImplTestCase {
@Test
public void testAddByPhase() throws Exception {
Operation op = newOperation("foo");
- InvocationChain chain = new InvocationChainImpl(op, op, false, new PhaseManager(new DefaultExtensionPointRegistry()));
+ InvocationChain chain = new InvocationChainImpl(op, op, false, new PhaseManager(new DefaultExtensionPointRegistry()), false);
Interceptor inter1 = new MockInterceptor();
Interceptor inter2 = new MockInterceptor();
Interceptor inter3 = new MockInterceptor();