From 0232f3394601df2306b87b78cbf53e873ca74d78 Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Mon, 24 Jan 2011 14:57:26 +0000 Subject: Fixing serialization of AsyncResponseInvoker in support of async services under TUSCANY-3783 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1062814 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/core/invocation/AsyncResponseInvoker.java | 152 ++++++++++++++++++--- 1 file changed, 133 insertions(+), 19 deletions(-) (limited to 'sca-java-2.x/trunk/modules/core/src') diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java index 38727d4670..2e675c5e7a 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java @@ -25,12 +25,16 @@ import java.io.Serializable; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.context.CompositeContext; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointImpl; +import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; @@ -40,6 +44,7 @@ import org.apache.tuscany.sca.node.NodeFactory; import org.apache.tuscany.sca.provider.EndpointAsyncProvider; import org.apache.tuscany.sca.runtime.DomainRegistryFactory; import org.apache.tuscany.sca.runtime.EndpointRegistry; +import org.apache.tuscany.sca.runtime.EndpointSerializer; import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; @@ -60,8 +65,8 @@ public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializab */ private static final long serialVersionUID = -7992598227671386588L; - private RuntimeEndpoint requestEndpoint; - private RuntimeEndpointReference responseEndpointReference; + private transient RuntimeEndpoint requestEndpoint; + private transient RuntimeEndpointReference responseEndpointReference; private T responseTargetAddress; private String relatesToMsgID; private String operationName; @@ -69,6 +74,12 @@ public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializab private String bindingType = ""; private boolean isNativeAsync; + private String endpointURI; + private String endpointReferenceURI; + private String domainURI; + + private transient EndpointRegistry endpointRegistry; + public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint, RuntimeEndpointReference responseEndpointReference, T responseTargetAddress, String relatesToMsgID, @@ -81,6 +92,20 @@ public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializab this.operationName = operationName; this.messageFactory = messageFactory; + CompositeContext context = null; + if(requestEndpoint != null ) { + endpointURI = requestEndpoint.getURI(); + context = requestEndpoint.getCompositeContext(); + } // end if + if(responseEndpointReference != null ) { + endpointReferenceURI = responseEndpointReference.getURI(); + context = responseEndpointReference.getCompositeContext(); + } + + if( context != null ) { + domainURI = context.getDomainURI(); + } // end if + if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) && (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){ isNativeAsync = true; @@ -126,8 +151,9 @@ public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializab * - this method creates a Tuscany message * * @param args the response data + * @param headers - any header */ - public void invokeAsyncResponse(Object args) { + public void invokeAsyncResponse(Object args, Map headers) { Message msg = messageFactory.createMessage(); @@ -150,6 +176,10 @@ public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializab msg.setTo(requestEndpoint); msg.setFrom(responseEndpointReference); + if( headers != null ) { + msg.getHeaders().putAll(headers); + } + if( args instanceof Throwable ) { msg.setFaultBody(args); } else { @@ -199,27 +229,111 @@ public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializab this.responseEndpointReference = responseEndpointReference; } - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - // find the real endpoint - ExtensionPointRegistry extensionPointRegistry = NodeFactory.getInstance().getExtensionPointRegistry(); - DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(extensionPointRegistry); - Collection endpointRegistries = domainRegistryFactory.getEndpointRegistries(); - EndpointRegistry endpointRegistry = endpointRegistries.iterator().next(); - List endpoints = endpointRegistry.findEndpoint(requestEndpoint.getURI()); - requestEndpoint = (RuntimeEndpoint)endpoints.get(0); + requestEndpoint = retrieveEndpoint(endpointURI); + responseEndpointReference = retrieveEndpointReference(endpointReferenceURI); if (responseTargetAddress instanceof EndpointReference){ // fix the target as in this case it will be an EPR EndpointReference epr = (EndpointReference)responseTargetAddress; - List endpointReferences = endpointRegistry.getEndpointReferences(); - for (EndpointReference endpointReference : endpointReferences){ - if (endpointReference.getURI().equals(epr.getURI())){ - responseTargetAddress = (T)endpointReference; - } - } - } - } + responseTargetAddress = (T)retrieveEndpointReference(epr.getURI()); + } // end if + } // end method readObject + + /** + * Fetches the EndpointReference identified by an endpoint reference URI + * @param uri - the URI of the endpoint reference + * @return - the EndpointReference matching the supplied URI - null if no EPR is found which + * matches the URI + */ + private RuntimeEndpointReference retrieveEndpointReference(String uri) { + if( uri == null ) return null; + if( endpointRegistry == null ) return null; + List refs = endpointRegistry.findEndpointReferences( uri ); + // If there is more than EndpointReference with the uri... + if( refs.isEmpty() ) return null; + // TODO: what if there is more than 1 EPR with the given URI? + return (RuntimeEndpointReference) refs.get(0); + } // end method retrieveEndpointReference + + /** + * Fetches the Endpoint identified by an endpoint URI + * - the Endpoint is retrieved from the EndpointRegistry + * @param uri - the URI of the Endpoint + * @return - the Endpoint corresponding to the URI, or null if no Endpoint is found which has the + * supplied URI + */ + private RuntimeEndpoint retrieveEndpoint(String uri) { + if( uri == null ) return null; + if( endpointRegistry == null ) endpointRegistry = getEndpointRegistry( uri ); + if( endpointRegistry == null ) return null; + // TODO what if more than one Endpoint gets returned?? + return (RuntimeEndpoint) endpointRegistry.findEndpoint(uri).get(0); + } // end method retrieveEndpoint + + /** + * Gets the EndpointRegistry which contains an Endpoint with the supplied URI + * @param uri - The URI of an Endpoint + * @return - the EndpointRegistry containing the Endpoint with the supplied URI - null if no + * such EndpointRegistry can be found + */ + private EndpointRegistry getEndpointRegistry(String uri) { + ExtensionPointRegistry registry = null; + EndpointRegistry endpointRegistry = null; + + CompositeContext context = CompositeContext.getCurrentCompositeContext(); + if( context == null && requestEndpoint != null ) context = requestEndpoint.getCompositeContext(); + if( context != null ) { + registry = context.getExtensionPointRegistry(); + endpointRegistry = getEndpointRegistry( registry ); + if( endpointRegistry != null ) return endpointRegistry; + } // end if + + // Deal with the case where there is no context available + for( NodeFactory factory : NodeFactory.getNodeFactories() ) { + registry = factory.getExtensionPointRegistry(); + if( registry != null ) { + // Find the actual Endpoint in the EndpointRegistry + endpointRegistry = getEndpointRegistry( registry ); + + if( endpointRegistry != null ) { + for( Endpoint endpoint : endpointRegistry.findEndpoint(uri) ) { + // TODO: For the present, simply return the first registry with a matching endpoint + return endpointRegistry; + } // end for + } // end if + } // end if + } // end for + + return null; + } // end method getEndpointRegistry + + /** + * Get the EndpointRegistry + * @param registry - the ExtensionPoint registry + * @return the EndpointRegistry - will be null if the EndpointRegistry cannot be found + */ + private EndpointRegistry getEndpointRegistry( ExtensionPointRegistry registry) { + DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry); + + if( domainRegistryFactory == null ) return null; + + // Find the first endpoint registry that matches the domain name + if( domainURI != null ) { + for( EndpointRegistry endpointRegistry : domainRegistryFactory.getEndpointRegistries() ) { + if( domainURI.equals( endpointRegistry.getDomainURI() ) ) return endpointRegistry; + } // end for + } // end if + + // if there was no domainName to match, simply return the first EndpointRegistry... + EndpointRegistry endpointRegistry = (EndpointRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0]; + + + return endpointRegistry; + } // end method + } // end class -- cgit v1.2.3