From 132aa8a77685ec92bc90c03f987650d275a7b639 Mon Sep 17 00:00:00 2001 From: lresende Date: Mon, 30 Sep 2013 06:59:11 +0000 Subject: 2.0.1 RC1 release tag git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1527464 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/core/invocation/AsyncResponseInvoker.java | 351 +++++++++++++++++++++ 1 file changed, 351 insertions(+) create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java (limited to 'sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java') diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java new file mode 100644 index 0000000000..1ebc9c633a --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.context.CompositeContext; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.ExtensionPointRegistryLocator; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.provider.EndpointAsyncProvider; +import org.apache.tuscany.sca.runtime.DomainRegistryFactory; +import org.apache.tuscany.sca.runtime.DomainRegistry; +import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; + +/** + * A class that wraps the mechanics for sending async responses + * and hides the decision about whether the response will be processed + * natively or non-natively + * + * This class is generic, based on the type of targetAddress information required by + * the Binding that creates it + */ +public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializable { + + /** + * + */ + private static final long serialVersionUID = -7992598227671386588L; + + private transient RuntimeEndpoint requestEndpoint; + private transient RuntimeEndpointReference responseEndpointReference; + private T responseTargetAddress; + private String relatesToMsgID; + private String operationName; + private transient MessageFactory messageFactory; + private String bindingType = ""; + private boolean isNativeAsync; + + private String endpointURI; + private String endpointReferenceURI; + private String domainURI; + + private transient DomainRegistry domainRegistry; + private transient ExtensionPointRegistry registry; + + public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint, + RuntimeEndpointReference responseEndpointReference, + T responseTargetAddress, String relatesToMsgID, + String operationName, MessageFactory messageFactory) { + super(); + this.requestEndpoint = requestEndpoint; + this.responseEndpointReference = responseEndpointReference; + this.responseTargetAddress = responseTargetAddress; + this.relatesToMsgID = relatesToMsgID; + this.operationName = operationName; + this.messageFactory = messageFactory; + + CompositeContext context = null; + if(requestEndpoint != null ) { + endpointURI = requestEndpoint.getURI(); + context = requestEndpoint.getCompositeContext(); + } // end if + if(responseEndpointReference != null ) { + endpointReferenceURI = responseEndpointReference.getURI(); + context = responseEndpointReference.getCompositeContext(); + } + + if( context != null ) { + domainURI = context.getDomainURI(); + registry = context.getExtensionPointRegistry(); + } // end if + + if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) && + (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){ + isNativeAsync = true; + } else { + isNativeAsync = false; + } // end if + } // end constructor + + /** + * If you have a Tuscany message you can call this + */ + public void invokeAsyncResponse(Message responseMessage) { + responseMessage.getHeaders().put(Constants.ASYNC_RESPONSE_INVOKER, this); + responseMessage.getHeaders().put(Constants.RELATES_TO, relatesToMsgID); + + if (isNativeAsync){ + // process the response as a native async response + requestEndpoint.invokeAsyncResponse(responseMessage); + } else { + // process the response as a non-native async response + responseEndpointReference.invoke(responseMessage); + } + } // end method invokeAsyncReponse(Message) + + public T getResponseTargetAddress() { + return responseTargetAddress; + } + + public void setResponseTargetAddress(T responseTargetAddress) { + this.responseTargetAddress = responseTargetAddress; + } + + public String getRelatesToMsgID() { + return relatesToMsgID; + } + + public void setRelatesToMsgID(String relatesToMsgID) { + this.relatesToMsgID = relatesToMsgID; + } + + /** + * Invokes the async response where the parameter is Java bean(s) + * - this method creates a Tuscany message + * + * @param args the response data + * @param headers - any header + */ + public void invokeAsyncResponse(Object args, Map headers) { + + Message msg = messageFactory.createMessage(); + + msg.setOperation(getOperation( args )); + + // If this is not native async, then any Throwable is being passed as a parameter and + // requires wrapping + if( !isNativeAsync && args instanceof Throwable ) { + args = new AsyncFaultWrapper( (Throwable) args ); + } // end if + + // If this is not native async, then the message must contain an array of args since + // this is what is expected when invoking an EPR for the async response... + if( !isNativeAsync ) { + Object[] objs = new Object[1]; + objs[0] = args; + args = objs; + } // end if + + msg.setTo(requestEndpoint); + msg.setFrom(responseEndpointReference); + + if( headers != null ) { + msg.getHeaders().putAll(headers); + } + + if( args instanceof Throwable ) { + msg.setFaultBody(args); + } else { + msg.setBody(args); + } // end if + + invokeAsyncResponse(msg); + + } // end method invokeAsyncResponse(Object) + + private Operation getOperation( Object args ) { + if( isNativeAsync ) { + List ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations(); + for (Operation op : ops) { + if( operationName.equals(op.getName()) ) return op; + } // end for + return null; + } else { + operationName = "setResponse"; + if( args instanceof Throwable ) { operationName = "setWrappedFault"; } + List ops = responseEndpointReference.getReference().getInterfaceContract().getInterface().getOperations(); + for (Operation op : ops) { + if( operationName.equals(op.getName()) ) return op; + } // end for + return null; + } // end if + } // end getOperation + + public void setBindingType(String bindingType) { + this.bindingType = bindingType; + } // end method setBindingType + + public String getBindingType() { + return bindingType; + } // end method getBindingType + + public RuntimeEndpoint getRequestEndpoint() { + return this.requestEndpoint; + } + + public RuntimeEndpointReference getResponseEndpointReference() { + return this.responseEndpointReference; + } + + public void setResponseEndpointReference( + RuntimeEndpointReference responseEndpointReference) { + this.responseEndpointReference = responseEndpointReference; + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException + { + in.defaultReadObject(); + + requestEndpoint = retrieveEndpoint(endpointURI); + responseEndpointReference = retrieveEndpointReference(endpointReferenceURI); + + messageFactory = getMessageFactory(); + + if (responseTargetAddress instanceof EndpointReference){ + // fix the target as in this case it will be an EPR + EndpointReference epr = (EndpointReference)responseTargetAddress; + responseTargetAddress = (T)retrieveEndpointReference(epr.getURI()); + } // end if + } // end method readObject + + /** + * Gets a message factory + * @return + */ + private MessageFactory getMessageFactory() { + return registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(MessageFactory.class); + } // end method getMessageFactory + + /** + * Fetches the EndpointReference identified by an endpoint reference URI + * @param uri - the URI of the endpoint reference + * @return - the EndpointReference matching the supplied URI - null if no EPR is found which + * matches the URI + */ + private RuntimeEndpointReference retrieveEndpointReference(String uri) { + if( uri == null ) return null; + if( domainRegistry == null ) return null; + List refs = domainRegistry.findEndpointReferences( uri ); + // If there is more than EndpointReference with the uri... + if( refs.isEmpty() ) return null; + // TODO: what if there is more than 1 EPR with the given URI? + return (RuntimeEndpointReference) refs.get(0); + } // end method retrieveEndpointReference + + /** + * Fetches the Endpoint identified by an endpoint URI + * - the Endpoint is retrieved from the DomainRegistry + * @param uri - the URI of the Endpoint + * @return - the Endpoint corresponding to the URI, or null if no Endpoint is found which has the + * supplied URI + */ + private RuntimeEndpoint retrieveEndpoint(String uri) { + if( uri == null ) return null; + if( domainRegistry == null ) domainRegistry = getEndpointRegistry( uri ); + if( domainRegistry == null ) return null; + // TODO what if more than one Endpoint gets returned?? + return (RuntimeEndpoint) domainRegistry.findEndpoint(uri).get(0); + } // end method retrieveEndpoint + + /** + * Gets the DomainRegistry which contains an Endpoint with the supplied URI + * @param uri - The URI of an Endpoint + * @return - the DomainRegistry containing the Endpoint with the supplied URI - null if no + * such DomainRegistry can be found + */ + private DomainRegistry getEndpointRegistry(String uri) { + ExtensionPointRegistry registry = null; + DomainRegistry domainRegistry = null; + + CompositeContext context = CompositeContext.getCurrentCompositeContext(); + if( context == null && requestEndpoint != null ) context = requestEndpoint.getCompositeContext(); + if( context != null ) { + registry = context.getExtensionPointRegistry(); + domainRegistry = getEndpointRegistry( registry ); + if( domainRegistry != null ) { + this.registry = registry; + return domainRegistry; + } // end if + } // end if + + // Deal with the case where there is no context available + for(ExtensionPointRegistry r : ExtensionPointRegistryLocator.getExtensionPointRegistries()) { + registry = r; + if( registry != null ) { + // Find the actual Endpoint in the DomainRegistry + domainRegistry = getEndpointRegistry( registry ); + + if( domainRegistry != null ) { + for( Endpoint endpoint : domainRegistry.findEndpoint(uri) ) { + // TODO: For the present, simply return the first registry with a matching endpoint + this.registry = registry; + return domainRegistry; + } // end for + } // end if + } // end if + } // end for + + return null; + } // end method getEndpointRegistry + + /** + * Get the DomainRegistry + * @param registry - the ExtensionPoint registry + * @return the DomainRegistry - will be null if the DomainRegistry cannot be found + */ + private DomainRegistry getEndpointRegistry( ExtensionPointRegistry registry) { + DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry); + + if( domainRegistryFactory == null ) return null; + + // Find the first endpoint registry that matches the domain name + if( domainURI != null ) { + for( DomainRegistry domainRegistry : domainRegistryFactory.getEndpointRegistries() ) { + if( domainURI.equals( domainRegistry.getDomainURI() ) ) return domainRegistry; + } // end for + } // end if + + // if there was no domainName to match, simply return the first DomainRegistry if there is one... + + if (domainRegistryFactory.getEndpointRegistries().size() > 0){ + DomainRegistry domainRegistry = (DomainRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0]; + return domainRegistry; + } else { + return null; + } + + } // end method + +} // end class -- cgit v1.2.3