From 132aa8a77685ec92bc90c03f987650d275a7b639 Mon Sep 17 00:00:00 2001 From: lresende Date: Mon, 30 Sep 2013 06:59:11 +0000 Subject: 2.0.1 RC1 release tag git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1527464 13f79535-47bb-0310-9956-ffa450edef68 --- .../tuscany/sca/core/invocation/AsyncContext.java | 42 ++ .../sca/core/invocation/AsyncFaultWrapper.java | 107 +++ .../core/invocation/AsyncResponseException.java | 47 ++ .../sca/core/invocation/AsyncResponseHandler.java | 58 ++ .../sca/core/invocation/AsyncResponseInvoker.java | 351 ++++++++++ .../sca/core/invocation/AsyncResponseService.java | 50 ++ .../invocation/CallableReferenceObjectFactory.java | 58 ++ .../sca/core/invocation/CallbackHandler.java | 53 ++ .../invocation/CallbackInterfaceInterceptor.java | 60 ++ .../invocation/CallbackReferenceObjectFactory.java | 50 ++ .../core/invocation/CallbackWireObjectFactory.java | 48 ++ .../sca/core/invocation/CglibProxyFactory.java | 155 +++++ .../tuscany/sca/core/invocation/Constants.java | 38 ++ .../DefaultProxyFactoryExtensionPoint.java | 95 +++ .../core/invocation/ExtensibleProxyFactory.java | 128 ++++ .../core/invocation/ExtensibleWireProcessor.java | 51 ++ .../sca/core/invocation/InterceptorAsyncImpl.java | 111 ++++ .../core/invocation/JDKAsyncResponseInvoker.java | 40 ++ .../core/invocation/NonBlockingInterceptor.java | 197 ++++++ .../core/invocation/ProxyCreationException.java | 48 ++ .../tuscany/sca/core/invocation/ProxyFactory.java | 96 +++ .../invocation/ProxyFactoryExtensionPoint.java | 54 ++ .../sca/core/invocation/RuntimeInvoker.java | 210 ++++++ .../sca/core/invocation/WireObjectFactory.java | 56 ++ .../invocation/impl/AsyncInvocationFutureImpl.java | 285 ++++++++ .../invocation/impl/AsyncJDKInvocationHandler.java | 740 +++++++++++++++++++++ .../sca/core/invocation/impl/AsyncResponse.java | 68 ++ .../invocation/impl/AsyncResponseHandlerImpl.java | 197 ++++++ .../core/invocation/impl/InvocationChainImpl.java | 313 +++++++++ .../impl/JDKCallbackInvocationHandler.java | 153 +++++ .../core/invocation/impl/JDKInvocationHandler.java | 436 ++++++++++++ .../sca/core/invocation/impl/JDKProxyFactory.java | 248 +++++++ .../core/invocation/impl/MessageFactoryImpl.java | 43 ++ .../sca/core/invocation/impl/MessageImpl.java | 114 ++++ .../impl/NoMethodForOperationException.java | 45 ++ .../sca/core/invocation/impl/PhaseManager.java | 313 +++++++++ .../sca/core/invocation/impl/PhaseSorter.java | 236 +++++++ 37 files changed, 5394 insertions(+) create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncContext.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseService.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallableReferenceObjectFactory.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackHandler.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackInterfaceInterceptor.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackReferenceObjectFactory.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackWireObjectFactory.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CglibProxyFactory.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/DefaultProxyFactoryExtensionPoint.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ExtensibleProxyFactory.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ExtensibleWireProcessor.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKAsyncResponseInvoker.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/NonBlockingInterceptor.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyCreationException.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyFactory.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyFactoryExtensionPoint.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/WireObjectFactory.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java (limited to 'sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation') diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncContext.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncContext.java new file mode 100644 index 0000000000..b91bfcca8e --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncContext.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +/** + * Allows extensions to associate data with a client's async request. + */ +public interface AsyncContext { + + /** + * Looks up an attribute value by name. + * @param name The name of the attribute + * @return The value of the attribute + */ + public Object getAttribute(String name); + + /** + * Sets the value of an attribute. + * + * @param name The name of the attribute + * @param value + */ + public void setAttribute(String name, Object value); + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java new file mode 100644 index 0000000000..b35d493d3c --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncFaultWrapper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * A class which is used to wrap an Exception of any type thrown by an asynchronous service operation and + * which is returned through a separate one-way message sent asynchronously from the server to the client. + * + */ +public class AsyncFaultWrapper { + + private String faultClassName = null; + private String faultMessage = null; + private AsyncFaultWrapper containedFault = null; + + + public AsyncFaultWrapper() { + super(); + } + + /** + * Constructor which creates an AsyncFaultWrapper which wraps the supplied Throwable + * @param e - a Throwable which is wrapped by this AsyncFaultWrapper + */ + public AsyncFaultWrapper( Throwable e ) { + super(); + storeFault( e ); + } + + /** + * Stores a given Throwable in this AsyncFaultWrapper + * If the supplied Throwable itself contains an embedded Throwable ("cause"), this is recursively + * wrapped by a nested AsyncFaultWrapper + * @param e - the Throwable + */ + public void storeFault( Throwable e ) { + setFaultClassName( e.getClass().getCanonicalName() ); + setFaultMessage( e.getMessage() ); + Throwable cause = e.getCause(); + if( cause != null ) setContainedFault( new AsyncFaultWrapper( cause ) ); + } + + /** + * Retrieves the Throwable wrapped by this AsyncFaultWrapper + * + * Note: When this method is invoked, the method attempts to instantiate an instance of the wrapped Throwable. + * It does this using the Thread Context Class Loader (TCCL) - the caller *MUST* ensure that the TCCL has access + * to the class of the wrapped Throwable and also to the classes of any nested Throwables. If this is not done, + * a ClassNotFound exception is thrown + * + * @return - the Throwable wrapped by this AsyncFaultWrapper - the Throwable will contain any nested Throwable(s) + * in its cause property + * @throws ClassNotFound exception, if the class of the wrapped Throwable is not accessible from the TCCL + */ + public Throwable retrieveFault( ) { + try { + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + Class faultClass = tccl.loadClass(faultClassName); + Class xclass = (Class) faultClass; + if( containedFault != null ) { + // If there is a nested fault, retrieve this recursively + Constructor cons = xclass.getConstructor(String.class, Throwable.class); + return (Throwable) cons.newInstance(faultMessage, getContainedFault().retrieveFault()); + } else { + try { + Constructor cons = xclass.getConstructor(String.class); + return (Throwable) cons.newInstance(faultMessage); + } catch (NoSuchMethodException e) { + Constructor cons = xclass.getConstructor(); + return (Throwable) cons.newInstance(); + } + } // end if + } catch (Exception e) { + return e; + } // end try + } // end method retrieveFault + + public void setFaultClassName( String name ) { this.faultClassName = name; } + public String getFaultClassName() { return this.faultClassName; } + + public String getFaultMessage() { return faultMessage; } + public void setFaultMessage(String faultMessage) { this.faultMessage = faultMessage; } + + public AsyncFaultWrapper getContainedFault() { return containedFault; } + public void setContainedFault(AsyncFaultWrapper containedFault) { this.containedFault = containedFault; } + +} // end class AsyncFaultWrapper diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java new file mode 100644 index 0000000000..df90c3286d --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +/** + * An exception which is used to signal that a service has been invoked asynchronously + * and that the result will be sent separately + * + */ +public class AsyncResponseException extends RuntimeException { + + private static final long serialVersionUID = 457954562860541631L; + + public AsyncResponseException() { + super(); + } + + public AsyncResponseException(String arg0, Throwable arg1) { + super(arg0, arg1); + } + + public AsyncResponseException(String arg0) { + super(arg0); + } + + public AsyncResponseException(Throwable arg0) { + super(arg0); + } + +} // end class AsyncResponseException diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java new file mode 100644 index 0000000000..306a141433 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import org.oasisopen.sca.annotation.OneWay; +import org.oasisopen.sca.annotation.Remotable; + +/** + * An interface which describes a general response pattern for the asynchronous invocation of a service + * + * @param - the type of the non-fault response + */ +public interface AsyncResponseHandler extends AsyncResponseService { + + /** + * Async process completed with a wrapped Fault. Must only be invoked once + * + * @param e - the wrapper containing the Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + @OneWay + public void setWrappedFault(AsyncFaultWrapper w); + + /** + * Async process completed with a Fault. Must only be invoked once. + * @param e - the Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + @OneWay + public void setFault( Throwable e ); + + /** + * Async process completed with a response message. Must only be invoked once + * + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + * @param res - the response message, which is of type V + */ + @OneWay + public void setResponse(V res); + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java new file mode 100644 index 0000000000..1ebc9c633a --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.context.CompositeContext; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.ExtensionPointRegistryLocator; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.provider.EndpointAsyncProvider; +import org.apache.tuscany.sca.runtime.DomainRegistryFactory; +import org.apache.tuscany.sca.runtime.DomainRegistry; +import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; + +/** + * A class that wraps the mechanics for sending async responses + * and hides the decision about whether the response will be processed + * natively or non-natively + * + * This class is generic, based on the type of targetAddress information required by + * the Binding that creates it + */ +public class AsyncResponseInvoker implements InvokerAsyncResponse, Serializable { + + /** + * + */ + private static final long serialVersionUID = -7992598227671386588L; + + private transient RuntimeEndpoint requestEndpoint; + private transient RuntimeEndpointReference responseEndpointReference; + private T responseTargetAddress; + private String relatesToMsgID; + private String operationName; + private transient MessageFactory messageFactory; + private String bindingType = ""; + private boolean isNativeAsync; + + private String endpointURI; + private String endpointReferenceURI; + private String domainURI; + + private transient DomainRegistry domainRegistry; + private transient ExtensionPointRegistry registry; + + public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint, + RuntimeEndpointReference responseEndpointReference, + T responseTargetAddress, String relatesToMsgID, + String operationName, MessageFactory messageFactory) { + super(); + this.requestEndpoint = requestEndpoint; + this.responseEndpointReference = responseEndpointReference; + this.responseTargetAddress = responseTargetAddress; + this.relatesToMsgID = relatesToMsgID; + this.operationName = operationName; + this.messageFactory = messageFactory; + + CompositeContext context = null; + if(requestEndpoint != null ) { + endpointURI = requestEndpoint.getURI(); + context = requestEndpoint.getCompositeContext(); + } // end if + if(responseEndpointReference != null ) { + endpointReferenceURI = responseEndpointReference.getURI(); + context = responseEndpointReference.getCompositeContext(); + } + + if( context != null ) { + domainURI = context.getDomainURI(); + registry = context.getExtensionPointRegistry(); + } // end if + + if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) && + (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){ + isNativeAsync = true; + } else { + isNativeAsync = false; + } // end if + } // end constructor + + /** + * If you have a Tuscany message you can call this + */ + public void invokeAsyncResponse(Message responseMessage) { + responseMessage.getHeaders().put(Constants.ASYNC_RESPONSE_INVOKER, this); + responseMessage.getHeaders().put(Constants.RELATES_TO, relatesToMsgID); + + if (isNativeAsync){ + // process the response as a native async response + requestEndpoint.invokeAsyncResponse(responseMessage); + } else { + // process the response as a non-native async response + responseEndpointReference.invoke(responseMessage); + } + } // end method invokeAsyncReponse(Message) + + public T getResponseTargetAddress() { + return responseTargetAddress; + } + + public void setResponseTargetAddress(T responseTargetAddress) { + this.responseTargetAddress = responseTargetAddress; + } + + public String getRelatesToMsgID() { + return relatesToMsgID; + } + + public void setRelatesToMsgID(String relatesToMsgID) { + this.relatesToMsgID = relatesToMsgID; + } + + /** + * Invokes the async response where the parameter is Java bean(s) + * - this method creates a Tuscany message + * + * @param args the response data + * @param headers - any header + */ + public void invokeAsyncResponse(Object args, Map headers) { + + Message msg = messageFactory.createMessage(); + + msg.setOperation(getOperation( args )); + + // If this is not native async, then any Throwable is being passed as a parameter and + // requires wrapping + if( !isNativeAsync && args instanceof Throwable ) { + args = new AsyncFaultWrapper( (Throwable) args ); + } // end if + + // If this is not native async, then the message must contain an array of args since + // this is what is expected when invoking an EPR for the async response... + if( !isNativeAsync ) { + Object[] objs = new Object[1]; + objs[0] = args; + args = objs; + } // end if + + msg.setTo(requestEndpoint); + msg.setFrom(responseEndpointReference); + + if( headers != null ) { + msg.getHeaders().putAll(headers); + } + + if( args instanceof Throwable ) { + msg.setFaultBody(args); + } else { + msg.setBody(args); + } // end if + + invokeAsyncResponse(msg); + + } // end method invokeAsyncResponse(Object) + + private Operation getOperation( Object args ) { + if( isNativeAsync ) { + List ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations(); + for (Operation op : ops) { + if( operationName.equals(op.getName()) ) return op; + } // end for + return null; + } else { + operationName = "setResponse"; + if( args instanceof Throwable ) { operationName = "setWrappedFault"; } + List ops = responseEndpointReference.getReference().getInterfaceContract().getInterface().getOperations(); + for (Operation op : ops) { + if( operationName.equals(op.getName()) ) return op; + } // end for + return null; + } // end if + } // end getOperation + + public void setBindingType(String bindingType) { + this.bindingType = bindingType; + } // end method setBindingType + + public String getBindingType() { + return bindingType; + } // end method getBindingType + + public RuntimeEndpoint getRequestEndpoint() { + return this.requestEndpoint; + } + + public RuntimeEndpointReference getResponseEndpointReference() { + return this.responseEndpointReference; + } + + public void setResponseEndpointReference( + RuntimeEndpointReference responseEndpointReference) { + this.responseEndpointReference = responseEndpointReference; + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException + { + in.defaultReadObject(); + + requestEndpoint = retrieveEndpoint(endpointURI); + responseEndpointReference = retrieveEndpointReference(endpointReferenceURI); + + messageFactory = getMessageFactory(); + + if (responseTargetAddress instanceof EndpointReference){ + // fix the target as in this case it will be an EPR + EndpointReference epr = (EndpointReference)responseTargetAddress; + responseTargetAddress = (T)retrieveEndpointReference(epr.getURI()); + } // end if + } // end method readObject + + /** + * Gets a message factory + * @return + */ + private MessageFactory getMessageFactory() { + return registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(MessageFactory.class); + } // end method getMessageFactory + + /** + * Fetches the EndpointReference identified by an endpoint reference URI + * @param uri - the URI of the endpoint reference + * @return - the EndpointReference matching the supplied URI - null if no EPR is found which + * matches the URI + */ + private RuntimeEndpointReference retrieveEndpointReference(String uri) { + if( uri == null ) return null; + if( domainRegistry == null ) return null; + List refs = domainRegistry.findEndpointReferences( uri ); + // If there is more than EndpointReference with the uri... + if( refs.isEmpty() ) return null; + // TODO: what if there is more than 1 EPR with the given URI? + return (RuntimeEndpointReference) refs.get(0); + } // end method retrieveEndpointReference + + /** + * Fetches the Endpoint identified by an endpoint URI + * - the Endpoint is retrieved from the DomainRegistry + * @param uri - the URI of the Endpoint + * @return - the Endpoint corresponding to the URI, or null if no Endpoint is found which has the + * supplied URI + */ + private RuntimeEndpoint retrieveEndpoint(String uri) { + if( uri == null ) return null; + if( domainRegistry == null ) domainRegistry = getEndpointRegistry( uri ); + if( domainRegistry == null ) return null; + // TODO what if more than one Endpoint gets returned?? + return (RuntimeEndpoint) domainRegistry.findEndpoint(uri).get(0); + } // end method retrieveEndpoint + + /** + * Gets the DomainRegistry which contains an Endpoint with the supplied URI + * @param uri - The URI of an Endpoint + * @return - the DomainRegistry containing the Endpoint with the supplied URI - null if no + * such DomainRegistry can be found + */ + private DomainRegistry getEndpointRegistry(String uri) { + ExtensionPointRegistry registry = null; + DomainRegistry domainRegistry = null; + + CompositeContext context = CompositeContext.getCurrentCompositeContext(); + if( context == null && requestEndpoint != null ) context = requestEndpoint.getCompositeContext(); + if( context != null ) { + registry = context.getExtensionPointRegistry(); + domainRegistry = getEndpointRegistry( registry ); + if( domainRegistry != null ) { + this.registry = registry; + return domainRegistry; + } // end if + } // end if + + // Deal with the case where there is no context available + for(ExtensionPointRegistry r : ExtensionPointRegistryLocator.getExtensionPointRegistries()) { + registry = r; + if( registry != null ) { + // Find the actual Endpoint in the DomainRegistry + domainRegistry = getEndpointRegistry( registry ); + + if( domainRegistry != null ) { + for( Endpoint endpoint : domainRegistry.findEndpoint(uri) ) { + // TODO: For the present, simply return the first registry with a matching endpoint + this.registry = registry; + return domainRegistry; + } // end for + } // end if + } // end if + } // end for + + return null; + } // end method getEndpointRegistry + + /** + * Get the DomainRegistry + * @param registry - the ExtensionPoint registry + * @return the DomainRegistry - will be null if the DomainRegistry cannot be found + */ + private DomainRegistry getEndpointRegistry( ExtensionPointRegistry registry) { + DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry); + + if( domainRegistryFactory == null ) return null; + + // Find the first endpoint registry that matches the domain name + if( domainURI != null ) { + for( DomainRegistry domainRegistry : domainRegistryFactory.getEndpointRegistries() ) { + if( domainURI.equals( domainRegistry.getDomainURI() ) ) return domainRegistry; + } // end for + } // end if + + // if there was no domainName to match, simply return the first DomainRegistry if there is one... + + if (domainRegistryFactory.getEndpointRegistries().size() > 0){ + DomainRegistry domainRegistry = (DomainRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0]; + return domainRegistry; + } else { + return null; + } + + } // end method + +} // end class diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseService.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseService.java new file mode 100644 index 0000000000..62d7f74505 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import org.oasisopen.sca.annotation.OneWay; +import org.oasisopen.sca.annotation.Remotable; + +/** + * An interface which describes the client response service interface for a non-native binding + * performing an asynchronous invocation of a service + * + * @param - the type of the non-fault response + */ +@Remotable() +public interface AsyncResponseService { + + /** + * Async process completed with a wrapped Fault. Must only be invoked once + * @param e - the wrapper containing the Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + @OneWay + public void setWrappedFault(AsyncFaultWrapper w); + + /** + * Async process completed with a response message. Must only be invoked once + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + * @param res - the response message, which is of type V + */ + @OneWay + public void setResponse(V res); + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallableReferenceObjectFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallableReferenceObjectFactory.java new file mode 100644 index 0000000000..400bb333fd --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallableReferenceObjectFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import org.apache.tuscany.sca.core.factory.ObjectCreationException; +import org.apache.tuscany.sca.core.factory.ObjectFactory; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.oasisopen.sca.ServiceReference; + +/** + * Uses a wire to return a CallableReference + * + * @version $Rev$ $Date$ + * @tuscany.spi.extension.asclient + */ +public class CallableReferenceObjectFactory implements ObjectFactory> { + private Class businessInterface; + private RuntimeEndpointReference endpointReference; + + /** + * Constructor. + * + * To support the @Reference protected CallableReference ref; + * + * @param businessInterface the interface to inject + * @param component the component defining the reference to be injected + * @param reference the reference to be injected + * @param binding the binding for the reference + */ + public CallableReferenceObjectFactory(Class businessInterface, + RuntimeEndpointReference endpointReference) { + this.businessInterface = businessInterface; + this.endpointReference = endpointReference; + } + + public ServiceReference getInstance() throws ObjectCreationException { + RuntimeComponent component = (RuntimeComponent) endpointReference.getComponent(); + return component.getComponentContext().getServiceReference(businessInterface, endpointReference); + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackHandler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackHandler.java new file mode 100644 index 0000000000..bccce0d20a --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +/** + * Models callback information arriving in the forward message + */ +public class CallbackHandler { + private String callbackTargetURI; + private boolean cloneCallbackWire = true; + + public CallbackHandler(String callbackTargetURI){ + setCallbackTargetURI(callbackTargetURI); + } + + public CallbackHandler(String callbackTargetURI, boolean cloneCallbackWire){ + setCallbackTargetURI(callbackTargetURI); + setCloneCallbackWire(cloneCallbackWire); + } + + public String getCallbackTargetURI() { + return callbackTargetURI; + } + + public boolean getCloneCallbackWire() { + return cloneCallbackWire; + } + + public void setCallbackTargetURI(String callbackTargetURI) { + this.callbackTargetURI = callbackTargetURI; + } + + public void setCloneCallbackWire(boolean cloneCallbackWire) { + this.cloneCallbackWire = cloneCallbackWire; + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackInterfaceInterceptor.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackInterfaceInterceptor.java new file mode 100644 index 0000000000..588eaa2d15 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackInterfaceInterceptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; + +/** + * An interceptor applied to the forward direction of a wire that ensures the callback target implements the required + * service contract. This is required as callback targets may be set dynamically by service implementations. + * + * @version $Rev$ $Date$ + */ +public class CallbackInterfaceInterceptor implements Interceptor { + private Invoker next; + + public CallbackInterfaceInterceptor() { + } + + public Message invoke(Message msg) { + + /* TODO - EPR - not required for OASIS + ReferenceParameters parameters = msg.getFrom().getReferenceParameters(); + if (parameters.getCallbackObjectID() != null || parameters.getCallbackReference() != msg.getFrom() + .getCallbackEndpoint()) { + */ + return next.invoke(msg); + /* + } else { + throw new NoRegisteredCallbackException("Callback target does not implement the callback interface"); + } + */ + } + + public void setNext(Invoker next) { + this.next = next; + } + + public Invoker getNext() { + return next; + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackReferenceObjectFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackReferenceObjectFactory.java new file mode 100644 index 0000000000..a56983a5d3 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackReferenceObjectFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import java.util.List; + +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl; +import org.apache.tuscany.sca.core.factory.ObjectCreationException; +import org.apache.tuscany.sca.core.factory.ObjectFactory; +import org.oasisopen.sca.ServiceReference; + +/** + * Uses a wire to return a CallableReference + * + * @version $Rev$ $Date$ + * @tuscany.spi.extension.asclient + */ +public class CallbackReferenceObjectFactory implements ObjectFactory> { + private Class businessInterface; + private ProxyFactory proxyFactory; + private List wires; + + public CallbackReferenceObjectFactory(Class interfaze, ProxyFactory proxyFactory, List wires) { + this.businessInterface = interfaze; + this.proxyFactory = proxyFactory; + this.wires = wires; + } + + public ServiceReference getInstance() throws ObjectCreationException { + return new CallbackServiceReferenceImpl(businessInterface, wires); + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackWireObjectFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackWireObjectFactory.java new file mode 100644 index 0000000000..fff3727d87 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CallbackWireObjectFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import java.util.List; + +import org.apache.tuscany.sca.core.factory.ObjectCreationException; +import org.apache.tuscany.sca.core.factory.ObjectFactory; +import org.apache.tuscany.sca.runtime.Invocable; + +/** + * Returns proxy instance for a wire callback + * + * @version $Rev$ $Date$ + * @tuscany.spi.extension.asclient + */ +public class CallbackWireObjectFactory implements ObjectFactory { + private Class businessInterface; + private ProxyFactory proxyFactory; + private List wires; + + public CallbackWireObjectFactory(Class interfaze, ProxyFactory proxyFactory, List wires) { + this.businessInterface = interfaze; + this.proxyFactory = proxyFactory; + this.wires = wires; + } + + public B getInstance() throws ObjectCreationException { + return proxyFactory.createCallbackProxy(businessInterface, wires); + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CglibProxyFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CglibProxyFactory.java new file mode 100644 index 0000000000..a597f8b1ee --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/CglibProxyFactory.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import java.lang.reflect.Method; +import java.util.List; + +import net.sf.cglib.proxy.Callback; +import net.sf.cglib.proxy.Enhancer; +import net.sf.cglib.proxy.Factory; +import net.sf.cglib.proxy.MethodInterceptor; +import net.sf.cglib.proxy.MethodProxy; + +import org.apache.tuscany.sca.core.context.ServiceReferenceExt; +import org.apache.tuscany.sca.core.context.impl.ServiceReferenceImpl; +import org.apache.tuscany.sca.core.invocation.impl.JDKCallbackInvocationHandler; +import org.apache.tuscany.sca.core.invocation.impl.JDKInvocationHandler; +import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.oasisopen.sca.ServiceReference; + +/** + * The implementation of a wire service that uses cglib dynamic proxies + * + * @version $Rev$ $Date$ + * @tuscany.spi.extension.asclient + */ +public class CglibProxyFactory implements ProxyFactory { + private MessageFactory messageFactory; + + public CglibProxyFactory(MessageFactory messageFactory, InterfaceContractMapper mapper) { + this.messageFactory = messageFactory; + + } + + public T createProxy(final Class interfaze, Invocable invocable) throws ProxyCreationException { + if (invocable instanceof RuntimeEndpoint) { + Enhancer enhancer = new Enhancer(); + enhancer.setSuperclass(interfaze); + enhancer.setCallback(new CglibMethodInterceptor(interfaze, invocable)); + Object proxy = enhancer.create(); + return interfaze.cast(proxy); + } + ServiceReference serviceReference = new ServiceReferenceImpl(interfaze, invocable, null); + return createProxy(serviceReference); + } + + /** + * create the proxy with cglib. use the same JDKInvocationHandler as + * JDKProxyService. + */ + public T createProxy(ServiceReference callableReference) throws ProxyCreationException { + Enhancer enhancer = new Enhancer(); + Class interfaze = callableReference.getBusinessInterface(); + enhancer.setSuperclass(interfaze); + enhancer.setCallback(new CglibMethodInterceptor(callableReference)); + Object proxy = enhancer.create(); + ((ServiceReferenceImpl)callableReference).setProxy(proxy); + return interfaze.cast(proxy); + } + + /** + * create the callback proxy with cglib. use the same + * JDKCallbackInvocationHandler as JDKProxyService. + */ + public T createCallbackProxy(Class interfaze, final List wires) throws ProxyCreationException { + ServiceReferenceImpl callbackReference = new ServiceReferenceImpl(interfaze, wires.get(0), null); + return callbackReference != null ? createCallbackProxy(callbackReference) : null; + } + + /** + * create the callback proxy with cglib. use the same + * JDKCallbackInvocationHandler as JDKProxyService. + */ + public T createCallbackProxy(ServiceReference callbackReference) throws ProxyCreationException { + Enhancer enhancer = new Enhancer(); + Class interfaze = callbackReference.getBusinessInterface(); + enhancer.setSuperclass(interfaze); + enhancer.setCallback(new CglibMethodInterceptor(callbackReference)); + Object object = enhancer.create(); + T proxy = interfaze.cast(object); + ((ServiceReferenceExt)callbackReference).setProxy(proxy); + return proxy; + } + + @SuppressWarnings("unchecked") + public > R cast(B target) throws IllegalArgumentException { + if (isProxyClass(target.getClass())) { + Factory factory = (Factory)target; + Callback[] callbacks = factory.getCallbacks(); + if (callbacks.length != 1 || !(callbacks[0] instanceof CglibMethodInterceptor)) { + throw new IllegalArgumentException("The object is not a known proxy."); + } + CglibMethodInterceptor interceptor = (CglibMethodInterceptor)callbacks[0]; + return (R)interceptor.invocationHandler.getCallableReference(); + } else { + throw new IllegalArgumentException("The object is not a known proxy."); + } + } + + /** + * @see org.apache.tuscany.sca.core.invocation.ProxyFactory#isProxyClass(java.lang.Class) + */ + public boolean isProxyClass(Class clazz) { + return Factory.class.isAssignableFrom(clazz); + } + + private class CglibMethodInterceptor implements MethodInterceptor { + private JDKInvocationHandler invocationHandler; + + public CglibMethodInterceptor(ServiceReference callableReference) { + invocationHandler = new JDKInvocationHandler(messageFactory, callableReference); + } + + public CglibMethodInterceptor(Class interfaze, Invocable invocable) { + invocationHandler = new JDKInvocationHandler(messageFactory, interfaze, invocable); + } + + public CglibMethodInterceptor(ServiceReferenceImpl callbackReference) { + invocationHandler = new JDKCallbackInvocationHandler(messageFactory, callbackReference); + } + + /** + * @see net.sf.cglib.proxy.MethodInterceptor#intercept(java.lang.Object, java.lang.reflect.Method, java.lang.Object[], net.sf.cglib.proxy.MethodProxy) + */ + public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable { + Object result = invocationHandler.invoke(proxy, method, args); + return result; + } + + } + + public void removeProxiesForContribution(ClassLoader contributionClassloader){ + // do nothing, no cache to clear + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java new file mode 100644 index 0000000000..00c650ada6 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +/** + * Constants used during invocation in the runtime + * + */ +public interface Constants { + public static final String MESSAGE_ID = "MESSAGE_ID"; + public static final String RELATES_TO = "RELATES_TO"; + public static final String ASYNC_RESPONSE_INVOKER = "ASYNC_RESPONSE_INVOKER"; + public static final String ASYNC_CALLBACK = "ASYNC_CALLBACK"; + public static final String CALLBACK = "CALLBACK"; + + /** + * If you've set the TCCL in your binding impl according to OASIS rules you can prevent + * the implementation provider from repeating the process by including this header + */ + public static final String SUPPRESS_TCCL_SWAP = "SUPPRESS_TCCL_SWAP"; +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/DefaultProxyFactoryExtensionPoint.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/DefaultProxyFactoryExtensionPoint.java new file mode 100644 index 0000000000..03505da302 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/DefaultProxyFactoryExtensionPoint.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.core.invocation.impl.JDKProxyFactory; +import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper; +import org.apache.tuscany.sca.invocation.MessageFactory; + +/** + * Default implementation of a ProxyFactoryExtensionPoint. + * + * @version $Rev$ $Date$ + */ +public class DefaultProxyFactoryExtensionPoint implements ProxyFactoryExtensionPoint, LifeCycleListener { + private InterfaceContractMapper interfaceContractMapper; + private MessageFactory messageFactory; + + private ProxyFactory interfaceFactory; + private ProxyFactory classFactory; + + public DefaultProxyFactoryExtensionPoint(ExtensionPointRegistry registry) { + UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class); + this.interfaceContractMapper = utilities.getUtility(InterfaceContractMapper.class); + + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + this.messageFactory = modelFactories.getFactory(MessageFactory.class); + + interfaceFactory = new JDKProxyFactory(registry, messageFactory, interfaceContractMapper); + } + + // public DefaultProxyFactoryExtensionPoint(MessageFactory messageFactory, InterfaceContractMapper mapper) { + // this.interfaceContractMapper = mapper; + // this.messageFactory = messageFactory; + // interfaceFactory = new JDKProxyFactory(null, messageFactory, mapper); + // } + + public ProxyFactory getClassProxyFactory() { + return classFactory; + } + + public ProxyFactory getInterfaceProxyFactory() { + return interfaceFactory; + } + + public void setClassProxyFactory(ProxyFactory factory) { + this.classFactory = factory; + + } + + public void setInterfaceProxyFactory(ProxyFactory factory) { + this.interfaceFactory = factory; + + } + + public void start() { + if (interfaceFactory instanceof LifeCycleListener) { + ((LifeCycleListener)interfaceFactory).start(); + } + if (classFactory instanceof LifeCycleListener) { + ((LifeCycleListener)classFactory).start(); + } + } + + public void stop() { + if (interfaceFactory instanceof LifeCycleListener) { + ((LifeCycleListener)interfaceFactory).stop(); + } + if (classFactory instanceof LifeCycleListener) { + ((LifeCycleListener)classFactory).stop(); + } + + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ExtensibleProxyFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ExtensibleProxyFactory.java new file mode 100644 index 0000000000..3cb9445d2d --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ExtensibleProxyFactory.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import java.util.List; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.runtime.Invocable; +import org.oasisopen.sca.ServiceReference; + +/** + * An extensible proxy factory. + * + * @version $Rev$ $Date$ + * @tuscany.spi.extension.asclient + */ +public class ExtensibleProxyFactory implements ProxyFactory { + + private ProxyFactoryExtensionPoint proxyFactories; + + public ExtensibleProxyFactory(ProxyFactoryExtensionPoint proxyFactories) { + this.proxyFactories = proxyFactories; + } + + public ExtensibleProxyFactory(ExtensionPointRegistry registry) { + this.proxyFactories = registry.getExtensionPoint(ProxyFactoryExtensionPoint.class); + } + + public static ExtensibleProxyFactory getInstance(ExtensionPointRegistry registry) { + UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class); + return utilities.getUtility(ExtensibleProxyFactory.class); + } + + /** + * @see org.apache.tuscany.sca.core.invocation.ProxyFactory#cast(java.lang.Object) + */ + @SuppressWarnings("unchecked") + public > R cast(B target) throws IllegalArgumentException { + ProxyFactory interfaceFactory = proxyFactories.getInterfaceProxyFactory(); + ProxyFactory classFactory = proxyFactories.getClassProxyFactory(); + if (interfaceFactory.isProxyClass(target.getClass())) { + return (R)interfaceFactory.cast(target); + } else if (classFactory != null && classFactory.isProxyClass(target.getClass())) { + return (R)classFactory.cast(target); + } else { + throw new IllegalArgumentException("The target is not a callable proxy"); + } + } + + /** + * @see org.apache.tuscany.sca.core.invocation.ProxyFactory#createCallbackProxy(java.lang.Class, + * java.util.List) + */ + public T createCallbackProxy(Class interfaze, List wires) throws ProxyCreationException { + ProxyFactory interfaceFactory = proxyFactories.getInterfaceProxyFactory(); + ProxyFactory classFactory = proxyFactories.getClassProxyFactory(); + if (interfaze.isInterface()) { + return interfaceFactory.createCallbackProxy(interfaze, wires); + } else { + return classFactory.createCallbackProxy(interfaze, wires); + } + } + + public T createProxy(ServiceReference callableReference) throws ProxyCreationException { + ProxyFactory interfaceFactory = proxyFactories.getInterfaceProxyFactory(); + ProxyFactory classFactory = proxyFactories.getClassProxyFactory(); + if (callableReference.getBusinessInterface().isInterface()) { + return interfaceFactory.createProxy(callableReference); + } else { + return classFactory.createProxy(callableReference); + } + } + + public T createCallbackProxy(ServiceReference callbackReference) throws ProxyCreationException { + ProxyFactory interfaceFactory = proxyFactories.getInterfaceProxyFactory(); + ProxyFactory classFactory = proxyFactories.getClassProxyFactory(); + if (callbackReference.getBusinessInterface().isInterface()) { + return interfaceFactory.createCallbackProxy(callbackReference); + } else { + return classFactory.createCallbackProxy(callbackReference); + } + } + + /** + * @see org.apache.tuscany.sca.core.invocation.ProxyFactory#createProxy(java.lang.Class, + * org.apache.tuscany.sca.runtime.Invocable) + */ + public T createProxy(Class interfaze, Invocable wire) throws ProxyCreationException { + ProxyFactory interfaceFactory = proxyFactories.getInterfaceProxyFactory(); + ProxyFactory classFactory = proxyFactories.getClassProxyFactory(); + if (interfaze.isInterface()) { + return interfaceFactory.createProxy(interfaze, wire); + } else { + return classFactory.createProxy(interfaze, wire); + } + } + + /** + * @see org.apache.tuscany.sca.core.invocation.ProxyFactory#isProxyClass(java.lang.Class) + */ + public boolean isProxyClass(Class clazz) { + ProxyFactory interfaceFactory = proxyFactories.getInterfaceProxyFactory(); + ProxyFactory classFactory = proxyFactories.getClassProxyFactory(); + return interfaceFactory.isProxyClass(clazz) || (classFactory != null && classFactory.isProxyClass(clazz)); + } + + public void removeProxiesForContribution(ClassLoader contributionClassloader){ + // do nothing, no cache to clear + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ExtensibleWireProcessor.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ExtensibleWireProcessor.java new file mode 100644 index 0000000000..08019ec3e2 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ExtensibleWireProcessor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.apache.tuscany.sca.runtime.RuntimeWireProcessor; +import org.apache.tuscany.sca.runtime.RuntimeWireProcessorExtensionPoint; + +/** + * The default implementation of an extensible WireProcessor + * + * @version $Rev$ $Date$ + */ +public class ExtensibleWireProcessor implements RuntimeWireProcessor { + + private RuntimeWireProcessorExtensionPoint processors; + + public ExtensibleWireProcessor(RuntimeWireProcessorExtensionPoint processors) { + this.processors = processors; + } + + public void process(RuntimeEndpoint endpoint) { + for (RuntimeWireProcessor processor : processors.getWireProcessors()) { + processor.process(endpoint); + } + } + + public void process(RuntimeEndpointReference endpointReference) { + for (RuntimeWireProcessor processor : processors.getWireProcessors()) { + processor.process(endpointReference); + } + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java new file mode 100644 index 0000000000..265311fe6b --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + + +import org.apache.tuscany.sca.invocation.InterceptorAsync; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Message; + +/** + * A base class that holds the mechanics for representing + * chained interceptors and for driving processing up and + * down the chain. + * + */ +public abstract class InterceptorAsyncImpl implements InterceptorAsync { + + protected Invoker next; + protected InvokerAsyncResponse previous; + + public Invoker getNext() { + return (Invoker)next; + } + + public void setNext(Invoker next) { + this.next = next; + } + + public InvokerAsyncResponse getPrevious() { + return previous; + } + + public void setPrevious(InvokerAsyncResponse previous) { + this.previous = previous; + } + + public Message invoke(Message msg) { + msg = processRequest(msg); + Message resultMsg = getNext().invoke(msg); + resultMsg = processResponse(resultMsg); + return resultMsg; + } + + public void invokeAsyncRequest(Message msg) throws Throwable { + try{ + msg = processRequest(msg); + InvokerAsyncRequest theNext = (InvokerAsyncRequest)getNext(); + if( theNext != null ) theNext.invokeAsyncRequest(msg); + postProcessRequest(msg); + } catch (Throwable e) { + postProcessRequest(msg, e); + } // end try + } // end method invokeAsyncRequest + + public void invokeAsyncResponse(Message msg) { + msg = processResponse(msg); + InvokerAsyncResponse thePrevious = (InvokerAsyncResponse)getPrevious(); + if (thePrevious != null ) thePrevious.invokeAsyncResponse(msg); + } // end method invokeAsyncResponse + + /** + * Basic null version of postProcessRequest - subclasses should override for any required + * real processing + */ + public Message postProcessRequest(Message msg) { + // Default processing is to do nothing + return msg; + } // end method postProcessRequest + + /** + * Basic null version of postProcessRequest - subclasses should override for any required + * real processing + * @throws Throwable + */ + public Message postProcessRequest(Message msg, Throwable e) throws Throwable { + // Default processing is to rethrow the exception + throw e; + } // end method postProcessRequest + + + /** + * A testing method while I use the local SCA binding wire to look + * at how the async response path works. This allows me to detect the + * point where the reference wire turns into the service with in the + * optimized case + * + * @return + */ + public boolean isLocalSCABIndingInvoker() { + return false; + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKAsyncResponseInvoker.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKAsyncResponseInvoker.java new file mode 100644 index 0000000000..60958ed6f9 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKAsyncResponseInvoker.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; + +public interface JDKAsyncResponseInvoker extends InvokerAsyncResponse { + + /** + * Registers an Async response, which provides an ID which identifies a given response + * and an object which can handle the response + * @param id - the ID + * @param responseHandler - the response handler object + */ + public void registerAsyncResponse( String id, Object responseHandler ); + + /** + * Returns the registered async response for a given ID + * @param id - the ID + * @return responseHandler - the response handler object + */ + public Object getAsyncResponse( String id ); + +} // end interface JDKAsyncResponseInvoker diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/NonBlockingInterceptor.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/NonBlockingInterceptor.java new file mode 100644 index 0000000000..7108728206 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/NonBlockingInterceptor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.work.WorkScheduler; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * Adds non-blocking behavior to an invocation chain + * + * @version $Rev$ $Date$ + */ +public class NonBlockingInterceptor extends InterceptorAsyncImpl { + + private static final Message RESPONSE = new ImmutableMessage(); + + /** + * The JDK logger that will be used to log messages. + */ + private static final Logger LOGGER = Logger.getLogger(NonBlockingInterceptor.class.getName()); + + private WorkScheduler workScheduler; + + public NonBlockingInterceptor(WorkScheduler workScheduler) { + this.workScheduler = workScheduler; + } + + public NonBlockingInterceptor(WorkScheduler workScheduler, Interceptor next) { + this.workScheduler = workScheduler; + this.next = next; + } + + /** + * Sets desired workScheduler to NonBlockingInterceptor. This is a useful function for the extension framework + * to set desired workmanager on the InvocationChain, other than default workmanager which is set per Tuscany runtime. + * Using this function, extension framework can set desired workmanager on InvocationChain during post wire processing. + * @param workScheduler workScheduler which contains workmanager + */ + public void setWorkScheduler(WorkScheduler workScheduler){ + this.workScheduler = workScheduler; + } + + /** + * For request/response messages use the workScheduler to break the connection between + * requests and the void response + */ + @Override + public Message invoke(final Message msg) { + // Schedule the invocation of the next interceptor in a new Work instance + try { + workScheduler.scheduleWork(new Runnable() { + public void run() { + Message context = ThreadMessageContext.setMessageContext(msg); + try { + Message response = null; + + Throwable ex = null; + try { + response = next.invoke(msg); + } catch (Throwable t) { + ex = t; + } + + // Tuscany-2225 - Did the @OneWay method complete successfully? + // (i.e. no exceptions) + if (response != null && response.isFault()) { + // The @OneWay method threw an Exception. Lets log it and + // then pass it on to the WorkScheduler so it can notify any + // listeners + ex = (Throwable)response.getBody(); + } + if (ex != null) { + LOGGER.log(Level.SEVERE, "Exception from @OneWay invocation", ex); + throw new ServiceRuntimeException("Exception from @OneWay invocation", ex); + } + } finally { + ThreadMessageContext.setMessageContext(context); + } + } + }); + } catch (Exception e) { + throw new ServiceRuntimeException(e); + } + return RESPONSE; + } + + /** + * For forward async responses we just pass the message along + * as this is naturally one way + */ + public Message processRequest(Message msg) { + return msg; + } + + /** + * This should never be called as a one way message won't + * expect a response + */ + public Message processResponse(Message msg) { + return null; + } + + /** + * A dummy message passed back on an invocation + */ + private static class ImmutableMessage implements Message { + + public T getBody() { + return null; + } + + public void setBody(Object body) { + if (body != null) { + throw new UnsupportedOperationException(); + } + } + + public Object getMessageID() { + return null; + } + + public void setMessageID(Object messageId) { + throw new UnsupportedOperationException(); + } + + public boolean isFault() { + return false; + } + + public void setFaultBody(Object fault) { + throw new UnsupportedOperationException(); + } + + public EndpointReference getFrom() { + return null; + } + + public Endpoint getTo() { + return null; + } + + public void setFrom(EndpointReference from) { + throw new UnsupportedOperationException(); + } + + public void setTo(Endpoint to) { + throw new UnsupportedOperationException(); + } + + public Operation getOperation() { + return null; + } + + public void setOperation(Operation op) { + throw new UnsupportedOperationException(); + } + + public Map getHeaders() { + return null; + } + public T getBindingContext() { + return null; + } + + public void setBindingContext(T bindingContext) { + } + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyCreationException.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyCreationException.java new file mode 100644 index 0000000000..0b36b178f3 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyCreationException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import org.apache.tuscany.sca.core.factory.ObjectCreationException; + + +/** + * Denotes an error creating a proxy + * + * @version $Rev$ $Date$ + */ +public class ProxyCreationException extends ObjectCreationException { + private static final long serialVersionUID = 8002454344828513781L; + + public ProxyCreationException() { + super(); + } + + public ProxyCreationException(String message, Throwable cause) { + super(message, cause); + } + + public ProxyCreationException(String message) { + super(message); + } + + public ProxyCreationException(Throwable cause) { + super(cause); + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyFactory.java new file mode 100644 index 0000000000..8d9612315a --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyFactory.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import java.util.List; + +import org.apache.tuscany.sca.runtime.Invocable; +import org.oasisopen.sca.ServiceReference; + +/** + * Creates proxies that implement Java interfaces and invocation handlers for fronting wires + * + * @version $Rev$ $Date$ + * @tuscany.spi.extension.asclient + */ + +public interface ProxyFactory { + + /** + * Creates a Java proxy for the given wire + * + * @param interfaze the interface the proxy implements + * @param invocable the wire to proxy + * @return the proxy + * @throws ProxyCreationException + */ + T createProxy(Class interfaze, Invocable invocable) throws ProxyCreationException; + + /** + * Creates a Java proxy for the given CallableReference + * + * @param callableReference The CallableReference + * @return the proxy + * @throws ProxyCreationException + */ + T createProxy(ServiceReference callableReference) throws ProxyCreationException; + + /** + * Creates a Java proxy for the service contract callback + * + * @param interfaze the interface the proxy should implement + * @return the proxy + * @throws ProxyCreationException + */ + T createCallbackProxy(Class interfaze, List invocables) throws ProxyCreationException; + + /** + * Creates a Java proxy for the given callback reference + * + * @param callableReference The CallableReference + * @return the proxy + * @throws ProxyCreationException + */ + T createCallbackProxy(ServiceReference callbackReference) throws ProxyCreationException; + + /** + * Cast a proxy to a CallableReference. + * + * @param target a proxy generated by this implementation + * @return a CallableReference (or subclass) equivalent to this proxy + * @throws IllegalArgumentException if the object supplied is not a proxy + */ + > R cast(B target) throws IllegalArgumentException; + + /** + * Test if a given class is a generated proxy class by this factory + * @param clazz A java class or interface + * @return true if the class is a generated proxy class by this factory + */ + boolean isProxyClass(Class clazz); + + /** + * Allow cached proxies to be removed when a contribution is removed. The proxy + * cache holds the application interface so will pin the contribution classloader + * + * @param contributionClassloader the classloader of the contribution being removed + */ + void removeProxiesForContribution(ClassLoader contributionClassloader); + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyFactoryExtensionPoint.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyFactoryExtensionPoint.java new file mode 100644 index 0000000000..3e0bd7cb17 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/ProxyFactoryExtensionPoint.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + + +/** + * The extension point to plug in proxy factories + * @version $Rev$ $Date$ + * @tuscany.spi.extension.asclient + */ +public interface ProxyFactoryExtensionPoint { + + /** + * Get the proxy factory for java interfaces + * @return + */ + ProxyFactory getInterfaceProxyFactory(); + + /** + * Get the proxy factory for java classes + * @return + */ + ProxyFactory getClassProxyFactory(); + + /** + * Set the proxy factory for java interfaces + * @param factory + */ + void setInterfaceProxyFactory(ProxyFactory factory); + + /** + * Set the proxy factory for java classes + * @param factory + */ + void setClassProxyFactory(ProxyFactory factory); + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java new file mode 100644 index 0000000000..8f093d62ed --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation; + +import java.lang.reflect.InvocationTargetException; +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.apache.tuscany.sca.work.WorkScheduler; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * Invoker for a endpoint or endpoint reference + * @version $Rev$ $Date$ + */ +public class RuntimeInvoker implements Invoker, InvokerAsyncRequest { + protected ExtensionPointRegistry registry; + protected MessageFactory messageFactory; + protected Invocable invocable; + + // Run async service invocations using a ThreadPoolExecutor + private ExecutorService theExecutor; + + public RuntimeInvoker(ExtensionPointRegistry registry, Invocable invocable) { + this.registry = registry; + this.messageFactory = registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(MessageFactory.class); + this.invocable = invocable; + + UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class); + WorkScheduler scheduler = utilities.getUtility(WorkScheduler.class); + theExecutor = scheduler.getExecutorService(); + } + + public Message invokeBinding(Message msg) { + Message context = ThreadMessageContext.setMessageContext(msg); + try { + return invocable.getBindingInvocationChain().getHeadInvoker().invoke(msg); + } finally { + ThreadMessageContext.setMessageContext(context); + } + } // end method invokeBinding + + /** + * Async Invoke of the Binding Chain + * @param msg - the message to use in the invocation + */ + public void invokeBindingAsync(Message msg) { + Message context = ThreadMessageContext.setMessageContext(msg); + try { + ((InvokerAsyncRequest)invocable.getBindingInvocationChain().getHeadInvoker()).invokeAsyncRequest(msg); + } catch (Throwable t ) { + // TODO - consider what best to do with exception + t.printStackTrace(); + } finally { + ThreadMessageContext.setMessageContext(context); + } // end try + } // end method invokeBindingAsync + + public Message invoke(Message msg) { + return invoke(msg.getOperation(), msg); + } + + public Object invoke(Operation operation, Object[] args) throws InvocationTargetException { + Message msg = messageFactory.createMessage(); + msg.setBody(args); + Message resp = invoke(operation, msg); + Object body = resp.getBody(); + if (resp.isFault()) { + throw new InvocationTargetException((Throwable)body); + } + return body; + } + + + public Message invoke(Operation operation, Message msg) { + InvocationChain chain = invocable.getInvocationChain(operation); + return invoke(chain, msg); + } + + public Message invoke(InvocationChain chain, Message msg) { + + if (invocable instanceof Endpoint) { + msg.setTo((Endpoint)invocable); + } else if (invocable instanceof EndpointReference) { + msg.setFrom((EndpointReference)invocable); + } + + Invoker headInvoker = chain.getHeadInvoker(); + Operation operation = chain.getTargetOperation(); + msg.setOperation(operation); + + Message msgContext = ThreadMessageContext.setMessageContext(msg); + try { + return headInvoker.invoke(msg); + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } + } + + /** + * Initiate the sending of the forward part of an asynchronous + * exchange along the request part of the wire. + * + * @param msg the request message + */ + public void invokeAsync(Message msg) { + if (invocable instanceof Endpoint) { + Endpoint ep = (Endpoint)invocable; + msg.setTo(ep); + if (!ep.isAsyncInvocation()){ + throw new ServiceRuntimeException("Calling invokeAsync on a non-async endpoint - " + + ep); + } + } else if (invocable instanceof EndpointReference) { + RuntimeEndpointReference epr = (RuntimeEndpointReference)invocable; + if (!epr.isAsyncInvocation()){ + throw new ServiceRuntimeException("Calling invokeAsync on a non-async endpoint reference - " + + epr); + } + if (epr.isOutOfDate()) { + epr.rebuild(); + } + msg.setFrom(epr); + msg.setTo(epr.getTargetEndpoint()); + } + + Operation operation = msg.getOperation(); + InvocationChain chain = invocable.getInvocationChain(operation); + + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + operation.getName()); + } + + // create an async message ID if there isn't one there already + if (!msg.getHeaders().containsKey(Constants.MESSAGE_ID)){ + msg.getHeaders().put(Constants.MESSAGE_ID, UUID.randomUUID().toString());UUID.randomUUID().toString(); + } + + // Perform the async invocation + Invoker headInvoker = chain.getHeadInvoker(); + + Message msgContext = ThreadMessageContext.setMessageContext(msg); + try { + try { + ((InvokerAsyncRequest)headInvoker).invokeAsyncRequest(msg); + } catch (ServiceRuntimeException ex) { + throw ex; + } catch (Throwable ex) { + // temporary fix to swallow the dummy exception that's + // thrown back to get past the response chain processing. + if (!(ex instanceof AsyncResponseException)){ + throw new ServiceRuntimeException(ex); + } + } + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } + + return; + } + + /** + * Initiate the sending of the response part of an asynchronous + * exchange along the response part of the wire. + * + * @param msg the response message + */ + public void invokeAsyncResponse(Message msg) { + InvocationChain chain = invocable.getInvocationChain(msg.getOperation()); + Invoker tailInvoker = chain.getTailInvoker(); + ((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg); + } // end method invokeAsyncResponse + + @Override + public void invokeAsyncRequest(Message msg) throws Throwable { + invokeAsync(msg); + } // end method invokeAsyncRequest +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/WireObjectFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/WireObjectFactory.java new file mode 100644 index 0000000000..489d06a3e1 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/WireObjectFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation; + +import org.apache.tuscany.sca.core.context.impl.ServiceReferenceImpl; +import org.apache.tuscany.sca.core.factory.ObjectCreationException; +import org.apache.tuscany.sca.core.factory.ObjectFactory; +import org.apache.tuscany.sca.core.invocation.impl.NoMethodForOperationException; +import org.apache.tuscany.sca.runtime.Invocable; + +/** + * Uses a wire to return an object instance + * + * @version $Rev$ $Date$ + * @tuscany.spi.extension.asclient + */ +public class WireObjectFactory implements ObjectFactory { + private Class interfaze; + private Invocable wire; + private ProxyFactory proxyService; + + /** + * Constructor. + * + * @param interfaze the interface to inject on the client + * @param wire the backing wire + * @param proxyService the wire service to create the proxy + * @throws NoMethodForOperationException + */ + public WireObjectFactory(Class interfaze, Invocable wire, ProxyFactory proxyService) { + this.interfaze = interfaze; + this.wire = wire; + this.proxyService = proxyService; + } + + public T getInstance() throws ObjectCreationException { + return new ServiceReferenceImpl(interfaze, wire, null).getProxy(); + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java new file mode 100644 index 0000000000..66b9516738 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.xml.ws.AsyncHandler; +import javax.xml.ws.Response; + +import org.apache.tuscany.sca.core.invocation.AsyncContext; +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; + +/** + * A class which provides an Implementation of a Future and Response for use with the JAXWS defined client + * asynchronous APIs. + * + * This implementation class provides the interfaces for use by the client code, but also provides methods for the + * Tuscany system code to set the result of the asynchronous service invocation, both Regular and Fault responses. + * + * This class is constructed to be fully thread-safe + * + * @param - this is the type of the response message from the invoked service. + */ +public class AsyncInvocationFutureImpl implements Future, Response, AsyncContext, AsyncResponseHandler { + + // Lock for handling the completion of this Future + private final Lock lock = new ReentrantLock(); + private final Condition isDone = lock.newCondition(); + + // The result + private volatile V response = null; + private volatile Throwable fault = null; + + private String uniqueID = UUID.randomUUID().toString(); + + private Class businessInterface = null; + private AsyncHandler callback; + + private Map attributes = new HashMap(); + + protected AsyncInvocationFutureImpl() { + super(); + } // end constructor + + /** + * Public constructor for AsyncInvocationFutureImpl - newInstance is necessary in order to enable the Type variable + * to be set for the class instances + * @param - the type of the response from the asynchronously invoked service + * @param type - the type of the AsyncInvocationFutureImpl expressed as a parameter + * @param classLoader - the classloader used for the business interface to which this Future applies + * @return - an instance of AsyncInvocationFutureImpl + */ + public static AsyncInvocationFutureImpl newInstance( Class type, Class businessInterface ) { + AsyncInvocationFutureImpl future = new AsyncInvocationFutureImpl(); + future.setBusinessInterface( businessInterface ); + return future; + } + + /** + * Cancels the asynchronous process + * - not possible in this version, so always returns false + */ + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + /** + * Gets the response value returned by the asynchronous process + * - waits forever + * @return - the response value of type V + * @throws InterruptedException if the get() method was interrupted while waiting for the async process to finish + * @throws ExecutionException if the async process threw an exception - the exception thrown is nested + */ + public V get() throws InterruptedException, ExecutionException { + try { + V response = get(Long.MAX_VALUE, TimeUnit.SECONDS); + return response; + } catch (TimeoutException t) { + throw new InterruptedException("Timed out waiting for Future to complete"); + } // end try + } // end method get() + + /** + * Gets the response value returned by the asynchronous process + * @return - the response value of type V + * @throws InterruptedException if the get() method was interrupted while waiting for the async process to finish + * @throws ExecutionException if the async process threw an exception - the exception thrown is nested + * @throws TimeoutException if the get() method timed out waiting for the async process to finish + */ + public V get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + lock.lock(); + try { + // wait for result to be available + if( notSetYet() ) isDone.await( timeout, unit); + if( response != null ) return response; + if( fault != null ) throw new ExecutionException( fault ); + throw new TimeoutException("get on this Future timed out"); + } finally { + lock.unlock(); + } // end try + + } // end method get(long timeout, TimeUnit unit) + + /** + * Indicates if the asynchronous process has been cancelled + * - not possible in this version so always returns false + */ + public boolean isCancelled() { + return false; + } + + /** + * Indicates if the asynchronous process is completed + * @return - true if the process is completed, false otherwise + */ + public boolean isDone() { + lock.lock(); + try { + return !notSetYet(); + } finally { + lock.unlock(); + } // end try + } // end method isDone + + /** + * Async process completed with a Fault. Must only be invoked once. + * @param e - the Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + public void setFault( Throwable e ) { + lock.lock(); + try { + if( notSetYet() ) { + fault = e; + isDone.signalAll(); + } else { + throw new IllegalStateException("setResponse() or setFault() has been called previously"); + } // end if + } finally { + lock.unlock(); + } // end try + if (callback != null) { + callback.handleResponse(this); + } + } // end method setFault( Throwable ) + + /** + * Async process completed with a wrapped Fault. Must only be invoked once. + * @param w - the wrapped Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + public void setWrappedFault(AsyncFaultWrapper w) { + + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + Throwable e; + try { + // Set the TCCL to the classloader of the business interface + Thread.currentThread().setContextClassLoader(this.getBusinessInterface().getClassLoader()); + e = w.retrieveFault(); + } finally { + Thread.currentThread().setContextClassLoader(tccl); + } // end try + + if( e == null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception"); + setFault( e ); + + } // end method setFault( AsyncFaultWrapper ) + + /** + * Async process completed with a response message. Must only be invoked once + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + * @param res - the response message, which is of type V + */ + public void setResponse(V res) { + + lock.lock(); + try { + if( notSetYet() ) { + response = res; + isDone.signalAll(); + } else { + throw new IllegalStateException("setResponse() or setFault() has been called previously"); + } + } finally { + lock.unlock(); + } // end try + if (callback != null) { + callback.handleResponse(this); + } + + } // end method setResponse + + /** + * Gets the unique ID of this future as a String + */ + public String getUniqueID() { return uniqueID; } + + /** + * Indicates that setting a response value is OK - can only set the response value or fault once + * @return - true if it is OK to set the response, false otherwise + */ + private boolean notSetYet() { + return ( response == null && fault == null ); + } + + /** + * Returns the JAXWS context for the response + * @return - a Map containing the context + */ + public Map getContext() { + // Intentionally returns null + return null; + } + + /** + * Gets the business interface to which this Future relates + * @return the business interface + */ + public Class getBusinessInterface() { + return businessInterface; + } + + /** + * Sets the business interface to which this Future relates + * @param classLoader - the classloader of the business interface + */ + public void setBusinessInterface(Class businessInterface) { + this.businessInterface = businessInterface; + } + + /** + * Sets the callback handler, when the client uses the async callback method + * @param callback - the client's callback object + */ + public void setCallback(AsyncHandler callback) { + this.callback = callback; + } + + /** + * Look up an attribute value by name. + * @param name The name of the attribute + * @return The value of the attribute + */ + public Object getAttribute(String name) { + return attributes.get(name); + } + + /** + * Set the value of an attribute. Allows extensions to associate other data with an async response. + * @param name The name of the attribute + * @param value + */ + public void setAttribute(String name, Object value) { + attributes.put(name, value); + } + +} // end class AsyncInvocationFutureImpl diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java new file mode 100644 index 0000000000..dc5738af96 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.io.StringReader; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.transform.stream.StreamSource; +import javax.xml.ws.AsyncHandler; +import javax.xml.ws.Response; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.assembly.ComponentService; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.Implementation; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.assembly.builder.BindingBuilder; +import org.apache.tuscany.sca.assembly.builder.BuilderContext; +import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint; +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.context.CompositeContext; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.contribution.processor.ContributionReadException; +import org.apache.tuscany.sca.contribution.processor.ProcessorContext; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint; +import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseException; +import org.apache.tuscany.sca.core.invocation.AsyncResponseService; +import org.apache.tuscany.sca.core.invocation.JDKAsyncResponseInvoker; +import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; +import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; +import org.apache.tuscany.sca.interfacedef.util.FaultException; +import org.apache.tuscany.sca.interfacedef.util.WrapperInfo; +import org.apache.tuscany.sca.invocation.InterceptorAsync; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.policy.Intent; +import org.apache.tuscany.sca.provider.EndpointReferenceAsyncProvider; +import org.apache.tuscany.sca.provider.PolicyProvider; +import org.apache.tuscany.sca.provider.ReferenceBindingProvider; +import org.apache.tuscany.sca.provider.ServiceBindingProvider; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.apache.tuscany.sca.work.WorkScheduler; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * An InvocationHandler which deals with JAXWS-defined asynchronous client Java API method calls + * + * 2 asynchronous mappings exist for any given synchronous service operation, as shown in this example: + * public interface StockQuote { + * float getPrice(String ticker); + * Response getPriceAsync(String ticker); + * Future getPriceAsync(String ticker, AsyncHandler handler); + * } + * + * - the second method is called the "polling method", since the returned Response object permits + * the client to poll to see if the async call has completed + * - the third method is called the "async callback method", since in this case the client application can specify + * a callback operation that is automatically called when the async call completes + */ +public class AsyncJDKInvocationHandler extends JDKInvocationHandler { + + private static final long serialVersionUID = 1L; + + // Run the async service invocations using a WorkScheduler + private WorkScheduler scheduler; + + public AsyncJDKInvocationHandler(ExtensionPointRegistry registry, + MessageFactory messageFactory, + ServiceReference callableReference ) { + super(messageFactory, callableReference); + initWorkScheduler(registry); + } + + public AsyncJDKInvocationHandler(ExtensionPointRegistry registry, + MessageFactory messageFactory, + Class businessInterface, + Invocable source ) { + super(messageFactory, businessInterface, source); + initWorkScheduler(registry); + } + + private final void initWorkScheduler(ExtensionPointRegistry registry) { + UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class); + scheduler = utilities.getUtility(WorkScheduler.class); + } // end method initWorkScheduler + + /** + * Perform the invocation of the operation + * - provides support for all 3 forms of client method: synchronous, polling and async callback + */ + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + if (Object.class == method.getDeclaringClass()) { + return invokeObjectMethod(method, args); + } + + // force the bind of the reference so that we can look at the + // target contract to see if it's asynchronous + source.getInvocationChains(); + + if (isAsyncCallback(method)) { + return doInvokeAsyncCallback(proxy, method, args); + } else if (isAsyncPoll(method)) { + return doInvokeAsyncPoll(proxy, method, args, null); + } else { + // Regular synchronous method call + return doInvokeSync(proxy, method, args); + } + } + + /** + * Indicates if a supplied method has the form of an async callback method + * @param method - the method + * @return - true if the method has the form of an async callback + */ + protected boolean isAsyncCallback(Method method) { + if (method.getName().endsWith("Async") && (method.getReturnType() == Future.class)) { + if (method.getParameterTypes().length > 0) { + return method.getParameterTypes()[method.getParameterTypes().length - 1] == AsyncHandler.class; + } + } + return false; + } + + /** + * Indicates is a supplied method has the form of an async polling method + * @param method - the method + * @return - true if the method has the form of an async polling method + */ + protected boolean isAsyncPoll(Method method) { + return method.getName().endsWith("Async") && (method.getReturnType() == Response.class); + } + + /** + * Invoke an async polling method + * @param proxy - the reference proxy + * @param asyncMethod - the async method to invoke + * @param args - array of input arguments to the method + * @return - the Response object that is returned to the client application, typed by the + * type of the response + */ + @SuppressWarnings("unchecked") + protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args, AsyncHandler callback) { + Method method = getNonAsyncMethod(asyncMethod); + Class returnType = method.getReturnType(); + // Allocate the Future / Response object - note: Response is a subclass of Future + AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance(returnType, businessInterface); + if (callback != null) + future.setCallback(callback); + try { + invokeAsync(proxy, method, args, future, asyncMethod); + } catch (Throwable t) { + // invokeAsync schedules a separate Runnable to run the request. Any exception caught here + // is a runtime exception, not an application exception. + if (!(t instanceof ServiceRuntimeException)) { + t = new ServiceRuntimeException("Received Throwable: " + t.getClass().getName() + + " when invoking: " + + asyncMethod.getName(), t); + } + future.setFault(t); + } // end try + return future; + } // end method doInvokeAsyncPoll + + /** + * Provide a synchronous invocation of a service operation that is either synchronous or asynchronous + * @return + */ + protected Object doInvokeSync(Object proxy, Method method, Object[] args) throws Throwable { + if (isAsyncInvocation(source)) { + // Target service is asynchronous + Class returnType = method.getReturnType(); + AsyncInvocationFutureImpl future = + AsyncInvocationFutureImpl.newInstance(returnType, businessInterface); + invokeAsync(proxy, method, args, future, method); + // Wait for some maximum time for the result - 120 seconds here + // Really, if the service is async, the client should use async client methods to invoke the service + // - and be prepared to wait a *really* long time + Object response = null; + try { + response = future.get(120, TimeUnit.SECONDS); + } catch (ExecutionException ex) { + throw ex.getCause(); + } + return response; + } else { + // Target service is not asynchronous, so perform sync invocation + return super.invoke(proxy, method, args); + } // end if + } // end method doInvokeSync + + /** + * Invoke an async callback method - note that this form of the async client API has as its final parameter + * an AsyncHandler method, used for callbacks to the client code + * @param proxy - the reference proxy + * @param asyncMethod - the async method to invoke + * @param args - array of input arguments to the method + * @return - the Future object that is returned to the client application, typed by the type of + * the response + */ + @SuppressWarnings("unchecked") + private Object doInvokeAsyncCallback(final Object proxy, final Method asyncMethod, final Object[] args) + throws Exception { + + AsyncHandler callback = (AsyncHandler)args[args.length - 1]; + Response response = doInvokeAsyncPoll(proxy, asyncMethod, Arrays.copyOf(args, args.length - 1), callback); + return response; + + } // end method doInvokeAsyncCallback + + /** + * Invoke the target (synchronous) method asynchronously + * @param proxy - the reference proxy object + * @param method - the method to invoke + * @param args - arguments for the call + * @param future - Future for handling the response + * @return - returns the response from the invocation + * @throws Throwable - if an exception is thrown during the invocation + */ + @SuppressWarnings("unchecked") + private void invokeAsync(Object proxy, + Method method, + Object[] args, + AsyncInvocationFutureImpl future, + Method asyncMethod) throws Throwable { + if (source == null) { + throw new ServiceRuntimeException("No runtime source is available"); + } + + if (source instanceof RuntimeEndpointReference) { + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + if (epr.isOutOfDate()) { + epr.rebuild(); + chains.clear(); + } + } // end if + + InvocationChain chain = getInvocationChain(method, source); + + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + method); + } + + // Organize for an async service + RuntimeEndpoint theEndpoint = getAsyncCallback(source); + boolean isAsyncService = false; + if (theEndpoint != null) { + // ... the service is asynchronous but binding does not support async natively ... + attachFuture(theEndpoint, future); + } // end if + + if( isAsyncInvocation((RuntimeEndpointReference)source ) ) { + isAsyncService = true; + // Get hold of the JavaAsyncResponseHandler from the chain dealing with the async response + Invoker theInvoker = chain.getHeadInvoker(); + if( theInvoker instanceof InterceptorAsync ) { + InvokerAsyncResponse responseInvoker = ((InterceptorAsync)theInvoker).getPrevious(); + if( responseInvoker instanceof JDKAsyncResponseInvoker ) { + // Register the future as the response object with its ID + ((JDKAsyncResponseInvoker)responseInvoker).registerAsyncResponse(future.getUniqueID(), future); + } // end if + } // end if + } // end if + + // Perform the invocations on separate thread... + scheduler.scheduleWork(new SeparateThreadInvoker(chain, args, source, future, asyncMethod, isAsyncService)); + + return; + } // end method invokeAsync + + /** + * An inner class which acts as a runnable task for invoking services asynchronously on threads that are separate from + * those used to execute operations of components + * + * This supports both synchronous services and asynchronous services + */ + private class SeparateThreadInvoker implements Runnable { + + private AsyncInvocationFutureImpl future; + private Method asyncMethod; + private InvocationChain chain; + private Object[] args; + private Invocable invocable; + private boolean isAsyncService; + + public SeparateThreadInvoker(InvocationChain chain, + Object[] args, + Invocable invocable, + AsyncInvocationFutureImpl future, + Method asyncMethod, + boolean isAsyncService) { + super(); + this.chain = chain; + this.asyncMethod = asyncMethod; + this.args = args; + this.invocable = invocable; + this.future = future; + this.isAsyncService = isAsyncService; + } // end constructor + + public void run() { + Object result; + + try { + if (isAsyncService) { + if( supportsNativeAsync(invocable) ) { + // Binding supports native async invocations + invokeAsync(chain, args, invocable, future.getUniqueID()); + } else { + // Binding does not support native async invocations + invoke(asyncMethod, chain, args, invocable, future.getUniqueID()); + } // end if + // The result is returned asynchronously via the future... + } else { + // ... the service is synchronous ... + result = invoke(asyncMethod, chain, args, invocable); + Type type = null; + if (asyncMethod.getReturnType() == Future.class) { + // For callback async method, where a Future is returned + Type[] types = asyncMethod.getGenericParameterTypes(); + if (types.length > 0 && asyncMethod.getParameterTypes()[types.length - 1] == AsyncHandler.class) { + // Last parameter is AsyncHandler + type = types[types.length - 1]; + } // end if + } else if (asyncMethod.getReturnType() == Response.class) { + // For the polling method, Response + type = asyncMethod.getGenericReturnType(); + } // end if + if (type instanceof ParameterizedType) { + // Check if the parameterized type of Response is a doc-lit-wrapper class + Class wrapperClass = (Class)((ParameterizedType)type).getActualTypeArguments()[0]; + WrapperInfo wrapperInfo = chain.getSourceOperation().getOutputWrapper(); + if (wrapperInfo != null && wrapperInfo.getWrapperClass() == wrapperClass) { + Object wrapper = wrapperClass.newInstance(); + // Find the 1st matching property + for (PropertyDescriptor p : Introspector.getBeanInfo(wrapperClass).getPropertyDescriptors()) { + if (p.getWriteMethod() == null) { + // There is a "class" property ... + continue; + } // end if + if (p.getWriteMethod().getParameterTypes()[0].isInstance(result)) { + p.getWriteMethod().invoke(wrapper, result); + result = wrapper; + break; + } // end if + } // end for + } // end if + } // end if + future.setResponse(result); + } // end if + } catch (ServiceRuntimeException s) { + Throwable e = s.getCause(); + if (e != null && e instanceof FaultException) { + if ("AsyncResponse".equals(e.getMessage())) { + // Do nothing... + } else { + future.setFault(s); + } // end if + } // end if + else { + future.setFault(s); + } + } catch (AsyncResponseException ar) { + // This exception is received in the case where the Binding does not support async invocation + // natively - the initial invocation is effectively synchronous with this exception thrown to + // indicate that the service received the request but will send the response separately - do nothing + } catch (Throwable t) { + //System.out.println("Async invoke got exception: " + t.toString()); + // If we invoked a sync service, this might be an application exception. + // The databinding ensured the exception is type-compatible with the application. + future.setFault(t); + } // end try + + } // end method run + + } // end class SeparateThreadInvoker + + /** + * Attaches a future to the callback endpoint - so that the Future is triggered when a response is + * received from the asynchronous service invocation associated with the Future + * @param endpoint - the async callback endpoint + * @param future - the async invocation future to attach + */ + private void attachFuture(RuntimeEndpoint endpoint, AsyncInvocationFutureImpl future) { + Implementation impl = endpoint.getComponent().getImplementation(); + AsyncResponseHandlerImpl asyncHandler = (AsyncResponseHandlerImpl)impl; + asyncHandler.addFuture(future); + } // end method attachFuture + + /** + * Perform an async invocation on the reference + * @param chain - the chain + * @param args - parameters for the invocation + * @param invocable - the reference + * @param msgID - a message ID + */ + public void invokeAsync(InvocationChain chain, Object[] args, Invocable invocable, String msgID) { + Message msg = messageFactory.createMessage(); + if (invocable instanceof RuntimeEndpointReference) { + msg.setFrom((RuntimeEndpointReference)invocable); + } // end if + if (target != null) { + msg.setTo(target); + } else if (source instanceof RuntimeEndpointReference) { + msg.setTo(((RuntimeEndpointReference)invocable).getTargetEndpoint()); + } // end if + + Operation operation = chain.getTargetOperation(); + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // Deal with header information that needs to be copied from the message context to the new message... + transferMessageHeaders( msg, msgContext); + + ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID" + if( msgID != null ){ + msg.getHeaders().put("MESSAGE_ID", msgID); + } // end if + + try { + // Invoke the reference + invocable.invokeAsync(msg); + return; + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } // end try + } // end method invokeAsync + + /** + * Get the async callback endpoint - if not already created, create and start it + * @param source - the RuntimeEndpointReference which needs an async callback endpoint + * @param future + * @return - the RuntimeEndpoint of the async callback + */ + private RuntimeEndpoint getAsyncCallback(Invocable source) { + if (!(source instanceof RuntimeEndpointReference)) + return null; + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + if (!isAsyncInvocation(epr)) return null; + + // Check to see if the binding supports async invocation natively + ReferenceBindingProvider eprProvider = epr.getBindingProvider(); + if( eprProvider instanceof EndpointReferenceAsyncProvider) { + if( ((EndpointReferenceAsyncProvider)eprProvider).supportsNativeAsync() ) return null; + } // end if + + RuntimeEndpoint endpoint; + synchronized (epr) { + endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint(); + // If the async callback endpoint is already created, return it... + if (endpoint != null) + return endpoint; + // Create the endpoint for the async callback + endpoint = createAsyncCallbackEndpoint(epr); + epr.setCallbackEndpoint(endpoint); + } // end synchronized + + // Activate the new callback endpoint + startEndpoint(epr.getCompositeContext(), endpoint); + endpoint.getInvocationChains(); + + return endpoint; + } // end method setupAsyncCallback + + /** + * Start the callback endpoint + * @param compositeContext - the composite context + * @param ep - the endpoint to start + */ + private void startEndpoint(CompositeContext compositeContext, RuntimeEndpoint ep) { + for (PolicyProvider policyProvider : ep.getPolicyProviders()) { + policyProvider.start(); + } // end for + + final ServiceBindingProvider bindingProvider = ep.getBindingProvider(); + if (bindingProvider != null) { + // Allow bindings to add shutdown hooks. Requires RuntimePermission shutdownHooks in policy. + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + bindingProvider.start(); + return null; + } + }); + compositeContext.getEndpointRegistry().addEndpoint(ep); + } + } // end method startEndpoint + + /** + * Create the async callback endpoint for a reference that is going to invoke an asyncInvocation service + * @param epr - the RuntimeEndpointReference for which the callback is created + * @return - a RuntimeEndpoint representing the callback endpoint + */ + private RuntimeEndpoint createAsyncCallbackEndpoint(RuntimeEndpointReference epr) { + CompositeContext compositeContext = epr.getCompositeContext(); + RuntimeAssemblyFactory assemblyFactory = getAssemblyFactory(compositeContext); + RuntimeEndpoint endpoint = (RuntimeEndpoint)assemblyFactory.createEndpoint(); + endpoint.bind(compositeContext); + + // Create a pseudo-component and pseudo-service + // - need to end with a chain with an invoker into the AsyncCallbackHandler class + RuntimeComponent fakeComponent = null; + try { + fakeComponent = (RuntimeComponent)epr.getComponent().clone(); + applyImplementation(fakeComponent); + } catch (CloneNotSupportedException e2) { + // will not happen + } // end try + endpoint.setComponent(fakeComponent); + + // Create pseudo-service + ComponentService service = assemblyFactory.createComponentService(); + ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry(); + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + JavaInterfaceFactory javaInterfaceFactory = + (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class); + JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract(); + try { + interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseService.class)); + } catch (InvalidInterfaceException e1) { + // Nothing to do here - will not happen + } // end try + service.setInterfaceContract(interfaceContract); + String serviceName = epr.getReference().getName() + "_asyncCallback"; + service.setName(serviceName); + // MJE 06/12/2010 - fixup for JMS binding code which looks at the implementation service + // as well as the component service... + // Create a pseudo implementation service... + Service implService = assemblyFactory.createService(); + implService.setName(serviceName); + implService.setInterfaceContract(interfaceContract); + service.setService(implService); + // + endpoint.setService(service); + // Set pseudo-service onto the pseudo-component + List services = fakeComponent.getServices(); + services.clear(); + services.add(service); + + // Create a binding + Binding binding = createMatchingBinding(epr.getBinding(), fakeComponent, service, registry); + endpoint.setBinding(binding); + + // Need to establish policies here (binding has some...) + endpoint.getRequiredIntents().addAll(epr.getRequiredIntents()); + endpoint.getPolicySets().addAll(epr.getPolicySets()); + String epURI = epr.getComponent().getName() + "#service-binding(" + serviceName + "/" + serviceName + ")"; + endpoint.setURI(epURI); + endpoint.setUnresolved(false); + return endpoint; + } + + /** + * Create a matching binding to a supplied binding + * - the matching binding has the same binding type, but is for the supplied component and service + * @param matchBinding - the binding to match + * @param component - the component + * @param service - the service + * @param registry - registry for extensions + * @return - the matching binding, or null if it could not be created + */ + @SuppressWarnings("unchecked") + private Binding createMatchingBinding(Binding matchBinding, + RuntimeComponent component, + ComponentService service, + ExtensionPointRegistry registry) { + // Since there is no simple way to obtain a Factory for a binding where the type is not known ahead of + // time, the process followed here is to generate the XML element from the binding type QName + // and then read the XML using the processor for that XML... + QName bindingName = matchBinding.getType(); + String bindingXML = + ""; + + StAXArtifactProcessorExtensionPoint processors = + registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); + StAXArtifactProcessor processor = (StAXArtifactProcessor)processors.getProcessor(bindingName); + + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + ValidatingXMLInputFactory inputFactory = modelFactories.getFactory(ValidatingXMLInputFactory.class); + StreamSource source = new StreamSource(new StringReader(bindingXML)); + + ProcessorContext context = new ProcessorContext(); + try { + XMLStreamReader reader = inputFactory.createXMLStreamReader(source); + reader.next(); + Binding newBinding = (Binding)processor.read(reader, context); + + // Create a URI address for the callback based on the Component_Name/Reference_Name pattern + String callbackURI = "/" + component.getName() + "/" + service.getName(); + newBinding.setURI(callbackURI); + + BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class); + BindingBuilder builder = builders.getBindingBuilder(newBinding.getType()); + if (builder != null) { + org.apache.tuscany.sca.assembly.builder.BuilderContext builderContext = new BuilderContext(registry); + builder.build(component, service, newBinding, builderContext, true); + } // end if + + return newBinding; + } catch (ContributionReadException e) { + e.printStackTrace(); + } catch (XMLStreamException e) { + e.printStackTrace(); + } + + return null; + } // end method createMatchingBinding + + /** + * Gets a RuntimeAssemblyFactory from the CompositeContext + * @param compositeContext + * @return the RuntimeAssemblyFactory + */ + private RuntimeAssemblyFactory getAssemblyFactory(CompositeContext compositeContext) { + ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry(); + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class); + } // end method RuntimeAssemblyFactory + + /** + * Applies an AsyncResponseHandlerImpl as the implementation of a RuntimeComponent + * - the AsyncResponseHandlerImpl acts as both the implementation class and the implementation provider... + * @param component - the component + */ + private void applyImplementation(RuntimeComponent component) { + AsyncResponseHandlerImpl asyncHandler = new AsyncResponseHandlerImpl(); + component.setImplementation(asyncHandler); + component.setImplementationProvider(asyncHandler); + return; + } // end method getImplementationProvider + + private static QName ASYNC_INVOKE = new QName(Constants.SCA11_NS, "asyncInvocation"); + + /** + * Determines if the service invocation is asynchronous + * @param source - the EPR involved in the invocation + * @return - true if the invocation is async + */ + private boolean isAsyncInvocation(Invocable source) { + if (!(source instanceof RuntimeEndpointReference)) + return false; + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + // First check is to see if the EPR itself has the asyncInvocation intent marked + for (Intent intent : epr.getRequiredIntents()) { + if (intent.getName().equals(ASYNC_INVOKE)) + return true; + } // end for + + // Second check is to see if the target service has the asyncInvocation intent marked + Endpoint ep = epr.getTargetEndpoint(); + for (Intent intent : ep.getRequiredIntents()) { + if (intent.getName().equals(ASYNC_INVOKE)) + return true; + } // end for + return false; + } // end isAsyncInvocation + + private boolean supportsNativeAsync(Invocable source) { + if (!(source instanceof RuntimeEndpointReference)) + return false; + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + + // TODO - need to update this once BindingProvider interface is refactored to contain + // supportsNativeAsync directly... + ReferenceBindingProvider provider = epr.getBindingProvider(); + if( provider instanceof EndpointReferenceAsyncProvider ) { + return ((EndpointReferenceAsyncProvider)provider).supportsNativeAsync(); + } else { + return false; + } // end if + } // end method supportsNativeAsync + + /** + * Return the synchronous method that is the equivalent of an async method + * @param asyncMethod - the async method + * @return - the equivalent synchronous method + */ + protected Method getNonAsyncMethod(Method asyncMethod) { + String methodName = asyncMethod.getName().substring(0, asyncMethod.getName().length() - 5); + for (Method m : businessInterface.getMethods()) { + if (methodName.equals(m.getName())) { + return m; + } + } + throw new IllegalStateException("No synchronous method matching async method " + asyncMethod.getName()); + } // end method getNonAsyncMethod +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java new file mode 100644 index 0000000000..7b459f3e7d --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.xml.ws.Response; + +public class AsyncResponse implements Response { + + private Object response; + private boolean isException; + + public AsyncResponse(Object response, boolean isException) { + this.response = response; + this.isException = isException; + } + + public Map getContext() { + return new HashMap(); + } + + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + public Object get() throws InterruptedException, ExecutionException { + if (isException) { + throw new ExecutionException((Throwable)response); + } else { + return response; + } + } + + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return get(); + } + + public boolean isCancelled() { + return false; + } + + public boolean isDone() { + return true; + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java new file mode 100644 index 0000000000..9de1809200 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import javax.xml.namespace.QName; + +import org.apache.tuscany.sca.assembly.Implementation; +import org.apache.tuscany.sca.assembly.Property; +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.Constants; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.policy.ExtensionType; +import org.apache.tuscany.sca.policy.Intent; +import org.apache.tuscany.sca.policy.PolicySet; +import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +/** + * A class intended to form the final link in the chain calling into a Future which represents + * the response to an asynchronous service invocation + * + * Most methods are dummies, required to fulfil the contracts for ImplementationProvider, Implementation + * and Invoker, since this class collapses together the functions of these separate interfaces, due to its + * specialized nature, where most of the function will never be used. + * + * The class acts as the implementation object that terminates the chain - and also as the provider of the implementation. + * The class accepts Future objects which represent individual invocations of forward operations on the async service + * and expects that the responses it handles as invocations will carry the unique ID of one of the Future objects in the + * message header. On receipt of each message, the class seeks out the Future with that unique ID and completes the future + * either with a response message or with a Fault. + * + * @param + */ +public class AsyncResponseHandlerImpl implements AsyncResponseHandler, + ImplementationProvider, Implementation, Invoker { + + private ConcurrentHashMap< String, AsyncInvocationFutureImpl > table = + new ConcurrentHashMap< String, AsyncInvocationFutureImpl >(); + + /** + * This class is its own invoker... + */ + public Invoker createInvoker(RuntimeComponentService service, + Operation operation) { + return this; + } + + /** + * Add a future to this response handler + * @param future - the future + */ + public void addFuture( AsyncInvocationFutureImpl future ) { + // The Future is stored in the table indexed by its unique ID + table.put(future.getUniqueID(), future); + } // end method addFuture + + public boolean supportsOneWayInvocation() { + return true; + } + + public void start() {} + + public void stop() {} + + public List getOperations() { + return null; + } + + public QName getType() { + return null; + } + + public List getProperties() { + return null; + } + + public Property getProperty(String name) { + return null; + } + + public Reference getReference(String name) { + return null; + } + + public List getReferences() { + return null; + } + + public Service getService(String name) { + return null; + } + + public List getServices() { + return null; + } + + public String getURI() { + return null; + } + + public void setURI(String uri) {} + + public boolean isUnresolved() { + return false; + } + + public void setUnresolved(boolean unresolved) {} + + public ExtensionType getExtensionType() { + return null; + } + + public List getPolicySets() { + return null; + } + + public List getRequiredIntents() { + return null; + } + + public void setExtensionType(ExtensionType type) {} + + public void setWrappedFault(AsyncFaultWrapper e) {} + + public void setFault(Throwable e) {} + + public void setResponse(V res) { } + + /** + * Method which is the termination for the invocation chain from the callback endpoint + * @param msg - the Tuscany message containing the response from the async service invocation + * which is either the Response message or an exception of some kind + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Message invoke(Message msg) { + // Get the unique ID from the RELATES_TO message header + String idValue = (String)msg.getHeaders().get(Constants.RELATES_TO); + + if( idValue == null ) { + System.out.println( "Async message ID not found "); + } else { + // Fetch the Future with that Unique ID + AsyncInvocationFutureImpl future = table.get(idValue); + if( future == null ) { + System.out.println("Future not found for id: " + idValue); + } else { + // Complete the Future with a Response message + Object payload = msg.getBody(); + Object response; + if( payload == null ) { + System.out.println("Returned response message was null"); + } else { + if (payload.getClass().isArray()) { + response = ((Object[])payload)[0]; + } else { + response = payload; + } // end if + if( response.getClass().equals(AsyncFaultWrapper.class)) { + future.setWrappedFault((AsyncFaultWrapper) response ); + } else { + future.setResponse(response); + } // end if + } // end if + } // end if + } // end if + + // Prepare an empty response message + msg.setBody(null); + return msg; + } // end method invoke + +} // end class diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java new file mode 100644 index 0000000000..0e4d4344d2 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.DataExchangeSemantics; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.InterceptorAsync; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Phase; +import org.apache.tuscany.sca.invocation.PhasedInterceptor; + +/** + * Default implementation of an invocation chain + * + * @version $Rev$ $Date$ + */ +public class InvocationChainImpl implements InvocationChain { + private Operation sourceOperation; + private Operation targetOperation; + private List nodes = new ArrayList(); + + private final PhaseManager phaseManager; + private boolean forReference; + private boolean allowsPassByReference; + private boolean isAsyncInvocation; + + public InvocationChainImpl(Operation sourceOperation, Operation targetOperation, boolean forReference, PhaseManager phaseManager, boolean isAsyncInvocation) { + this.targetOperation = targetOperation; + this.sourceOperation = sourceOperation; + this.forReference = forReference; + this.phaseManager = phaseManager; + this.isAsyncInvocation = isAsyncInvocation; + } + + public Operation getTargetOperation() { + return targetOperation; + } + + public void setTargetOperation(Operation operation) { + this.targetOperation = operation; + } + + public void addInterceptor(Interceptor interceptor) { + if (interceptor instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)interceptor; + if (pi.getPhase() != null) { + addInvoker(pi.getPhase(), pi); + return; + } + } + String phase = forReference ? Phase.REFERENCE : Phase.SERVICE; + addInterceptor(phase, interceptor); + } // end method addInterceptor + + public void addInvoker(Invoker invoker) { + if (invoker instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)invoker; + if (pi.getPhase() != null) { + addInvoker(pi.getPhase(), pi); + return; + } + } + String phase = forReference ? Phase.REFERENCE_BINDING : Phase.IMPLEMENTATION; + addInvoker(phase, invoker); + } + + public Invoker getHeadInvoker() { + return nodes.isEmpty() ? null : nodes.get(0).getInvoker(); + } + + public Invoker getTailInvoker() { + int nodeCount = nodes.size(); + if( nodeCount > 0 ) { + return nodes.get( nodeCount - 1).getInvoker(); + } // end if + + return null; + } // end method getTailInvoker + + public Invoker getHeadInvoker(String phase) { + int index = phaseManager.getAllPhases().indexOf(phase); + if (index == -1) { + throw new IllegalArgumentException("Invalid phase name: " + phase); + } + for (Node node : nodes) { + if (index <= node.getPhaseIndex()) { + return node.getInvoker(); + } + } + return null; + } + + /** + * @return the sourceOperation + */ + public Operation getSourceOperation() { + return sourceOperation; + } + + /** + * @param sourceOperation the sourceOperation to set + */ + public void setSourceOperation(Operation sourceOperation) { + this.sourceOperation = sourceOperation; + } + + public void addInterceptor(String phase, Interceptor interceptor) { + addInvoker(phase, interceptor); + } + + private void addInvoker(String phase, Invoker invoker) { + if (isAsyncInvocation && + !(invoker instanceof InvokerAsyncRequest) && + !(invoker instanceof InvokerAsyncResponse) ){ + // TODO - should raise an error but don't want to break + // the existing non-native async support +/* + throw new IllegalArgumentException("Trying to add synchronous invoker " + + invoker.getClass().getName() + + " to asynchronous chain"); +*/ + } + + int index = phaseManager.getAllPhases().indexOf(phase); + if (index == -1) { + throw new IllegalArgumentException("Invalid phase name: " + phase); + } + Node node = new Node(index, invoker); + ListIterator li = nodes.listIterator(); + Node before = null, after = null; + boolean found = false; + while (li.hasNext()) { + before = after; + after = li.next(); + if (after.getPhaseIndex() > index) { + // Move back + li.previous(); + li.add(node); + found = true; + break; + } + } + if (!found) { + // Add to the end + nodes.add(node); + before = after; + after = null; + } + + // Relink the interceptors + if (before != null) { + if (before.getInvoker() instanceof Interceptor) { + ((Interceptor)before.getInvoker()).setNext(invoker); + if ((invoker instanceof InterceptorAsync) && + (before.getInvoker() instanceof InvokerAsyncResponse)) { + ((InterceptorAsync) invoker).setPrevious((InvokerAsyncResponse)before.getInvoker()); + } + } + } + if (after != null) { + if (invoker instanceof Interceptor) { + ((Interceptor)invoker).setNext(after.getInvoker()); + if ((after.getInvoker() instanceof InterceptorAsync) && + (invoker instanceof InvokerAsyncResponse)){ + ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsyncResponse)invoker); + } + } + } + + } + + public boolean allowsPassByReference() { + if (allowsPassByReference) { + // No need to check the invokers + return true; + } + // Check if any of the invokers allows pass-by-reference + boolean allowsPBR = false; + for (Node i : nodes) { + if (i.getInvoker() instanceof DataExchangeSemantics) { + if (((DataExchangeSemantics)i.getInvoker()).allowsPassByReference()) { + allowsPBR = true; + break; + } + } + } + return allowsPBR; + } + + public void setAllowsPassByReference(boolean allowsPBR) { + this.allowsPassByReference = allowsPBR; + } + + private static class Node { + private int phaseIndex; + private Invoker invoker; + + public Node(int phaseIndex, Invoker invoker) { + super(); + this.phaseIndex = phaseIndex; + this.invoker = invoker; + } + + public int getPhaseIndex() { + return phaseIndex; + } + + public Invoker getInvoker() { + return invoker; + } + + @Override + public String toString() { + return "(" + phaseIndex + ")" + invoker; + } + } + + public boolean isAsyncInvocation() { + return isAsyncInvocation; + } + + public void addHeadInterceptor(Interceptor interceptor) { + String phase = forReference ? Phase.REFERENCE : Phase.SERVICE_BINDING; + if (interceptor instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)interceptor; + if (pi.getPhase() != null) { + phase = pi.getPhase(); + } // end if + } // end if + + addHeadInterceptor(phase, interceptor); + } // end method addHeadInterceptor + + public void addHeadInterceptor(String phase, Interceptor interceptor) { + // TODO Auto-generated method stub + Invoker invoker = (Invoker)interceptor; + + int index = phaseManager.getAllPhases().indexOf(phase); + if (index == -1) { + throw new IllegalArgumentException("Invalid phase name: " + phase); + } // end if + Node node = new Node(index, invoker); + + ListIterator li = nodes.listIterator(); + Node before = null, after = null; + boolean found = false; + while (li.hasNext()) { + before = after; + after = li.next(); + // Look for the first node with a phase index equal to or greater than the one provided + if (after.getPhaseIndex() >= index) { + // Move back + li.previous(); + li.add(node); + found = true; + break; + } + } + if (!found) { + // Add to the end + nodes.add(node); + before = after; + after = null; + } + + // Relink the interceptors + if (before != null) { + if (before.getInvoker() instanceof Interceptor) { + ((Interceptor)before.getInvoker()).setNext(invoker); + if ((invoker instanceof InterceptorAsync) && + (before.getInvoker() instanceof InvokerAsyncResponse)) { + ((InterceptorAsync) invoker).setPrevious((InvokerAsyncResponse)before.getInvoker()); + } + } + } + if (after != null) { + if (invoker instanceof Interceptor) { + ((Interceptor)invoker).setNext(after.getInvoker()); + if ((after.getInvoker() instanceof InterceptorAsync) && + (invoker instanceof InvokerAsyncResponse)){ + ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsyncResponse)invoker); + } + } + } + + } // end method addHeadInterceptor + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java new file mode 100644 index 0000000000..bde3e92c27 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl; +import org.apache.tuscany.sca.core.invocation.Constants; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * Responsible for dispatching to a callback through a wire.

TODO cache + * target invoker + * + * @version $Rev$ $Date$ + */ +public class JDKCallbackInvocationHandler extends JDKInvocationHandler { + private static final long serialVersionUID = -3350283555825935609L; + + public JDKCallbackInvocationHandler(MessageFactory messageFactory, ServiceReference ref) { + super(messageFactory, ref); + this.fixedWire = false; + } + + @Override + @SuppressWarnings( {"unchecked", "rawtypes"}) + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + if (Object.class == method.getDeclaringClass()) { + return invokeObjectMethod(method, args); + } + + // obtain a dedicated wire to be used for this callback invocation + RuntimeEndpointReference wire = ((CallbackServiceReferenceImpl)callableReference).getCallbackEPR(); + if (wire == null) { + //FIXME: need better exception + throw new ServiceRuntimeException("No callback wire found"); + } + + setEndpoint(((CallbackServiceReferenceImpl)callableReference).getResolvedEndpoint()); + + InvocationChain chain = getInvocationChain(method, wire); + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + method); + } + + try { + String msgID = ((CallbackServiceReferenceImpl)callableReference).getMsgID(); + return invoke(method, chain, args, wire, msgID ); + } catch (InvocationTargetException e) { + Throwable t = e.getCause(); + throw t; + } finally { + // allow the cloned wire to be reused by subsequent callbacks + } + } + + /** + * Invoke the chain + * @param chain - the chain + * @param args - arguments to the invocation as an array of Objects + * @param source - the Endpoint or EndpointReference to which the chain relates + * @param msgID - ID of the message to which this invovation is a callback - ID ends up in "RELATES_TO" header + * @return - the Response message from the invocation + * @throws Throwable - if any exception occurs during the invocation + */ + @Override + protected Object invoke(Method method, InvocationChain chain, Object[] args, Invocable source, String msgID) + throws Throwable { + Message msg = messageFactory.createMessage(); + if (source instanceof RuntimeEndpointReference) { + msg.setFrom((RuntimeEndpointReference)source); + } + if (target != null) { + msg.setTo(target); + } else { + if (source instanceof RuntimeEndpointReference) { + msg.setTo(((RuntimeEndpointReference)source).getTargetEndpoint()); + } + } + + msg.getHeaders().put(Constants.CALLBACK, ((CallbackServiceReferenceImpl)callableReference).getCallbackHandler()); + + Invoker headInvoker = chain.getHeadInvoker(); + + Operation operation = null; + if(source instanceof RuntimeEndpoint) { + for (InvocationChain c : source.getInvocationChains()) { + Operation op = c.getTargetOperation(); + if (method.getName().equals(op.getName())) { + operation = op; + break; + } + } + } else { + operation = chain.getTargetOperation(); + } + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // Deal with header information that needs to be copied from the message context to the new message... + transferMessageHeaders( msg, msgContext); + + ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "RELATES_TO" + if( msgID != null ){ + msg.getHeaders().put(Constants.RELATES_TO, msgID); + } // end if + + try { + // dispatch the source down the chain and get the response + Message resp = headInvoker.invoke(msg); + Object body = resp.getBody(); + if (resp.isFault()) { + throw (Throwable)body; + } + return body; + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } + } // end method invoke + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java new file mode 100644 index 0000000000..827008dc73 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.io.Serializable; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; + +import javax.xml.ws.Holder; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.context.ServiceReferenceExt; +import org.apache.tuscany.sca.interfacedef.DataType; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.ParameterMode; +import org.apache.tuscany.sca.interfacedef.java.JavaOperation; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * @version $Rev$ $Date$ + */ +public class JDKInvocationHandler implements InvocationHandler, Serializable { + private static final long serialVersionUID = -3366410500152201371L; + + protected MessageFactory messageFactory; + protected Endpoint target; + protected Invocable source; + protected ServiceReferenceExt callableReference; + protected Class businessInterface; + + protected boolean fixedWire = true; + + protected transient Map chains = new IdentityHashMap(); + + public JDKInvocationHandler(MessageFactory messageFactory, Class businessInterface, Invocable source) { + this.messageFactory = messageFactory; + this.source = source; + this.businessInterface = businessInterface; + } + + public JDKInvocationHandler(MessageFactory messageFactory, ServiceReference callableReference) { + this.messageFactory = messageFactory; + this.callableReference = (ServiceReferenceExt)callableReference; + if (callableReference != null) { + this.businessInterface = callableReference.getBusinessInterface(); + this.source = (RuntimeEndpointReference) this.callableReference.getEndpointReference(); + } + } + + + public Class getBusinessInterface() { + return businessInterface; + } + + protected Object getCallbackID() { + return null; + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (Object.class == method.getDeclaringClass()) { + return invokeObjectMethod(method, args); + } + if (source == null) { + throw new ServiceRuntimeException("No runtime source is available"); + } + + if (source instanceof RuntimeEndpointReference) { + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + if (epr.isOutOfDate()) { + epr.rebuild(); + chains.clear(); + } + } + + InvocationChain chain = getInvocationChain(method, source); + + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + method); + } + + // Holder pattern. Items stored in a Holder are promoted to T. + // After the invoke, the returned data are placed back in Holder. + Object [] promotedArgs = promoteHolderArgs( args ); + + // Strip out OUT-only arguments. Not too sure if the presence + // of a sourceOperation is exactly the right check to use to + // know whether or not to do this, but will assume it is until + // learning otherwise. + Operation sourceOp = chain.getSourceOperation(); + if (sourceOp != null) { + promotedArgs = removeOutOnlyArgs(sourceOp, promotedArgs ); + } + + Object result = invoke(method, chain, promotedArgs, source); + + // TODO - Based on the code in JavaInterfaceIntrospectorImpl, it seems there are + // some cases involving generics that we're not taking into account. + boolean voidReturnType = (void.class == method.getReturnType() ? true : false); + + // Returned Holder data are placed back in Holder. + boolean holderPattern = false; + Class [] parameters = method.getParameterTypes(); + if ( parameters != null ) { + int resultIdx = (voidReturnType ? 0 : 1); + for ( int i = 0; i < parameters.length; i++ ) { + Class parameterType = parameters[ i ]; + if ( isHolder( parameterType ) ) { + holderPattern = true; + // Pop results and place in holder (demote). + Holder holder = (Holder) args[ i ]; + + Object[] results = (Object[])result; + if ( result != null ) { + holder.value = results[resultIdx++]; + } + } + } + } + if (holderPattern && result != null) { + if (voidReturnType) { + return null; + } else { + return ((Object[])result)[0]; + } + } else { + return result; + } + } + + /** + * Handle the methods on the Object.class + * @param method + * @param args + */ + protected Object invokeObjectMethod(Method method, Object[] args) throws Throwable { + String name = method.getName(); + if ("toString".equals(name)) { + return "[Proxy - " + toString() + "]"; + } else if ("equals".equals(name)) { + Object obj = args[0]; + if (obj == null) { + return false; + } + if (!Proxy.isProxyClass(obj.getClass())) { + return false; + } + return equals(Proxy.getInvocationHandler(obj)); + } else if ("hashCode".equals(name)) { + return hashCode(); + } else { + return method.invoke(this); + } + } + + /** + * Determines if the given operation matches the given method + * + * @return true if the operation matches, false if does not + */ + // FIXME: Should it be in the InterfaceContractMapper? + @SuppressWarnings("unchecked") + private static boolean match(Operation operation, Method method) { + if (operation instanceof JavaOperation) { + JavaOperation javaOp = (JavaOperation)operation; + Method m = javaOp.getJavaMethod(); + if (!method.getName().equals(m.getName())) { + return false; + } + if (method.equals(m)) { + return true; + } + } else { + if (!method.getName().equals(operation.getName())) { + return false; + } + } + + // For remotable interface, operation is not overloaded. + if (operation.getInterface().isRemotable()) { + return true; + } + + Class[] params = method.getParameterTypes(); + + DataType> inputType = null; + if (operation.isInputWrapperStyle()) { + inputType = operation.getInputWrapper().getUnwrappedType(); + } else { + inputType = operation.getInputType(); + } + List types = inputType.getLogical(); + boolean matched = true; + if (types.size() == params.length && method.getName().equals(operation.getName())) { + for (int i = 0; i < params.length; i++) { + Class clazz = params[i]; + Class type = types.get(i).getPhysical(); + // Object.class.isAssignableFrom(int.class) returns false + if (type != Object.class && (!type.isAssignableFrom(clazz))) { + matched = false; + } + } + } else { + matched = false; + } + return matched; + + } + + protected synchronized InvocationChain getInvocationChain(Method method, Invocable source) { + if (source instanceof RuntimeEndpoint) { + // [rfeng] Start with the binding invocation chain + return source.getBindingInvocationChain(); + } + if (fixedWire && chains.containsKey(method)) { + return chains.get(method); + } + InvocationChain found = null; + for (InvocationChain chain : source.getInvocationChains()) { + Operation operation = chain.getSourceOperation(); + if (operation.isDynamic()) { + operation.setName(method.getName()); + found = chain; + break; + } else if (match(operation, method)) { + found = chain; + break; + } + } + if (fixedWire) { + chains.put(method, found); + } + return found; + } + + protected void setEndpoint(Endpoint endpoint) { + this.target = endpoint; + } + + protected Object invoke(Method method, InvocationChain chain, Object[] args, Invocable source) + throws Throwable { + return invoke( method, chain, args, source, null ); + } + + /** + * Invoke the chain + * @param chain - the chain + * @param args - arguments to the invocation as an array of Objects + * @param source - the Endpoint or EndpointReference to which the chain relates + * @param msgID - an ID for the message being sent, may be null + * @return - the Response message from the invocation + * @throws Throwable - if any exception occurs during the invocation + */ + protected Object invoke(Method method, InvocationChain chain, Object[] args, Invocable source, String msgID) + throws Throwable { + Message msg = messageFactory.createMessage(); + if (source instanceof RuntimeEndpointReference) { + msg.setFrom((RuntimeEndpointReference)source); + } + if (target != null) { + msg.setTo(target); + } else { + if (source instanceof RuntimeEndpointReference) { + msg.setTo(((RuntimeEndpointReference)source).getTargetEndpoint()); + } + } + Invoker headInvoker = chain.getHeadInvoker(); + Operation operation = null; + if(source instanceof RuntimeEndpoint) { + // [rfeng] We cannot use the targetOperation from the binding invocation chain. + // For each method, we need to find the matching operation so that we can set the operation on to the message + for (InvocationChain c : source.getInvocationChains()) { + Operation op = c.getTargetOperation(); + if (method.getName().equals(op.getName())) { + operation = op; + break; + } + } + } else { + operation = chain.getTargetOperation(); + } + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // Deal with header information that needs to be copied from the message context to the new message... + transferMessageHeaders( msg, msgContext); + + ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID" + if( msgID != null ){ + msg.getHeaders().put("MESSAGE_ID", msgID); + } // end if + + try { + // dispatch the source down the chain and get the response + Message resp = headInvoker.invoke(msg); + Object body = resp.getBody(); + if (resp.isFault()) { + throw (Throwable)body; + } + return body; + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } + } + + /** + * Transfer relevant header information from the old message (incoming) to the new message (outgoing) + * @param newMsg + * @param oldMsg + */ + protected void transferMessageHeaders( Message newMsg, Message oldMsg ) { + if( oldMsg == null ) return; + // For the present, simply copy all the headers + if( !oldMsg.getHeaders().isEmpty() ) newMsg.getHeaders().putAll( oldMsg.getHeaders() ); + } // end transferMessageHeaders + + /** + * @return the callableReference + */ + public ServiceReference getCallableReference() { + return callableReference; + } + + /** + * @param callableReference the callableReference to set + */ + public void setCallableReference(ServiceReference callableReference) { + this.callableReference = (ServiceReferenceExt)callableReference; + } + + /** + * Creates a copy of arguments. Holder values are promoted to T. + * Note. It is essential that arg Holders not be destroyed here. + * PromotedArgs should not destroy holders. They are used on response return. + * @param args containing Holders and other objects. + * @return Object [] + */ + protected static Object [] promoteHolderArgs( Object [] args ) { + if ( args == null ) + return args; + Object [] promotedArgs = new Object[ args.length ]; + + for ( int i = 0; i < args.length; i++ ) { + Object argument = args[ i ]; + if ( argument != null ) { + if ( isHolder( argument ) ) { + promotedArgs[ i ] = ((Holder)argument).value; + } else { + promotedArgs[ i ] = args[ i ]; + } + + } + } + return promotedArgs; + } + + /** + * Given an argument array, filters out (removes) OUT-only parameters + * @param sourceOp + * @return array of filtered arguments + */ + Object[] removeOutOnlyArgs(Operation sourceOp, Object[] args) { + if ( args == null ) + return args; + ArrayList retValList = new ArrayList(); + List parmList = sourceOp.getParameterModes(); + for (int i = 0; i < args.length; i++) { + if (parmList.get(i) != ParameterMode.OUT) { + retValList.add(args[i]); + } + } + return retValList.toArray(); + } + + /** + * Given a Class, tells if it is a Holder by comparing to "javax.xml.ws.Holder" + * @param testClass + * @return boolean whether class is Holder type. + */ + protected static boolean isHolder( Class testClass ) { + if ( testClass.getName().startsWith( "javax.xml.ws.Holder" )) { + return true; + } + return false; + } + + + /** + * Given an Object, tells if it is a Holder by comparing to "javax.xml.ws.Holder" + * @param testClass + * @return boolean stating whether Object is a Holder type. + * @author DOB + */ + protected static boolean isHolder( Object object ) { + String objectName = object.getClass().getName(); + if ( object instanceof javax.xml.ws.Holder ) { + return true; + } + return false; + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java new file mode 100644 index 0000000000..7163357042 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; + +import javax.xml.bind.JAXBContext; +import javax.xml.ws.AsyncHandler; +import javax.xml.ws.Response; + +import org.apache.tuscany.sca.common.java.collection.LRUCache; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.core.context.ServiceReferenceExt; +import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl; +import org.apache.tuscany.sca.core.context.impl.ServiceReferenceImpl; +import org.apache.tuscany.sca.core.invocation.ProxyCreationException; +import org.apache.tuscany.sca.core.invocation.ProxyFactory; +import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * the default implementation of a wire service that uses JDK dynamic proxies + * + * @version $Rev$ $Date$ + */ +public class JDKProxyFactory implements ProxyFactory, LifeCycleListener { + protected ExtensionPointRegistry registry; + protected InterfaceContractMapper contractMapper; + private MessageFactory messageFactory; + + public JDKProxyFactory(ExtensionPointRegistry registry, + MessageFactory messageFactory, + InterfaceContractMapper mapper) { + this.registry = registry; + this.contractMapper = mapper; + this.messageFactory = messageFactory; + } + + /** + * The original createProxy method assumes that the proxy doesn't want to + * share conversation state so sets the conversation object to null + */ + public T createProxy(final Class interfaze, Invocable invocable) throws ProxyCreationException { + if (invocable instanceof RuntimeEndpoint) { + InvocationHandler handler; + // TUSCANY-3659 - Always install a asynch handler regardless of whether ref is sync or async + // needs tidying + // if (isAsync(interfaze)) { + handler = new AsyncJDKInvocationHandler(registry, messageFactory, interfaze, invocable); + // } else { + // handler = new JDKInvocationHandler(messageFactory, interfaze, invocable); + // } + // Allow privileged access to class loader. Requires RuntimePermission in security policy. + ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction() { + public ClassLoader run() { + return interfaze.getClassLoader(); + } + }); + T proxy = interfaze.cast(newProxyInstance(cl, new Class[] {interfaze}, handler)); + return proxy; + } + ServiceReference serviceReference = new ServiceReferenceImpl(interfaze, invocable, null); + return createProxy(serviceReference); + } + + public T createProxy(ServiceReference callableReference) throws ProxyCreationException { + assert callableReference != null; + final Class interfaze = callableReference.getBusinessInterface(); + InvocationHandler handler; + // TUSCANY-3659 - Always install a asynch handler regardless of whether ref is sync or async + // needs tidying + // if (isAsync(interfaze)) { + handler = new AsyncJDKInvocationHandler(registry, messageFactory, callableReference); + // } else { + // handler = new JDKInvocationHandler(messageFactory, callableReference); + // } + // Allow privileged access to class loader. Requires RuntimePermission in security policy. + ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction() { + public ClassLoader run() { + return interfaze.getClassLoader(); + } + }); + T proxy = interfaze.cast(newProxyInstance(cl, new Class[] {interfaze}, handler)); + ((ServiceReferenceExt)callableReference).setProxy(proxy); + return proxy; + } + + private boolean isAsync(Class interfaze) { + for (Method method : interfaze.getMethods()) { + if (method.getName().endsWith("Async")) { + if (method.getReturnType().isAssignableFrom(Future.class)) { + if (method.getParameterTypes().length > 0) { + if (method.getParameterTypes()[method.getParameterTypes().length - 1] + .isAssignableFrom(AsyncHandler.class)) { + return true; + } + } + } + if (method.getReturnType().isAssignableFrom(Response.class)) { + return true; + } + } + } + return false; + } + + public T createCallbackProxy(Class interfaze, List wires) throws ProxyCreationException { + ServiceReferenceImpl callbackReference = null; + try { + callbackReference = new CallbackServiceReferenceImpl(interfaze, wires); + } catch (ServiceRuntimeException e) { + // [rfeng] In case that the call is not from a bidirectional interface, the field should be injected with null + callbackReference = null; + } + return callbackReference != null ? createCallbackProxy(callbackReference) : null; + } + + public T createCallbackProxy(ServiceReference callbackReference) throws ProxyCreationException { + assert callbackReference != null; + final Class interfaze = callbackReference.getBusinessInterface(); + InvocationHandler handler = new JDKCallbackInvocationHandler(messageFactory, callbackReference); + ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction() { + public ClassLoader run() { + return interfaze.getClassLoader(); + } + }); + T proxy = interfaze.cast(newProxyInstance(cl, new Class[] {interfaze}, handler)); + ((ServiceReferenceExt)callbackReference).setProxy(proxy); + return proxy; + } + + public > R cast(B target) throws IllegalArgumentException { + InvocationHandler handler = Proxy.getInvocationHandler(target); + if (handler instanceof JDKInvocationHandler) { + return (R)((JDKInvocationHandler)handler).getCallableReference(); + } else { + throw new IllegalArgumentException("The object is not a known proxy."); + } + } + + /** + * @see org.apache.tuscany.sca.core.invocation.ProxyFactory#isProxyClass(java.lang.Class) + */ + public boolean isProxyClass(Class clazz) { + return Proxy.isProxyClass(clazz); + } + + // This is a cache containing the proxy class constructor for each business interface. + // This improves performance compared to calling Proxy.newProxyInstance() + // every time that a proxy is needed. + private final LRUCache, Constructor> cache = new LRUCache, Constructor>(512); + + public Object newProxyInstance(ClassLoader classloader, Class interfaces[], InvocationHandler invocationhandler) + throws IllegalArgumentException { + if (interfaces.length > 1) { + // We only cache the proxy constructors with one single interface which the case in SCA where + // one reference can have one interface + return Proxy.newProxyInstance(classloader, interfaces, invocationhandler); + } + try { + if (invocationhandler == null) + throw new NullPointerException("InvocationHandler is null"); + // Lookup cached constructor. aclass[0] is the reference's business interface. + Constructor proxyCTOR; + synchronized (cache) { + proxyCTOR = cache.get(interfaces[0]); + } + if (proxyCTOR == null) { + Class proxyClass = Proxy.getProxyClass(classloader, interfaces); + proxyCTOR = proxyClass.getConstructor(InvocationHandler.class); + synchronized (cache) { + cache.put(interfaces[0], proxyCTOR); + } + } + return proxyCTOR.newInstance(invocationhandler); + } catch (Throwable e) { + throw new IllegalArgumentException(e); + } + } + + public void start() { + } + + public void stop() { + cache.clear(); + } + + public void removeProxiesForContribution(ClassLoader contributionClassloader){ + try { + synchronized(cache) { + Set> objSet = cache.keySet(); + List> toRemove = new ArrayList>(); + Iterator> i = objSet.iterator(); + loop: + while(i.hasNext()) { + Class cls = i.next(); + ClassLoader cl = cls.getClassLoader(); + while (cl != null){ + if (cl == contributionClassloader){ + toRemove.add(cls); + break loop; + } + // take account of generated classes + cl = cl.getParent(); + } + } + for (Class cls : toRemove){ + cache.remove(cls); + } + } + } catch(Exception e) { + throw new ServiceRuntimeException(e); + } + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java new file mode 100644 index 0000000000..6f3e947631 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.io.Serializable; + +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; + +/** + * Implementation of MessageFactory. + * + * @version $Rev$ $Date$ + */ +public class MessageFactoryImpl implements MessageFactory, Serializable { + + /** + * + */ + private static final long serialVersionUID = -2112289169275106977L; + + public Message createMessage() { + return new MessageImpl(); + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java new file mode 100644 index 0000000000..cc8cb48cc5 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Message; + +/** + * The default implementation of a message flowed through a wire during an invocation + * + * @version $Rev $Date$ + */ +public class MessageImpl implements Message { + private Map headers = new HashMap(); + private Object body; + private Object messageID; + private boolean isFault; + private Operation operation; + + private EndpointReference from; + private Endpoint to; + + private Object bindingContext; + + public MessageImpl() { + this.from = null; + this.to = null; + } + + @SuppressWarnings("unchecked") + public T getBody() { + return (T)body; + } + + public void setBody(T body) { + this.isFault = false; + this.body = body; + } + + public Object getMessageID() { + return messageID; + } + + public void setMessageID(Object messageId) { + this.messageID = messageId; + } + + public boolean isFault() { + return isFault; + } + + public void setFaultBody(Object fault) { + this.isFault = true; + this.body = fault; + } + + public EndpointReference getFrom() { + return from; + } + + public void setFrom(EndpointReference from) { + this.from = from; + } + + public Endpoint getTo() { + return to; + } + + public void setTo(Endpoint to) { + this.to = to; + } + + public Operation getOperation() { + return operation; + } + + public void setOperation(Operation op) { + this.operation = op; + } + + public Map getHeaders() { + return headers; + } + + @SuppressWarnings("unchecked") + public T getBindingContext() { + return (T)bindingContext; + } + + public void setBindingContext(T bindingContext) { + this.bindingContext = bindingContext; + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java new file mode 100644 index 0000000000..45f4bf52bf --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import org.apache.tuscany.sca.core.invocation.ProxyCreationException; + + +/** + * Thrown when an {@link org.apache.tuscany.sca.core.factory.model.Operation} cannot be mapped to a method on an interface + * @version $Rev$ $Date$ + */ +public class NoMethodForOperationException extends ProxyCreationException { + private static final long serialVersionUID = 5116536602309483679L; + + public NoMethodForOperationException() { + } + + public NoMethodForOperationException(String message) { + super(message); + } + + public NoMethodForOperationException(String message, Throwable cause) { + super(message, cause); + } + + public NoMethodForOperationException(Throwable cause) { + super(cause); + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java new file mode 100644 index 0000000000..85ef79b5d7 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import static org.apache.tuscany.sca.invocation.Phase.IMPLEMENTATION; +import static org.apache.tuscany.sca.invocation.Phase.IMPLEMENTATION_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_TRANSPORT; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_WIREFORMAT; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_INTERFACE; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_OPERATION_SELECTOR; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_TRANSPORT; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_WIREFORMAT; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_INTERFACE; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_POLICY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.extensibility.ServiceDeclaration; +import org.apache.tuscany.sca.invocation.Phase; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * @version $Rev$ $Date$ + */ +public class PhaseManager { + private static final Logger log = Logger.getLogger(PhaseManager.class.getName()); + + public static final String STAGE_REFERENCE = "reference"; + public static final String STAGE_REFERENCE_BINDING = "reference.binding"; + public static final String STAGE_SERVICE_BINDING = "service.binding"; + public static final String STAGE_SERVICE = "service"; + public static final String STAGE_IMPLEMENTATION = "implementation"; + + private static final String[] SYSTEM_REFERENCE_PHASES = + {REFERENCE, REFERENCE_POLICY, REFERENCE_INTERFACE, REFERENCE_BINDING}; + + private static final String[] SYSTEM_REFERENCE_BINDING_PHASES = + {REFERENCE_BINDING_WIREFORMAT, REFERENCE_BINDING_POLICY, REFERENCE_BINDING_TRANSPORT}; + + private static final String[] SYSTEM_SERVICE_BINDING_PHASES = + {SERVICE_BINDING_TRANSPORT, SERVICE_BINDING_OPERATION_SELECTOR, SERVICE_BINDING_WIREFORMAT, SERVICE_BINDING_POLICY}; + + private static final String[] SYSTEM_SERVICE_PHASES = + {SERVICE_BINDING, SERVICE_INTERFACE, SERVICE_POLICY, SERVICE}; + + private static final String[] SYSTEM_IMPLEMENTATION_PHASES = {IMPLEMENTATION_POLICY, IMPLEMENTATION}; + + private ExtensionPointRegistry registry; + private String pattern = Phase.class.getName(); + private Map stages; + private List phases; + + public class Stage { + private String name; + private PhaseSorter sorter = new PhaseSorter(); + private Set firstSet = new HashSet(); + private Set lastSet = new HashSet(); + private List phases = new ArrayList(); + + public Stage(String name) { + super(); + this.name = name; + } + + public String getName() { + return name; + } + + public PhaseSorter getSorter() { + return sorter; + } + + public Set getFirstSet() { + return firstSet; + } + + public Set getLastSet() { + return lastSet; + } + + public List getPhases() { + return phases; + } + + @Override + public String toString() { + return name + phases; + } + } + + /** + * @param registry + */ + public PhaseManager(ExtensionPointRegistry registry) { + super(); + this.registry = registry; + } + + public static PhaseManager getInstance(ExtensionPointRegistry registry) { + UtilityExtensionPoint utilityExtensionPoint = registry.getExtensionPoint(UtilityExtensionPoint.class); + return utilityExtensionPoint.getUtility(PhaseManager.class); + } + + // For unit test purpose + PhaseManager(String pattern) { + super(); + this.pattern = pattern; + this.registry = new DefaultExtensionPointRegistry(); + } + + private List getPhases(String stage) { + Stage s = getStages().get(stage); + return s == null ? null : s.getPhases(); + } + + public List getReferencePhases() { + return getPhases(STAGE_REFERENCE); + } + + public List getServicePhases() { + return getPhases(STAGE_SERVICE); + } + + public List getReferenceBindingPhases() { + return getPhases(STAGE_REFERENCE_BINDING); + } + + public List getServiceBindingPhases() { + return getPhases(STAGE_SERVICE_BINDING); + } + + public List getImplementationPhases() { + return getPhases(STAGE_IMPLEMENTATION); + } + + public synchronized List getAllPhases() { + if (phases == null) { + phases = new ArrayList(); + phases.addAll(getReferencePhases()); + phases.addAll(getReferenceBindingPhases()); + phases.addAll(getServiceBindingPhases()); + phases.addAll(getServicePhases()); + phases.addAll(getImplementationPhases()); + } + return phases; + } + + public synchronized Map getStages() { + if (stages != null) { + return stages; + } + init(); + + Collection services; + try { + services = registry.getServiceDiscovery().getServiceDeclarations(pattern); + } catch (IOException e) { + throw new ServiceRuntimeException(e); + } + + for (ServiceDeclaration d : services) { + if (log.isLoggable(Level.FINE)) { + log.fine(d.getLocation() + ": " + d.getAttributes()); + } + String name = d.getAttributes().get("name"); + if (name == null) { + throw new ServiceRuntimeException("Required attribute 'name' is missing."); + } + String stageName = d.getAttributes().get("stage"); + if (stageName == null) { + throw new ServiceRuntimeException("Required attribute 'stage' is missing."); + } + Stage stage = stages.get(stageName); + if (stage == null) { + throw new ServiceRuntimeException("Invalid stage: " + stageName); + } + PhaseSorter graph = stage.getSorter(); + Set firstSet = stage.getFirstSet(), lastSet = stage.getLastSet(); + + String before = d.getAttributes().get("before"); + String after = d.getAttributes().get("after"); + if (before != null) { + StringTokenizer tokenizer = new StringTokenizer(before); + while (tokenizer.hasMoreTokens()) { + String p = tokenizer.nextToken(); + if (!"*".equals(p)) { + graph.addEdge(name, p); + } else { + firstSet.add(name); + } + } + } + if (after != null) { + StringTokenizer tokenizer = new StringTokenizer(after); + while (tokenizer.hasMoreTokens()) { + String p = tokenizer.nextToken(); + if (!"*".equals(p)) { + graph.addEdge(p, name); + } else { + lastSet.add(name); + } + } + } + graph.addVertext(name); + if(firstSet.size()>1) { + log.warning("More than one phases are declared to be first: "+firstSet); + } + for (String s : firstSet) { + for (String v : new HashSet(graph.getVertices().keySet())) { + if (!firstSet.contains(v)) { + graph.addEdge(s, v); + } + } + } + if(lastSet.size()>1) { + log.warning("More than one phases are declared to be the last: "+lastSet); + } + for (String s : lastSet) { + for (String v : new HashSet(graph.getVertices().keySet())) { + if (!lastSet.contains(v)) { + graph.addEdge(v, s); + } + } + } + + } + + for (Stage s : stages.values()) { + List phases = s.getSorter().topologicalSort(false); + s.getPhases().clear(); + s.getPhases().addAll(phases); + } + if (log.isLoggable(Level.FINE)) { + log.fine("Stages: " + stages); + } + return stages; + } + + private void init() { + stages = new HashMap(); + + Stage referenceStage = new Stage(STAGE_REFERENCE); + for (int i = 1; i < SYSTEM_REFERENCE_PHASES.length; i++) { + referenceStage.getSorter().addEdge(SYSTEM_REFERENCE_PHASES[i - 1], SYSTEM_REFERENCE_PHASES[i]); + } + referenceStage.getLastSet().add(REFERENCE_BINDING); + stages.put(referenceStage.getName(), referenceStage); + + Stage referenceBindingStage = new Stage(STAGE_REFERENCE_BINDING); + for (int i = 1; i < SYSTEM_REFERENCE_BINDING_PHASES.length; i++) { + referenceBindingStage.getSorter().addEdge(SYSTEM_REFERENCE_BINDING_PHASES[i - 1], SYSTEM_REFERENCE_BINDING_PHASES[i]); + } + stages.put(referenceBindingStage.getName(), referenceBindingStage); + + Stage serviceBindingStage = new Stage(STAGE_SERVICE_BINDING); + for (int i = 1; i < SYSTEM_SERVICE_BINDING_PHASES.length; i++) { + serviceBindingStage.getSorter().addEdge(SYSTEM_SERVICE_BINDING_PHASES[i - 1], SYSTEM_SERVICE_BINDING_PHASES[i]); + } + stages.put(serviceBindingStage.getName(), serviceBindingStage); + + + Stage serviceStage = new Stage(STAGE_SERVICE); + for (int i = 1; i < SYSTEM_SERVICE_PHASES.length; i++) { + serviceStage.getSorter().addEdge(SYSTEM_SERVICE_PHASES[i - 1], SYSTEM_SERVICE_PHASES[i]); + } + stages.put(serviceStage.getName(), serviceStage); + + Stage implementationStage = new Stage(STAGE_IMPLEMENTATION); + for (int i = 1; i < SYSTEM_IMPLEMENTATION_PHASES.length; i++) { + implementationStage.getSorter().addEdge(SYSTEM_IMPLEMENTATION_PHASES[i - 1], + SYSTEM_IMPLEMENTATION_PHASES[i]); + } + implementationStage.getLastSet().add(IMPLEMENTATION); + stages.put(implementationStage.getName(), implementationStage); + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java new file mode 100644 index 0000000000..175f3463ad --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Directed, weighted graph + * + * @param The type of vertex object + * @param The type of edge object + * + * @version $Rev$ $Date$ + */ +public class PhaseSorter implements Cloneable { + private final Map vertices = new HashMap(); + + /** + * Vertex of a graph + */ + public final class Vertex { + private V value; + + // TODO: Do we want to support multiple edges for a vertex pair? If so, + // we should use a List instead of Map + private Map outEdges = new HashMap(); + private Map inEdges = new HashMap(); + + private Vertex(V value) { + this.value = value; + } + + @Override + public String toString() { + return "(" + value + ")"; + } + + public V getValue() { + return value; + } + + public Map getOutEdges() { + return outEdges; + } + + public Map getInEdges() { + return inEdges; + } + + } + + /** + * An Edge connects two vertices in one direction + */ + public final class Edge { + private Vertex sourceVertex; + + private Vertex targetVertex; + + public Edge(Vertex source, Vertex target) { + this.sourceVertex = source; + this.targetVertex = target; + } + + @Override + public String toString() { + return sourceVertex + "->" + targetVertex; + } + + public Vertex getTargetVertex() { + return targetVertex; + } + + public void setTargetVertex(Vertex vertex) { + this.targetVertex = vertex; + } + + public Vertex getSourceVertex() { + return sourceVertex; + } + + public void setSourceVertex(Vertex sourceVertex) { + this.sourceVertex = sourceVertex; + } + } + + public void addEdge(V source, V target) { + Vertex s = getVertex(source); + if (s == null) { + s = new Vertex(source); + vertices.put(source, s); + } + Vertex t = getVertex(target); + if (t == null) { + t = new Vertex(target); + vertices.put(target, t); + } + Edge edge = new Edge(s, t); + s.outEdges.put(t, edge); + t.inEdges.put(s, edge); + } + + public void addVertext(V source) { + Vertex s = getVertex(source); + if (s == null) { + s = new Vertex(source); + vertices.put(source, s); + } + } + + public Vertex getVertex(V source) { + Vertex s = vertices.get(source); + return s; + } + + public boolean removeEdge(V source, V target) { + Vertex s = getVertex(source); + if (s == null) { + return false; + } + + Vertex t = getVertex(target); + if (t == null) { + return false; + } + + return s.outEdges.remove(t) != null && t.inEdges.remove(s) != null; + + } + + public void removeEdge(Edge edge) { + edge.sourceVertex.outEdges.remove(edge.targetVertex); + edge.targetVertex.inEdges.remove(edge.sourceVertex); + } + + public void removeVertex(Vertex vertex) { + vertices.remove(vertex.getValue()); + for (Edge e : new ArrayList(vertex.outEdges.values())) { + removeEdge(e); + } + for (Edge e : new ArrayList(vertex.inEdges.values())) { + removeEdge(e); + } + } + + public Edge getEdge(Vertex source, Vertex target) { + return source.outEdges.get(target); + } + + public Edge getEdge(V source, V target) { + Vertex sv = getVertex(source); + if (sv == null) { + return null; + } + Vertex tv = getVertex(target); + if (tv == null) { + return null; + } + return getEdge(getVertex(source), getVertex(target)); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + for (Vertex v : vertices.values()) { + sb.append(v.outEdges.values()).append("\n"); + } + return sb.toString(); + } + + public Map getVertices() { + return vertices; + } + + public void addGraph(PhaseSorter otherGraph) { + for (Vertex v : otherGraph.vertices.values()) { + for (Edge e : v.outEdges.values()) { + addEdge(e.sourceVertex.value, e.targetVertex.value); + } + } + } + + private Vertex getFirst() { + for (Vertex v : vertices.values()) { + if (v.inEdges.isEmpty()) { + return v; + } + } + if (!vertices.isEmpty()) { + throw new IllegalArgumentException("Circular ordering has been detected: " + toString()); + } else { + return null; + } + } + + public List topologicalSort(boolean readOnly) { + PhaseSorter graph = (!readOnly) ? this : (PhaseSorter)clone(); + List list = new ArrayList(); + while (true) { + Vertex v = graph.getFirst(); + if (v == null) { + break; + } + list.add(v.getValue()); + graph.removeVertex(v); + } + + return list; + } + + @Override + public Object clone() { + PhaseSorter copy = new PhaseSorter(); + copy.addGraph(this); + return copy; + } +} -- cgit v1.2.3