summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x
diff options
context:
space:
mode:
authoredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2010-06-29 22:27:13 +0000
committeredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2010-06-29 22:27:13 +0000
commit4e3151407cee25a65b2c07de2c31873183a9395e (patch)
tree565bd16d950b837d66dbc8733c3436f037a3f9c4 /sca-java-2.x
parent67bed8671084b529f037c8fb8ebb209272e99305 (diff)
Client side asynchronous support - extending the async client invokers to fully support the JAXWS async client API pattern as described in TUSCANY-3612
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@959131 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x')
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java177
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java66
2 files changed, 239 insertions, 4 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
new file mode 100644
index 0000000000..20eada7432
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
@@ -0,0 +1,177 @@
+
+package org.apache.tuscany.sca.core.invocation.impl;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.xml.ws.Response;
+
+/**
+ * A class which provides an Implementation of a Future<V> and Response<V> for use with the JAXWS defined client
+ * asynchronous APIs.
+ *
+ * This implementation class provides the interfaces for use by the client code, but also provides methods for the
+ * Tuscany system code to set the result of the asynchronous service invocation, both Regular and Fault responses.
+ *
+ * This class is constructed to be fully thread-safe
+ *
+ * @param <V> - this is the type of the response message from the invoked service.
+ */
+public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V> {
+
+ // Lock for handling the completion of this Future
+ private final Lock lock = new ReentrantLock();
+ private final Condition isDone = lock.newCondition();
+
+ // The result
+ private volatile V response = null;
+ private volatile Throwable fault = null;
+
+ protected AsyncInvocationFutureImpl() {
+ super();
+ } // end constructor
+
+ /**
+ * Public constructor for AsyncInvocationFutureImpl - newInstance is necessary in order to enable the Type variable
+ * to be set for the class instances
+ * @param <V> - the type of the response from the asynchronously invoked service
+ * @param type - the type of the AsyncInvocationFutureImpl expressed as a parameter
+ * @return - an instance of AsyncInvocationFutureImpl<V>
+ */
+ public static <V> AsyncInvocationFutureImpl<V> newInstance( Class<V> type ) {
+ return new AsyncInvocationFutureImpl<V>();
+ }
+
+ /**
+ * Cancels the asynchronous process
+ * - not possible in this version, so always returns false
+ */
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ /**
+ * Gets the response value returned by the asynchronous process
+ * - waits forever
+ * @return - the response value of type V
+ * @throws InterruptedException if the get() method was interrupted while waiting for the async process to finish
+ * @throws ExecutionException if the async process threw an exception - the exception thrown is nested
+ */
+ public V get() throws InterruptedException, ExecutionException {
+ try {
+ V response = get(Long.MAX_VALUE, TimeUnit.SECONDS);
+ return response;
+ } catch (TimeoutException t) {
+ throw new InterruptedException("Timed out waiting for Future to complete");
+ } // end try
+ } // end method get()
+
+ /**
+ * Gets the response value returned by the asynchronous process
+ * @return - the response value of type V
+ * @throws InterruptedException if the get() method was interrupted while waiting for the async process to finish
+ * @throws ExecutionException if the async process threw an exception - the exception thrown is nested
+ * @throws TimeoutException if the get() method timed out waiting for the async process to finish
+ */
+ public V get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ lock.lock();
+ try {
+ // wait for result to be available
+ if( notSetYet() ) isDone.await( timeout, unit);
+ if( response != null ) return response;
+ if( fault != null ) throw new ExecutionException( fault );
+ throw new TimeoutException("get on this Future timed out");
+ } finally {
+ lock.unlock();
+ } // end try
+
+ } // end method get(long timeout, TimeUnit unit)
+
+ /**
+ * Indicates if the asynchronous process has been cancelled
+ * - not possible in this version so always returns false
+ */
+ public boolean isCancelled() {
+ return false;
+ }
+
+ /**
+ * Indicates if the asynchronous process is completed
+ * @return - true if the process is completed, false otherwise
+ */
+ public boolean isDone() {
+ lock.lock();
+ try {
+ return !notSetYet();
+ } finally {
+ lock.unlock();
+ } // end try
+ } // end method isDone
+
+ /**
+ * Async process completed with a Fault. Must only be invoked once
+ * @param e - the Fault to send
+ * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously
+ */
+ public void setFault(Throwable e) {
+
+ lock.lock();
+ try {
+ if( notSetYet() ) {
+ fault = e;
+ isDone.signalAll();
+ } else {
+ throw new IllegalStateException("setResponse() or setFault() has been called previously");
+ } // end if
+ } finally {
+ lock.unlock();
+ } // end try
+
+ } // end method setFault
+
+ /**
+ * Async process completed with a response message. Must only be invoked once
+ * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously
+ * @param res - the response message, which is of type V
+ */
+ public void setResponse(V res) {
+
+ lock.lock();
+ try {
+ if( notSetYet() ) {
+ response = res;
+ isDone.signalAll();
+ } else {
+ throw new IllegalStateException("setResponse() or setFault() has been called previously");
+ }
+ } finally {
+ lock.unlock();
+ } // end try
+
+ } // end method setResponse
+
+ /**
+ * Indicates that setting a response value is OK - can only set the response value or fault once
+ * @return - true if it is OK to set the response, false otherwise
+ */
+ private boolean notSetYet() {
+ return ( response == null && fault == null );
+ }
+
+ /**
+ * Returns the JAXWS context for the response
+ * @return - a Map containing the context
+ */
+ public Map<String, Object> getContext() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+} // end class AsyncInvocationFutureImpl
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
index bb2e34b311..940ac37935 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
@@ -20,6 +20,7 @@
package org.apache.tuscany.sca.core.invocation.impl;
import java.lang.reflect.Method;
+import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.concurrent.Future;
@@ -30,6 +31,21 @@ import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.runtime.Invocable;
import org.oasisopen.sca.ServiceReference;
+/**
+ * An InvocationHandler which deals with JAXWS-defined asynchronous client Java API method calls
+ *
+ * 2 asynchronous mappings exist for any given synchronous service operation, as shown in this example:
+ * public interface StockQuote {
+ * float getPrice(String ticker);
+ * Response<Float> getPriceAsync(String ticker);
+ * Future<?> getPriceAsync(String ticker, AsyncHandler<Float> handler);
+ * }
+ *
+ * - the second method is called the "polling method", since the returned Response<?> object permits
+ * the client to poll to see if the async call has completed
+ * - the third method is called the "async callback method", since in this case the client application can specify
+ * a callback operation that is automatically called when the async call completes
+ */
public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
private static final long serialVersionUID = 1L;
@@ -44,6 +60,10 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
super(messageFactory, businessInterface, source);
}
+ /**
+ * Perform the invocation of the operation
+ * - provides support for all 3 forms of client method: synchronous, polling and async callback
+ */
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (isAsyncCallback(method)) {
@@ -51,10 +71,16 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
} else if (isAsyncPoll(method)) {
return doInvokeAsyncPoll(proxy, method, args);
} else {
+ // Regular synchronous method call
return super.invoke(proxy, method, args);
}
}
+ /**
+ * Indicates if a supplied method has the form of an async callback method
+ * @param method - the method
+ * @return - true if the method has the form of an async callback
+ */
protected boolean isAsyncCallback(Method method) {
if (method.getName().endsWith("Async") && (method.getReturnType().isAssignableFrom(Future.class))) {
if (method.getParameterTypes().length > 0) {
@@ -64,31 +90,63 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
return false;
}
+ /**
+ * Indicates is a supplied method has the form of an async polling method
+ * @param method - the method
+ * @return - true if the method has the form of an async polling method
+ */
protected boolean isAsyncPoll(Method method) {
return method.getName().endsWith("Async") && (method.getReturnType().isAssignableFrom(Response.class));
}
- protected AsyncResponse doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) {
+ /**
+ * Invoke an async polling method
+ * @param proxy - the reference proxy
+ * @param asyncMethod - the async method to invoke
+ * @param args - array of input arguments to the method
+ * @return - the Response<?> object that is returned to the client application, typed by the
+ * type of the response
+ */
+ protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) {
Object response;
boolean isException;
+ Class<?> returnType = getNonAsyncMethod(asyncMethod).getReturnType();
+ // Allocate the Future<?> / Response<?> object - note: Response<?> is a subclass of Future<?>
+ AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType );
try {
response = super.invoke(proxy, getNonAsyncMethod(asyncMethod), args);
isException = false;
+ future.setResponse(response);
} catch (Throwable e) {
response = e;
isException = true;
+ future.setFault(e);
}
- return new AsyncResponse(response, isException);
+ return future;
+ //return new AsyncResponse(response, isException);
}
+ /**
+ * Invoke an async callback method
+ * @param proxy - the reference proxy
+ * @param asyncMethod - the async method to invoke
+ * @param args - array of input arguments to the method
+ * @return - the Future<?> object that is returned to the client application, typed by the type of
+ * the response
+ */
private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod, Object[] args) {
AsyncHandler handler = (AsyncHandler)args[args.length-1];
Response response = doInvokeAsyncPoll(proxy,asyncMethod,Arrays.copyOf(args, args.length-1));
handler.handleResponse(response);
- return null;
+ return response;
}
+ /**
+ * Return the synchronous method that is the equivalent of an async method
+ * @param asyncMethod - the async method
+ * @return - the equivalent synchronous method
+ */
protected Method getNonAsyncMethod(Method asyncMethod) {
String methodName = asyncMethod.getName().substring(0, asyncMethod.getName().length()-5);
for (Method m : businessInterface.getMethods()) {
@@ -96,6 +154,6 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
return m;
}
}
- throw new IllegalStateException("No non-async method matching async method " + asyncMethod.getName());
+ throw new IllegalStateException("No synchronous method matching async method " + asyncMethod.getName());
}
}