summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java128
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java95
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java15
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java25
4 files changed, 247 insertions, 16 deletions
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
index 97b7570faa..259e3a66ad 100644
--- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
@@ -29,6 +29,9 @@ import org.apache.tuscany.sca.interfacedef.DataType;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.interfacedef.java.JavaOperation;
+import org.apache.tuscany.sca.invocation.InterceptorAsync;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.oasisopen.sca.ResponseDispatch;
@@ -39,7 +42,7 @@ import org.oasisopen.sca.ServiceRuntimeException;
* implementation instance
*
*/
-public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
+public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker implements InterceptorAsync {
public JavaAsyncImplementationInvoker(Operation operation, Method method, RuntimeComponent component,
InterfaceContract interfaceContract) {
@@ -53,16 +56,13 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
Object payload = msg.getBody();
- Object contextId = null;
-
- // store the current thread context classloader
- // - replace it with the class loader used to load the java class as per SCA Spec
+ // Save the current thread context classloader
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
// The following call might create a new conversation, as a result, the msg.getConversationID() might
// return a new value
- InstanceWrapper wrapper = scopeContainer.getWrapper(contextId);
+ InstanceWrapper wrapper = scopeContainer.getWrapper(null);
Object instance = wrapper.getInstance();
@@ -89,12 +89,8 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
ret = method.invoke(instance, (Object[])payload2);
- //ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
throw new InvocationTargetException( new AsyncResponseException("AsyncResponse") );
- //scopeContainer.returnWrapper(wrapper, contextId);
-
- //msg.setBody(ret);
} catch (InvocationTargetException e) {
Throwable cause = e.getTargetException();
boolean isChecked = false;
@@ -104,7 +100,7 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
msg.setFaultBody(cause);
break;
}
- }
+ } // end for
if (!isChecked) {
if (cause instanceof RuntimeException) {
@@ -115,7 +111,7 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
} else {
throw new ServiceRuntimeException(cause.getMessage(), cause);
}
- }
+ } // end if
} catch (ObjectCreationException e) {
throw new ServiceRuntimeException(e.getMessage(), e);
@@ -128,4 +124,110 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
return msg;
} // end method invoke
-}
+ protected Invoker next;
+ protected InvokerAsyncResponse previous;
+
+ public void setNext(Invoker next) {
+ this.next = next;
+ }
+
+ public Invoker getNext() {
+ return next;
+ }
+
+ public void invokeAsyncRequest(Message msg) throws Throwable {
+ processRequest(msg);
+ } // end method invokeAsyncRequest
+
+ public void invokeAsyncResponse(Message msg) {
+ msg = processResponse(msg);
+ InvokerAsyncResponse thePrevious = (InvokerAsyncResponse)getPrevious();
+ if (thePrevious != null ) thePrevious.invokeAsyncResponse(msg);
+ } // end method invokeAsyncResponse
+
+ public void setPrevious(InvokerAsyncResponse previous) {
+ this.previous = previous;
+ }
+
+ public InvokerAsyncResponse getPrevious() {
+ return previous;
+ }
+
+ public Message processRequest(Message msg) {
+ Operation op = this.operation;
+ Object payload = msg.getBody();
+
+ // Replace TCCL with the class loader used to load the java class as per SCA Spec
+ ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+
+ try {
+ InstanceWrapper wrapper = scopeContainer.getWrapper(null);
+ Object instance = wrapper.getInstance();
+
+ // Set the TCCL to the classloader used to load the implementation class
+ Thread.currentThread().setContextClassLoader(instance.getClass().getClassLoader());
+
+ // For an async server method, there is an extra input parameter, which is a DispatchResponse instance
+ // which is typed by the type of the response
+ Class<?> responseType = op.getOutputType().getPhysical();
+ ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType, msg );
+
+ Object[] payload2;
+ if (payload != null && !payload.getClass().isArray()) {
+ payload2 = new Object[2];
+ payload2[0] = payload;
+ } else {
+ payload2 = new Object[ ((Object[])payload).length + 1 ];
+ for( int i = 0; i < ((Object[])payload).length; i++) {
+ payload2[i] = ((Object[])payload)[i];
+ } // end for
+ }
+ payload2[ payload2.length - 1 ] = dispatch;
+
+ method.invoke(instance, (Object[])payload2);
+
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getTargetException();
+ boolean isChecked = false;
+ for (DataType<?> d : operation.getFaultTypes()) {
+ if (d.getPhysical().isInstance(cause)) {
+ isChecked = true;
+ // Ignore these errors since they should be returned asynchronously
+ break;
+ }
+ } // end for
+
+ if (!isChecked) {
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException)cause;
+ } // end if
+ if (cause instanceof Error) {
+ throw (Error)cause;
+ } else {
+ throw new ServiceRuntimeException(cause.getMessage(), cause);
+ } // end if
+ } // end if
+
+ } catch (Exception e) {
+ throw new ServiceRuntimeException(e.getMessage(), e);
+ } finally {
+ // set the tccl
+ Thread.currentThread().setContextClassLoader(tccl);
+ }
+ return msg;
+ } // end method processRequest
+
+ public Message postProcessRequest(Message msg) {
+ return msg;
+ }
+
+ public Message postProcessRequest(Message msg, Throwable e)
+ throws Throwable {
+ throw e;
+ }
+
+ public Message processResponse(Message msg) {
+ return msg;
+ } // end method processResponse
+
+} // end class JavaAsyncImplementationInvoker
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java
new file mode 100644
index 0000000000..e7db583edd
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java
@@ -0,0 +1,95 @@
+package org.apache.tuscany.sca.implementation.java.invocation;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.JDKAsyncResponseInvoker;
+import org.apache.tuscany.sca.invocation.Message;
+import org.oasisopen.sca.ServiceRuntimeException;
+
+/**
+ * Class which handles the asynchronous response message from an async service back to the client Java component
+ *
+ * This class provides a registration function which permits the reference invoking code to register the Future
+ * which is used to return the response to the Java component code
+ */
+public class JavaAsyncResponseInvokerImpl implements JDKAsyncResponseInvoker {
+
+ // Map used to link between async requests and async responses
+ private ConcurrentMap<String, Object> asyncMessageMap;
+
+ public JavaAsyncResponseInvokerImpl() {
+
+ asyncMessageMap = new ConcurrentHashMap<String, Object>();
+ } // end constructor
+
+ /**
+ * Deal with the asynchronous response message
+ * @param msg - the response message
+ *
+ * The response message must contain a RELATES_TO id, which is used to identify the Future that represents
+ * the operation yet to complete. The Future is then completed with the content of the message.
+ * The Future either calls back to the application code, or it is expected that the application is polling
+ * the Future for its completion.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void invokeAsyncResponse(Message msg) {
+ // Obtain the Message ID for this message
+ String relatesID = getMessageRelatesID( msg );
+ if( relatesID == null )
+ throw new ServiceRuntimeException("JavaAsyncResponseInvoker - response message has no RELATES_TO id");
+
+ // Look up the response object & remove it from the Map
+ Object responseHandler = asyncMessageMap.remove(relatesID);
+
+ if( responseHandler == null )
+ throw new ServiceRuntimeException("JavaAsyncResponseInvoker - no Future matches the RELATES_TO id: " + relatesID);
+
+ // Invoke the response handler with the content of the message
+ // - in the case of a Java implementation, the response handler is a Future...
+ AsyncResponseHandler future = (AsyncResponseHandler) responseHandler;
+
+ Object payload = msg.getBody();
+ Object response;
+ if( payload == null ) {
+ System.out.println("Returned response message was null");
+ } else {
+ if (payload.getClass().isArray()) {
+ response = ((Object[])payload)[0];
+ } else {
+ response = payload;
+ } // end if
+ if( response.getClass().equals(AsyncFaultWrapper.class)) {
+ future.setWrappedFault((AsyncFaultWrapper) response );
+ } else if ( response instanceof Throwable ) {
+ future.setFault( (Throwable)response );
+ } else {
+ future.setResponse(response);
+ } // end if
+ } // end if
+
+ } // end method invokeAsyncResponse
+
+ /**
+ * Registers an Async response, which provides an ID which identifies a given response
+ * and an object which can handle the response
+ * @param id - the ID
+ * @param responseHandler - the response handler object
+ */
+ public void registerAsyncResponse( String id, Object responseHandler ) {
+ // Add the ID/response handler mapping into the table
+ if( id != null && responseHandler != null ) asyncMessageMap.put(id, responseHandler);
+ } // end method registerAsyncResponse
+
+ /**
+ * Extracts the RELATES_TO header from the message
+ * @param msg - the Tuscany message
+ * @return - the value of the RELATES_TO header as a String
+ */
+ private String getMessageRelatesID( Message msg ) {
+ return (String)msg.getHeaders().get("RELATES_TO");
+ } // end method getMessageRelatesID
+
+} // end class JavaAsyncResponseInvoker
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java
index d7abd021f6..3bc7a486a5 100644
--- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java
@@ -40,6 +40,9 @@ import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.interfacedef.java.JavaInterface;
import org.apache.tuscany.sca.interfacedef.java.impl.JavaInterfaceUtil;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsyncRequest;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
+import org.apache.tuscany.sca.provider.ImplementationAsyncProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.oasisopen.sca.ComponentContext;
@@ -48,7 +51,7 @@ import org.oasisopen.sca.RequestContext;
/**
* @version $Rev$ $Date$
*/
-public class JavaImplementationProvider implements ScopedImplementationProvider {
+public class JavaImplementationProvider implements ScopedImplementationProvider, ImplementationAsyncProvider {
private JavaImplementation implementation;
private JavaComponentContextProvider componentContextProvider;
private RequestContextFactory requestContextFactory;
@@ -166,4 +169,14 @@ public class JavaImplementationProvider implements ScopedImplementationProvider
return implementation.isEagerInit();
}
+ public InvokerAsyncRequest createAsyncInvoker(RuntimeComponentService service, Operation operation) {
+ // createInvoker should automatically create a JavaAsyncImplementationInvoker - if not, then it means
+ // that the service is not async and the result will be an exception caused by the class cast.
+ return (InvokerAsyncRequest) createInvoker( service, operation );
+ } // end method createAsyncInvoker
+
+ public InvokerAsyncResponse createAsyncResponseInvoker(Operation operation) {
+ return new JavaAsyncResponseInvokerImpl();
+ } // end method createAsyncResponseInvoker
+
}
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
index af40d07409..8c1650096a 100644
--- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
@@ -37,6 +37,7 @@ import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.factory.ObjectFactory;
import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory;
import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory;
import org.apache.tuscany.sca.core.invocation.ProxyFactory;
@@ -45,6 +46,7 @@ import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ResponseDispatch;
import org.oasisopen.sca.ServiceReference;
+import org.oasisopen.sca.ServiceRuntimeException;
/**
* Implementation of the ResponseDispatch interface of the OASIS SCA Java API
@@ -80,12 +82,16 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
// Service Reference used for the callback
private ServiceReference<AsyncResponseHandler<?>> callbackRef;
- private String messageID;
+ private AsyncResponseInvoker<?> respInvoker;
+ private String messageID;
public ResponseDispatchImpl( Message msg ) {
super();
callbackRef = getAsyncCallbackRef( msg );
+ respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
+ //if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker");
+
// TODO - why is WS stuff bleeding into general code?
messageID = (String) msg.getHeaders().get(MESSAGE_ID);
if (messageID == null){
@@ -122,10 +128,18 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
} else {
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
} // end if
+
+ // Use response invoker if present
+ if( respInvoker != null ) {
+ //respInvoker.invokeAsyncResponse(new AsyncFaultWrapper(e));
+ respInvoker.invokeAsyncResponse(e);
+ return;
+ } // end if
+
// Now dispatch the response to the callback...
AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
setResponseHeaders();
- handler.setFault(new AsyncFaultWrapper(e));
+ handler.setWrappedFault(new AsyncFaultWrapper(e));
} // end method sendFault
/**
@@ -145,6 +159,13 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
} else {
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
} // end if
+
+ // Use response invoker if present
+ if( respInvoker != null ) {
+ respInvoker.invokeAsyncResponse(res);
+ return;
+ } // end if
+
// Now dispatch the response to the callback...
AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
setResponseHeaders();