summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2011-09-08 10:10:02 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2011-09-08 10:10:02 +0000
commit158c0e5d605c25a3a4b55495c9cfb9bfef900f9c (patch)
tree468efdc3d50079f0f53bfdda1caa3a7f899b9a2f /sca-java-2.x
parent3a6bea46ba20cd39274d949a66171f9ff8e3ed79 (diff)
TUSCANY-3940: Apply patch from Greg Dritschler to Change AsyncJDKInvocationHandler to use WorkScheduler directly
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1166600 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x')
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java9
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java18
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java59
3 files changed, 48 insertions, 38 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
index 3d98de9e21..b35d493d3c 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
@@ -82,8 +82,13 @@ public class AsyncFaultWrapper {
Constructor cons = xclass.getConstructor(String.class, Throwable.class);
return (Throwable) cons.newInstance(faultMessage, getContainedFault().retrieveFault());
} else {
- Constructor cons = xclass.getConstructor(String.class);
- return (Throwable) cons.newInstance(faultMessage);
+ try {
+ Constructor cons = xclass.getConstructor(String.class);
+ return (Throwable) cons.newInstance(faultMessage);
+ } catch (NoSuchMethodException e) {
+ Constructor cons = xclass.getConstructor();
+ return (Throwable) cons.newInstance();
+ }
} // end if
} catch (Exception e) {
return e;
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
index 213fd536e9..f8f393d040 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.xml.ws.AsyncHandler;
import javax.xml.ws.Response;
import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
@@ -58,7 +59,8 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
private String uniqueID = UUID.randomUUID().toString();
private ClassLoader classLoader = null;
-
+ private AsyncHandler callback;
+
protected AsyncInvocationFutureImpl() {
super();
} // end constructor
@@ -161,6 +163,9 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
} finally {
lock.unlock();
} // end try
+ if (callback != null) {
+ callback.handleResponse(this);
+ }
} // end method setFault( Throwable )
/**
@@ -203,6 +208,9 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
} finally {
lock.unlock();
} // end try
+ if (callback != null) {
+ callback.handleResponse(this);
+ }
} // end method setResponse
@@ -244,5 +252,13 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
this.classLoader = classLoader;
}
+ /**
+ * Sets the callback handler, when the client uses the async callback method
+ * @param classLoader - the classloader of the business interface
+ */
+ public void setCallback(AsyncHandler callback) {
+ this.callback = callback;
+ }
+
} // end class AsyncInvocationFutureImpl
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
index 29d2da1487..e37fa7754a 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
@@ -31,7 +31,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -111,17 +110,14 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
private static final long serialVersionUID = 1L;
- private static int invocationCount = 10; // # of threads to use
- private static long maxWaitTime = 30; // Max wait time for completion = 30sec
-
- // Run the async service invocations using a ThreadPoolExecutor
- private ExecutorService theExecutor;
+ // Run the async service invocations using a WorkScheduler
+ private WorkScheduler scheduler;
public AsyncJDKInvocationHandler(ExtensionPointRegistry registry,
MessageFactory messageFactory,
ServiceReference<?> callableReference ) {
super(messageFactory, callableReference);
- initExecutorService(registry);
+ initWorkScheduler(registry);
}
public AsyncJDKInvocationHandler(ExtensionPointRegistry registry,
@@ -129,15 +125,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
Class<?> businessInterface,
Invocable source ) {
super(messageFactory, businessInterface, source);
- initExecutorService(registry);
+ initWorkScheduler(registry);
}
- private final void initExecutorService(ExtensionPointRegistry registry) {
+ private final void initWorkScheduler(ExtensionPointRegistry registry) {
UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class);
- WorkScheduler scheduler = utilities.getUtility(WorkScheduler.class);
- theExecutor = scheduler.getExecutorService();
-
- } // end method initExecutorService
+ scheduler = utilities.getUtility(WorkScheduler.class);
+ } // end method initWorkScheduler
/**
* Perform the invocation of the operation
@@ -157,7 +151,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
if (isAsyncCallback(method)) {
return doInvokeAsyncCallback(proxy, method, args);
} else if (isAsyncPoll(method)) {
- return doInvokeAsyncPoll(proxy, method, args);
+ return doInvokeAsyncPoll(proxy, method, args, null);
} else {
// Regular synchronous method call
return doInvokeSync(proxy, method, args);
@@ -196,11 +190,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
* type of the response
*/
@SuppressWarnings("unchecked")
- protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) {
+ protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args, AsyncHandler callback) {
Method method = getNonAsyncMethod(asyncMethod);
Class<?> returnType = method.getReturnType();
// Allocate the Future<?> / Response<?> object - note: Response<?> is a subclass of Future<?>
AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance(returnType, getInterfaceClassloader());
+ if (callback != null)
+ future.setCallback(callback);
try {
invokeAsync(proxy, method, args, future, asyncMethod);
} catch (Exception e) {
@@ -226,12 +222,12 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
AsyncInvocationFutureImpl future =
AsyncInvocationFutureImpl.newInstance(returnType, getInterfaceClassloader());
invokeAsync(proxy, method, args, future, method);
- // Wait for some maximum time for the result - 1000 seconds here
+ // Wait for some maximum time for the result - 120 seconds here
// Really, if the service is async, the client should use async client methods to invoke the service
// - and be prepared to wait a *really* long time
Object response = null;
try {
- response = future.get(1000, TimeUnit.SECONDS);
+ response = future.get(120, TimeUnit.SECONDS);
} catch (ExecutionException ex) {
throw ex.getCause();
}
@@ -254,20 +250,10 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
@SuppressWarnings("unchecked")
private Object doInvokeAsyncCallback(final Object proxy, final Method asyncMethod, final Object[] args)
throws Exception {
- Future<Response> future = theExecutor.submit(new Callable<Response>() {
-
- @Override
- public Response call() {
- AsyncHandler handler = (AsyncHandler)args[args.length - 1];
- Response response = doInvokeAsyncPoll(proxy, asyncMethod, Arrays.copyOf(args, args.length - 1));
- // Invoke the callback handler, if present
- if (handler != null) {
- handler.handleResponse(response);
- } // end if
- return response;
- }
- });
- return future.get();
+
+ AsyncHandler callback = (AsyncHandler)args[args.length - 1];
+ Response response = doInvokeAsyncPoll(proxy, asyncMethod, Arrays.copyOf(args, args.length - 1), callback);
+ return response;
} // end method doInvokeAsyncCallback
@@ -326,7 +312,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
} // end if
// Perform the invocations on separate thread...
- theExecutor.submit(new separateThreadInvoker(chain, args, source, future, asyncMethod, isAsyncService));
+ scheduler.scheduleWork(new SeparateThreadInvoker(chain, args, source, future, asyncMethod, isAsyncService));
return;
} // end method invokeAsync
@@ -337,7 +323,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
*
* This supports both synchronous services and asynchronous services
*/
- private class separateThreadInvoker implements Runnable {
+ private class SeparateThreadInvoker implements Runnable {
private AsyncInvocationFutureImpl future;
private Method asyncMethod;
@@ -346,7 +332,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
private Invocable invocable;
private boolean isAsyncService;
- public separateThreadInvoker(InvocationChain chain,
+ public SeparateThreadInvoker(InvocationChain chain,
Object[] args,
Invocable invocable,
AsyncInvocationFutureImpl future,
@@ -420,6 +406,9 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
future.setWrappedFault(new AsyncFaultWrapper(s));
} // end if
} // end if
+ else {
+ future.setWrappedFault(new AsyncFaultWrapper(s));
+ }
} catch (AsyncResponseException ar) {
// This exception is received in the case where the Binding does not support async invocation
// natively - the initial invocation is effectively synchronous with this exception thrown to
@@ -431,7 +420,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
} // end method run
- } // end class separateThreadInvoker
+ } // end class SeparateThreadInvoker
/**
* Attaches a future to the callback endpoint - so that the Future is triggered when a response is