From 74c8e2f04c98bd599c29709203737009b75f224a Mon Sep 17 00:00:00 2001 From: slaws Date: Tue, 7 Dec 2010 11:23:53 +0000 Subject: TUSCANY-3801 - Allow the response chain "previous" link to be attached to an async response handler so that the invokerAsyncResponse can have a void return type. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1042976 13f79535-47bb-0310-9956-ffa450edef68 --- .../core/assembly/impl/RuntimeEndpointImpl.java | 71 +++++++++++++++++++--- .../impl/RuntimeEndpointReferenceImpl.java | 24 +++++++- .../sca/core/invocation/InterceptorAsyncImpl.java | 34 +++++++---- .../sca/core/invocation/RuntimeInvoker.java | 23 +++---- .../core/invocation/impl/InvocationChainImpl.java | 26 ++++---- 5 files changed, 131 insertions(+), 47 deletions(-) (limited to 'sca-java-2.x/trunk/modules/core/src/main/java') diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java index d74ee4a8f9..26fc6722aa 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java @@ -73,13 +73,15 @@ import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.invocation.Phase; import org.apache.tuscany.sca.provider.BindingProviderFactory; +import org.apache.tuscany.sca.provider.EndpointAsyncProvider; import org.apache.tuscany.sca.provider.EndpointProvider; import org.apache.tuscany.sca.provider.ImplementationAsyncProvider; import org.apache.tuscany.sca.provider.ImplementationProvider; @@ -365,6 +367,31 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } wireProcessor.process(this); + + // If we have to support async and there is no binding chain + // then set the response path to point directly to the + // binding provided async response handler + if (isAsyncInvocation() && + bindingInvocationChain == null){ + // fix up the operation chain response path to point back to the + // binding provided async response handler + ServiceBindingProvider serviceBindingProvider = getBindingProvider(); + if (serviceBindingProvider instanceof EndpointAsyncProvider){ + EndpointAsyncProvider asyncEndpointProvider = (EndpointAsyncProvider)serviceBindingProvider; + InvokerAsyncResponse asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(); + + for (InvocationChain chain : getInvocationChains()){ + Invoker invoker = chain.getHeadInvoker(); + if (invoker instanceof InterceptorAsync){ + ((InterceptorAsync)invoker).setPrevious(asyncResponseInvoker); + } else { + //TODO - throw error once the old async code is removed + } + } + } else { + // TODO - throw error once the old async code is removed + } + } } /** @@ -570,11 +597,41 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } - // TODO - add something on the end of the wire to invoke the - // invocation chain. Need to split out the runtime - // wire invoker into conversation, callback interceptors etc + // Add the runtime invoker to the end of the binding chain. + // It mediates between the binding chain and selects the + // correct invocation chain based on the operation that's + // been selected bindingInvocationChain.addInvoker(invoker); - + + if (isAsyncInvocation()){ + // fix up the invocation chains to point back to the + // binding chain so that async response messages + // are processed correctly + for (InvocationChain chain : getInvocationChains()){ + Invoker invoker = chain.getHeadInvoker(); + if (invoker instanceof InterceptorAsync){ + ((InterceptorAsync)invoker).setPrevious((InvokerAsyncResponse)bindingInvocationChain.getTailInvoker()); + } else { + // TODO - raise an error. Not doing that while + // we have the old async mechanism in play + } + } + + // fix up the binding chain response path to point back to the + // binding provided async response handler + ServiceBindingProvider serviceBindingProvider = getBindingProvider(); + if (serviceBindingProvider instanceof EndpointAsyncProvider){ + EndpointAsyncProvider asyncEndpointProvider = (EndpointAsyncProvider)serviceBindingProvider; + InvokerAsyncResponse asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(); + if (bindingInvocationChain.getHeadInvoker() instanceof InterceptorAsync){ + ((InterceptorAsync)bindingInvocationChain.getHeadInvoker()).setPrevious(asyncResponseInvoker); + } else { + //TODO - throw error once the old async code is removed + } + } else { + //TODO - throw error once the old async code is removed + } + } } /** @@ -639,7 +696,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint RuntimeComponentService runtimeService = (RuntimeComponentService)service; if (runtimeService.getName().endsWith("_asyncCallback")){ if (provider instanceof ImplementationAsyncProvider){ - invoker = ((ImplementationAsyncProvider)provider).createAsyncResponseInvoker(operation); + invoker = (Invoker)((ImplementationAsyncProvider)provider).createAsyncResponseInvoker(operation); } else { // TODO - This should be an error but taking account of the // existing non-native async support @@ -656,7 +713,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } } else if (isAsyncInvocation() && provider instanceof ImplementationAsyncProvider){ - invoker = ((ImplementationAsyncProvider)provider).createAsyncInvoker(this, (RuntimeComponentService)service, operation); + invoker = (Invoker)((ImplementationAsyncProvider)provider).createAsyncInvoker(this, (RuntimeComponentService)service, operation); } else { invoker = provider.createInvoker((RuntimeComponentService)service, operation); } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java index b8ca59b214..a854e833ac 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java @@ -63,14 +63,17 @@ import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterfaceContract; import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.invocation.Phase; import org.apache.tuscany.sca.provider.BindingProviderFactory; import org.apache.tuscany.sca.provider.EndpointReferenceProvider; +import org.apache.tuscany.sca.provider.ImplementationAsyncProvider; +import org.apache.tuscany.sca.provider.ImplementationProvider; import org.apache.tuscany.sca.provider.PolicyProvider; import org.apache.tuscany.sca.provider.PolicyProviderFactory; import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint; @@ -324,10 +327,27 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen } } - // Set the chains until it's fully populated. If we initialize too early, any exeception could + // Set the chains until it's fully populated. If we initialize too early, any exception could // leave this endpoint reference in a wrong state with an empty chain. chains = chainList; wireProcessor.process(this); + + if (isAsyncInvocation()){ + // fix up all of the operation chain response paths + // to point back to the implementation provided + // async response handler + ImplementationProvider implementationProvider = ((RuntimeComponent)getComponent()).getImplementationProvider(); + if (implementationProvider instanceof ImplementationAsyncProvider){ + for (InvocationChain chain : getInvocationChains()){ + InvokerAsyncResponse asyncResponseInvoker = ((ImplementationAsyncProvider)implementationProvider).createAsyncResponseInvoker(chain.getSourceOperation()); + if (chain.getHeadInvoker() instanceof InterceptorAsync){ + ((InterceptorAsync)chain.getHeadInvoker()).setPrevious(asyncResponseInvoker); + } else { + //TODO - throw error once the old async code is removed + } + } + } + } } /** diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java index 0c42a523a6..8495445951 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java @@ -22,7 +22,8 @@ package org.apache.tuscany.sca.core.invocation; import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; /** @@ -33,22 +34,22 @@ import org.apache.tuscany.sca.invocation.Message; */ public abstract class InterceptorAsyncImpl implements InterceptorAsync { - protected InvokerAsync next; - protected InvokerAsync previous; + protected Invoker next; + protected InvokerAsyncResponse previous; public Invoker getNext() { return (Invoker)next; } public void setNext(Invoker next) { - this.next = (InvokerAsync)next; + this.next = next; } - public InvokerAsync getPrevious() { + public InvokerAsyncResponse getPrevious() { return previous; } - public void setPrevious(InvokerAsync previous) { + public void setPrevious(InvokerAsyncResponse previous) { this.previous = previous; } @@ -61,14 +62,23 @@ public abstract class InterceptorAsyncImpl implements InterceptorAsync { public void invokeAsyncRequest(Message msg) { msg = processRequest(msg); - ((InvokerAsync)getNext()).invokeAsyncRequest(msg); + ((InvokerAsyncRequest)getNext()).invokeAsyncRequest(msg); } - public Message invokeAsyncResponse(Message msg) { + public void invokeAsyncResponse(Message msg) { msg = processResponse(msg); - if (getPrevious() != null){ - return ((InvokerAsync)getPrevious()).invokeAsyncResponse(msg); - } - return msg; + ((InvokerAsyncResponse)getPrevious()).invokeAsyncResponse(msg); + } + + /** + * A testing method while I use the local SCA binding wire to look + * at how the async response path works. This allows me to detect the + * point where the reference wire turns into the service with in the + * optimized case + * + * @return + */ + public boolean isLocalSCABIndingInvoker() { + return false; } } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java index e1ec899fa5..6c9f13ff17 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java @@ -30,20 +30,13 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.interfacedef.Operation; -import org.apache.tuscany.sca.invocation.Interceptor; -import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; -import org.apache.tuscany.sca.provider.EndpointAsyncProvider; -import org.apache.tuscany.sca.provider.ImplementationAsyncProvider; -import org.apache.tuscany.sca.provider.ImplementationProvider; -import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.Invocable; -import org.apache.tuscany.sca.runtime.RuntimeComponent; -import org.apache.tuscany.sca.runtime.RuntimeEndpoint; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; import org.apache.tuscany.sca.work.WorkScheduler; @@ -144,14 +137,14 @@ public class RuntimeInvoker implements Invoker{ } // Perform the async invocation - InvokerAsync headInvoker = (InvokerAsync)chain.getHeadInvoker(); + Invoker headInvoker = chain.getHeadInvoker(); Message msgContext = ThreadMessageContext.setMessageContext(msg); try { // TODO - is this the way we'll pass async messages down the chain? Message resp = null; try { - headInvoker.invokeAsyncRequest(msg); + ((InvokerAsyncRequest)headInvoker).invokeAsyncRequest(msg); } catch (Throwable ex) { // temporary fix to swallow the dummy exception that's // thrown back to get past the response chain processing. @@ -179,10 +172,11 @@ public class RuntimeInvoker implements Invoker{ public void invokeAsyncResponse(Message msg) { InvocationChain chain = invocable.getInvocationChain(msg.getOperation()); - InvokerAsync tailInvoker = (InvokerAsync)chain.getTailInvoker(); + Invoker tailInvoker = chain.getTailInvoker(); - Message asyncResponseMsg = tailInvoker.invokeAsyncResponse(msg); + ((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg); +/* now statically configured // now get the asyncResponseInvoker Invoker asyncResponseInvoker = null; @@ -193,7 +187,7 @@ public class RuntimeInvoker implements Invoker{ ServiceBindingProvider serviceBindingProvider = ep.getBindingProvider(); if (serviceBindingProvider instanceof EndpointAsyncProvider){ EndpointAsyncProvider asyncEndpointProvider = (EndpointAsyncProvider)serviceBindingProvider; - asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(asyncResponseMsg.getOperation()); + asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(); } else { // TODO - throw error @@ -211,5 +205,6 @@ public class RuntimeInvoker implements Invoker{ } asyncResponseInvoker.invoke(asyncResponseMsg); +*/ } } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java index 716379d141..69158683ef 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java @@ -22,13 +22,15 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; +import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.DataExchangeSemantics; import org.apache.tuscany.sca.invocation.Interceptor; import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Phase; import org.apache.tuscany.sca.invocation.PhasedInterceptor; @@ -98,19 +100,18 @@ public class InvocationChainImpl implements InvocationChain { while (next != null){ tail = next; if (next instanceof Interceptor){ - next = ((Interceptor)next).getNext(); - // TODO - hack to get round SCA binding optimization // On the reference side this loop will go all the way // across to the service invoker so stop looking if we find // an invoker with no "previous" pointer. This will be the point // where the SCA binding invoker points to the head of the // service chain - if (!(next instanceof InterceptorAsync) || - ((InterceptorAsync)next).getPrevious() == null){ + ((InterceptorAsyncImpl)next).isLocalSCABIndingInvoker()){ break; } + + next = ((Interceptor)next).getNext(); } else { next = null; } @@ -152,7 +153,8 @@ public class InvocationChainImpl implements InvocationChain { private void addInvoker(String phase, Invoker invoker) { if (isAsyncInvocation && - !(invoker instanceof InvokerAsync)){ + !(invoker instanceof InvokerAsyncRequest) && + !(invoker instanceof InvokerAsyncResponse) ){ // TODO - should raise an error but don't want to break // the existing non-native async support /* @@ -192,18 +194,18 @@ public class InvocationChainImpl implements InvocationChain { if (before != null) { if (before.getInvoker() instanceof Interceptor) { ((Interceptor)before.getInvoker()).setNext(invoker); - if (invoker instanceof InterceptorAsync && - before.getInvoker() instanceof InvokerAsync){ - ((InterceptorAsync) invoker).setPrevious((InvokerAsync)before.getInvoker()); + if ((invoker instanceof InterceptorAsync) && + (before.getInvoker() instanceof InvokerAsyncResponse)) { + ((InterceptorAsync) invoker).setPrevious((InvokerAsyncResponse)before.getInvoker()); } } } if (after != null) { if (invoker instanceof Interceptor) { ((Interceptor)invoker).setNext(after.getInvoker()); - if (after.getInvoker() instanceof InterceptorAsync && - invoker instanceof InvokerAsync){ - ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsync)invoker); + if ((after.getInvoker() instanceof InterceptorAsync) && + (invoker instanceof InvokerAsyncResponse)){ + ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsyncResponse)invoker); } } } -- cgit v1.2.3