summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/core
diff options
context:
space:
mode:
authoredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2010-07-11 10:09:27 +0000
committeredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2010-07-11 10:09:27 +0000
commit2b6ccef38686cc118bf89904114d40652b9464f3 (patch)
treeeee21b81b77ef0d34943d76c083a4ee7f0949af2 /sca-java-2.x/trunk/modules/core
parent045bcb9ff57dab5c20786414e962adcae86f462f (diff)
Changes and additions to core in support of Client-side and Server-side asynchronous services and @asyncInvocation as described in TUSCANY-3608, 3611 & 3612
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@963034 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java173
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java72
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java47
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java3
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java41
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java180
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java53
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java21
8 files changed, 519 insertions, 71 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
index d1c81ef8ee..eeadfef9c1 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
@@ -23,24 +23,43 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.StringReader;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.assembly.Component;
import org.apache.tuscany.sca.assembly.ComponentReference;
import org.apache.tuscany.sca.assembly.ComponentService;
import org.apache.tuscany.sca.assembly.CompositeReference;
import org.apache.tuscany.sca.assembly.CompositeService;
import org.apache.tuscany.sca.assembly.Contract;
+import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.assembly.Service;
+import org.apache.tuscany.sca.assembly.builder.BindingBuilder;
+import org.apache.tuscany.sca.assembly.builder.BuilderContext;
+import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint;
import org.apache.tuscany.sca.assembly.impl.EndpointImpl;
import org.apache.tuscany.sca.context.CompositeContext;
+import org.apache.tuscany.sca.contribution.processor.ContributionReadException;
+import org.apache.tuscany.sca.contribution.processor.ProcessorContext;
+import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor;
+import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint;
+import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
+import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor;
import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor;
import org.apache.tuscany.sca.core.invocation.RuntimeInvoker;
@@ -49,7 +68,10 @@ import org.apache.tuscany.sca.core.invocation.impl.PhaseManager;
import org.apache.tuscany.sca.interfacedef.Compatibility;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper;
+import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
+import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
import org.apache.tuscany.sca.invocation.Interceptor;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
@@ -68,6 +90,7 @@ import org.apache.tuscany.sca.runtime.EndpointSerializer;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.apache.tuscany.sca.runtime.RuntimeWireProcessor;
import org.apache.tuscany.sca.runtime.RuntimeWireProcessorExtensionPoint;
import org.apache.tuscany.sca.work.WorkScheduler;
@@ -101,7 +124,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
protected InterfaceContract serviceInterfaceContract;
/**
- * No-arg constructor for Java serilization
+ * No-arg constructor for Java serialization
*/
public RuntimeEndpointImpl() {
super(null);
@@ -218,6 +241,15 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
}
public Message invoke(Message msg) {
+ // Deal with async callback
+ // Ensure invocation chains are built...
+ getInvocationChains();
+ if ( !this.getCallbackEndpointReferences().isEmpty() ) {
+ RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0);
+ // Place a link to the callback EPR into the message headers...
+ msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR );
+ }
+ // end of async callback handling
return invoker.invokeBinding(msg);
}
@@ -288,10 +320,149 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
addServiceBindingInterceptor(chain, operation);
addImplementationInterceptor(serviceComponent, service, chain, targetOperation);
chains.add(chain);
+
+ // Handle cases where the operation is an async server
+ if( targetOperation.isAsyncServer() ) {
+ createAsyncServerCallback( this, operation );
+ } // end if
}
wireProcessor.process(this);
}
+
+ /**
+ * Creates the async callback for the supplied Endpoint and Operation, if it does not already exist
+ * and stores it into the Endpoint
+ * @param endpoint - the Endpoint
+ * @param operation - the Operation
+ */
+ private void createAsyncServerCallback( RuntimeEndpoint endpoint, Operation operation ) {
+ // Check to see if the callback already exists
+ if( asyncCallbackExists( endpoint ) ) return;
+
+ RuntimeEndpointReference asyncEPR = createAsyncEPR( endpoint );
+
+ // Store the new callback EPR into the Endpoint
+ endpoint.getCallbackEndpointReferences().add(asyncEPR);
+ } // end method createAsyncServerCallback
+
+ /**
+ * Creates the Endpoint object for the async callback
+ * @param endpoint - the endpoint which has the async server operations
+ * @return the EndpointReference object representing the callback
+ */
+ private RuntimeEndpointReference createAsyncEPR( RuntimeEndpoint endpoint ){
+ CompositeContext compositeContext = endpoint.getCompositeContext();
+ RuntimeAssemblyFactory assemblyFactory = getAssemblyFactory( compositeContext );
+ RuntimeEndpointReference epr = (RuntimeEndpointReference)assemblyFactory.createEndpointReference();
+ epr.bind( compositeContext );
+
+ // Create pseudo-reference
+ ComponentReference reference = assemblyFactory.createComponentReference();
+ ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry();
+ FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+ JavaInterfaceFactory javaInterfaceFactory = (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class);
+ JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract();
+ try {
+ interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseHandler.class));
+ } catch (InvalidInterfaceException e1) {
+ // Nothing to do here - will not happen
+ } // end try
+ reference.setInterfaceContract(interfaceContract);
+ String referenceName = endpoint.getService().getName() + "_asyncCallback";
+ reference.setName(referenceName);
+ reference.setForCallback(true);
+ epr.setReference(reference);
+
+ // Create a binding
+ Binding binding = createMatchingBinding( endpoint.getBinding(), (RuntimeComponent)endpoint.getComponent(), reference, registry );
+ epr.setBinding(binding);
+
+ // Need to establish policies here (binding has some...)
+ epr.getRequiredIntents().addAll( endpoint.getRequiredIntents() );
+ epr.getPolicySets().addAll( endpoint.getPolicySets() );
+ String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")";
+ epr.setURI(eprURI);
+
+ // Attach a dummy endpoint to the epr
+ RuntimeEndpoint ep = (RuntimeEndpoint)assemblyFactory.createEndpoint();
+ ep.setUnresolved(false);
+ epr.setTargetEndpoint(ep);
+ //epr.setStatus(EndpointReference.Status.RESOLVED_BINDING);
+ epr.setStatus(EndpointReference.Status.WIRED_TARGET_FOUND_AND_MATCHED);
+ epr.setUnresolved(false);
+
+ return epr;
+ } // end method RuntimeEndpointReference
+
+ private boolean asyncCallbackExists( RuntimeEndpoint endpoint ) {
+ if( endpoint.getCallbackEndpointReferences().isEmpty() ) return false;
+ return true;
+ } // end method asyncCallbackExists
+
+ /**
+ * Create a matching binding to a supplied binding
+ * - the matching binding has the same binding type, but is for the supplied component and service
+ * @param matchBinding - the binding to match
+ * @param component - the component
+ * @param service - the service
+ * @param registry - registry for extensions
+ * @return - the matching binding, or null if it could not be created
+ */
+ @SuppressWarnings("unchecked")
+ private Binding createMatchingBinding( Binding matchBinding, RuntimeComponent component,
+ ComponentReference reference, ExtensionPointRegistry registry ) {
+ // Since there is no simple way to obtain a Factory for a binding where the type is not known ahead of
+ // time, the process followed here is to generate the <binding.xxx/> XML element from the binding type QName
+ // and then read the XML using the processor for that XML...
+ QName bindingName = matchBinding.getType();
+ String bindingXML = "<ns1:" + bindingName.getLocalPart() + " xmlns:ns1='" + bindingName.getNamespaceURI() + "'/>";
+
+ StAXArtifactProcessorExtensionPoint processors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class);
+ StAXArtifactProcessor<?> processor = (StAXArtifactProcessor<?>)processors.getProcessor(bindingName);
+
+ FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+ ValidatingXMLInputFactory inputFactory = modelFactories.getFactory(ValidatingXMLInputFactory.class);
+ StreamSource source = new StreamSource( new StringReader(bindingXML) );
+
+ ProcessorContext context = new ProcessorContext();
+ try {
+ XMLStreamReader reader = inputFactory.createXMLStreamReader(source);
+ reader.next();
+ Binding newBinding = (Binding) processor.read(reader, context );
+ newBinding.setName("asyncCallback");
+
+ // Create a URI address for the callback based on the Component_Name/Reference_Name pattern
+ String callbackURI = "/" + component.getName() + "/" + reference.getName();
+ //newBinding.setURI(callbackURI);
+
+ BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class);
+ BindingBuilder builder = builders.getBindingBuilder(newBinding.getType());
+ if (builder != null) {
+ org.apache.tuscany.sca.assembly.builder.BuilderContext builderContext = new BuilderContext(registry);
+ builder.build(component, reference, newBinding, builderContext, true);
+ } // end if
+
+ return newBinding;
+ } catch (ContributionReadException e) {
+ e.printStackTrace();
+ } catch (XMLStreamException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ } // end method createMatchingBinding
+
+ /**
+ * Gets a RuntimeAssemblyFactory from the CompositeContext
+ * @param compositeContext
+ * @return the RuntimeAssemblyFactory
+ */
+ private RuntimeAssemblyFactory getAssemblyFactory( CompositeContext compositeContext ) {
+ ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry();
+ FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+ return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class);
+ } // end method RuntimeAssemblyFactory
private void initServiceBindingInvocationChains() {
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
index c76e02597d..3d98de9e21 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java
@@ -19,6 +19,9 @@
package org.apache.tuscany.sca.core.invocation;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
/**
* A class which is used to wrap an Exception of any type thrown by an asynchronous service operation and
* which is returned through a separate one-way message sent asynchronously from the server to the client.
@@ -27,26 +30,73 @@ package org.apache.tuscany.sca.core.invocation;
public class AsyncFaultWrapper {
private String faultClassName = null;
- private Exception e = null;
+ private String faultMessage = null;
+ private AsyncFaultWrapper containedFault = null;
+
public AsyncFaultWrapper() {
super();
}
- public AsyncFaultWrapper( Exception e ) {
+ /**
+ * Constructor which creates an AsyncFaultWrapper which wraps the supplied Throwable
+ * @param e - a Throwable which is wrapped by this AsyncFaultWrapper
+ */
+ public AsyncFaultWrapper( Throwable e ) {
super();
storeFault( e );
}
- public void storeFault( Exception e ) {
- faultClassName = e.getClass().getCanonicalName();
- this.e = e;
+ /**
+ * Stores a given Throwable in this AsyncFaultWrapper
+ * If the supplied Throwable itself contains an embedded Throwable ("cause"), this is recursively
+ * wrapped by a nested AsyncFaultWrapper
+ * @param e - the Throwable
+ */
+ public void storeFault( Throwable e ) {
+ setFaultClassName( e.getClass().getCanonicalName() );
+ setFaultMessage( e.getMessage() );
+ Throwable cause = e.getCause();
+ if( cause != null ) setContainedFault( new AsyncFaultWrapper( cause ) );
}
- public Exception retrieveFault( ) {
- if( e != null ) return e;
- System.out.println( "Tried to retrieve Exception reom AsyncFaultWrapper: " + faultClassName);
- return null;
- }
+ /**
+ * Retrieves the Throwable wrapped by this AsyncFaultWrapper
+ *
+ * Note: When this method is invoked, the method attempts to instantiate an instance of the wrapped Throwable.
+ * It does this using the Thread Context Class Loader (TCCL) - the caller *MUST* ensure that the TCCL has access
+ * to the class of the wrapped Throwable and also to the classes of any nested Throwables. If this is not done,
+ * a ClassNotFound exception is thrown
+ *
+ * @return - the Throwable wrapped by this AsyncFaultWrapper - the Throwable will contain any nested Throwable(s)
+ * in its cause property
+ * @throws ClassNotFound exception, if the class of the wrapped Throwable is not accessible from the TCCL
+ */
+ public Throwable retrieveFault( ) {
+ try {
+ ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ Class<?> faultClass = tccl.loadClass(faultClassName);
+ Class<Throwable> xclass = (Class<Throwable>) faultClass;
+ if( containedFault != null ) {
+ // If there is a nested fault, retrieve this recursively
+ Constructor cons = xclass.getConstructor(String.class, Throwable.class);
+ return (Throwable) cons.newInstance(faultMessage, getContainedFault().retrieveFault());
+ } else {
+ Constructor cons = xclass.getConstructor(String.class);
+ return (Throwable) cons.newInstance(faultMessage);
+ } // end if
+ } catch (Exception e) {
+ return e;
+ } // end try
+ } // end method retrieveFault
+
+ public void setFaultClassName( String name ) { this.faultClassName = name; }
+ public String getFaultClassName() { return this.faultClassName; }
+
+ public String getFaultMessage() { return faultMessage; }
+ public void setFaultMessage(String faultMessage) { this.faultMessage = faultMessage; }
+
+ public AsyncFaultWrapper getContainedFault() { return containedFault; }
+ public void setContainedFault(AsyncFaultWrapper containedFault) { this.containedFault = containedFault; }
-}
+} // end class AsyncFaultWrapper
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java
new file mode 100644
index 0000000000..df90c3286d
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.core.invocation;
+
+/**
+ * An exception which is used to signal that a service has been invoked asynchronously
+ * and that the result will be sent separately
+ *
+ */
+public class AsyncResponseException extends RuntimeException {
+
+ private static final long serialVersionUID = 457954562860541631L;
+
+ public AsyncResponseException() {
+ super();
+ }
+
+ public AsyncResponseException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ public AsyncResponseException(String arg0) {
+ super(arg0);
+ }
+
+ public AsyncResponseException(Throwable arg0) {
+ super(arg0);
+ }
+
+} // end class AsyncResponseException
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java
index e3e883c79d..d8eac8a166 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java
@@ -19,6 +19,7 @@
package org.apache.tuscany.sca.core.invocation;
+import org.oasisopen.sca.annotation.OneWay;
import org.oasisopen.sca.annotation.Remotable;
/**
@@ -34,6 +35,7 @@ public interface AsyncResponseHandler<V> {
* @param e - the wrapper containing the Fault to send
* @throws IllegalStateException if either the setResponse method or the setFault method have been called previously
*/
+ @OneWay
public void setFault(AsyncFaultWrapper e);
/**
@@ -41,6 +43,7 @@ public interface AsyncResponseHandler<V> {
* @throws IllegalStateException if either the setResponse method or the setFault method have been called previously
* @param res - the response message, which is of type V
*/
+ @OneWay
public void setResponse(V res);
}
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
index 4d9abafe54..8db469b25e 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java
@@ -57,6 +57,8 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
private String uniqueID = UUID.randomUUID().toString();
+ private ClassLoader classLoader = null;
+
protected AsyncInvocationFutureImpl() {
super();
} // end constructor
@@ -66,10 +68,13 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
* to be set for the class instances
* @param <V> - the type of the response from the asynchronously invoked service
* @param type - the type of the AsyncInvocationFutureImpl expressed as a parameter
+ * @param classLoader - the classloader used for the business interface to which this Future applies
* @return - an instance of AsyncInvocationFutureImpl<V>
*/
- public static <V> AsyncInvocationFutureImpl<V> newInstance( Class<V> type ) {
- return new AsyncInvocationFutureImpl<V>();
+ public static <V> AsyncInvocationFutureImpl<V> newInstance( Class<V> type, ClassLoader classLoader ) {
+ AsyncInvocationFutureImpl<V> future = new AsyncInvocationFutureImpl<V>();
+ future.setClassLoader( classLoader );
+ return future;
}
/**
@@ -146,8 +151,17 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
*/
public void setFault(AsyncFaultWrapper w) {
- Exception e = w.retrieveFault();
- if( e != null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception");
+ ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ Throwable e;
+ try {
+ // Set the TCCL to the classloader of the business interface
+ Thread.currentThread().setContextClassLoader(this.getClassLoader());
+ e = w.retrieveFault();
+ } finally {
+ Thread.currentThread().setContextClassLoader(tccl);
+ } // end try
+
+ if( e == null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception");
lock.lock();
try {
if( notSetYet() ) {
@@ -201,8 +215,25 @@ public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, Asy
* @return - a Map containing the context
*/
public Map<String, Object> getContext() {
- // TODO Auto-generated method stub
+ // Intentionally returns null
return null;
}
+
+ /**
+ * Gets the classloader associated with the business interface to which this Future relates
+ * @return the ClassLoader of the business interface
+ */
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ /**
+ * Sets the classloader associated with the business interface to which this Future relates
+ * @param classLoader - the classloader of the business interface
+ */
+ public void setClassLoader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
} // end class AsyncInvocationFutureImpl
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
index a16bd1fc74..e0e219d3f1 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
@@ -21,15 +21,17 @@ package org.apache.tuscany.sca.core.invocation.impl;
import java.io.StringReader;
import java.lang.reflect.Method;
-import java.lang.reflect.Type;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.xml.namespace.QName;
-import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stream.StreamSource;
@@ -54,29 +56,23 @@ import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
-import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointReferenceImpl;
import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
-import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
+import org.apache.tuscany.sca.interfacedef.util.FaultException;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.policy.Intent;
-import org.apache.tuscany.sca.provider.ImplementationProvider;
-import org.apache.tuscany.sca.provider.ImplementationProviderFactory;
import org.apache.tuscany.sca.provider.PolicyProvider;
-import org.apache.tuscany.sca.provider.RuntimeProvider;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.Invocable;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
-import org.oasisopen.sca.ComponentContext;
import org.oasisopen.sca.ServiceReference;
-import org.oasisopen.sca.ServiceRuntimeException;
-import org.oasisopen.sca.annotation.AsyncInvocation;
+import org.oasisopen.sca.ServiceRuntimeException;
/**
* An InvocationHandler which deals with JAXWS-defined asynchronous client Java API method calls
@@ -96,6 +92,15 @@ import org.oasisopen.sca.annotation.AsyncInvocation;
public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
private static final long serialVersionUID = 1L;
+
+ private static int invocationCount = 10; // # of threads to use
+ private static long maxWaitTime = 30; // Max wait time for completion = 30sec
+
+ // Run the async service invocations using a ThreadPoolExecutor
+ private static ThreadPoolExecutor theExecutor = new ThreadPoolExecutor( invocationCount, invocationCount,
+ maxWaitTime, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>( invocationCount ) );
+
public AsyncJDKInvocationHandler(MessageFactory messageFactory, ServiceReference<?> callableReference) {
super(messageFactory, callableReference);
@@ -119,7 +124,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
return doInvokeAsyncPoll(proxy, method, args);
} else {
// Regular synchronous method call
- return super.invoke(proxy, method, args);
+ return doInvokeSync(proxy, method, args);
}
}
@@ -156,13 +161,11 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
*/
@SuppressWarnings("unchecked")
protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) {
- Object response;
Class<?> returnType = getNonAsyncMethod(asyncMethod).getReturnType();
// Allocate the Future<?> / Response<?> object - note: Response<?> is a subclass of Future<?>
- AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType );
+ AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType, getInterfaceClassloader() );
try {
- response = invokeAsync(proxy, getNonAsyncMethod(asyncMethod), args, future);
- future.setResponse(response);
+ invokeAsync(proxy, getNonAsyncMethod(asyncMethod), args, future);
} catch (Exception e) {
future.setFault( new AsyncFaultWrapper(e) );
} catch (Throwable t ) {
@@ -171,11 +174,31 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
future.setFault( new AsyncFaultWrapper(e) );
} // end try
return future;
- //return new AsyncResponse(response, isException);
} // end method doInvokeAsyncPoll
+
+ /**
+ * Provide a synchronous invocation of a service operation that is either synchronous or asynchronous
+ * @return
+ */
+ protected Object doInvokeSync(Object proxy, Method method, Object[] args) throws Throwable {
+ if ( isAsyncInvocation( source ) ) {
+ // Target service is asynchronous
+ Class<?> returnType = method.getReturnType();
+ AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance( returnType, getInterfaceClassloader() );
+ invokeAsync(proxy, method, args, future);
+ // Wait for some maximum time for the result - 1000 seconds here
+ // Really, if the service is async, the client should use async client methods to invoke the service
+ // - and be prepared to wait a *really* long time
+ return future.get(1000, TimeUnit.SECONDS);
+ } else {
+ // Target service is not asynchronous, so perform sync invocation
+ return super.invoke(proxy, method, args);
+ } // end if
+ } // end method doInvokeSync
/**
- * Invoke an async callback method
+ * Invoke an async callback method - note that this form of the async client API has as its final parameter
+ * an AsyncHandler method, used for callbacks to the client code
* @param proxy - the reference proxy
* @param asyncMethod - the async method to invoke
* @param args - array of input arguments to the method
@@ -186,14 +209,17 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod, Object[] args) {
AsyncHandler handler = (AsyncHandler)args[args.length-1];
Response response = doInvokeAsyncPoll(proxy,asyncMethod,Arrays.copyOf(args, args.length-1));
- handler.handleResponse(response);
+ // Invoke the callback handler, if present
+ if( handler != null ) {
+ handler.handleResponse(response);
+ } // end if
return response;
} // end method doInvokeAsyncCallback
/**
- * Invoke the target method on
- * @param proxy
+ * Invoke the target (synchronous) method asynchronously
+ * @param proxy - the reference proxy object
* @param method - the method to invoke
* @param args - arguments for the call
* @param future - Future for handling the response
@@ -201,10 +227,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
* @throws Throwable - if an exception is thrown during the invocation
*/
@SuppressWarnings("unchecked")
- private Object invokeAsync(Object proxy, Method method, Object[] args, AsyncInvocationFutureImpl future) throws Throwable {
- if (Object.class == method.getDeclaringClass()) {
- return invokeObjectMethod(method, args);
- }
+ private void invokeAsync(Object proxy, Method method, Object[] args, AsyncInvocationFutureImpl future) throws Throwable {
if (source == null) {
throw new ServiceRuntimeException("No runtime source is available");
}
@@ -215,7 +238,7 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
epr.rebuild();
chains.clear();
}
- }
+ } // end if
InvocationChain chain = getInvocationChain(method, source);
@@ -223,22 +246,84 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
throw new IllegalArgumentException("No matching operation is found: " + method);
}
+ // Organize for an async service
RuntimeEndpoint theEndpoint = getAsyncCallback( source );
- attachFuture( theEndpoint, future );
+ boolean isAsyncService = false;
+ if( theEndpoint != null ) {
+ // ... the service is asynchronous ...
+ attachFuture( theEndpoint, future );
+ isAsyncService = true;
+ } else {
+ // ... the service is synchronous ...
+ } // end if
- // send the invocation down the source
- Object result = super.invoke(chain, args, source);
+ // Perform the invocations on separate thread...
+ theExecutor.execute( new separateThreadInvoker( chain, args, source, future, isAsyncService ) );
- return result;
+ return;
} // end method invokeAsync
+ /**
+ * An inner class which acts as a runnable task for invoking services asynchronously on threads that are separate from
+ * those used to execute operations of components
+ *
+ * This supports both synchronous services and asynchronous services
+ */
+ private class separateThreadInvoker implements Runnable {
+
+ private AsyncInvocationFutureImpl future;
+ private InvocationChain chain;
+ private Object[] args;
+ private Invocable invocable;
+ private boolean isAsyncService;
+
+ public separateThreadInvoker( InvocationChain chain, Object[] args, Invocable invocable,
+ AsyncInvocationFutureImpl future, boolean isAsyncService ) {
+ super();
+ this.chain = chain;
+ this.args = args;
+ this.invocable = invocable;
+ this.future = future;
+ this.isAsyncService = isAsyncService;
+ } // end constructor
+
+ public void run() {
+ Object result;
+
+ try {
+ if( isAsyncService ) {
+ invoke(chain, args, invocable, future.getUniqueID());
+ // The result is returned asynchronously via the future...
+ } else {
+ // ... the service is synchronous ...
+ result = invoke(chain, args, invocable);
+ future.setResponse(result);
+ } // end if
+ } catch ( ServiceRuntimeException s ) {
+ Throwable e = s.getCause();
+ if( e != null && e instanceof FaultException ) {
+ if( "AsyncResponse".equals(e.getMessage()) ) {
+ // Do nothing...
+ } else {
+ future.setFault( new AsyncFaultWrapper( s ) );
+ } // end if
+ } // end if
+ } catch ( Throwable t ) {
+ System.out.println("Async invoke got exception: " + t.toString());
+ future.setFault( new AsyncFaultWrapper( t ) );
+ } // end try
+
+ } // end method run
+
+ } // end class separateThreadInvoker
+
/**
* Attaches a future to the callback endpoint - so that the Future is triggered when a response is
* received from the asynchronous service invocation associated with the Future
* @param endpoint - the async callback endpoint
* @param future - the async invocation future to attach
*/
- private void attachFuture( RuntimeEndpoint endpoint, AsyncInvocationFutureImpl future ) {
+ private void attachFuture( RuntimeEndpoint endpoint, AsyncInvocationFutureImpl<?> future ) {
Implementation impl = endpoint.getComponent().getImplementation();
AsyncResponseHandlerImpl<?> asyncHandler = (AsyncResponseHandlerImpl<?>) impl;
asyncHandler.addFuture(future);
@@ -257,11 +342,12 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
RuntimeEndpoint endpoint;
synchronized( epr ) {
endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint();
+ // If the async callback endpoint is already created, return it...
if( endpoint != null ) return endpoint;
// Create the endpoint for the async callback
endpoint = createAsyncCallbackEndpoint( epr );
epr.setCallbackEndpoint(endpoint);
- }
+ } // end synchronized
// Activate the new callback endpoint
startEndpoint( epr.getCompositeContext(), endpoint );
@@ -335,19 +421,9 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
services.clear();
services.add(service);
- Binding eprBinding = epr.getBinding();
- try {
- Binding binding = (Binding)eprBinding.clone();
- // Create a binding
- binding = createMatchingBinding( eprBinding, fakeComponent, service, registry );
-
- // Create a URI address for the callback based on the Component_Name/Reference_Name pattern
- //String callbackURI = "/" + epr.getComponent().getName() + "/" + serviceName;
- //binding.setURI(callbackURI);
- endpoint.setBinding(binding);
- } catch (CloneNotSupportedException e) {
- // will not happen
- } // end try
+ // Create a binding
+ Binding binding = createMatchingBinding( epr.getBinding(), fakeComponent, service, registry );
+ endpoint.setBinding(binding);
// Need to establish policies here (binding has some...)
endpoint.getRequiredIntents().addAll( epr.getRequiredIntents() );
@@ -421,6 +497,11 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class);
} // end method RuntimeAssemblyFactory
+ /**
+ * Applies an AsyncResponseHandlerImpl as the implementation of a RuntimeComponent
+ * - the AsyncResponseHandlerImpl acts as both the implementation class and the implementation provider...
+ * @param component - the component
+ */
private void applyImplementation( RuntimeComponent component ) {
AsyncResponseHandlerImpl<?> asyncHandler = new AsyncResponseHandlerImpl<Object>();
component.setImplementation( asyncHandler );
@@ -434,7 +515,8 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
* @param source - the EPR involved in the invocation
* @return - true if the invocation is async
*/
- private boolean isAsyncInvocation( RuntimeEndpointReference source ) {
+ private boolean isAsyncInvocation( Invocable source ) {
+ if( !(source instanceof RuntimeEndpointReference) ) return false;
RuntimeEndpointReference epr = (RuntimeEndpointReference) source;
// First check is to see if the EPR itself has the asyncInvocation intent marked
for( Intent intent : epr.getRequiredIntents() ) {
@@ -462,5 +544,13 @@ public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
}
}
throw new IllegalStateException("No synchronous method matching async method " + asyncMethod.getName());
+ } // end method getNonAsyncMethod
+
+ /**
+ * Gets the classloader of the business interface
+ * @return
+ */
+ private ClassLoader getInterfaceClassloader( ) {
+ return businessInterface.getClassLoader();
}
}
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
index 99b4b26b8f..8d56088c44 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
@@ -42,6 +42,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService;
/**
* A class intended to form the final link in the chain calling into a Future which represents
* the response to an asynchronous service invocation
+ *
* Most methods are dummies, required to fulfil the contracts for ImplementationProvider, Implementation
* and Invoker, since this class collapses together the functions of these separate interfaces, due to its
* specialized nature, where most of the function will never be used.
@@ -52,7 +53,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService;
* message header. On receipt of each message, the class seeks out the Future with that unique ID and completes the future
* either with a response message or with a Fault.
*
- * @param <V>
+ * @param <V>
*/
public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
ImplementationProvider, Implementation, Invoker {
@@ -60,6 +61,9 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
private ConcurrentHashMap< String, AsyncInvocationFutureImpl<?> > table =
new ConcurrentHashMap< String, AsyncInvocationFutureImpl<?> >();
+ /**
+ * This class is its own invoker...
+ */
public Invoker createInvoker(RuntimeComponentService service,
Operation operation) {
return this;
@@ -144,13 +148,46 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
public void setResponse(V res) { }
- public Message invoke(Message msg) {
- // TODO Auto-generated method stub
+ /**
+ * Method which is the termination for the invocation chain from the callback endpoint
+ * @param msg - the Tuscany message containing the response from the async service invocation
+ * which is either the Response message or an exception of some kind
+ */
+ private static final String WS_MESSAGE_ID = "WS_MESSAGE_ID";
+ public Message invoke(Message msg) {
// Get the unique ID from the message header
- // Fetch the Future with that Unique ID
- // Complete the Future with a Response message
- // ...or complete the Future with a Fault
- return null;
- }
+ String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID);
+ if( idValue == null ) {
+ System.out.println( "Async message ID not found ");
+ } else {
+ // Fetch the Future with that Unique ID
+ AsyncInvocationFutureImpl future = table.get(idValue);
+ if( future == null ) {
+ System.out.println("Future not found for id: " + idValue);
+ } else {
+ // Complete the Future with a Response message
+ Object payload = msg.getBody();
+ Object response;
+ if( payload == null ) {
+ System.out.println("Returned response message was null");
+ } else {
+ if (payload.getClass().isArray()) {
+ response = ((Object[])payload)[0];
+ } else {
+ response = payload;
+ } // end if
+ if( response.getClass().equals(AsyncFaultWrapper.class)) {
+ future.setFault((AsyncFaultWrapper) response );
+ } else {
+ future.setResponse(response);
+ } // end if
+ } // end if
+ } // end if
+ } // end if
+
+ // Prepare an empty response message
+ msg.setBody(null);
+ return msg;
+ } // end method invoke
} // end class
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java
index 412481886f..d0095f45ee 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java
@@ -225,8 +225,22 @@ public class JDKInvocationHandler implements InvocationHandler, Serializable {
protected void setEndpoint(Endpoint endpoint) {
this.target = endpoint;
}
-
+
protected Object invoke(InvocationChain chain, Object[] args, Invocable source)
+ throws Throwable {
+ return invoke( chain, args, source, null );
+ }
+
+ /**
+ * Invoke the chain
+ * @param chain - the chain
+ * @param args - arguments to the invocation as an array of Objects
+ * @param source - the Endpoint or EndpointReference to which the chain relates
+ * @param msgID - an ID for the message being sent, may be null
+ * @return - the Response message from the invocation
+ * @throws Throwable - if any exception occurs during the invocation
+ */
+ protected Object invoke(InvocationChain chain, Object[] args, Invocable source, String msgID)
throws Throwable {
Message msg = messageFactory.createMessage();
if (source instanceof RuntimeEndpointReference) {
@@ -250,6 +264,11 @@ public class JDKInvocationHandler implements InvocationHandler, Serializable {
transferMessageHeaders( msg, msgContext);
ThreadMessageContext.setMessageContext(msg);
+
+ // If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID"
+ if( msgID != null ){
+ msg.getHeaders().put("MESSAGE_ID", msgID);
+ } // end if
try {
// dispatch the source down the chain and get the response