From 3c7130f5e2a0ef58e77997fd70b678172d2e1cce Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Mon, 20 Dec 2010 19:26:33 +0000 Subject: Updating core invocation code to support bindings that provide native async support - as described in TUSCANY-3801 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1051249 13f79535-47bb-0310-9956-ffa450edef68 --- .../core/assembly/impl/RuntimeEndpointImpl.java | 6 ++- .../impl/RuntimeEndpointReferenceImpl.java | 17 ++++++- .../sca/core/invocation/AsyncResponseInvoker.java | 55 +++++++++++++++++----- .../sca/core/invocation/InterceptorAsyncImpl.java | 39 ++++++++++++--- .../sca/core/invocation/RuntimeInvoker.java | 30 +++++++++--- .../core/invocation/impl/InvocationChainImpl.java | 7 +++ 6 files changed, 127 insertions(+), 27 deletions(-) (limited to 'sca-java-2.x/trunk') diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java index 2d87e50ed0..6339b96a1b 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java @@ -291,10 +291,14 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint return invoker.invoke(operation, msg); } + public void invokeAsync(Message msg){ + invoker.invokeBindingAsync(msg); + } // end method invokeAsync(Message) + public void invokeAsync(Operation operation, Message msg){ msg.setOperation(operation); invoker.invokeAsync(msg); - } + } // end method invokeAsync(Operation, Message) public void invokeAsyncResponse(Message msg){ invoker.invokeAsyncResponse(msg); diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java index ad5c9124fa..0a1d5b87eb 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java @@ -248,9 +248,22 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen invoker.invokeAsync(msg); } + public void invokeAsync(Message msg){ + invoker.invokeAsync(msg); + } + public void invokeAsyncResponse(Message msg){ - invoker.invokeAsyncResponse(msg); - } + // If there is a Binding Chain, invoke it first... + InvocationChain chain = this.getBindingInvocationChain(); + if( chain != null ) { + Invoker tailInvoker = chain.getTailInvoker(); + ((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg); + } // end if + + chain = this.getInvocationChain(msg.getOperation()); + Invoker tailInvoker = chain.getTailInvoker(); + ((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg); + } // end method invokeAsyncResponse /** * Navigate the component/componentType inheritence chain to find the leaf contract diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java index 9c327defbd..fd07911084 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java @@ -19,6 +19,8 @@ package org.apache.tuscany.sca.core.invocation; +import java.io.Serializable; + import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.provider.EndpointAsyncProvider; @@ -30,20 +32,33 @@ import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; * and hides the decision about whether the response will be processed * natively or non-natively */ -public class AsyncResponseInvoker implements InvokerAsyncResponse { - - RuntimeEndpoint requestEndpoint; - RuntimeEndpointReference responseEndpointReference; +public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializable { - public AsyncResponseInvoker(Message requestMessage){ - requestEndpoint = (RuntimeEndpoint)requestMessage.getTo(); - responseEndpointReference = (RuntimeEndpointReference)requestMessage.getFrom(); - } + /** + * + */ + private static final long serialVersionUID = -7992598227671386588L; + RuntimeEndpoint requestEndpoint; + RuntimeEndpointReference responseEndpointReference; + String responseTargetAddress; + String relatesToMsgID; + + public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint, + RuntimeEndpointReference responseEndpointReference, + String responseTargetAddress, String relatesToMsgID) { + super(); + this.requestEndpoint = requestEndpoint; + this.responseEndpointReference = responseEndpointReference; + this.responseTargetAddress = responseTargetAddress; + this.relatesToMsgID = relatesToMsgID; + } // end constructor + /** * If you have a Tuscany message you can call this */ public void invokeAsyncResponse(Message responseMessage) { + responseMessage.getHeaders().put("ASYNC_RESPONSE_INVOKER", this); if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) && (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){ // process the response as a native async response @@ -52,9 +67,25 @@ public class AsyncResponseInvoker implements InvokerAsyncResponse { // process the response as a non-native async response responseEndpointReference.invoke(responseMessage); } - } + } // end method invokeAsyncReponse(Message) - /** + public String getResponseTargetAddress() { + return responseTargetAddress; + } + + public void setResponseTargetAddress(String responseTargetAddress) { + this.responseTargetAddress = responseTargetAddress; + } + + public String getRelatesToMsgID() { + return relatesToMsgID; + } + + public void setRelatesToMsgID(String relatesToMsgID) { + this.relatesToMsgID = relatesToMsgID; + } + + /** * If you have Java beans you can call this and we'll create * a Tuscany message * @@ -66,5 +97,5 @@ public class AsyncResponseInvoker implements InvokerAsyncResponse { // turn args into a message Message responseMessage = null; invokeAsyncResponse(responseMessage); - } -} + } // end method invokeAsyncResponse(Object) +} // end class diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java index 8495445951..265311fe6b 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java @@ -60,15 +60,42 @@ public abstract class InterceptorAsyncImpl implements InterceptorAsync { return resultMsg; } - public void invokeAsyncRequest(Message msg) { - msg = processRequest(msg); - ((InvokerAsyncRequest)getNext()).invokeAsyncRequest(msg); - } + public void invokeAsyncRequest(Message msg) throws Throwable { + try{ + msg = processRequest(msg); + InvokerAsyncRequest theNext = (InvokerAsyncRequest)getNext(); + if( theNext != null ) theNext.invokeAsyncRequest(msg); + postProcessRequest(msg); + } catch (Throwable e) { + postProcessRequest(msg, e); + } // end try + } // end method invokeAsyncRequest public void invokeAsyncResponse(Message msg) { msg = processResponse(msg); - ((InvokerAsyncResponse)getPrevious()).invokeAsyncResponse(msg); - } + InvokerAsyncResponse thePrevious = (InvokerAsyncResponse)getPrevious(); + if (thePrevious != null ) thePrevious.invokeAsyncResponse(msg); + } // end method invokeAsyncResponse + + /** + * Basic null version of postProcessRequest - subclasses should override for any required + * real processing + */ + public Message postProcessRequest(Message msg) { + // Default processing is to do nothing + return msg; + } // end method postProcessRequest + + /** + * Basic null version of postProcessRequest - subclasses should override for any required + * real processing + * @throws Throwable + */ + public Message postProcessRequest(Message msg, Throwable e) throws Throwable { + // Default processing is to rethrow the exception + throw e; + } // end method postProcessRequest + /** * A testing method while I use the local SCA binding wire to look diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java index f894290b3e..62593ba895 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java @@ -45,7 +45,7 @@ import org.oasisopen.sca.ServiceRuntimeException; * Invoker for a endpoint or endpoint reference * @version $Rev$ $Date$ */ -public class RuntimeInvoker implements Invoker{ +public class RuntimeInvoker implements Invoker, InvokerAsyncRequest { protected ExtensionPointRegistry registry; protected MessageFactory messageFactory; protected Invocable invocable; @@ -70,7 +70,22 @@ public class RuntimeInvoker implements Invoker{ } finally { ThreadMessageContext.setMessageContext(context); } - } + } // end method invokeBinding + + /** + * Async Invoke of the Binding Chain + * @param msg - the message to use in the invocation + */ + public void invokeBindingAsync(Message msg) { + Message context = ThreadMessageContext.setMessageContext(msg); + try { + ((InvokerAsyncRequest)invocable.getBindingInvocationChain().getHeadInvoker()).invokeAsyncRequest(msg); + } catch (Throwable t ) { + // TODO - consider what best to do with exception + } finally { + ThreadMessageContext.setMessageContext(context); + } // end try + } // end method invokeBindingAsync public Message invoke(Message msg) { return invoke(msg.getOperation(), msg); @@ -183,10 +198,13 @@ public class RuntimeInvoker implements Invoker{ * @param msg the response message */ public void invokeAsyncResponse(Message msg) { - - InvocationChain chain = invocable.getInvocationChain(msg.getOperation()); + InvocationChain chain = invocable.getInvocationChain(msg.getOperation()); Invoker tailInvoker = chain.getTailInvoker(); - ((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg); - } + } // end method invokeAsyncResponse + + @Override + public void invokeAsyncRequest(Message msg) throws Throwable { + invokeAsync(msg); + } // end method invokeAsyncRequest } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java index 69158683ef..fff5fb3991 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java @@ -94,6 +94,13 @@ public class InvocationChainImpl implements InvocationChain { } public Invoker getTailInvoker() { + // *** + int nodeCount = nodes.size(); + if( nodeCount > 0 ) { + return nodes.get( nodeCount - 1).getInvoker(); + } // end if + // *** + // find the tail invoker Invoker next = getHeadInvoker(); Invoker tail = null; -- cgit v1.2.3