From 4e3151407cee25a65b2c07de2c31873183a9395e Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Tue, 29 Jun 2010 22:27:13 +0000 Subject: Client side asynchronous support - extending the async client invokers to fully support the JAXWS async client API pattern as described in TUSCANY-3612 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@959131 13f79535-47bb-0310-9956-ffa450edef68 --- .../invocation/impl/AsyncInvocationFutureImpl.java | 177 +++++++++++++++++++++ .../invocation/impl/AsyncJDKInvocationHandler.java | 66 +++++++- 2 files changed, 239 insertions(+), 4 deletions(-) create mode 100644 sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java (limited to 'sca-java-2.x') diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java new file mode 100644 index 0000000000..20eada7432 --- /dev/null +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java @@ -0,0 +1,177 @@ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.xml.ws.Response; + +/** + * A class which provides an Implementation of a Future and Response for use with the JAXWS defined client + * asynchronous APIs. + * + * This implementation class provides the interfaces for use by the client code, but also provides methods for the + * Tuscany system code to set the result of the asynchronous service invocation, both Regular and Fault responses. + * + * This class is constructed to be fully thread-safe + * + * @param - this is the type of the response message from the invoked service. + */ +public class AsyncInvocationFutureImpl implements Future, Response { + + // Lock for handling the completion of this Future + private final Lock lock = new ReentrantLock(); + private final Condition isDone = lock.newCondition(); + + // The result + private volatile V response = null; + private volatile Throwable fault = null; + + protected AsyncInvocationFutureImpl() { + super(); + } // end constructor + + /** + * Public constructor for AsyncInvocationFutureImpl - newInstance is necessary in order to enable the Type variable + * to be set for the class instances + * @param - the type of the response from the asynchronously invoked service + * @param type - the type of the AsyncInvocationFutureImpl expressed as a parameter + * @return - an instance of AsyncInvocationFutureImpl + */ + public static AsyncInvocationFutureImpl newInstance( Class type ) { + return new AsyncInvocationFutureImpl(); + } + + /** + * Cancels the asynchronous process + * - not possible in this version, so always returns false + */ + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + /** + * Gets the response value returned by the asynchronous process + * - waits forever + * @return - the response value of type V + * @throws InterruptedException if the get() method was interrupted while waiting for the async process to finish + * @throws ExecutionException if the async process threw an exception - the exception thrown is nested + */ + public V get() throws InterruptedException, ExecutionException { + try { + V response = get(Long.MAX_VALUE, TimeUnit.SECONDS); + return response; + } catch (TimeoutException t) { + throw new InterruptedException("Timed out waiting for Future to complete"); + } // end try + } // end method get() + + /** + * Gets the response value returned by the asynchronous process + * @return - the response value of type V + * @throws InterruptedException if the get() method was interrupted while waiting for the async process to finish + * @throws ExecutionException if the async process threw an exception - the exception thrown is nested + * @throws TimeoutException if the get() method timed out waiting for the async process to finish + */ + public V get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + lock.lock(); + try { + // wait for result to be available + if( notSetYet() ) isDone.await( timeout, unit); + if( response != null ) return response; + if( fault != null ) throw new ExecutionException( fault ); + throw new TimeoutException("get on this Future timed out"); + } finally { + lock.unlock(); + } // end try + + } // end method get(long timeout, TimeUnit unit) + + /** + * Indicates if the asynchronous process has been cancelled + * - not possible in this version so always returns false + */ + public boolean isCancelled() { + return false; + } + + /** + * Indicates if the asynchronous process is completed + * @return - true if the process is completed, false otherwise + */ + public boolean isDone() { + lock.lock(); + try { + return !notSetYet(); + } finally { + lock.unlock(); + } // end try + } // end method isDone + + /** + * Async process completed with a Fault. Must only be invoked once + * @param e - the Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + public void setFault(Throwable e) { + + lock.lock(); + try { + if( notSetYet() ) { + fault = e; + isDone.signalAll(); + } else { + throw new IllegalStateException("setResponse() or setFault() has been called previously"); + } // end if + } finally { + lock.unlock(); + } // end try + + } // end method setFault + + /** + * Async process completed with a response message. Must only be invoked once + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + * @param res - the response message, which is of type V + */ + public void setResponse(V res) { + + lock.lock(); + try { + if( notSetYet() ) { + response = res; + isDone.signalAll(); + } else { + throw new IllegalStateException("setResponse() or setFault() has been called previously"); + } + } finally { + lock.unlock(); + } // end try + + } // end method setResponse + + /** + * Indicates that setting a response value is OK - can only set the response value or fault once + * @return - true if it is OK to set the response, false otherwise + */ + private boolean notSetYet() { + return ( response == null && fault == null ); + } + + /** + * Returns the JAXWS context for the response + * @return - a Map containing the context + */ + public Map getContext() { + // TODO Auto-generated method stub + return null; + } + +} // end class AsyncInvocationFutureImpl diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java index bb2e34b311..940ac37935 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java @@ -20,6 +20,7 @@ package org.apache.tuscany.sca.core.invocation.impl; import java.lang.reflect.Method; +import java.lang.reflect.Type; import java.util.Arrays; import java.util.concurrent.Future; @@ -30,6 +31,21 @@ import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.runtime.Invocable; import org.oasisopen.sca.ServiceReference; +/** + * An InvocationHandler which deals with JAXWS-defined asynchronous client Java API method calls + * + * 2 asynchronous mappings exist for any given synchronous service operation, as shown in this example: + * public interface StockQuote { + * float getPrice(String ticker); + * Response getPriceAsync(String ticker); + * Future getPriceAsync(String ticker, AsyncHandler handler); + * } + * + * - the second method is called the "polling method", since the returned Response object permits + * the client to poll to see if the async call has completed + * - the third method is called the "async callback method", since in this case the client application can specify + * a callback operation that is automatically called when the async call completes + */ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { private static final long serialVersionUID = 1L; @@ -44,6 +60,10 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { super(messageFactory, businessInterface, source); } + /** + * Perform the invocation of the operation + * - provides support for all 3 forms of client method: synchronous, polling and async callback + */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (isAsyncCallback(method)) { @@ -51,10 +71,16 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { } else if (isAsyncPoll(method)) { return doInvokeAsyncPoll(proxy, method, args); } else { + // Regular synchronous method call return super.invoke(proxy, method, args); } } + /** + * Indicates if a supplied method has the form of an async callback method + * @param method - the method + * @return - true if the method has the form of an async callback + */ protected boolean isAsyncCallback(Method method) { if (method.getName().endsWith("Async") && (method.getReturnType().isAssignableFrom(Future.class))) { if (method.getParameterTypes().length > 0) { @@ -64,31 +90,63 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { return false; } + /** + * Indicates is a supplied method has the form of an async polling method + * @param method - the method + * @return - true if the method has the form of an async polling method + */ protected boolean isAsyncPoll(Method method) { return method.getName().endsWith("Async") && (method.getReturnType().isAssignableFrom(Response.class)); } - protected AsyncResponse doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) { + /** + * Invoke an async polling method + * @param proxy - the reference proxy + * @param asyncMethod - the async method to invoke + * @param args - array of input arguments to the method + * @return - the Response object that is returned to the client application, typed by the + * type of the response + */ + protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) { Object response; boolean isException; + Class returnType = getNonAsyncMethod(asyncMethod).getReturnType(); + // Allocate the Future / Response object - note: Response is a subclass of Future + AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType ); try { response = super.invoke(proxy, getNonAsyncMethod(asyncMethod), args); isException = false; + future.setResponse(response); } catch (Throwable e) { response = e; isException = true; + future.setFault(e); } - return new AsyncResponse(response, isException); + return future; + //return new AsyncResponse(response, isException); } + /** + * Invoke an async callback method + * @param proxy - the reference proxy + * @param asyncMethod - the async method to invoke + * @param args - array of input arguments to the method + * @return - the Future object that is returned to the client application, typed by the type of + * the response + */ private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod, Object[] args) { AsyncHandler handler = (AsyncHandler)args[args.length-1]; Response response = doInvokeAsyncPoll(proxy,asyncMethod,Arrays.copyOf(args, args.length-1)); handler.handleResponse(response); - return null; + return response; } + /** + * Return the synchronous method that is the equivalent of an async method + * @param asyncMethod - the async method + * @return - the equivalent synchronous method + */ protected Method getNonAsyncMethod(Method asyncMethod) { String methodName = asyncMethod.getName().substring(0, asyncMethod.getName().length()-5); for (Method m : businessInterface.getMethods()) { @@ -96,6 +154,6 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { return m; } } - throw new IllegalStateException("No non-async method matching async method " + asyncMethod.getName()); + throw new IllegalStateException("No synchronous method matching async method " + asyncMethod.getName()); } } -- cgit v1.2.3