From 2b6ccef38686cc118bf89904114d40652b9464f3 Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Sun, 11 Jul 2010 10:09:27 +0000 Subject: Changes and additions to core in support of Client-side and Server-side asynchronous services and @asyncInvocation as described in TUSCANY-3608, 3611 & 3612 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@963034 13f79535-47bb-0310-9956-ffa450edef68 --- .../core/assembly/impl/RuntimeEndpointImpl.java | 173 +++++++++++++++++++- .../sca/core/invocation/AsyncFaultWrapper.java | 72 +++++++-- .../core/invocation/AsyncResponseException.java | 47 ++++++ .../sca/core/invocation/AsyncResponseHandler.java | 3 + .../invocation/impl/AsyncInvocationFutureImpl.java | 41 ++++- .../invocation/impl/AsyncJDKInvocationHandler.java | 180 +++++++++++++++------ .../invocation/impl/AsyncResponseHandlerImpl.java | 53 +++++- .../core/invocation/impl/JDKInvocationHandler.java | 21 ++- 8 files changed, 519 insertions(+), 71 deletions(-) create mode 100644 sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java (limited to 'sca-java-2.x/trunk/modules/core') diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java index d1c81ef8ee..eeadfef9c1 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java @@ -23,24 +23,43 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.io.StringReader; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.transform.stream.StreamSource; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.Binding; import org.apache.tuscany.sca.assembly.Component; import org.apache.tuscany.sca.assembly.ComponentReference; import org.apache.tuscany.sca.assembly.ComponentService; import org.apache.tuscany.sca.assembly.CompositeReference; import org.apache.tuscany.sca.assembly.CompositeService; import org.apache.tuscany.sca.assembly.Contract; +import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.assembly.builder.BindingBuilder; +import org.apache.tuscany.sca.assembly.builder.BuilderContext; +import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint; import org.apache.tuscany.sca.assembly.impl.EndpointImpl; import org.apache.tuscany.sca.context.CompositeContext; +import org.apache.tuscany.sca.contribution.processor.ContributionReadException; +import org.apache.tuscany.sca.contribution.processor.ProcessorContext; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint; +import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; +import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor; import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor; import org.apache.tuscany.sca.core.invocation.RuntimeInvoker; @@ -49,7 +68,10 @@ import org.apache.tuscany.sca.core.invocation.impl.PhaseManager; import org.apache.tuscany.sca.interfacedef.Compatibility; import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper; +import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException; import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; +import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; import org.apache.tuscany.sca.invocation.Interceptor; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; @@ -68,6 +90,7 @@ import org.apache.tuscany.sca.runtime.EndpointSerializer; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; import org.apache.tuscany.sca.runtime.RuntimeWireProcessor; import org.apache.tuscany.sca.runtime.RuntimeWireProcessorExtensionPoint; import org.apache.tuscany.sca.work.WorkScheduler; @@ -101,7 +124,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint protected InterfaceContract serviceInterfaceContract; /** - * No-arg constructor for Java serilization + * No-arg constructor for Java serialization */ public RuntimeEndpointImpl() { super(null); @@ -218,6 +241,15 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } public Message invoke(Message msg) { + // Deal with async callback + // Ensure invocation chains are built... + getInvocationChains(); + if ( !this.getCallbackEndpointReferences().isEmpty() ) { + RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0); + // Place a link to the callback EPR into the message headers... + msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR ); + } + // end of async callback handling return invoker.invokeBinding(msg); } @@ -288,10 +320,149 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint addServiceBindingInterceptor(chain, operation); addImplementationInterceptor(serviceComponent, service, chain, targetOperation); chains.add(chain); + + // Handle cases where the operation is an async server + if( targetOperation.isAsyncServer() ) { + createAsyncServerCallback( this, operation ); + } // end if } wireProcessor.process(this); } + + /** + * Creates the async callback for the supplied Endpoint and Operation, if it does not already exist + * and stores it into the Endpoint + * @param endpoint - the Endpoint + * @param operation - the Operation + */ + private void createAsyncServerCallback( RuntimeEndpoint endpoint, Operation operation ) { + // Check to see if the callback already exists + if( asyncCallbackExists( endpoint ) ) return; + + RuntimeEndpointReference asyncEPR = createAsyncEPR( endpoint ); + + // Store the new callback EPR into the Endpoint + endpoint.getCallbackEndpointReferences().add(asyncEPR); + } // end method createAsyncServerCallback + + /** + * Creates the Endpoint object for the async callback + * @param endpoint - the endpoint which has the async server operations + * @return the EndpointReference object representing the callback + */ + private RuntimeEndpointReference createAsyncEPR( RuntimeEndpoint endpoint ){ + CompositeContext compositeContext = endpoint.getCompositeContext(); + RuntimeAssemblyFactory assemblyFactory = getAssemblyFactory( compositeContext ); + RuntimeEndpointReference epr = (RuntimeEndpointReference)assemblyFactory.createEndpointReference(); + epr.bind( compositeContext ); + + // Create pseudo-reference + ComponentReference reference = assemblyFactory.createComponentReference(); + ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry(); + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + JavaInterfaceFactory javaInterfaceFactory = (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class); + JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract(); + try { + interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseHandler.class)); + } catch (InvalidInterfaceException e1) { + // Nothing to do here - will not happen + } // end try + reference.setInterfaceContract(interfaceContract); + String referenceName = endpoint.getService().getName() + "_asyncCallback"; + reference.setName(referenceName); + reference.setForCallback(true); + epr.setReference(reference); + + // Create a binding + Binding binding = createMatchingBinding( endpoint.getBinding(), (RuntimeComponent)endpoint.getComponent(), reference, registry ); + epr.setBinding(binding); + + // Need to establish policies here (binding has some...) + epr.getRequiredIntents().addAll( endpoint.getRequiredIntents() ); + epr.getPolicySets().addAll( endpoint.getPolicySets() ); + String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")"; + epr.setURI(eprURI); + + // Attach a dummy endpoint to the epr + RuntimeEndpoint ep = (RuntimeEndpoint)assemblyFactory.createEndpoint(); + ep.setUnresolved(false); + epr.setTargetEndpoint(ep); + //epr.setStatus(EndpointReference.Status.RESOLVED_BINDING); + epr.setStatus(EndpointReference.Status.WIRED_TARGET_FOUND_AND_MATCHED); + epr.setUnresolved(false); + + return epr; + } // end method RuntimeEndpointReference + + private boolean asyncCallbackExists( RuntimeEndpoint endpoint ) { + if( endpoint.getCallbackEndpointReferences().isEmpty() ) return false; + return true; + } // end method asyncCallbackExists + + /** + * Create a matching binding to a supplied binding + * - the matching binding has the same binding type, but is for the supplied component and service + * @param matchBinding - the binding to match + * @param component - the component + * @param service - the service + * @param registry - registry for extensions + * @return - the matching binding, or null if it could not be created + */ + @SuppressWarnings("unchecked") + private Binding createMatchingBinding( Binding matchBinding, RuntimeComponent component, + ComponentReference reference, ExtensionPointRegistry registry ) { + // Since there is no simple way to obtain a Factory for a binding where the type is not known ahead of + // time, the process followed here is to generate the XML element from the binding type QName + // and then read the XML using the processor for that XML... + QName bindingName = matchBinding.getType(); + String bindingXML = ""; + + StAXArtifactProcessorExtensionPoint processors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); + StAXArtifactProcessor processor = (StAXArtifactProcessor)processors.getProcessor(bindingName); + + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + ValidatingXMLInputFactory inputFactory = modelFactories.getFactory(ValidatingXMLInputFactory.class); + StreamSource source = new StreamSource( new StringReader(bindingXML) ); + + ProcessorContext context = new ProcessorContext(); + try { + XMLStreamReader reader = inputFactory.createXMLStreamReader(source); + reader.next(); + Binding newBinding = (Binding) processor.read(reader, context ); + newBinding.setName("asyncCallback"); + + // Create a URI address for the callback based on the Component_Name/Reference_Name pattern + String callbackURI = "/" + component.getName() + "/" + reference.getName(); + //newBinding.setURI(callbackURI); + + BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class); + BindingBuilder builder = builders.getBindingBuilder(newBinding.getType()); + if (builder != null) { + org.apache.tuscany.sca.assembly.builder.BuilderContext builderContext = new BuilderContext(registry); + builder.build(component, reference, newBinding, builderContext, true); + } // end if + + return newBinding; + } catch (ContributionReadException e) { + e.printStackTrace(); + } catch (XMLStreamException e) { + e.printStackTrace(); + } + + return null; + } // end method createMatchingBinding + + /** + * Gets a RuntimeAssemblyFactory from the CompositeContext + * @param compositeContext + * @return the RuntimeAssemblyFactory + */ + private RuntimeAssemblyFactory getAssemblyFactory( CompositeContext compositeContext ) { + ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry(); + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class); + } // end method RuntimeAssemblyFactory private void initServiceBindingInvocationChains() { diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java index c76e02597d..3d98de9e21 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java @@ -19,6 +19,9 @@ package org.apache.tuscany.sca.core.invocation; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + /** * A class which is used to wrap an Exception of any type thrown by an asynchronous service operation and * which is returned through a separate one-way message sent asynchronously from the server to the client. @@ -27,26 +30,73 @@ package org.apache.tuscany.sca.core.invocation; public class AsyncFaultWrapper { private String faultClassName = null; - private Exception e = null; + private String faultMessage = null; + private AsyncFaultWrapper containedFault = null; + public AsyncFaultWrapper() { super(); } - public AsyncFaultWrapper( Exception e ) { + /** + * Constructor which creates an AsyncFaultWrapper which wraps the supplied Throwable + * @param e - a Throwable which is wrapped by this AsyncFaultWrapper + */ + public AsyncFaultWrapper( Throwable e ) { super(); storeFault( e ); } - public void storeFault( Exception e ) { - faultClassName = e.getClass().getCanonicalName(); - this.e = e; + /** + * Stores a given Throwable in this AsyncFaultWrapper + * If the supplied Throwable itself contains an embedded Throwable ("cause"), this is recursively + * wrapped by a nested AsyncFaultWrapper + * @param e - the Throwable + */ + public void storeFault( Throwable e ) { + setFaultClassName( e.getClass().getCanonicalName() ); + setFaultMessage( e.getMessage() ); + Throwable cause = e.getCause(); + if( cause != null ) setContainedFault( new AsyncFaultWrapper( cause ) ); } - public Exception retrieveFault( ) { - if( e != null ) return e; - System.out.println( "Tried to retrieve Exception reom AsyncFaultWrapper: " + faultClassName); - return null; - } + /** + * Retrieves the Throwable wrapped by this AsyncFaultWrapper + * + * Note: When this method is invoked, the method attempts to instantiate an instance of the wrapped Throwable. + * It does this using the Thread Context Class Loader (TCCL) - the caller *MUST* ensure that the TCCL has access + * to the class of the wrapped Throwable and also to the classes of any nested Throwables. If this is not done, + * a ClassNotFound exception is thrown + * + * @return - the Throwable wrapped by this AsyncFaultWrapper - the Throwable will contain any nested Throwable(s) + * in its cause property + * @throws ClassNotFound exception, if the class of the wrapped Throwable is not accessible from the TCCL + */ + public Throwable retrieveFault( ) { + try { + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + Class faultClass = tccl.loadClass(faultClassName); + Class xclass = (Class) faultClass; + if( containedFault != null ) { + // If there is a nested fault, retrieve this recursively + Constructor cons = xclass.getConstructor(String.class, Throwable.class); + return (Throwable) cons.newInstance(faultMessage, getContainedFault().retrieveFault()); + } else { + Constructor cons = xclass.getConstructor(String.class); + return (Throwable) cons.newInstance(faultMessage); + } // end if + } catch (Exception e) { + return e; + } // end try + } // end method retrieveFault + + public void setFaultClassName( String name ) { this.faultClassName = name; } + public String getFaultClassName() { return this.faultClassName; } + + public String getFaultMessage() { return faultMessage; } + public void setFaultMessage(String faultMessage) { this.faultMessage = faultMessage; } + + public AsyncFaultWrapper getContainedFault() { return containedFault; } + public void setContainedFault(AsyncFaultWrapper containedFault) { this.containedFault = containedFault; } -} +} // end class AsyncFaultWrapper diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java new file mode 100644 index 0000000000..df90c3286d --- /dev/null +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +/** + * An exception which is used to signal that a service has been invoked asynchronously + * and that the result will be sent separately + * + */ +public class AsyncResponseException extends RuntimeException { + + private static final long serialVersionUID = 457954562860541631L; + + public AsyncResponseException() { + super(); + } + + public AsyncResponseException(String arg0, Throwable arg1) { + super(arg0, arg1); + } + + public AsyncResponseException(String arg0) { + super(arg0); + } + + public AsyncResponseException(Throwable arg0) { + super(arg0); + } + +} // end class AsyncResponseException diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java index e3e883c79d..d8eac8a166 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java @@ -19,6 +19,7 @@ package org.apache.tuscany.sca.core.invocation; +import org.oasisopen.sca.annotation.OneWay; import org.oasisopen.sca.annotation.Remotable; /** @@ -34,6 +35,7 @@ public interface AsyncResponseHandler { * @param e - the wrapper containing the Fault to send * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously */ + @OneWay public void setFault(AsyncFaultWrapper e); /** @@ -41,6 +43,7 @@ public interface AsyncResponseHandler { * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously * @param res - the response message, which is of type V */ + @OneWay public void setResponse(V res); } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java index 4d9abafe54..8db469b25e 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java @@ -57,6 +57,8 @@ public class AsyncInvocationFutureImpl implements Future, Response, Asy private String uniqueID = UUID.randomUUID().toString(); + private ClassLoader classLoader = null; + protected AsyncInvocationFutureImpl() { super(); } // end constructor @@ -66,10 +68,13 @@ public class AsyncInvocationFutureImpl implements Future, Response, Asy * to be set for the class instances * @param - the type of the response from the asynchronously invoked service * @param type - the type of the AsyncInvocationFutureImpl expressed as a parameter + * @param classLoader - the classloader used for the business interface to which this Future applies * @return - an instance of AsyncInvocationFutureImpl */ - public static AsyncInvocationFutureImpl newInstance( Class type ) { - return new AsyncInvocationFutureImpl(); + public static AsyncInvocationFutureImpl newInstance( Class type, ClassLoader classLoader ) { + AsyncInvocationFutureImpl future = new AsyncInvocationFutureImpl(); + future.setClassLoader( classLoader ); + return future; } /** @@ -146,8 +151,17 @@ public class AsyncInvocationFutureImpl implements Future, Response, Asy */ public void setFault(AsyncFaultWrapper w) { - Exception e = w.retrieveFault(); - if( e != null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception"); + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + Throwable e; + try { + // Set the TCCL to the classloader of the business interface + Thread.currentThread().setContextClassLoader(this.getClassLoader()); + e = w.retrieveFault(); + } finally { + Thread.currentThread().setContextClassLoader(tccl); + } // end try + + if( e == null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception"); lock.lock(); try { if( notSetYet() ) { @@ -201,8 +215,25 @@ public class AsyncInvocationFutureImpl implements Future, Response, Asy * @return - a Map containing the context */ public Map getContext() { - // TODO Auto-generated method stub + // Intentionally returns null return null; } + + /** + * Gets the classloader associated with the business interface to which this Future relates + * @return the ClassLoader of the business interface + */ + public ClassLoader getClassLoader() { + return classLoader; + } + + /** + * Sets the classloader associated with the business interface to which this Future relates + * @param classLoader - the classloader of the business interface + */ + public void setClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + } + } // end class AsyncInvocationFutureImpl diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java index a16bd1fc74..e0e219d3f1 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java @@ -21,15 +21,17 @@ package org.apache.tuscany.sca.core.invocation.impl; import java.io.StringReader; import java.lang.reflect.Method; -import java.lang.reflect.Type; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.xml.namespace.QName; -import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamReader; import javax.xml.transform.stream.StreamSource; @@ -54,29 +56,23 @@ import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; -import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointReferenceImpl; import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; -import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; +import org.apache.tuscany.sca.interfacedef.util.FaultException; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.policy.Intent; -import org.apache.tuscany.sca.provider.ImplementationProvider; -import org.apache.tuscany.sca.provider.ImplementationProviderFactory; import org.apache.tuscany.sca.provider.PolicyProvider; -import org.apache.tuscany.sca.provider.RuntimeProvider; import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.Invocable; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; -import org.oasisopen.sca.ComponentContext; import org.oasisopen.sca.ServiceReference; -import org.oasisopen.sca.ServiceRuntimeException; -import org.oasisopen.sca.annotation.AsyncInvocation; +import org.oasisopen.sca.ServiceRuntimeException; /** * An InvocationHandler which deals with JAXWS-defined asynchronous client Java API method calls @@ -96,6 +92,15 @@ import org.oasisopen.sca.annotation.AsyncInvocation; public class AsyncJDKInvocationHandler extends JDKInvocationHandler { private static final long serialVersionUID = 1L; + + private static int invocationCount = 10; // # of threads to use + private static long maxWaitTime = 30; // Max wait time for completion = 30sec + + // Run the async service invocations using a ThreadPoolExecutor + private static ThreadPoolExecutor theExecutor = new ThreadPoolExecutor( invocationCount, invocationCount, + maxWaitTime, TimeUnit.SECONDS, + new ArrayBlockingQueue( invocationCount ) ); + public AsyncJDKInvocationHandler(MessageFactory messageFactory, ServiceReference callableReference) { super(messageFactory, callableReference); @@ -119,7 +124,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { return doInvokeAsyncPoll(proxy, method, args); } else { // Regular synchronous method call - return super.invoke(proxy, method, args); + return doInvokeSync(proxy, method, args); } } @@ -156,13 +161,11 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { */ @SuppressWarnings("unchecked") protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) { - Object response; Class returnType = getNonAsyncMethod(asyncMethod).getReturnType(); // Allocate the Future / Response object - note: Response is a subclass of Future - AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType ); + AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType, getInterfaceClassloader() ); try { - response = invokeAsync(proxy, getNonAsyncMethod(asyncMethod), args, future); - future.setResponse(response); + invokeAsync(proxy, getNonAsyncMethod(asyncMethod), args, future); } catch (Exception e) { future.setFault( new AsyncFaultWrapper(e) ); } catch (Throwable t ) { @@ -171,11 +174,31 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { future.setFault( new AsyncFaultWrapper(e) ); } // end try return future; - //return new AsyncResponse(response, isException); } // end method doInvokeAsyncPoll + + /** + * Provide a synchronous invocation of a service operation that is either synchronous or asynchronous + * @return + */ + protected Object doInvokeSync(Object proxy, Method method, Object[] args) throws Throwable { + if ( isAsyncInvocation( source ) ) { + // Target service is asynchronous + Class returnType = method.getReturnType(); + AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType, getInterfaceClassloader() ); + invokeAsync(proxy, method, args, future); + // Wait for some maximum time for the result - 1000 seconds here + // Really, if the service is async, the client should use async client methods to invoke the service + // - and be prepared to wait a *really* long time + return future.get(1000, TimeUnit.SECONDS); + } else { + // Target service is not asynchronous, so perform sync invocation + return super.invoke(proxy, method, args); + } // end if + } // end method doInvokeSync /** - * Invoke an async callback method + * Invoke an async callback method - note that this form of the async client API has as its final parameter + * an AsyncHandler method, used for callbacks to the client code * @param proxy - the reference proxy * @param asyncMethod - the async method to invoke * @param args - array of input arguments to the method @@ -186,14 +209,17 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod, Object[] args) { AsyncHandler handler = (AsyncHandler)args[args.length-1]; Response response = doInvokeAsyncPoll(proxy,asyncMethod,Arrays.copyOf(args, args.length-1)); - handler.handleResponse(response); + // Invoke the callback handler, if present + if( handler != null ) { + handler.handleResponse(response); + } // end if return response; } // end method doInvokeAsyncCallback /** - * Invoke the target method on - * @param proxy + * Invoke the target (synchronous) method asynchronously + * @param proxy - the reference proxy object * @param method - the method to invoke * @param args - arguments for the call * @param future - Future for handling the response @@ -201,10 +227,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { * @throws Throwable - if an exception is thrown during the invocation */ @SuppressWarnings("unchecked") - private Object invokeAsync(Object proxy, Method method, Object[] args, AsyncInvocationFutureImpl future) throws Throwable { - if (Object.class == method.getDeclaringClass()) { - return invokeObjectMethod(method, args); - } + private void invokeAsync(Object proxy, Method method, Object[] args, AsyncInvocationFutureImpl future) throws Throwable { if (source == null) { throw new ServiceRuntimeException("No runtime source is available"); } @@ -215,7 +238,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { epr.rebuild(); chains.clear(); } - } + } // end if InvocationChain chain = getInvocationChain(method, source); @@ -223,22 +246,84 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { throw new IllegalArgumentException("No matching operation is found: " + method); } + // Organize for an async service RuntimeEndpoint theEndpoint = getAsyncCallback( source ); - attachFuture( theEndpoint, future ); + boolean isAsyncService = false; + if( theEndpoint != null ) { + // ... the service is asynchronous ... + attachFuture( theEndpoint, future ); + isAsyncService = true; + } else { + // ... the service is synchronous ... + } // end if - // send the invocation down the source - Object result = super.invoke(chain, args, source); + // Perform the invocations on separate thread... + theExecutor.execute( new separateThreadInvoker( chain, args, source, future, isAsyncService ) ); - return result; + return; } // end method invokeAsync + /** + * An inner class which acts as a runnable task for invoking services asynchronously on threads that are separate from + * those used to execute operations of components + * + * This supports both synchronous services and asynchronous services + */ + private class separateThreadInvoker implements Runnable { + + private AsyncInvocationFutureImpl future; + private InvocationChain chain; + private Object[] args; + private Invocable invocable; + private boolean isAsyncService; + + public separateThreadInvoker( InvocationChain chain, Object[] args, Invocable invocable, + AsyncInvocationFutureImpl future, boolean isAsyncService ) { + super(); + this.chain = chain; + this.args = args; + this.invocable = invocable; + this.future = future; + this.isAsyncService = isAsyncService; + } // end constructor + + public void run() { + Object result; + + try { + if( isAsyncService ) { + invoke(chain, args, invocable, future.getUniqueID()); + // The result is returned asynchronously via the future... + } else { + // ... the service is synchronous ... + result = invoke(chain, args, invocable); + future.setResponse(result); + } // end if + } catch ( ServiceRuntimeException s ) { + Throwable e = s.getCause(); + if( e != null && e instanceof FaultException ) { + if( "AsyncResponse".equals(e.getMessage()) ) { + // Do nothing... + } else { + future.setFault( new AsyncFaultWrapper( s ) ); + } // end if + } // end if + } catch ( Throwable t ) { + System.out.println("Async invoke got exception: " + t.toString()); + future.setFault( new AsyncFaultWrapper( t ) ); + } // end try + + } // end method run + + } // end class separateThreadInvoker + /** * Attaches a future to the callback endpoint - so that the Future is triggered when a response is * received from the asynchronous service invocation associated with the Future * @param endpoint - the async callback endpoint * @param future - the async invocation future to attach */ - private void attachFuture( RuntimeEndpoint endpoint, AsyncInvocationFutureImpl future ) { + private void attachFuture( RuntimeEndpoint endpoint, AsyncInvocationFutureImpl future ) { Implementation impl = endpoint.getComponent().getImplementation(); AsyncResponseHandlerImpl asyncHandler = (AsyncResponseHandlerImpl) impl; asyncHandler.addFuture(future); @@ -257,11 +342,12 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { RuntimeEndpoint endpoint; synchronized( epr ) { endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint(); + // If the async callback endpoint is already created, return it... if( endpoint != null ) return endpoint; // Create the endpoint for the async callback endpoint = createAsyncCallbackEndpoint( epr ); epr.setCallbackEndpoint(endpoint); - } + } // end synchronized // Activate the new callback endpoint startEndpoint( epr.getCompositeContext(), endpoint ); @@ -335,19 +421,9 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { services.clear(); services.add(service); - Binding eprBinding = epr.getBinding(); - try { - Binding binding = (Binding)eprBinding.clone(); - // Create a binding - binding = createMatchingBinding( eprBinding, fakeComponent, service, registry ); - - // Create a URI address for the callback based on the Component_Name/Reference_Name pattern - //String callbackURI = "/" + epr.getComponent().getName() + "/" + serviceName; - //binding.setURI(callbackURI); - endpoint.setBinding(binding); - } catch (CloneNotSupportedException e) { - // will not happen - } // end try + // Create a binding + Binding binding = createMatchingBinding( epr.getBinding(), fakeComponent, service, registry ); + endpoint.setBinding(binding); // Need to establish policies here (binding has some...) endpoint.getRequiredIntents().addAll( epr.getRequiredIntents() ); @@ -421,6 +497,11 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class); } // end method RuntimeAssemblyFactory + /** + * Applies an AsyncResponseHandlerImpl as the implementation of a RuntimeComponent + * - the AsyncResponseHandlerImpl acts as both the implementation class and the implementation provider... + * @param component - the component + */ private void applyImplementation( RuntimeComponent component ) { AsyncResponseHandlerImpl asyncHandler = new AsyncResponseHandlerImpl(); component.setImplementation( asyncHandler ); @@ -434,7 +515,8 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { * @param source - the EPR involved in the invocation * @return - true if the invocation is async */ - private boolean isAsyncInvocation( RuntimeEndpointReference source ) { + private boolean isAsyncInvocation( Invocable source ) { + if( !(source instanceof RuntimeEndpointReference) ) return false; RuntimeEndpointReference epr = (RuntimeEndpointReference) source; // First check is to see if the EPR itself has the asyncInvocation intent marked for( Intent intent : epr.getRequiredIntents() ) { @@ -462,5 +544,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler { } } throw new IllegalStateException("No synchronous method matching async method " + asyncMethod.getName()); + } // end method getNonAsyncMethod + + /** + * Gets the classloader of the business interface + * @return + */ + private ClassLoader getInterfaceClassloader( ) { + return businessInterface.getClassLoader(); } } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java index 99b4b26b8f..8d56088c44 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java @@ -42,6 +42,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService; /** * A class intended to form the final link in the chain calling into a Future which represents * the response to an asynchronous service invocation + * * Most methods are dummies, required to fulfil the contracts for ImplementationProvider, Implementation * and Invoker, since this class collapses together the functions of these separate interfaces, due to its * specialized nature, where most of the function will never be used. @@ -52,7 +53,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService; * message header. On receipt of each message, the class seeks out the Future with that unique ID and completes the future * either with a response message or with a Fault. * - * @param + * @param */ public class AsyncResponseHandlerImpl implements AsyncResponseHandler, ImplementationProvider, Implementation, Invoker { @@ -60,6 +61,9 @@ public class AsyncResponseHandlerImpl implements AsyncResponseHandler, private ConcurrentHashMap< String, AsyncInvocationFutureImpl > table = new ConcurrentHashMap< String, AsyncInvocationFutureImpl >(); + /** + * This class is its own invoker... + */ public Invoker createInvoker(RuntimeComponentService service, Operation operation) { return this; @@ -144,13 +148,46 @@ public class AsyncResponseHandlerImpl implements AsyncResponseHandler, public void setResponse(V res) { } - public Message invoke(Message msg) { - // TODO Auto-generated method stub + /** + * Method which is the termination for the invocation chain from the callback endpoint + * @param msg - the Tuscany message containing the response from the async service invocation + * which is either the Response message or an exception of some kind + */ + private static final String WS_MESSAGE_ID = "WS_MESSAGE_ID"; + public Message invoke(Message msg) { // Get the unique ID from the message header - // Fetch the Future with that Unique ID - // Complete the Future with a Response message - // ...or complete the Future with a Fault - return null; - } + String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID); + if( idValue == null ) { + System.out.println( "Async message ID not found "); + } else { + // Fetch the Future with that Unique ID + AsyncInvocationFutureImpl future = table.get(idValue); + if( future == null ) { + System.out.println("Future not found for id: " + idValue); + } else { + // Complete the Future with a Response message + Object payload = msg.getBody(); + Object response; + if( payload == null ) { + System.out.println("Returned response message was null"); + } else { + if (payload.getClass().isArray()) { + response = ((Object[])payload)[0]; + } else { + response = payload; + } // end if + if( response.getClass().equals(AsyncFaultWrapper.class)) { + future.setFault((AsyncFaultWrapper) response ); + } else { + future.setResponse(response); + } // end if + } // end if + } // end if + } // end if + + // Prepare an empty response message + msg.setBody(null); + return msg; + } // end method invoke } // end class diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java index 412481886f..d0095f45ee 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java @@ -225,8 +225,22 @@ public class JDKInvocationHandler implements InvocationHandler, Serializable { protected void setEndpoint(Endpoint endpoint) { this.target = endpoint; } - + protected Object invoke(InvocationChain chain, Object[] args, Invocable source) + throws Throwable { + return invoke( chain, args, source, null ); + } + + /** + * Invoke the chain + * @param chain - the chain + * @param args - arguments to the invocation as an array of Objects + * @param source - the Endpoint or EndpointReference to which the chain relates + * @param msgID - an ID for the message being sent, may be null + * @return - the Response message from the invocation + * @throws Throwable - if any exception occurs during the invocation + */ + protected Object invoke(InvocationChain chain, Object[] args, Invocable source, String msgID) throws Throwable { Message msg = messageFactory.createMessage(); if (source instanceof RuntimeEndpointReference) { @@ -250,6 +264,11 @@ public class JDKInvocationHandler implements InvocationHandler, Serializable { transferMessageHeaders( msg, msgContext); ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID" + if( msgID != null ){ + msg.getHeaders().put("MESSAGE_ID", msgID); + } // end if try { // dispatch the source down the chain and get the response -- cgit v1.2.3