summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x
diff options
context:
space:
mode:
authorslaws <slaws@13f79535-47bb-0310-9956-ffa450edef68>2010-08-25 15:02:01 +0000
committerslaws <slaws@13f79535-47bb-0310-9956-ffa450edef68>2010-08-25 15:02:01 +0000
commitb55bad231eba7a750c94e0fcfea8212260bff1f0 (patch)
tree3523f61400dbbaba70d784ccb758d5823b5c1719 /sca-java-2.x
parent400a6d98bc4b9c140357a7ff3e9520b2613f8bb6 (diff)
TUSCANY-3659 - Enable asynch operation over the local SCA binding. This does what I think is the right thing but it doesn't necessarily do it in an optimal way yet.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@989157 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x')
-rw-r--r--sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java2
-rw-r--r--sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java15
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java17
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java4
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java20
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java9
6 files changed, 55 insertions, 12 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java
index 88c6888ba9..08f2bee3b3 100644
--- a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java
+++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java
@@ -154,7 +154,7 @@ public class RuntimeSCAReferenceBindingProvider implements EndpointReferenceProv
// it turns out that the chain source and target operations are the same, and are the operation
// from the target, not sure if thats by design or a bug. The SCA binding invoker needs to know
// the source and target class loaders so pass in the real source operation in the constructor
- return chain == null ? null : new SCABindingInvoker(chain, operation, mediator, passByValue);
+ return chain == null ? null : new SCABindingInvoker(chain, operation, mediator, passByValue, epr);
}
}
return null;
diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java
index a0b976c7ad..c2a9038367 100644
--- a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java
+++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java
@@ -26,6 +26,8 @@ import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.Phase;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
/**
* @version $Rev$ $Date$
@@ -36,17 +38,21 @@ public class SCABindingInvoker implements Interceptor {
private Operation sourceOperation;
private Operation targetOperation;
private boolean passByValue;
+ private RuntimeEndpointReference epr;
+ private RuntimeEndpoint ep;
/**
* Construct a SCABindingInvoker that delegates to the service invocaiton chain
*/
- public SCABindingInvoker(InvocationChain chain, Operation sourceOperation, Mediator mediator, boolean passByValue) {
+ public SCABindingInvoker(InvocationChain chain, Operation sourceOperation, Mediator mediator, boolean passByValue, RuntimeEndpointReference epr) {
super();
this.chain = chain;
this.mediator = mediator;
this.sourceOperation = sourceOperation;
this.targetOperation = chain.getTargetOperation();
this.passByValue = passByValue;
+ this.epr = epr;
+ this.ep = (RuntimeEndpoint)epr.getTargetEndpoint();
}
/**
@@ -71,6 +77,13 @@ public class SCABindingInvoker implements Interceptor {
if (passByValue) {
msg.setBody(mediator.copyInput(msg.getBody(), sourceOperation, targetOperation));
}
+
+ ep.getInvocationChains();
+ if ( !ep.getCallbackEndpointReferences().isEmpty() ) {
+ RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference) ep.getCallbackEndpointReferences().get(0);
+ // Place a link to the callback EPR into the message headers...
+ msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR );
+ }
Message resultMsg = getNext().invoke(msg);
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
index e0e219d3f1..4371399f58 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
@@ -26,6 +26,7 @@ import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -57,6 +58,7 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseException;
import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
@@ -118,6 +120,11 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+ // force the bind of the reference so that we can look at the
+ // target contract to see if it's asynchronous
+ source.getInvocationChains();
+
if (isAsyncCallback(method)) {
return doInvokeAsyncCallback(proxy, method, args);
} else if (isAsyncPoll(method)) {
@@ -189,7 +196,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
// Wait for some maximum time for the result - 1000 seconds here
// Really, if the service is async, the client should use async client methods to invoke the service
// - and be prepared to wait a *really* long time
- return future.get(1000, TimeUnit.SECONDS);
+ Object response = null;
+ try {
+ response = future.get(1000, TimeUnit.SECONDS);
+ } catch(ExecutionException ex) {
+ throw ex.getCause();
+ }
+ return response;
} else {
// Target service is not asynchronous, so perform sync invocation
return super.invoke(proxy, method, args);
@@ -308,6 +321,8 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
future.setFault( new AsyncFaultWrapper( s ) );
} // end if
} // end if
+ } catch ( AsyncResponseException ar ) {
+ // do nothing
} catch ( Throwable t ) {
System.out.println("Async invoke got exception: " + t.toString());
future.setFault( new AsyncFaultWrapper( t ) );
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
index 8d56088c44..aa9cf4ad48 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
@@ -157,6 +157,10 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
public Message invoke(Message msg) {
// Get the unique ID from the message header
String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID);
+ if (idValue == null){
+ idValue = (String)msg.getHeaders().get("MESSAGE_ID");
+ }
+
if( idValue == null ) {
System.out.println( "Async message ID not found ");
} else {
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java
index a162110835..87a7f0910b 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java
@@ -66,11 +66,13 @@ public class JDKProxyFactory implements ProxyFactory, LifeCycleListener {
public <T> T createProxy(final Class<T> interfaze, Invocable invocable) throws ProxyCreationException {
if (invocable instanceof RuntimeEndpoint) {
InvocationHandler handler;
- if (isAsync(interfaze)) {
+// TUSCANY-3659 - Always install a asynch handler regardless of whether ref is sync or async
+// needs tidying
+// if (isAsync(interfaze)) {
handler = new AsyncJDKInvocationHandler(messageFactory, interfaze, invocable);
- } else {
- handler = new JDKInvocationHandler(messageFactory, interfaze, invocable);
- }
+// } else {
+// handler = new JDKInvocationHandler(messageFactory, interfaze, invocable);
+// }
// Allow privileged access to class loader. Requires RuntimePermission in security policy.
ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
@@ -88,11 +90,13 @@ public class JDKProxyFactory implements ProxyFactory, LifeCycleListener {
assert callableReference != null;
final Class<T> interfaze = callableReference.getBusinessInterface();
InvocationHandler handler;
- if (isAsync(interfaze)) {
+// TUSCANY-3659 - Always install a asynch handler regardless of whether ref is sync or async
+// needs tidying
+// if (isAsync(interfaze)) {
handler = new AsyncJDKInvocationHandler(messageFactory, callableReference);
- } else {
- handler = new JDKInvocationHandler(messageFactory, callableReference);
- }
+// } else {
+// handler = new JDKInvocationHandler(messageFactory, callableReference);
+// }
// Allow privileged access to class loader. Requires RuntimePermission in security policy.
ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
index 0d56a6ef9d..dc0bb94bde 100644
--- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
@@ -63,6 +63,7 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
*/
private static final long serialVersionUID = 300158355992568592L;
private static String WS_MESSAGE_ID = "WS_MESSAGE_ID";
+ private static String MESSAGE_ID = "MESSAGE_ID";
// A latch used to ensure that the sendResponse() and sendFault() operations are used at most once
// The latch is initialized with the value "false"
@@ -87,7 +88,12 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
callbackRef = getAsyncCallbackRef( msg );
callbackAddress = msg.getFrom().getCallbackEndpoint().getURI();
- messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID);
+
+ // TODO - why is WS stuff bleeding into general code?
+ messageID = (String) msg.getHeaders().get(MESSAGE_ID);
+ if (messageID == null){
+ messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID);
+ }
} // end constructor
@@ -206,6 +212,7 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
// Add in the header for the RelatesTo Message ID
msgContext.getHeaders().put(WS_MESSAGE_ID, messageID);
+ msgContext.getHeaders().put(MESSAGE_ID, messageID);
ThreadMessageContext.setMessageContext(msgContext);
} // end method setResponseHeaders