From 158c0e5d605c25a3a4b55495c9cfb9bfef900f9c Mon Sep 17 00:00:00 2001 From: antelder Date: Thu, 8 Sep 2011 10:10:02 +0000 Subject: TUSCANY-3940: Apply patch from Greg Dritschler to Change AsyncJDKInvocationHandler to use WorkScheduler directly git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1166600 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/core/invocation/AsyncFaultWrapper.java | 9 +++- .../invocation/impl/AsyncInvocationFutureImpl.java | 18 ++++++- .../invocation/impl/AsyncJDKInvocationHandler.java | 59 +++++++++------------- 3 files changed, 48 insertions(+), 38 deletions(-) diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java index 3d98de9e21..b35d493d3c 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java @@ -82,8 +82,13 @@ public class AsyncFaultWrapper { Constructor cons = xclass.getConstructor(String.class, Throwable.class); return (Throwable) cons.newInstance(faultMessage, getContainedFault().retrieveFault()); } else { - Constructor cons = xclass.getConstructor(String.class); - return (Throwable) cons.newInstance(faultMessage); + try { + Constructor cons = xclass.getConstructor(String.class); + return (Throwable) cons.newInstance(faultMessage); + } catch (NoSuchMethodException e) { + Constructor cons = xclass.getConstructor(); + return (Throwable) cons.newInstance(); + } } // end if } catch (Exception e) { return e; diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java index 213fd536e9..f8f393d040 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import javax.xml.ws.AsyncHandler; import javax.xml.ws.Response; import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; @@ -58,7 +59,8 @@ public class AsyncInvocationFutureImpl implements Future, Response, Asy private String uniqueID = UUID.randomUUID().toString(); private ClassLoader classLoader = null; - + private AsyncHandler callback; + protected AsyncInvocationFutureImpl() { super(); } // end constructor @@ -161,6 +163,9 @@ public class AsyncInvocationFutureImpl implements Future, Response, Asy } finally { lock.unlock(); } // end try + if (callback != null) { + callback.handleResponse(this); + } } // end method setFault( Throwable ) /** @@ -203,6 +208,9 @@ public class AsyncInvocationFutureImpl implements Future, Response, Asy } finally { lock.unlock(); } // end try + if (callback != null) { + callback.handleResponse(this); + } } // end method setResponse @@ -244,5 +252,13 @@ public class AsyncInvocationFutureImpl implements Future, Response, Asy this.classLoader = classLoader; } + /** + * Sets the callback handler, when the client uses the async callback method + * @param classLoader - the classloader of the business interface + */ + public void setCallback(AsyncHandler callback) { + this.callback = callback; + } + } // end class AsyncInvocationFutureImpl diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java index 29d2da1487..e37fa7754a 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java @@ -31,7 +31,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -111,17 +110,14 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { private static final long serialVersionUID = 1L; - private static int invocationCount = 10; // # of threads to use - private static long maxWaitTime = 30; // Max wait time for completion = 30sec - - // Run the async service invocations using a ThreadPoolExecutor - private ExecutorService theExecutor; + // Run the async service invocations using a WorkScheduler + private WorkScheduler scheduler; public AsyncJDKInvocationHandler(ExtensionPointRegistry registry, MessageFactory messageFactory, ServiceReference callableReference ) { super(messageFactory, callableReference); - initExecutorService(registry); + initWorkScheduler(registry); } public AsyncJDKInvocationHandler(ExtensionPointRegistry registry, @@ -129,15 +125,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { Class businessInterface, Invocable source ) { super(messageFactory, businessInterface, source); - initExecutorService(registry); + initWorkScheduler(registry); } - private final void initExecutorService(ExtensionPointRegistry registry) { + private final void initWorkScheduler(ExtensionPointRegistry registry) { UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class); - WorkScheduler scheduler = utilities.getUtility(WorkScheduler.class); - theExecutor = scheduler.getExecutorService(); - - } // end method initExecutorService + scheduler = utilities.getUtility(WorkScheduler.class); + } // end method initWorkScheduler /** * Perform the invocation of the operation @@ -157,7 +151,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { if (isAsyncCallback(method)) { return doInvokeAsyncCallback(proxy, method, args); } else if (isAsyncPoll(method)) { - return doInvokeAsyncPoll(proxy, method, args); + return doInvokeAsyncPoll(proxy, method, args, null); } else { // Regular synchronous method call return doInvokeSync(proxy, method, args); @@ -196,11 +190,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { * type of the response */ @SuppressWarnings("unchecked") - protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) { + protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args, AsyncHandler callback) { Method method = getNonAsyncMethod(asyncMethod); Class returnType = method.getReturnType(); // Allocate the Future / Response object - note: Response is a subclass of Future AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance(returnType, getInterfaceClassloader()); + if (callback != null) + future.setCallback(callback); try { invokeAsync(proxy, method, args, future, asyncMethod); } catch (Exception e) { @@ -226,12 +222,12 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance(returnType, getInterfaceClassloader()); invokeAsync(proxy, method, args, future, method); - // Wait for some maximum time for the result - 1000 seconds here + // Wait for some maximum time for the result - 120 seconds here // Really, if the service is async, the client should use async client methods to invoke the service // - and be prepared to wait a *really* long time Object response = null; try { - response = future.get(1000, TimeUnit.SECONDS); + response = future.get(120, TimeUnit.SECONDS); } catch (ExecutionException ex) { throw ex.getCause(); } @@ -254,20 +250,10 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { @SuppressWarnings("unchecked") private Object doInvokeAsyncCallback(final Object proxy, final Method asyncMethod, final Object[] args) throws Exception { - Future future = theExecutor.submit(new Callable() { - - @Override - public Response call() { - AsyncHandler handler = (AsyncHandler)args[args.length - 1]; - Response response = doInvokeAsyncPoll(proxy, asyncMethod, Arrays.copyOf(args, args.length - 1)); - // Invoke the callback handler, if present - if (handler != null) { - handler.handleResponse(response); - } // end if - return response; - } - }); - return future.get(); + + AsyncHandler callback = (AsyncHandler)args[args.length - 1]; + Response response = doInvokeAsyncPoll(proxy, asyncMethod, Arrays.copyOf(args, args.length - 1), callback); + return response; } // end method doInvokeAsyncCallback @@ -326,7 +312,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { } // end if // Perform the invocations on separate thread... - theExecutor.submit(new separateThreadInvoker(chain, args, source, future, asyncMethod, isAsyncService)); + scheduler.scheduleWork(new SeparateThreadInvoker(chain, args, source, future, asyncMethod, isAsyncService)); return; } // end method invokeAsync @@ -337,7 +323,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { * * This supports both synchronous services and asynchronous services */ - private class separateThreadInvoker implements Runnable { + private class SeparateThreadInvoker implements Runnable { private AsyncInvocationFutureImpl future; private Method asyncMethod; @@ -346,7 +332,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { private Invocable invocable; private boolean isAsyncService; - public separateThreadInvoker(InvocationChain chain, + public SeparateThreadInvoker(InvocationChain chain, Object[] args, Invocable invocable, AsyncInvocationFutureImpl future, @@ -420,6 +406,9 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { future.setWrappedFault(new AsyncFaultWrapper(s)); } // end if } // end if + else { + future.setWrappedFault(new AsyncFaultWrapper(s)); + } } catch (AsyncResponseException ar) { // This exception is received in the case where the Binding does not support async invocation // natively - the initial invocation is effectively synchronous with this exception thrown to @@ -431,7 +420,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { } // end method run - } // end class separateThreadInvoker + } // end class SeparateThreadInvoker /** * Attaches a future to the callback endpoint - so that the Future is triggered when a response is -- cgit v1.2.3