diff options
author | edwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68> | 2011-01-11 14:14:06 +0000 |
---|---|---|
committer | edwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68> | 2011-01-11 14:14:06 +0000 |
commit | c714d7ff510918662faf56dd51f904732cb9bb67 (patch) | |
tree | 05fbf5bd6dc6696bdd43173ccd0626f9f55ee16d /sca-java-2.x/trunk | |
parent | 26ed4a51216e4d8fe65e035ed540780c74cd4555 (diff) |
Add capability to support Bindings that support Async invocations natively - as under TUSCANY-3801
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1057648 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk')
13 files changed, 409 insertions, 135 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java index 6339b96a1b..5c21041413 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java @@ -60,7 +60,7 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; -import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.AsyncResponseService; import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor; import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor; import org.apache.tuscany.sca.core.invocation.RuntimeInvoker; @@ -86,6 +86,7 @@ import org.apache.tuscany.sca.provider.EndpointAsyncProvider; import org.apache.tuscany.sca.provider.EndpointProvider; import org.apache.tuscany.sca.provider.ImplementationAsyncProvider; import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.provider.OptimisingBindingProvider; import org.apache.tuscany.sca.provider.PolicyProvider; import org.apache.tuscany.sca.provider.PolicyProviderFactory; import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint; @@ -391,13 +392,20 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint ((InterceptorAsync)invoker).setPrevious(asyncResponseInvoker); } else { //TODO - throw error once the old async code is removed - } - } + } // end if + } // end for } else { // TODO - throw error once the old async code is removed - } - } - } + } // end if + } // end if + + ServiceBindingProvider provider = getBindingProvider(); + if ((provider != null) && (provider instanceof OptimisingBindingProvider)) { + //TODO - remove this comment once optimisation codepath is tested + ((OptimisingBindingProvider)provider).optimiseBinding( this ); + } // end if + + } // end method initInvocationChains /** * Creates the async callback for the supplied Endpoint and Operation, if it does not already exist @@ -406,6 +414,9 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint * @param operation - the Operation */ private void createAsyncServerCallback( RuntimeEndpoint endpoint, Operation operation ) { + // No need to create a callback if the Binding supports async natively... + if( hasNativeAsyncBinding(endpoint) ) return; + // Check to see if the callback already exists if( asyncCallbackExists( endpoint ) ) return; @@ -416,6 +427,20 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } // end method createAsyncServerCallback /** + * Indicates if a given endpoint has a Binding that supports native async invocation + * @param endpoint - the endpoint + * @return - true if the endpoint has a binding that supports native async, false otherwise + */ + private boolean hasNativeAsyncBinding(RuntimeEndpoint endpoint) { + ServiceBindingProvider provider = endpoint.getBindingProvider(); + if( provider instanceof EndpointAsyncProvider ) { + EndpointAsyncProvider asyncProvider = (EndpointAsyncProvider) provider; + if( asyncProvider.supportsNativeAsync() ) return true; + } // end if + return false; + } // end method hasNativeAsyncBinding + + /** * Creates the Endpoint object for the async callback * @param endpoint - the endpoint which has the async server operations * @return the EndpointReference object representing the callback @@ -433,7 +458,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint JavaInterfaceFactory javaInterfaceFactory = (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class); JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract(); try { - interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseHandler.class)); + interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseService.class)); } catch (InvalidInterfaceException e1) { // Nothing to do here - will not happen } // end try @@ -605,10 +630,9 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint Interceptor interceptor = p.createBindingInterceptor(); if (interceptor != null) { bindingInvocationChain.addInterceptor(interceptor); - } - } - - } + } // end if + } // end for + } // end if // This is strategically placed before the RuntimeInvoker is added to the end of the // binding chain as the RuntimeInvoker doesn't need to take part in the response @@ -625,7 +649,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint // TODO - raise an error. Not doing that while // we have the old async mechanism in play } - } + } // end for // fix up the binding chain response path to point back to the // binding provided async response handler @@ -640,15 +664,16 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } } else { //TODO - throw error once the old async code is removed - } - } + } // end if + } // end if // Add the runtime invoker to the end of the binding chain. // It mediates between the binding chain and selects the // correct invocation chain based on the operation that's // been selected - bindingInvocationChain.addInvoker(invoker); - } + bindingInvocationChain.addInvoker(invoker); + + } // end method initServiceBindingInvocationChains /** * Add the interceptor for a binding diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java index 89d0e444af..f86f90c6e6 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java @@ -48,7 +48,7 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; -import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.AsyncResponseService; import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor; import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor; import org.apache.tuscany.sca.core.invocation.RuntimeInvoker; @@ -348,10 +348,12 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen wireProcessor.process(this); if (isAsyncInvocation()){ - // fix up all of the operation chain response paths - // to point back to the implementation provided + // Fix up all of the operation chain response paths to point back to the implementation provided // async response handler - ImplementationProvider implementationProvider = ((RuntimeComponent)getComponent()).getImplementationProvider(); + //ImplementationProvider implementationProvider = ((RuntimeComponent)getComponent()).getImplementationProvider(); + RuntimeComponentReference theReference = (RuntimeComponentReference)this.getReference(); + RuntimeComponent theComponent = theReference.getComponent(); + ImplementationProvider implementationProvider = theComponent.getImplementationProvider(); if (implementationProvider instanceof ImplementationAsyncProvider){ for (InvocationChain chain : getInvocationChains()){ InvokerAsyncResponse asyncResponseInvoker = ((ImplementationAsyncProvider)implementationProvider).createAsyncResponseInvoker(chain.getSourceOperation()); @@ -359,11 +361,11 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen ((InterceptorAsync)chain.getHeadInvoker()).setPrevious(asyncResponseInvoker); } else { //TODO - throw error once the old async code is removed - } - } - } - } - } + } // end if + } // end for + } // end if + } // end if + } // end method initInvocationChains /** * Check that endpoint reference has compatible interface at the component and binding ends. @@ -688,7 +690,7 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class); JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract(); try { - interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseHandler.class)); + interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseService.class)); } catch (InvalidInterfaceException e1) { // Nothing to do here - will not happen } // end try diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java index d8eac8a166..306a141433 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java @@ -27,19 +27,28 @@ import org.oasisopen.sca.annotation.Remotable; * * @param <V> - the type of the non-fault response */ -@Remotable() -public interface AsyncResponseHandler<V> { +public interface AsyncResponseHandler<V> extends AsyncResponseService<V> { /** - * Async process completed with a Fault. Must only be invoked once + * Async process completed with a wrapped Fault. Must only be invoked once + * <inherited> * @param e - the wrapper containing the Fault to send * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously */ @OneWay - public void setFault(AsyncFaultWrapper e); + public void setWrappedFault(AsyncFaultWrapper w); + + /** + * Async process completed with a Fault. Must only be invoked once. + * @param e - the Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + @OneWay + public void setFault( Throwable e ); /** * Async process completed with a response message. Must only be invoked once + * <inherited> * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously * @param res - the response message, which is of type V */ diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java index 0a28bed480..652f67f5e1 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java @@ -20,12 +20,21 @@ package org.apache.tuscany.sca.core.invocation;
import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.tuscany.sca.context.CompositeContext;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.provider.EndpointAsyncProvider;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
+import org.oasisopen.sca.ComponentContext;
/**
* A class that wraps the mechanics for sending async responses
@@ -42,19 +51,25 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab */
private static final long serialVersionUID = -7992598227671386588L;
- RuntimeEndpoint requestEndpoint;
- RuntimeEndpointReference responseEndpointReference;
- T responseTargetAddress;
- String relatesToMsgID;
+ private RuntimeEndpoint requestEndpoint;
+ private RuntimeEndpointReference responseEndpointReference;
+ private T responseTargetAddress;
+ private String relatesToMsgID;
+ private String operationName;
+ private MessageFactory messageFactory;
+ private String bindingType = "";
public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint,
RuntimeEndpointReference responseEndpointReference,
- T responseTargetAddress, String relatesToMsgID) {
+ T responseTargetAddress, String relatesToMsgID,
+ String operationName, MessageFactory messageFactory) {
super();
this.requestEndpoint = requestEndpoint;
this.responseEndpointReference = responseEndpointReference;
this.responseTargetAddress = responseTargetAddress;
this.relatesToMsgID = relatesToMsgID;
+ this.operationName = operationName;
+ this.messageFactory = messageFactory;
} // end constructor
/**
@@ -95,10 +110,34 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab * @param args the response data
*/
public void invokeAsyncResponse(Object args) {
- // TODO - how to get at the code that translates from args to msg?
- // turn args into a message
- Message responseMessage = null;
- invokeAsyncResponse(responseMessage);
+ Message msg = messageFactory.createMessage();
+
+ msg.setOperation(getOperation());
+ if( args instanceof Throwable ) {
+ msg.setFaultBody(args);
+ } else {
+ msg.setBody(args);
+ } // end if
+
+ invokeAsyncResponse(msg);
+
} // end method invokeAsyncResponse(Object)
+
+ private Operation getOperation() {
+ List<Operation> ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations();
+ for (Operation op : ops) {
+ if( operationName.equals(op.getName()) ) return op;
+ } // end for
+ return null;
+ } // end getOperation
+
+ public void setBindingType(String bindingType) {
+ this.bindingType = bindingType;
+ } // end method setBindingType
+
+ public String getBindingType() {
+ return bindingType;
+ } // end method getBindingType
+
} // end class
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseService.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseService.java new file mode 100644 index 0000000000..62d7f74505 --- /dev/null +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import org.oasisopen.sca.annotation.OneWay; +import org.oasisopen.sca.annotation.Remotable; + +/** + * An interface which describes the client response service interface for a non-native binding + * performing an asynchronous invocation of a service + * + * @param <V> - the type of the non-fault response + */ +@Remotable() +public interface AsyncResponseService<V> { + + /** + * Async process completed with a wrapped Fault. Must only be invoked once + * @param e - the wrapper containing the Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + @OneWay + public void setWrappedFault(AsyncFaultWrapper w); + + /** + * Async process completed with a response message. Must only be invoked once + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + * @param res - the response message, which is of type V + */ + @OneWay + public void setResponse(V res); + +} diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKAsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKAsyncResponseInvoker.java new file mode 100644 index 0000000000..3dc40e8d67 --- /dev/null +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKAsyncResponseInvoker.java @@ -0,0 +1,15 @@ +package org.apache.tuscany.sca.core.invocation; + +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; + +public interface JDKAsyncResponseInvoker extends InvokerAsyncResponse { + + /** + * Registers an Async response, which provides an ID which identifies a given response + * and an object which can handle the response + * @param id - the ID + * @param responseHandler - the response handler object + */ + public void registerAsyncResponse( String id, Object responseHandler ); + +} // end interface JDKAsyncResponseInvoker diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java index 34c00dbb94..62593ba895 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java @@ -178,7 +178,10 @@ public class RuntimeInvoker implements Invoker, InvokerAsyncRequest { // temporary fix to swallow the dummy exception that's // thrown back to get past the response chain processing. if (!(ex instanceof AsyncResponseException)){ - throw new ServiceRuntimeException(ex); + // send the exception in through the + // async response processing path + msg.setFaultBody(ex); + invokeAsyncResponse(msg); } } } finally { @@ -197,15 +200,11 @@ public class RuntimeInvoker implements Invoker, InvokerAsyncRequest { public void invokeAsyncResponse(Message msg) { InvocationChain chain = invocable.getInvocationChain(msg.getOperation()); Invoker tailInvoker = chain.getTailInvoker(); - try { - ((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg); - } catch (Throwable ex) { - throw new ServiceRuntimeException(ex); - } + ((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg); } // end method invokeAsyncResponse - @Override - public void invokeAsyncRequest(Message msg) throws Throwable { - invokeAsync(msg); - } // end method invokeAsyncRequest + @Override + public void invokeAsyncRequest(Message msg) throws Throwable { + invokeAsync(msg); + } // end method invokeAsyncRequest } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java index 8db469b25e..213fd536e9 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java @@ -145,11 +145,30 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy } // end method isDone /** - * Async process completed with a Fault. Must only be invoked once + * Async process completed with a Fault. Must only be invoked once. * @param e - the Fault to send * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously */ - public void setFault(AsyncFaultWrapper w) { + public void setFault( Throwable e ) { + lock.lock(); + try { + if( notSetYet() ) { + fault = e; + isDone.signalAll(); + } else { + throw new IllegalStateException("setResponse() or setFault() has been called previously"); + } // end if + } finally { + lock.unlock(); + } // end try + } // end method setFault( Throwable ) + + /** + * Async process completed with a wrapped Fault. Must only be invoked once. + * @param w - the wrapped Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + public void setWrappedFault(AsyncFaultWrapper w) { ClassLoader tccl = Thread.currentThread().getContextClassLoader(); Throwable e; @@ -162,19 +181,9 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy } // end try if( e == null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception"); - lock.lock(); - try { - if( notSetYet() ) { - fault = e; - isDone.signalAll(); - } else { - throw new IllegalStateException("setResponse() or setFault() has been called previously"); - } // end if - } finally { - lock.unlock(); - } // end try + setFault( e ); - } // end method setFault + } // end method setFault( AsyncFaultWrapper ) /** * Async process completed with a response message. Must only be invoked once diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java index be65a51c1b..a36ee8d2f8 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java @@ -29,6 +29,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -53,6 +54,7 @@ import org.apache.tuscany.sca.assembly.builder.BuilderContext; import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint; import org.apache.tuscany.sca.assembly.xml.Constants; import org.apache.tuscany.sca.context.CompositeContext; +import org.apache.tuscany.sca.context.ThreadMessageContext; import org.apache.tuscany.sca.contribution.processor.ContributionReadException; import org.apache.tuscany.sca.contribution.processor.ProcessorContext; import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; @@ -65,15 +67,24 @@ import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; import org.apache.tuscany.sca.core.invocation.AsyncResponseException; import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.AsyncResponseService; +import org.apache.tuscany.sca.core.invocation.JDKAsyncResponseInvoker; import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException; +import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; import org.apache.tuscany.sca.interfacedef.util.FaultException; import org.apache.tuscany.sca.interfacedef.util.WrapperInfo; +import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.policy.Intent; +import org.apache.tuscany.sca.provider.EndpointReferenceAsyncProvider; import org.apache.tuscany.sca.provider.PolicyProvider; +import org.apache.tuscany.sca.provider.ReferenceBindingProvider; import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.Invocable; import org.apache.tuscany.sca.runtime.RuntimeComponent; @@ -104,13 +115,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { private static int invocationCount = 10; // # of threads to use private static long maxWaitTime = 30; // Max wait time for completion = 30sec - + // Run the async service invocations using a ThreadPoolExecutor private ExecutorService theExecutor; public AsyncJDKInvocationHandler(ExtensionPointRegistry registry, MessageFactory messageFactory, - ServiceReference<?> callableReference) { + ServiceReference<?> callableReference ) { super(messageFactory, callableReference); initExecutorService(registry); } @@ -118,7 +129,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { public AsyncJDKInvocationHandler(ExtensionPointRegistry registry, MessageFactory messageFactory, Class<?> businessInterface, - Invocable source) { + Invocable source ) { super(messageFactory, businessInterface, source); initExecutorService(registry); } @@ -128,18 +139,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { WorkScheduler scheduler = utilities.getUtility(WorkScheduler.class); theExecutor = scheduler.getExecutorService(); - /* - synchronized (AsyncJDKInvocationHandler.class) { - theExecutor = utilities.getUtility(ExecutorService.class); - if (theExecutor == null) { - theExecutor = - new ThreadPoolExecutor(invocationCount, invocationCount, maxWaitTime, TimeUnit.SECONDS, - new ArrayBlockingQueue<Runnable>(invocationCount)); - utilities.addUtility(ExecutorService.class, theExecutor); - } - } - */ - } + } // end method initExecutorService /** * Perform the invocation of the operation @@ -202,13 +202,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { try { invokeAsync(proxy, method, args, future, asyncMethod); } catch (Exception e) { - future.setFault(new AsyncFaultWrapper(e)); + future.setWrappedFault(new AsyncFaultWrapper(e)); } catch (Throwable t) { Exception e = new ServiceRuntimeException("Received Throwable: " + t.getClass().getName() + " when invoking: " + asyncMethod.getName(), t); - future.setFault(new AsyncFaultWrapper(e)); + future.setWrappedFault(new AsyncFaultWrapper(e)); } // end try return future; } // end method doInvokeAsyncPoll @@ -279,10 +279,10 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { * @throws Throwable - if an exception is thrown during the invocation */ @SuppressWarnings("unchecked") - private void invokeAsync(Object proxy, + private void invokeAsync(Object proxy, Method method, Object[] args, - AsyncInvocationFutureImpl future, + AsyncInvocationFutureImpl<?> future, Method asyncMethod) throws Throwable { if (source == null) { throw new ServiceRuntimeException("No runtime source is available"); @@ -306,12 +306,22 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { RuntimeEndpoint theEndpoint = getAsyncCallback(source); boolean isAsyncService = false; if (theEndpoint != null) { - // ... the service is asynchronous ... + // ... the service is asynchronous but binding does not support async natively ... attachFuture(theEndpoint, future); - isAsyncService = true; - } else { - // ... the service is synchronous ... } // end if + + if( isAsyncInvocation((RuntimeEndpointReference)source ) ) { + isAsyncService = true; + // Get hold of the JavaAsyncResponseHandler from the chain dealing with the async response + Invoker theInvoker = chain.getHeadInvoker(); + if( theInvoker instanceof InterceptorAsync ) { + InvokerAsyncResponse responseInvoker = ((InterceptorAsync)theInvoker).getPrevious(); + if( responseInvoker instanceof JDKAsyncResponseInvoker ) { + // Register the future as the response object with its ID + ((JDKAsyncResponseInvoker)responseInvoker).registerAsyncResponse(future.getUniqueID(), future); + } // end if + } // end if + } // end if // Perform the invocations on separate thread... theExecutor.submit(new separateThreadInvoker(chain, args, source, future, asyncMethod, isAsyncService)); @@ -354,23 +364,29 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { try { if (isAsyncService) { - invoke(chain, args, invocable, future.getUniqueID()); + if( supportsNativeAsync(invocable) ) { + // Binding supports native async invocations + invokeAsync(chain, args, invocable, future.getUniqueID()); + } else { + // Binding does not support native async invocations + invoke(chain, args, invocable, future.getUniqueID()); + } // end if // The result is returned asynchronously via the future... } else { // ... the service is synchronous ... result = invoke(chain, args, invocable); Type type = null; if (asyncMethod.getReturnType() == Future.class) { - // For callback async menthod + // For callback async method, where a Future is returned Type[] types = asyncMethod.getGenericParameterTypes(); if (types.length > 0 && asyncMethod.getParameterTypes()[types.length - 1] == AsyncHandler.class) { - // Last paremeter, AsyncHandler<T> + // Last parameter is AsyncHandler<T> type = types[types.length - 1]; - } + } // end if } else if (asyncMethod.getReturnType() == Response.class) { // For the polling method, Response<T> type = asyncMethod.getGenericReturnType(); - } + } // end if if (type instanceof ParameterizedType) { // Check if the parameterized type of Response<T> is a doc-lit-wrapper class Class<?> wrapperClass = (Class<?>)((ParameterizedType)type).getActualTypeArguments()[0]; @@ -382,16 +398,15 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { if (p.getWriteMethod() == null) { // There is a "class" property ... continue; - } + } // end if if (p.getWriteMethod().getParameterTypes()[0].isInstance(result)) { p.getWriteMethod().invoke(wrapper, result); result = wrapper; break; - } - } - - } - } + } // end if + } // end for + } // end if + } // end if future.setResponse(result); } // end if } catch (ServiceRuntimeException s) { @@ -400,14 +415,16 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { if ("AsyncResponse".equals(e.getMessage())) { // Do nothing... } else { - future.setFault(new AsyncFaultWrapper(s)); + future.setWrappedFault(new AsyncFaultWrapper(s)); } // end if } // end if } catch (AsyncResponseException ar) { - // do nothing + // This exception is received in the case where the Binding does not support async invocation + // natively - the initial invocation is effectively synchronous with this exception thrown to + // indicate that the service received the request but will send the response separately - do nothing } catch (Throwable t) { - System.out.println("Async invoke got exception: " + t.toString()); - future.setFault(new AsyncFaultWrapper(t)); + //System.out.println("Async invoke got exception: " + t.toString()); + future.setWrappedFault(new AsyncFaultWrapper(t)); } // end try } // end method run @@ -427,6 +444,49 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { } // end method attachFuture /** + * Perform an async invocation on the reference + * @param chain - the chain + * @param args - parameters for the invocation + * @param invocable - the reference + * @param msgID - a message ID + */ + public void invokeAsync(InvocationChain chain, Object[] args, Invocable invocable, String msgID) { + Message msg = messageFactory.createMessage(); + if (invocable instanceof RuntimeEndpointReference) { + msg.setFrom((RuntimeEndpointReference)invocable); + } // end if + if (target != null) { + msg.setTo(target); + } else if (source instanceof RuntimeEndpointReference) { + msg.setTo(((RuntimeEndpointReference)invocable).getTargetEndpoint()); + } // end if + + Operation operation = chain.getTargetOperation(); + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // Deal with header information that needs to be copied from the message context to the new message... + transferMessageHeaders( msg, msgContext); + + ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID" + if( msgID != null ){ + msg.getHeaders().put("MESSAGE_ID", msgID); + } // end if + + try { + // Invoke the reference + invocable.invokeAsync(msg); + return; + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } // end try + } // end method invokeAsync + + /** * Get the async callback endpoint - if not already created, create and start it * @param source - the RuntimeEndpointReference which needs an async callback endpoint * @param future @@ -436,8 +496,14 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { if (!(source instanceof RuntimeEndpointReference)) return null; RuntimeEndpointReference epr = (RuntimeEndpointReference)source; - if (!isAsyncInvocation(epr)) - return null; + if (!isAsyncInvocation(epr)) return null; + + // Check to see if the binding supports async invocation natively + ReferenceBindingProvider eprProvider = epr.getBindingProvider(); + if( eprProvider instanceof EndpointReferenceAsyncProvider) { + if( ((EndpointReferenceAsyncProvider)eprProvider).supportsNativeAsync() ) return null; + } // end if + RuntimeEndpoint endpoint; synchronized (epr) { endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint(); @@ -509,7 +575,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class); JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract(); try { - interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseHandler.class)); + interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseService.class)); } catch (InvalidInterfaceException e1) { // Nothing to do here - will not happen } // end try @@ -647,6 +713,21 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { } // end for return false; } // end isAsyncInvocation + + private boolean supportsNativeAsync(Invocable source) { + if (!(source instanceof RuntimeEndpointReference)) + return false; + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + + // TODO - need to update this once BindingProvider interface is refactored to contain + // supportsNativeAsync directly... + ReferenceBindingProvider provider = epr.getBindingProvider(); + if( provider instanceof EndpointReferenceAsyncProvider ) { + return ((EndpointReferenceAsyncProvider)provider).supportsNativeAsync(); + } else { + return false; + } // end if + } // end method supportsNativeAsync /** * Return the synchronous method that is the equivalent of an async method diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java index aa9cf4ad48..1e49767880 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java @@ -144,7 +144,9 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>, public void setExtensionType(ExtensionType type) {} - public void setFault(AsyncFaultWrapper e) {} + public void setWrappedFault(AsyncFaultWrapper e) {} + + public void setFault(Throwable e) {} public void setResponse(V res) { } @@ -181,7 +183,7 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>, response = payload; } // end if if( response.getClass().equals(AsyncFaultWrapper.class)) { - future.setFault((AsyncFaultWrapper) response ); + future.setWrappedFault((AsyncFaultWrapper) response ); } else { future.setResponse(response); } // end if diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java index fff5fb3991..0e4d4344d2 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java @@ -75,7 +75,7 @@ public class InvocationChainImpl implements InvocationChain { } String phase = forReference ? Phase.REFERENCE : Phase.SERVICE; addInterceptor(phase, interceptor); - } + } // end method addInterceptor public void addInvoker(Invoker invoker) { if (invoker instanceof PhasedInterceptor) { @@ -94,38 +94,13 @@ public class InvocationChainImpl implements InvocationChain { } public Invoker getTailInvoker() { - // *** int nodeCount = nodes.size(); if( nodeCount > 0 ) { return nodes.get( nodeCount - 1).getInvoker(); } // end if - // *** - // find the tail invoker - Invoker next = getHeadInvoker(); - Invoker tail = null; - while (next != null){ - tail = next; - if (next instanceof Interceptor){ - // TODO - hack to get round SCA binding optimization - // On the reference side this loop will go all the way - // across to the service invoker so stop looking if we find - // an invoker with no "previous" pointer. This will be the point - // where the SCA binding invoker points to the head of the - // service chain - if (!(next instanceof InterceptorAsync) || - ((InterceptorAsyncImpl)next).isLocalSCABIndingInvoker()){ - break; - } - - next = ((Interceptor)next).getNext(); - } else { - next = null; - } - } - - return tail; - } + return null; + } // end method getTailInvoker public Invoker getHeadInvoker(String phase) { int index = phaseManager.getAllPhases().indexOf(phase); @@ -269,4 +244,70 @@ public class InvocationChainImpl implements InvocationChain { return isAsyncInvocation; } + public void addHeadInterceptor(Interceptor interceptor) { + String phase = forReference ? Phase.REFERENCE : Phase.SERVICE_BINDING; + if (interceptor instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)interceptor; + if (pi.getPhase() != null) { + phase = pi.getPhase(); + } // end if + } // end if + + addHeadInterceptor(phase, interceptor); + } // end method addHeadInterceptor + + public void addHeadInterceptor(String phase, Interceptor interceptor) { + // TODO Auto-generated method stub + Invoker invoker = (Invoker)interceptor; + + int index = phaseManager.getAllPhases().indexOf(phase); + if (index == -1) { + throw new IllegalArgumentException("Invalid phase name: " + phase); + } // end if + Node node = new Node(index, invoker); + + ListIterator<Node> li = nodes.listIterator(); + Node before = null, after = null; + boolean found = false; + while (li.hasNext()) { + before = after; + after = li.next(); + // Look for the first node with a phase index equal to or greater than the one provided + if (after.getPhaseIndex() >= index) { + // Move back + li.previous(); + li.add(node); + found = true; + break; + } + } + if (!found) { + // Add to the end + nodes.add(node); + before = after; + after = null; + } + + // Relink the interceptors + if (before != null) { + if (before.getInvoker() instanceof Interceptor) { + ((Interceptor)before.getInvoker()).setNext(invoker); + if ((invoker instanceof InterceptorAsync) && + (before.getInvoker() instanceof InvokerAsyncResponse)) { + ((InterceptorAsync) invoker).setPrevious((InvokerAsyncResponse)before.getInvoker()); + } + } + } + if (after != null) { + if (invoker instanceof Interceptor) { + ((Interceptor)invoker).setNext(after.getInvoker()); + if ((after.getInvoker() instanceof InterceptorAsync) && + (invoker instanceof InvokerAsyncResponse)){ + ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsyncResponse)invoker); + } + } + } + + } // end method addHeadInterceptor + } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java index afdad37e2d..df378e55a0 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java @@ -314,7 +314,7 @@ public class JDKInvocationHandler implements InvocationHandler, Serializable { * @param newMsg * @param oldMsg */ - private void transferMessageHeaders( Message newMsg, Message oldMsg ) { + protected void transferMessageHeaders( Message newMsg, Message oldMsg ) { if( oldMsg == null ) return; // For the present, simply copy all the headers if( !oldMsg.getHeaders().isEmpty() ) newMsg.getHeaders().putAll( oldMsg.getHeaders() ); diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java index f072d66227..7109dad1fd 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java @@ -24,7 +24,9 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Future; import javax.xml.ws.AsyncHandler; |