diff options
4 files changed, 247 insertions, 16 deletions
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java index 97b7570faa..259e3a66ad 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java @@ -29,6 +29,9 @@ import org.apache.tuscany.sca.interfacedef.DataType; import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.java.JavaOperation; +import org.apache.tuscany.sca.invocation.InterceptorAsync; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.oasisopen.sca.ResponseDispatch; @@ -39,7 +42,7 @@ import org.oasisopen.sca.ServiceRuntimeException; * implementation instance * */ -public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { +public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker implements InterceptorAsync { public JavaAsyncImplementationInvoker(Operation operation, Method method, RuntimeComponent component, InterfaceContract interfaceContract) { @@ -53,16 +56,13 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { Object payload = msg.getBody(); - Object contextId = null; - - // store the current thread context classloader - // - replace it with the class loader used to load the java class as per SCA Spec + // Save the current thread context classloader ClassLoader tccl = Thread.currentThread().getContextClassLoader(); try { // The following call might create a new conversation, as a result, the msg.getConversationID() might // return a new value - InstanceWrapper wrapper = scopeContainer.getWrapper(contextId); + InstanceWrapper wrapper = scopeContainer.getWrapper(null); Object instance = wrapper.getInstance(); @@ -89,12 +89,8 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { ret = method.invoke(instance, (Object[])payload2); - //ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS); throw new InvocationTargetException( new AsyncResponseException("AsyncResponse") ); - //scopeContainer.returnWrapper(wrapper, contextId); - - //msg.setBody(ret); } catch (InvocationTargetException e) { Throwable cause = e.getTargetException(); boolean isChecked = false; @@ -104,7 +100,7 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { msg.setFaultBody(cause); break; } - } + } // end for if (!isChecked) { if (cause instanceof RuntimeException) { @@ -115,7 +111,7 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { } else { throw new ServiceRuntimeException(cause.getMessage(), cause); } - } + } // end if } catch (ObjectCreationException e) { throw new ServiceRuntimeException(e.getMessage(), e); @@ -128,4 +124,110 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { return msg; } // end method invoke -} + protected Invoker next; + protected InvokerAsyncResponse previous; + + public void setNext(Invoker next) { + this.next = next; + } + + public Invoker getNext() { + return next; + } + + public void invokeAsyncRequest(Message msg) throws Throwable { + processRequest(msg); + } // end method invokeAsyncRequest + + public void invokeAsyncResponse(Message msg) { + msg = processResponse(msg); + InvokerAsyncResponse thePrevious = (InvokerAsyncResponse)getPrevious(); + if (thePrevious != null ) thePrevious.invokeAsyncResponse(msg); + } // end method invokeAsyncResponse + + public void setPrevious(InvokerAsyncResponse previous) { + this.previous = previous; + } + + public InvokerAsyncResponse getPrevious() { + return previous; + } + + public Message processRequest(Message msg) { + Operation op = this.operation; + Object payload = msg.getBody(); + + // Replace TCCL with the class loader used to load the java class as per SCA Spec + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + + try { + InstanceWrapper wrapper = scopeContainer.getWrapper(null); + Object instance = wrapper.getInstance(); + + // Set the TCCL to the classloader used to load the implementation class + Thread.currentThread().setContextClassLoader(instance.getClass().getClassLoader()); + + // For an async server method, there is an extra input parameter, which is a DispatchResponse instance + // which is typed by the type of the response + Class<?> responseType = op.getOutputType().getPhysical(); + ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType, msg ); + + Object[] payload2; + if (payload != null && !payload.getClass().isArray()) { + payload2 = new Object[2]; + payload2[0] = payload; + } else { + payload2 = new Object[ ((Object[])payload).length + 1 ]; + for( int i = 0; i < ((Object[])payload).length; i++) { + payload2[i] = ((Object[])payload)[i]; + } // end for + } + payload2[ payload2.length - 1 ] = dispatch; + + method.invoke(instance, (Object[])payload2); + + } catch (InvocationTargetException e) { + Throwable cause = e.getTargetException(); + boolean isChecked = false; + for (DataType<?> d : operation.getFaultTypes()) { + if (d.getPhysical().isInstance(cause)) { + isChecked = true; + // Ignore these errors since they should be returned asynchronously + break; + } + } // end for + + if (!isChecked) { + if (cause instanceof RuntimeException) { + throw (RuntimeException)cause; + } // end if + if (cause instanceof Error) { + throw (Error)cause; + } else { + throw new ServiceRuntimeException(cause.getMessage(), cause); + } // end if + } // end if + + } catch (Exception e) { + throw new ServiceRuntimeException(e.getMessage(), e); + } finally { + // set the tccl + Thread.currentThread().setContextClassLoader(tccl); + } + return msg; + } // end method processRequest + + public Message postProcessRequest(Message msg) { + return msg; + } + + public Message postProcessRequest(Message msg, Throwable e) + throws Throwable { + throw e; + } + + public Message processResponse(Message msg) { + return msg; + } // end method processResponse + +} // end class JavaAsyncImplementationInvoker diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java new file mode 100644 index 0000000000..e7db583edd --- /dev/null +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java @@ -0,0 +1,95 @@ +package org.apache.tuscany.sca.implementation.java.invocation; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.JDKAsyncResponseInvoker; +import org.apache.tuscany.sca.invocation.Message; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * Class which handles the asynchronous response message from an async service back to the client Java component + * + * This class provides a registration function which permits the reference invoking code to register the Future + * which is used to return the response to the Java component code + */ +public class JavaAsyncResponseInvokerImpl implements JDKAsyncResponseInvoker { + + // Map used to link between async requests and async responses + private ConcurrentMap<String, Object> asyncMessageMap; + + public JavaAsyncResponseInvokerImpl() { + + asyncMessageMap = new ConcurrentHashMap<String, Object>(); + } // end constructor + + /** + * Deal with the asynchronous response message + * @param msg - the response message + * + * The response message must contain a RELATES_TO id, which is used to identify the Future that represents + * the operation yet to complete. The Future is then completed with the content of the message. + * The Future either calls back to the application code, or it is expected that the application is polling + * the Future for its completion. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void invokeAsyncResponse(Message msg) { + // Obtain the Message ID for this message + String relatesID = getMessageRelatesID( msg ); + if( relatesID == null ) + throw new ServiceRuntimeException("JavaAsyncResponseInvoker - response message has no RELATES_TO id"); + + // Look up the response object & remove it from the Map + Object responseHandler = asyncMessageMap.remove(relatesID); + + if( responseHandler == null ) + throw new ServiceRuntimeException("JavaAsyncResponseInvoker - no Future matches the RELATES_TO id: " + relatesID); + + // Invoke the response handler with the content of the message + // - in the case of a Java implementation, the response handler is a Future... + AsyncResponseHandler future = (AsyncResponseHandler) responseHandler; + + Object payload = msg.getBody(); + Object response; + if( payload == null ) { + System.out.println("Returned response message was null"); + } else { + if (payload.getClass().isArray()) { + response = ((Object[])payload)[0]; + } else { + response = payload; + } // end if + if( response.getClass().equals(AsyncFaultWrapper.class)) { + future.setWrappedFault((AsyncFaultWrapper) response ); + } else if ( response instanceof Throwable ) { + future.setFault( (Throwable)response ); + } else { + future.setResponse(response); + } // end if + } // end if + + } // end method invokeAsyncResponse + + /** + * Registers an Async response, which provides an ID which identifies a given response + * and an object which can handle the response + * @param id - the ID + * @param responseHandler - the response handler object + */ + public void registerAsyncResponse( String id, Object responseHandler ) { + // Add the ID/response handler mapping into the table + if( id != null && responseHandler != null ) asyncMessageMap.put(id, responseHandler); + } // end method registerAsyncResponse + + /** + * Extracts the RELATES_TO header from the message + * @param msg - the Tuscany message + * @return - the value of the RELATES_TO header as a String + */ + private String getMessageRelatesID( Message msg ) { + return (String)msg.getHeaders().get("RELATES_TO"); + } // end method getMessageRelatesID + +} // end class JavaAsyncResponseInvoker diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java index d7abd021f6..3bc7a486a5 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java @@ -40,6 +40,9 @@ import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.java.JavaInterface; import org.apache.tuscany.sca.interfacedef.java.impl.JavaInterfaceUtil; import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.provider.ImplementationAsyncProvider; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.oasisopen.sca.ComponentContext; @@ -48,7 +51,7 @@ import org.oasisopen.sca.RequestContext; /** * @version $Rev$ $Date$ */ -public class JavaImplementationProvider implements ScopedImplementationProvider { +public class JavaImplementationProvider implements ScopedImplementationProvider, ImplementationAsyncProvider { private JavaImplementation implementation; private JavaComponentContextProvider componentContextProvider; private RequestContextFactory requestContextFactory; @@ -166,4 +169,14 @@ public class JavaImplementationProvider implements ScopedImplementationProvider return implementation.isEagerInit(); } + public InvokerAsyncRequest createAsyncInvoker(RuntimeComponentService service, Operation operation) { + // createInvoker should automatically create a JavaAsyncImplementationInvoker - if not, then it means + // that the service is not async and the result will be an exception caused by the class cast. + return (InvokerAsyncRequest) createInvoker( service, operation ); + } // end method createAsyncInvoker + + public InvokerAsyncResponse createAsyncResponseInvoker(Operation operation) { + return new JavaAsyncResponseInvokerImpl(); + } // end method createAsyncResponseInvoker + } diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java index af40d07409..8c1650096a 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java @@ -37,6 +37,7 @@ import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.factory.ObjectFactory; import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker; import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory; import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory; import org.apache.tuscany.sca.core.invocation.ProxyFactory; @@ -45,6 +46,7 @@ import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; import org.oasisopen.sca.ResponseDispatch; import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; /** * Implementation of the ResponseDispatch interface of the OASIS SCA Java API @@ -80,12 +82,16 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl // Service Reference used for the callback private ServiceReference<AsyncResponseHandler<?>> callbackRef; - private String messageID; + private AsyncResponseInvoker<?> respInvoker; + private String messageID; public ResponseDispatchImpl( Message msg ) { super(); callbackRef = getAsyncCallbackRef( msg ); + respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); + //if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); + // TODO - why is WS stuff bleeding into general code? messageID = (String) msg.getHeaders().get(MESSAGE_ID); if (messageID == null){ @@ -122,10 +128,18 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl } else { throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); } // end if + + // Use response invoker if present + if( respInvoker != null ) { + //respInvoker.invokeAsyncResponse(new AsyncFaultWrapper(e)); + respInvoker.invokeAsyncResponse(e); + return; + } // end if + // Now dispatch the response to the callback... AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService(); setResponseHeaders(); - handler.setFault(new AsyncFaultWrapper(e)); + handler.setWrappedFault(new AsyncFaultWrapper(e)); } // end method sendFault /** @@ -145,6 +159,13 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl } else { throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); } // end if + + // Use response invoker if present + if( respInvoker != null ) { + respInvoker.invokeAsyncResponse(res); + return; + } // end if + // Now dispatch the response to the callback... AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService(); setResponseHeaders(); |