Changes and additions to core in support of Client-side and Server-side asynchronous services and @asyncInvocation as described in TUSCANY-3608, 3611 & 3612

git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@963034 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
edwardsmj 2010-07-11 10:09:27 +00:00
commit 2b6ccef386
8 changed files with 519 additions and 71 deletions

View file

@ -23,24 +23,43 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.StringReader;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stream.StreamSource;
import org.apache.tuscany.sca.assembly.AssemblyFactory;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.assembly.Component;
import org.apache.tuscany.sca.assembly.ComponentReference;
import org.apache.tuscany.sca.assembly.ComponentService;
import org.apache.tuscany.sca.assembly.CompositeReference;
import org.apache.tuscany.sca.assembly.CompositeService;
import org.apache.tuscany.sca.assembly.Contract;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.assembly.Service;
import org.apache.tuscany.sca.assembly.builder.BindingBuilder;
import org.apache.tuscany.sca.assembly.builder.BuilderContext;
import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint;
import org.apache.tuscany.sca.assembly.impl.EndpointImpl;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.contribution.processor.ContributionReadException;
import org.apache.tuscany.sca.contribution.processor.ProcessorContext;
import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor;
import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint;
import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor;
import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor;
import org.apache.tuscany.sca.core.invocation.RuntimeInvoker;
@ -49,7 +68,10 @@ import org.apache.tuscany.sca.core.invocation.impl.PhaseManager;
import org.apache.tuscany.sca.interfacedef.Compatibility;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper;
import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
@ -68,6 +90,7 @@ import org.apache.tuscany.sca.runtime.EndpointSerializer;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.apache.tuscany.sca.runtime.RuntimeWireProcessor;
import org.apache.tuscany.sca.runtime.RuntimeWireProcessorExtensionPoint;
import org.apache.tuscany.sca.work.WorkScheduler;
@ -101,7 +124,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
protected InterfaceContract serviceInterfaceContract;
/**
* No-arg constructor for Java serilization
* No-arg constructor for Java serialization
*/
public RuntimeEndpointImpl() {
super(null);
@ -218,6 +241,15 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
}
public Message invoke(Message msg) {
// Deal with async callback
// Ensure invocation chains are built...
getInvocationChains();
if ( !this.getCallbackEndpointReferences().isEmpty() ) {
RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0);
// Place a link to the callback EPR into the message headers...
msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR );
}
// end of async callback handling
return invoker.invokeBinding(msg);
}
@ -288,10 +320,149 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
addServiceBindingInterceptor(chain, operation);
addImplementationInterceptor(serviceComponent, service, chain, targetOperation);
chains.add(chain);
// Handle cases where the operation is an async server
if( targetOperation.isAsyncServer() ) {
createAsyncServerCallback( this, operation );
} // end if
}
wireProcessor.process(this);
}
/**
* Creates the async callback for the supplied Endpoint and Operation, if it does not already exist
* and stores it into the Endpoint
* @param endpoint - the Endpoint
* @param operation - the Operation
*/
private void createAsyncServerCallback( RuntimeEndpoint endpoint, Operation operation ) {
// Check to see if the callback already exists
if( asyncCallbackExists( endpoint ) ) return;
RuntimeEndpointReference asyncEPR = createAsyncEPR( endpoint );
// Store the new callback EPR into the Endpoint
endpoint.getCallbackEndpointReferences().add(asyncEPR);
} // end method createAsyncServerCallback
/**
* Creates the Endpoint object for the async callback
* @param endpoint - the endpoint which has the async server operations
* @return the EndpointReference object representing the callback
*/
private RuntimeEndpointReference createAsyncEPR( RuntimeEndpoint endpoint ){
CompositeContext compositeContext = endpoint.getCompositeContext();
RuntimeAssemblyFactory assemblyFactory = getAssemblyFactory( compositeContext );
RuntimeEndpointReference epr = (RuntimeEndpointReference)assemblyFactory.createEndpointReference();
epr.bind( compositeContext );
// Create pseudo-reference
ComponentReference reference = assemblyFactory.createComponentReference();
ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry();
FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
JavaInterfaceFactory javaInterfaceFactory = (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class);
JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract();
try {
interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseHandler.class));
} catch (InvalidInterfaceException e1) {
// Nothing to do here - will not happen
} // end try
reference.setInterfaceContract(interfaceContract);
String referenceName = endpoint.getService().getName() + "_asyncCallback";
reference.setName(referenceName);
reference.setForCallback(true);
epr.setReference(reference);
// Create a binding
Binding binding = createMatchingBinding( endpoint.getBinding(), (RuntimeComponent)endpoint.getComponent(), reference, registry );
epr.setBinding(binding);
// Need to establish policies here (binding has some...)
epr.getRequiredIntents().addAll( endpoint.getRequiredIntents() );
epr.getPolicySets().addAll( endpoint.getPolicySets() );
String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")";
epr.setURI(eprURI);
// Attach a dummy endpoint to the epr
RuntimeEndpoint ep = (RuntimeEndpoint)assemblyFactory.createEndpoint();
ep.setUnresolved(false);
epr.setTargetEndpoint(ep);
//epr.setStatus(EndpointReference.Status.RESOLVED_BINDING);
epr.setStatus(EndpointReference.Status.WIRED_TARGET_FOUND_AND_MATCHED);
epr.setUnresolved(false);
return epr;
} // end method RuntimeEndpointReference
private boolean asyncCallbackExists( RuntimeEndpoint endpoint ) {
if( endpoint.getCallbackEndpointReferences().isEmpty() ) return false;
return true;
} // end method asyncCallbackExists
/**
* Create a matching binding to a supplied binding
* - the matching binding has the same binding type, but is for the supplied component and service
* @param matchBinding - the binding to match
* @param component - the component
* @param service - the service
* @param registry - registry for extensions
* @return - the matching binding, or null if it could not be created
*/
@SuppressWarnings("unchecked")
private Binding createMatchingBinding( Binding matchBinding, RuntimeComponent component,
ComponentReference reference, ExtensionPointRegistry registry ) {
// Since there is no simple way to obtain a Factory for a binding where the type is not known ahead of
// time, the process followed here is to generate the <binding.xxx/> XML element from the binding type QName
// and then read the XML using the processor for that XML...
QName bindingName = matchBinding.getType();
String bindingXML = "<ns1:" + bindingName.getLocalPart() + " xmlns:ns1='" + bindingName.getNamespaceURI() + "'/>";
StAXArtifactProcessorExtensionPoint processors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class);
StAXArtifactProcessor<?> processor = (StAXArtifactProcessor<?>)processors.getProcessor(bindingName);
FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
ValidatingXMLInputFactory inputFactory = modelFactories.getFactory(ValidatingXMLInputFactory.class);
StreamSource source = new StreamSource( new StringReader(bindingXML) );
ProcessorContext context = new ProcessorContext();
try {
XMLStreamReader reader = inputFactory.createXMLStreamReader(source);
reader.next();
Binding newBinding = (Binding) processor.read(reader, context );
newBinding.setName("asyncCallback");
// Create a URI address for the callback based on the Component_Name/Reference_Name pattern
String callbackURI = "/" + component.getName() + "/" + reference.getName();
//newBinding.setURI(callbackURI);
BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class);
BindingBuilder builder = builders.getBindingBuilder(newBinding.getType());
if (builder != null) {
org.apache.tuscany.sca.assembly.builder.BuilderContext builderContext = new BuilderContext(registry);
builder.build(component, reference, newBinding, builderContext, true);
} // end if
return newBinding;
} catch (ContributionReadException e) {
e.printStackTrace();
} catch (XMLStreamException e) {
e.printStackTrace();
}
return null;
} // end method createMatchingBinding
/**
* Gets a RuntimeAssemblyFactory from the CompositeContext
* @param compositeContext
* @return the RuntimeAssemblyFactory
*/
private RuntimeAssemblyFactory getAssemblyFactory( CompositeContext compositeContext ) {
ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry();
FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class);
} // end method RuntimeAssemblyFactory
private void initServiceBindingInvocationChains() {

View file

@ -19,6 +19,9 @@
package org.apache.tuscany.sca.core.invocation;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
/**
* A class which is used to wrap an Exception of any type thrown by an asynchronous service operation and
* which is returned through a separate one-way message sent asynchronously from the server to the client.
@ -27,26 +30,73 @@ package org.apache.tuscany.sca.core.invocation;
public class AsyncFaultWrapper {
private String faultClassName = null;
private Exception e = null;
private String faultMessage = null;
private AsyncFaultWrapper containedFault = null;
public AsyncFaultWrapper() {
super();
}
public AsyncFaultWrapper( Exception e ) {
/**
* Constructor which creates an AsyncFaultWrapper which wraps the supplied Throwable
* @param e - a Throwable which is wrapped by this AsyncFaultWrapper
*/
public AsyncFaultWrapper( Throwable e ) {
super();
storeFault( e );
}
public void storeFault( Exception e ) {
faultClassName = e.getClass().getCanonicalName();
this.e = e;
/**
* Stores a given Throwable in this AsyncFaultWrapper
* If the supplied Throwable itself contains an embedded Throwable ("cause"), this is recursively
* wrapped by a nested AsyncFaultWrapper
* @param e - the Throwable
*/
public void storeFault( Throwable e ) {
setFaultClassName( e.getClass().getCanonicalName() );
setFaultMessage( e.getMessage() );
Throwable cause = e.getCause();
if( cause != null ) setContainedFault( new AsyncFaultWrapper( cause ) );
}
public Exception retrieveFault( ) {
if( e != null ) return e;
System.out.println( "Tried to retrieve Exception reom AsyncFaultWrapper: " + faultClassName);
return null;
}
/**
* Retrieves the Throwable wrapped by this AsyncFaultWrapper
*
* Note: When this method is invoked, the method attempts to instantiate an instance of the wrapped Throwable.
* It does this using the Thread Context Class Loader (TCCL) - the caller *MUST* ensure that the TCCL has access
* to the class of the wrapped Throwable and also to the classes of any nested Throwables. If this is not done,
* a ClassNotFound exception is thrown
*
* @return - the Throwable wrapped by this AsyncFaultWrapper - the Throwable will contain any nested Throwable(s)
* in its cause property
* @throws ClassNotFound exception, if the class of the wrapped Throwable is not accessible from the TCCL
*/
public Throwable retrieveFault( ) {
try {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
Class<?> faultClass = tccl.loadClass(faultClassName);
Class<Throwable> xclass = (Class<Throwable>) faultClass;
if( containedFault != null ) {
// If there is a nested fault, retrieve this recursively
Constructor cons = xclass.getConstructor(String.class, Throwable.class);
return (Throwable) cons.newInstance(faultMessage, getContainedFault().retrieveFault());
} else {
Constructor cons = xclass.getConstructor(String.class);
return (Throwable) cons.newInstance(faultMessage);
} // end if
} catch (Exception e) {
return e;
} // end try
} // end method retrieveFault
public void setFaultClassName( String name ) { this.faultClassName = name; }
public String getFaultClassName() { return this.faultClassName; }
public String getFaultMessage() { return faultMessage; }
public void setFaultMessage(String faultMessage) { this.faultMessage = faultMessage; }
public AsyncFaultWrapper getContainedFault() { return containedFault; }
public void setContainedFault(AsyncFaultWrapper containedFault) { this.containedFault = containedFault; }
}
} // end class AsyncFaultWrapper

View file

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.core.invocation;
/**
* An exception which is used to signal that a service has been invoked asynchronously
* and that the result will be sent separately
*
*/
public class AsyncResponseException extends RuntimeException {
private static final long serialVersionUID = 457954562860541631L;
public AsyncResponseException() {
super();
}
public AsyncResponseException(String arg0, Throwable arg1) {
super(arg0, arg1);
}
public AsyncResponseException(String arg0) {
super(arg0);
}
public AsyncResponseException(Throwable arg0) {
super(arg0);
}
} // end class AsyncResponseException

View file

@ -19,6 +19,7 @@
package org.apache.tuscany.sca.core.invocation;
import org.oasisopen.sca.annotation.OneWay;
import org.oasisopen.sca.annotation.Remotable;
/**
@ -34,6 +35,7 @@ public interface AsyncResponseHandler<V> {
* @param e - the wrapper containing the Fault to send
* @throws IllegalStateException if either the setResponse method or the setFault method have been called previously
*/
@OneWay
public void setFault(AsyncFaultWrapper e);
/**
@ -41,6 +43,7 @@ public interface AsyncResponseHandler<V> {
* @throws IllegalStateException if either the setResponse method or the setFault method have been called previously
* @param res - the response message, which is of type V
*/
@OneWay
public void setResponse(V res);
}

View file

@ -57,6 +57,8 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
private String uniqueID = UUID.randomUUID().toString();
private ClassLoader classLoader = null;
protected AsyncInvocationFutureImpl() {
super();
} // end constructor
@ -66,10 +68,13 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
* to be set for the class instances
* @param <V> - the type of the response from the asynchronously invoked service
* @param type - the type of the AsyncInvocationFutureImpl expressed as a parameter
* @param classLoader - the classloader used for the business interface to which this Future applies
* @return - an instance of AsyncInvocationFutureImpl<V>
*/
public static <V> AsyncInvocationFutureImpl<V> newInstance( Class<V> type ) {
return new AsyncInvocationFutureImpl<V>();
public static <V> AsyncInvocationFutureImpl<V> newInstance( Class<V> type, ClassLoader classLoader ) {
AsyncInvocationFutureImpl<V> future = new AsyncInvocationFutureImpl<V>();
future.setClassLoader( classLoader );
return future;
}
/**
@ -146,8 +151,17 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
*/
public void setFault(AsyncFaultWrapper w) {
Exception e = w.retrieveFault();
if( e != null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception");
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
Throwable e;
try {
// Set the TCCL to the classloader of the business interface
Thread.currentThread().setContextClassLoader(this.getClassLoader());
e = w.retrieveFault();
} finally {
Thread.currentThread().setContextClassLoader(tccl);
} // end try
if( e == null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception");
lock.lock();
try {
if( notSetYet() ) {
@ -201,8 +215,25 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
* @return - a Map containing the context
*/
public Map<String, Object> getContext() {
// TODO Auto-generated method stub
// Intentionally returns null
return null;
}
/**
* Gets the classloader associated with the business interface to which this Future relates
* @return the ClassLoader of the business interface
*/
public ClassLoader getClassLoader() {
return classLoader;
}
/**
* Sets the classloader associated with the business interface to which this Future relates
* @param classLoader - the classloader of the business interface
*/
public void setClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
} // end class AsyncInvocationFutureImpl

View file

@ -21,15 +21,17 @@ package org.apache.tuscany.sca.core.invocation.impl;
import java.io.StringReader;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stream.StreamSource;
@ -54,29 +56,23 @@ import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointReferenceImpl;
import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
import org.apache.tuscany.sca.interfacedef.util.FaultException;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.policy.Intent;
import org.apache.tuscany.sca.provider.ImplementationProvider;
import org.apache.tuscany.sca.provider.ImplementationProviderFactory;
import org.apache.tuscany.sca.provider.PolicyProvider;
import org.apache.tuscany.sca.provider.RuntimeProvider;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.Invocable;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ComponentContext;
import org.oasisopen.sca.ServiceReference;
import org.oasisopen.sca.ServiceRuntimeException;
import org.oasisopen.sca.annotation.AsyncInvocation;
import org.oasisopen.sca.ServiceRuntimeException;
/**
* An InvocationHandler which deals with JAXWS-defined asynchronous client Java API method calls
@ -96,6 +92,15 @@ import org.oasisopen.sca.annotation.AsyncInvocation;
public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
private static final long serialVersionUID = 1L;
private static int invocationCount = 10; // # of threads to use
private static long maxWaitTime = 30; // Max wait time for completion = 30sec
// Run the async service invocations using a ThreadPoolExecutor
private static ThreadPoolExecutor theExecutor = new ThreadPoolExecutor( invocationCount, invocationCount,
maxWaitTime, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>( invocationCount ) );
public AsyncJDKInvocationHandler(MessageFactory messageFactory, ServiceReference<?> callableReference) {
super(messageFactory, callableReference);
@ -119,7 +124,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
return doInvokeAsyncPoll(proxy, method, args);
} else {
// Regular synchronous method call
return super.invoke(proxy, method, args);
return doInvokeSync(proxy, method, args);
}
}
@ -156,13 +161,11 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
*/
@SuppressWarnings("unchecked")
protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) {
Object response;
Class<?> returnType = getNonAsyncMethod(asyncMethod).getReturnType();
// Allocate the Future<?> / Response<?> object - note: Response<?> is a subclass of Future<?>
AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType );
AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType, getInterfaceClassloader() );
try {
response = invokeAsync(proxy, getNonAsyncMethod(asyncMethod), args, future);
future.setResponse(response);
invokeAsync(proxy, getNonAsyncMethod(asyncMethod), args, future);
} catch (Exception e) {
future.setFault( new AsyncFaultWrapper(e) );
} catch (Throwable t ) {
@ -171,11 +174,31 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
future.setFault( new AsyncFaultWrapper(e) );
} // end try
return future;
//return new AsyncResponse(response, isException);
} // end method doInvokeAsyncPoll
/**
* Provide a synchronous invocation of a service operation that is either synchronous or asynchronous
* @return
*/
protected Object doInvokeSync(Object proxy, Method method, Object[] args) throws Throwable {
if ( isAsyncInvocation( source ) ) {
// Target service is asynchronous
Class<?> returnType = method.getReturnType();
AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType, getInterfaceClassloader() );
invokeAsync(proxy, method, args, future);
// Wait for some maximum time for the result - 1000 seconds here
// Really, if the service is async, the client should use async client methods to invoke the service
// - and be prepared to wait a *really* long time
return future.get(1000, TimeUnit.SECONDS);
} else {
// Target service is not asynchronous, so perform sync invocation
return super.invoke(proxy, method, args);
} // end if
} // end method doInvokeSync
/**
* Invoke an async callback method
* Invoke an async callback method - note that this form of the async client API has as its final parameter
* an AsyncHandler method, used for callbacks to the client code
* @param proxy - the reference proxy
* @param asyncMethod - the async method to invoke
* @param args - array of input arguments to the method
@ -186,14 +209,17 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod, Object[] args) {
AsyncHandler handler = (AsyncHandler)args[args.length-1];
Response response = doInvokeAsyncPoll(proxy,asyncMethod,Arrays.copyOf(args, args.length-1));
handler.handleResponse(response);
// Invoke the callback handler, if present
if( handler != null ) {
handler.handleResponse(response);
} // end if
return response;
} // end method doInvokeAsyncCallback
/**
* Invoke the target method on
* @param proxy
* Invoke the target (synchronous) method asynchronously
* @param proxy - the reference proxy object
* @param method - the method to invoke
* @param args - arguments for the call
* @param future - Future for handling the response
@ -201,10 +227,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
* @throws Throwable - if an exception is thrown during the invocation
*/
@SuppressWarnings("unchecked")
private Object invokeAsync(Object proxy, Method method, Object[] args, AsyncInvocationFutureImpl future) throws Throwable {
if (Object.class == method.getDeclaringClass()) {
return invokeObjectMethod(method, args);
}
private void invokeAsync(Object proxy, Method method, Object[] args, AsyncInvocationFutureImpl future) throws Throwable {
if (source == null) {
throw new ServiceRuntimeException("No runtime source is available");
}
@ -215,7 +238,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
epr.rebuild();
chains.clear();
}
}
} // end if
InvocationChain chain = getInvocationChain(method, source);
@ -223,22 +246,84 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
throw new IllegalArgumentException("No matching operation is found: " + method);
}
// Organize for an async service
RuntimeEndpoint theEndpoint = getAsyncCallback( source );
attachFuture( theEndpoint, future );
boolean isAsyncService = false;
if( theEndpoint != null ) {
// ... the service is asynchronous ...
attachFuture( theEndpoint, future );
isAsyncService = true;
} else {
// ... the service is synchronous ...
} // end if
// send the invocation down the source
Object result = super.invoke(chain, args, source);
// Perform the invocations on separate thread...
theExecutor.execute( new separateThreadInvoker( chain, args, source, future, isAsyncService ) );
return result;
return;
} // end method invokeAsync
/**
* An inner class which acts as a runnable task for invoking services asynchronously on threads that are separate from
* those used to execute operations of components
*
* This supports both synchronous services and asynchronous services
*/
private class separateThreadInvoker implements Runnable {
private AsyncInvocationFutureImpl future;
private InvocationChain chain;
private Object[] args;
private Invocable invocable;
private boolean isAsyncService;
public separateThreadInvoker( InvocationChain chain, Object[] args, Invocable invocable,
AsyncInvocationFutureImpl future, boolean isAsyncService ) {
super();
this.chain = chain;
this.args = args;
this.invocable = invocable;
this.future = future;
this.isAsyncService = isAsyncService;
} // end constructor
public void run() {
Object result;
try {
if( isAsyncService ) {
invoke(chain, args, invocable, future.getUniqueID());
// The result is returned asynchronously via the future...
} else {
// ... the service is synchronous ...
result = invoke(chain, args, invocable);
future.setResponse(result);
} // end if
} catch ( ServiceRuntimeException s ) {
Throwable e = s.getCause();
if( e != null && e instanceof FaultException ) {
if( "AsyncResponse".equals(e.getMessage()) ) {
// Do nothing...
} else {
future.setFault( new AsyncFaultWrapper( s ) );
} // end if
} // end if
} catch ( Throwable t ) {
System.out.println("Async invoke got exception: " + t.toString());
future.setFault( new AsyncFaultWrapper( t ) );
} // end try
} // end method run
} // end class separateThreadInvoker
/**
* Attaches a future to the callback endpoint - so that the Future is triggered when a response is
* received from the asynchronous service invocation associated with the Future
* @param endpoint - the async callback endpoint
* @param future - the async invocation future to attach
*/
private void attachFuture( RuntimeEndpoint endpoint, AsyncInvocationFutureImpl future ) {
private void attachFuture( RuntimeEndpoint endpoint, AsyncInvocationFutureImpl<?> future ) {
Implementation impl = endpoint.getComponent().getImplementation();
AsyncResponseHandlerImpl<?> asyncHandler = (AsyncResponseHandlerImpl<?>) impl;
asyncHandler.addFuture(future);
@ -257,11 +342,12 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
RuntimeEndpoint endpoint;
synchronized( epr ) {
endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint();
// If the async callback endpoint is already created, return it...
if( endpoint != null ) return endpoint;
// Create the endpoint for the async callback
endpoint = createAsyncCallbackEndpoint( epr );
epr.setCallbackEndpoint(endpoint);
}
} // end synchronized
// Activate the new callback endpoint
startEndpoint( epr.getCompositeContext(), endpoint );
@ -335,19 +421,9 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
services.clear();
services.add(service);
Binding eprBinding = epr.getBinding();
try {
Binding binding = (Binding)eprBinding.clone();
// Create a binding
binding = createMatchingBinding( eprBinding, fakeComponent, service, registry );
// Create a URI address for the callback based on the Component_Name/Reference_Name pattern
//String callbackURI = "/" + epr.getComponent().getName() + "/" + serviceName;
//binding.setURI(callbackURI);
endpoint.setBinding(binding);
} catch (CloneNotSupportedException e) {
// will not happen
} // end try
// Create a binding
Binding binding = createMatchingBinding( epr.getBinding(), fakeComponent, service, registry );
endpoint.setBinding(binding);
// Need to establish policies here (binding has some...)
endpoint.getRequiredIntents().addAll( epr.getRequiredIntents() );
@ -421,6 +497,11 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class);
} // end method RuntimeAssemblyFactory
/**
* Applies an AsyncResponseHandlerImpl as the implementation of a RuntimeComponent
* - the AsyncResponseHandlerImpl acts as both the implementation class and the implementation provider...
* @param component - the component
*/
private void applyImplementation( RuntimeComponent component ) {
AsyncResponseHandlerImpl<?> asyncHandler = new AsyncResponseHandlerImpl<Object>();
component.setImplementation( asyncHandler );
@ -434,7 +515,8 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
* @param source - the EPR involved in the invocation
* @return - true if the invocation is async
*/
private boolean isAsyncInvocation( RuntimeEndpointReference source ) {
private boolean isAsyncInvocation( Invocable source ) {
if( !(source instanceof RuntimeEndpointReference) ) return false;
RuntimeEndpointReference epr = (RuntimeEndpointReference) source;
// First check is to see if the EPR itself has the asyncInvocation intent marked
for( Intent intent : epr.getRequiredIntents() ) {
@ -462,5 +544,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
}
}
throw new IllegalStateException("No synchronous method matching async method " + asyncMethod.getName());
} // end method getNonAsyncMethod
/**
* Gets the classloader of the business interface
* @return
*/
private ClassLoader getInterfaceClassloader( ) {
return businessInterface.getClassLoader();
}
}

View file

@ -42,6 +42,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService;
/**
* A class intended to form the final link in the chain calling into a Future which represents
* the response to an asynchronous service invocation
*
* Most methods are dummies, required to fulfil the contracts for ImplementationProvider, Implementation
* and Invoker, since this class collapses together the functions of these separate interfaces, due to its
* specialized nature, where most of the function will never be used.
@ -52,7 +53,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService;
* message header. On receipt of each message, the class seeks out the Future with that unique ID and completes the future
* either with a response message or with a Fault.
*
* @param <V>
* @param <V>
*/
public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
ImplementationProvider, Implementation, Invoker {
@ -60,6 +61,9 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
private ConcurrentHashMap< String, AsyncInvocationFutureImpl<?> > table =
new ConcurrentHashMap< String, AsyncInvocationFutureImpl<?> >();
/**
* This class is its own invoker...
*/
public Invoker createInvoker(RuntimeComponentService service,
Operation operation) {
return this;
@ -144,13 +148,46 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
public void setResponse(V res) { }
public Message invoke(Message msg) {
// TODO Auto-generated method stub
/**
* Method which is the termination for the invocation chain from the callback endpoint
* @param msg - the Tuscany message containing the response from the async service invocation
* which is either the Response message or an exception of some kind
*/
private static final String WS_MESSAGE_ID = "WS_MESSAGE_ID";
public Message invoke(Message msg) {
// Get the unique ID from the message header
// Fetch the Future with that Unique ID
// Complete the Future with a Response message
// ...or complete the Future with a Fault
return null;
}
String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID);
if( idValue == null ) {
System.out.println( "Async message ID not found ");
} else {
// Fetch the Future with that Unique ID
AsyncInvocationFutureImpl future = table.get(idValue);
if( future == null ) {
System.out.println("Future not found for id: " + idValue);
} else {
// Complete the Future with a Response message
Object payload = msg.getBody();
Object response;
if( payload == null ) {
System.out.println("Returned response message was null");
} else {
if (payload.getClass().isArray()) {
response = ((Object[])payload)[0];
} else {
response = payload;
} // end if
if( response.getClass().equals(AsyncFaultWrapper.class)) {
future.setFault((AsyncFaultWrapper) response );
} else {
future.setResponse(response);
} // end if
} // end if
} // end if
} // end if
// Prepare an empty response message
msg.setBody(null);
return msg;
} // end method invoke
} // end class

View file

@ -225,8 +225,22 @@ public class JDKInvocationHandler implements InvocationHandler, Serializable {
protected void setEndpoint(Endpoint endpoint) {
this.target = endpoint;
}
protected Object invoke(InvocationChain chain, Object[] args, Invocable source)
throws Throwable {
return invoke( chain, args, source, null );
}
/**
* Invoke the chain
* @param chain - the chain
* @param args - arguments to the invocation as an array of Objects
* @param source - the Endpoint or EndpointReference to which the chain relates
* @param msgID - an ID for the message being sent, may be null
* @return - the Response message from the invocation
* @throws Throwable - if any exception occurs during the invocation
*/
protected Object invoke(InvocationChain chain, Object[] args, Invocable source, String msgID)
throws Throwable {
Message msg = messageFactory.createMessage();
if (source instanceof RuntimeEndpointReference) {
@ -250,6 +264,11 @@ public class JDKInvocationHandler implements InvocationHandler, Serializable {
transferMessageHeaders( msg, msgContext);
ThreadMessageContext.setMessageContext(msg);
// If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID"
if( msgID != null ){
msg.getHeaders().put("MESSAGE_ID", msgID);
} // end if
try {
// dispatch the source down the chain and get the response