From b55bad231eba7a750c94e0fcfea8212260bff1f0 Mon Sep 17 00:00:00 2001 From: slaws Date: Wed, 25 Aug 2010 15:02:01 +0000 Subject: TUSCANY-3659 - Enable asynch operation over the local SCA binding. This does what I think is the right thing but it doesn't necessarily do it in an optimal way yet. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@989157 13f79535-47bb-0310-9956-ffa450edef68 --- .../provider/RuntimeSCAReferenceBindingProvider.java | 2 +- .../sca/binding/sca/provider/SCABindingInvoker.java | 15 ++++++++++++++- .../invocation/impl/AsyncJDKInvocationHandler.java | 17 ++++++++++++++++- .../invocation/impl/AsyncResponseHandlerImpl.java | 4 ++++ .../sca/core/invocation/impl/JDKProxyFactory.java | 20 ++++++++++++-------- .../java/invocation/ResponseDispatchImpl.java | 9 ++++++++- 6 files changed, 55 insertions(+), 12 deletions(-) (limited to 'sca-java-2.x/trunk') diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java index 88c6888ba9..08f2bee3b3 100644 --- a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java +++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java @@ -154,7 +154,7 @@ public class RuntimeSCAReferenceBindingProvider implements EndpointReferenceProv // it turns out that the chain source and target operations are the same, and are the operation // from the target, not sure if thats by design or a bug. The SCA binding invoker needs to know // the source and target class loaders so pass in the real source operation in the constructor - return chain == null ? null : new SCABindingInvoker(chain, operation, mediator, passByValue); + return chain == null ? null : new SCABindingInvoker(chain, operation, mediator, passByValue, epr); } } return null; diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java index a0b976c7ad..c2a9038367 100644 --- a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java +++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java @@ -26,6 +26,8 @@ import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.Phase; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; /** * @version $Rev$ $Date$ @@ -36,17 +38,21 @@ public class SCABindingInvoker implements Interceptor { private Operation sourceOperation; private Operation targetOperation; private boolean passByValue; + private RuntimeEndpointReference epr; + private RuntimeEndpoint ep; /** * Construct a SCABindingInvoker that delegates to the service invocaiton chain */ - public SCABindingInvoker(InvocationChain chain, Operation sourceOperation, Mediator mediator, boolean passByValue) { + public SCABindingInvoker(InvocationChain chain, Operation sourceOperation, Mediator mediator, boolean passByValue, RuntimeEndpointReference epr) { super(); this.chain = chain; this.mediator = mediator; this.sourceOperation = sourceOperation; this.targetOperation = chain.getTargetOperation(); this.passByValue = passByValue; + this.epr = epr; + this.ep = (RuntimeEndpoint)epr.getTargetEndpoint(); } /** @@ -71,6 +77,13 @@ public class SCABindingInvoker implements Interceptor { if (passByValue) { msg.setBody(mediator.copyInput(msg.getBody(), sourceOperation, targetOperation)); } + + ep.getInvocationChains(); + if ( !ep.getCallbackEndpointReferences().isEmpty() ) { + RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference) ep.getCallbackEndpointReferences().get(0); + // Place a link to the callback EPR into the message headers... + msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR ); + } Message resultMsg = getNext().invoke(msg); diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java index e0e219d3f1..4371399f58 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java @@ -26,6 +26,7 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -57,6 +58,7 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseException; import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; @@ -118,6 +120,11 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + // force the bind of the reference so that we can look at the + // target contract to see if it's asynchronous + source.getInvocationChains(); + if (isAsyncCallback(method)) { return doInvokeAsyncCallback(proxy, method, args); } else if (isAsyncPoll(method)) { @@ -189,7 +196,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { // Wait for some maximum time for the result - 1000 seconds here // Really, if the service is async, the client should use async client methods to invoke the service // - and be prepared to wait a *really* long time - return future.get(1000, TimeUnit.SECONDS); + Object response = null; + try { + response = future.get(1000, TimeUnit.SECONDS); + } catch(ExecutionException ex) { + throw ex.getCause(); + } + return response; } else { // Target service is not asynchronous, so perform sync invocation return super.invoke(proxy, method, args); @@ -308,6 +321,8 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { future.setFault( new AsyncFaultWrapper( s ) ); } // end if } // end if + } catch ( AsyncResponseException ar ) { + // do nothing } catch ( Throwable t ) { System.out.println("Async invoke got exception: " + t.toString()); future.setFault( new AsyncFaultWrapper( t ) ); diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java index 8d56088c44..aa9cf4ad48 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java @@ -157,6 +157,10 @@ public class AsyncResponseHandlerImpl implements AsyncResponseHandler, public Message invoke(Message msg) { // Get the unique ID from the message header String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID); + if (idValue == null){ + idValue = (String)msg.getHeaders().get("MESSAGE_ID"); + } + if( idValue == null ) { System.out.println( "Async message ID not found "); } else { diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java index a162110835..87a7f0910b 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java @@ -66,11 +66,13 @@ public class JDKProxyFactory implements ProxyFactory, LifeCycleListener { public T createProxy(final Class interfaze, Invocable invocable) throws ProxyCreationException { if (invocable instanceof RuntimeEndpoint) { InvocationHandler handler; - if (isAsync(interfaze)) { +// TUSCANY-3659 - Always install a asynch handler regardless of whether ref is sync or async +// needs tidying +// if (isAsync(interfaze)) { handler = new AsyncJDKInvocationHandler(messageFactory, interfaze, invocable); - } else { - handler = new JDKInvocationHandler(messageFactory, interfaze, invocable); - } +// } else { +// handler = new JDKInvocationHandler(messageFactory, interfaze, invocable); +// } // Allow privileged access to class loader. Requires RuntimePermission in security policy. ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction() { public ClassLoader run() { @@ -88,11 +90,13 @@ public class JDKProxyFactory implements ProxyFactory, LifeCycleListener { assert callableReference != null; final Class interfaze = callableReference.getBusinessInterface(); InvocationHandler handler; - if (isAsync(interfaze)) { +// TUSCANY-3659 - Always install a asynch handler regardless of whether ref is sync or async +// needs tidying +// if (isAsync(interfaze)) { handler = new AsyncJDKInvocationHandler(messageFactory, callableReference); - } else { - handler = new JDKInvocationHandler(messageFactory, callableReference); - } +// } else { +// handler = new JDKInvocationHandler(messageFactory, callableReference); +// } // Allow privileged access to class loader. Requires RuntimePermission in security policy. ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction() { public ClassLoader run() { diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java index 0d56a6ef9d..dc0bb94bde 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java @@ -63,6 +63,7 @@ public class ResponseDispatchImpl implements ResponseDispatch, Serializabl */ private static final long serialVersionUID = 300158355992568592L; private static String WS_MESSAGE_ID = "WS_MESSAGE_ID"; + private static String MESSAGE_ID = "MESSAGE_ID"; // A latch used to ensure that the sendResponse() and sendFault() operations are used at most once // The latch is initialized with the value "false" @@ -87,7 +88,12 @@ public class ResponseDispatchImpl implements ResponseDispatch, Serializabl callbackRef = getAsyncCallbackRef( msg ); callbackAddress = msg.getFrom().getCallbackEndpoint().getURI(); - messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID); + + // TODO - why is WS stuff bleeding into general code? + messageID = (String) msg.getHeaders().get(MESSAGE_ID); + if (messageID == null){ + messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID); + } } // end constructor @@ -206,6 +212,7 @@ public class ResponseDispatchImpl implements ResponseDispatch, Serializabl // Add in the header for the RelatesTo Message ID msgContext.getHeaders().put(WS_MESSAGE_ID, messageID); + msgContext.getHeaders().put(MESSAGE_ID, messageID); ThreadMessageContext.setMessageContext(msgContext); } // end method setResponseHeaders -- cgit v1.2.3