diff options
Diffstat (limited to 'sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl')
13 files changed, 3191 insertions, 0 deletions
diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java new file mode 100644 index 0000000000..66b9516738 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncInvocationFutureImpl.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.xml.ws.AsyncHandler; +import javax.xml.ws.Response; + +import org.apache.tuscany.sca.core.invocation.AsyncContext; +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; + +/** + * A class which provides an Implementation of a Future<V> and Response<V> for use with the JAXWS defined client + * asynchronous APIs. + * + * This implementation class provides the interfaces for use by the client code, but also provides methods for the + * Tuscany system code to set the result of the asynchronous service invocation, both Regular and Fault responses. + * + * This class is constructed to be fully thread-safe + * + * @param <V> - this is the type of the response message from the invoked service. + */ +public class AsyncInvocationFutureImpl<V> implements Future<V>, Response<V>, AsyncContext, AsyncResponseHandler<V> { + + // Lock for handling the completion of this Future + private final Lock lock = new ReentrantLock(); + private final Condition isDone = lock.newCondition(); + + // The result + private volatile V response = null; + private volatile Throwable fault = null; + + private String uniqueID = UUID.randomUUID().toString(); + + private Class businessInterface = null; + private AsyncHandler callback; + + private Map<String, Object> attributes = new HashMap<String, Object>(); + + protected AsyncInvocationFutureImpl() { + super(); + } // end constructor + + /** + * Public constructor for AsyncInvocationFutureImpl - newInstance is necessary in order to enable the Type variable + * to be set for the class instances + * @param <V> - the type of the response from the asynchronously invoked service + * @param type - the type of the AsyncInvocationFutureImpl expressed as a parameter + * @param classLoader - the classloader used for the business interface to which this Future applies + * @return - an instance of AsyncInvocationFutureImpl<V> + */ + public static <V> AsyncInvocationFutureImpl<V> newInstance( Class<V> type, Class businessInterface ) { + AsyncInvocationFutureImpl<V> future = new AsyncInvocationFutureImpl<V>(); + future.setBusinessInterface( businessInterface ); + return future; + } + + /** + * Cancels the asynchronous process + * - not possible in this version, so always returns false + */ + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + /** + * Gets the response value returned by the asynchronous process + * - waits forever + * @return - the response value of type V + * @throws InterruptedException if the get() method was interrupted while waiting for the async process to finish + * @throws ExecutionException if the async process threw an exception - the exception thrown is nested + */ + public V get() throws InterruptedException, ExecutionException { + try { + V response = get(Long.MAX_VALUE, TimeUnit.SECONDS); + return response; + } catch (TimeoutException t) { + throw new InterruptedException("Timed out waiting for Future to complete"); + } // end try + } // end method get() + + /** + * Gets the response value returned by the asynchronous process + * @return - the response value of type V + * @throws InterruptedException if the get() method was interrupted while waiting for the async process to finish + * @throws ExecutionException if the async process threw an exception - the exception thrown is nested + * @throws TimeoutException if the get() method timed out waiting for the async process to finish + */ + public V get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + lock.lock(); + try { + // wait for result to be available + if( notSetYet() ) isDone.await( timeout, unit); + if( response != null ) return response; + if( fault != null ) throw new ExecutionException( fault ); + throw new TimeoutException("get on this Future timed out"); + } finally { + lock.unlock(); + } // end try + + } // end method get(long timeout, TimeUnit unit) + + /** + * Indicates if the asynchronous process has been cancelled + * - not possible in this version so always returns false + */ + public boolean isCancelled() { + return false; + } + + /** + * Indicates if the asynchronous process is completed + * @return - true if the process is completed, false otherwise + */ + public boolean isDone() { + lock.lock(); + try { + return !notSetYet(); + } finally { + lock.unlock(); + } // end try + } // end method isDone + + /** + * Async process completed with a Fault. Must only be invoked once. + * @param e - the Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + public void setFault( Throwable e ) { + lock.lock(); + try { + if( notSetYet() ) { + fault = e; + isDone.signalAll(); + } else { + throw new IllegalStateException("setResponse() or setFault() has been called previously"); + } // end if + } finally { + lock.unlock(); + } // end try + if (callback != null) { + callback.handleResponse(this); + } + } // end method setFault( Throwable ) + + /** + * Async process completed with a wrapped Fault. Must only be invoked once. + * @param w - the wrapped Fault to send + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + */ + public void setWrappedFault(AsyncFaultWrapper w) { + + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + Throwable e; + try { + // Set the TCCL to the classloader of the business interface + Thread.currentThread().setContextClassLoader(this.getBusinessInterface().getClassLoader()); + e = w.retrieveFault(); + } finally { + Thread.currentThread().setContextClassLoader(tccl); + } // end try + + if( e == null ) throw new IllegalArgumentException("AsyncFaultWrapper did not return an Exception"); + setFault( e ); + + } // end method setFault( AsyncFaultWrapper ) + + /** + * Async process completed with a response message. Must only be invoked once + * @throws IllegalStateException if either the setResponse method or the setFault method have been called previously + * @param res - the response message, which is of type V + */ + public void setResponse(V res) { + + lock.lock(); + try { + if( notSetYet() ) { + response = res; + isDone.signalAll(); + } else { + throw new IllegalStateException("setResponse() or setFault() has been called previously"); + } + } finally { + lock.unlock(); + } // end try + if (callback != null) { + callback.handleResponse(this); + } + + } // end method setResponse + + /** + * Gets the unique ID of this future as a String + */ + public String getUniqueID() { return uniqueID; } + + /** + * Indicates that setting a response value is OK - can only set the response value or fault once + * @return - true if it is OK to set the response, false otherwise + */ + private boolean notSetYet() { + return ( response == null && fault == null ); + } + + /** + * Returns the JAXWS context for the response + * @return - a Map containing the context + */ + public Map<String, Object> getContext() { + // Intentionally returns null + return null; + } + + /** + * Gets the business interface to which this Future relates + * @return the business interface + */ + public Class getBusinessInterface() { + return businessInterface; + } + + /** + * Sets the business interface to which this Future relates + * @param classLoader - the classloader of the business interface + */ + public void setBusinessInterface(Class businessInterface) { + this.businessInterface = businessInterface; + } + + /** + * Sets the callback handler, when the client uses the async callback method + * @param callback - the client's callback object + */ + public void setCallback(AsyncHandler callback) { + this.callback = callback; + } + + /** + * Look up an attribute value by name. + * @param name The name of the attribute + * @return The value of the attribute + */ + public Object getAttribute(String name) { + return attributes.get(name); + } + + /** + * Set the value of an attribute. Allows extensions to associate other data with an async response. + * @param name The name of the attribute + * @param value + */ + public void setAttribute(String name, Object value) { + attributes.put(name, value); + } + +} // end class AsyncInvocationFutureImpl diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java new file mode 100644 index 0000000000..dc5738af96 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.io.StringReader; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.transform.stream.StreamSource; +import javax.xml.ws.AsyncHandler; +import javax.xml.ws.Response; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.assembly.ComponentService; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.Implementation; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.assembly.builder.BindingBuilder; +import org.apache.tuscany.sca.assembly.builder.BuilderContext; +import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint; +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.context.CompositeContext; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.contribution.processor.ContributionReadException; +import org.apache.tuscany.sca.contribution.processor.ProcessorContext; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint; +import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseException; +import org.apache.tuscany.sca.core.invocation.AsyncResponseService; +import org.apache.tuscany.sca.core.invocation.JDKAsyncResponseInvoker; +import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; +import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; +import org.apache.tuscany.sca.interfacedef.util.FaultException; +import org.apache.tuscany.sca.interfacedef.util.WrapperInfo; +import org.apache.tuscany.sca.invocation.InterceptorAsync; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.policy.Intent; +import org.apache.tuscany.sca.provider.EndpointReferenceAsyncProvider; +import org.apache.tuscany.sca.provider.PolicyProvider; +import org.apache.tuscany.sca.provider.ReferenceBindingProvider; +import org.apache.tuscany.sca.provider.ServiceBindingProvider; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.apache.tuscany.sca.work.WorkScheduler; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * An InvocationHandler which deals with JAXWS-defined asynchronous client Java API method calls + * + * 2 asynchronous mappings exist for any given synchronous service operation, as shown in this example: + * public interface StockQuote { + * float getPrice(String ticker); + * Response<Float> getPriceAsync(String ticker); + * Future<?> getPriceAsync(String ticker, AsyncHandler<Float> handler); + * } + * + * - the second method is called the "polling method", since the returned Response<?> object permits + * the client to poll to see if the async call has completed + * - the third method is called the "async callback method", since in this case the client application can specify + * a callback operation that is automatically called when the async call completes + */ +public class AsyncJDKInvocationHandler extends JDKInvocationHandler { + + private static final long serialVersionUID = 1L; + + // Run the async service invocations using a WorkScheduler + private WorkScheduler scheduler; + + public AsyncJDKInvocationHandler(ExtensionPointRegistry registry, + MessageFactory messageFactory, + ServiceReference<?> callableReference ) { + super(messageFactory, callableReference); + initWorkScheduler(registry); + } + + public AsyncJDKInvocationHandler(ExtensionPointRegistry registry, + MessageFactory messageFactory, + Class<?> businessInterface, + Invocable source ) { + super(messageFactory, businessInterface, source); + initWorkScheduler(registry); + } + + private final void initWorkScheduler(ExtensionPointRegistry registry) { + UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class); + scheduler = utilities.getUtility(WorkScheduler.class); + } // end method initWorkScheduler + + /** + * Perform the invocation of the operation + * - provides support for all 3 forms of client method: synchronous, polling and async callback + */ + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + if (Object.class == method.getDeclaringClass()) { + return invokeObjectMethod(method, args); + } + + // force the bind of the reference so that we can look at the + // target contract to see if it's asynchronous + source.getInvocationChains(); + + if (isAsyncCallback(method)) { + return doInvokeAsyncCallback(proxy, method, args); + } else if (isAsyncPoll(method)) { + return doInvokeAsyncPoll(proxy, method, args, null); + } else { + // Regular synchronous method call + return doInvokeSync(proxy, method, args); + } + } + + /** + * Indicates if a supplied method has the form of an async callback method + * @param method - the method + * @return - true if the method has the form of an async callback + */ + protected boolean isAsyncCallback(Method method) { + if (method.getName().endsWith("Async") && (method.getReturnType() == Future.class)) { + if (method.getParameterTypes().length > 0) { + return method.getParameterTypes()[method.getParameterTypes().length - 1] == AsyncHandler.class; + } + } + return false; + } + + /** + * Indicates is a supplied method has the form of an async polling method + * @param method - the method + * @return - true if the method has the form of an async polling method + */ + protected boolean isAsyncPoll(Method method) { + return method.getName().endsWith("Async") && (method.getReturnType() == Response.class); + } + + /** + * Invoke an async polling method + * @param proxy - the reference proxy + * @param asyncMethod - the async method to invoke + * @param args - array of input arguments to the method + * @return - the Response<?> object that is returned to the client application, typed by the + * type of the response + */ + @SuppressWarnings("unchecked") + protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args, AsyncHandler callback) { + Method method = getNonAsyncMethod(asyncMethod); + Class<?> returnType = method.getReturnType(); + // Allocate the Future<?> / Response<?> object - note: Response<?> is a subclass of Future<?> + AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance(returnType, businessInterface); + if (callback != null) + future.setCallback(callback); + try { + invokeAsync(proxy, method, args, future, asyncMethod); + } catch (Throwable t) { + // invokeAsync schedules a separate Runnable to run the request. Any exception caught here + // is a runtime exception, not an application exception. + if (!(t instanceof ServiceRuntimeException)) { + t = new ServiceRuntimeException("Received Throwable: " + t.getClass().getName() + + " when invoking: " + + asyncMethod.getName(), t); + } + future.setFault(t); + } // end try + return future; + } // end method doInvokeAsyncPoll + + /** + * Provide a synchronous invocation of a service operation that is either synchronous or asynchronous + * @return + */ + protected Object doInvokeSync(Object proxy, Method method, Object[] args) throws Throwable { + if (isAsyncInvocation(source)) { + // Target service is asynchronous + Class<?> returnType = method.getReturnType(); + AsyncInvocationFutureImpl future = + AsyncInvocationFutureImpl.newInstance(returnType, businessInterface); + invokeAsync(proxy, method, args, future, method); + // Wait for some maximum time for the result - 120 seconds here + // Really, if the service is async, the client should use async client methods to invoke the service + // - and be prepared to wait a *really* long time + Object response = null; + try { + response = future.get(120, TimeUnit.SECONDS); + } catch (ExecutionException ex) { + throw ex.getCause(); + } + return response; + } else { + // Target service is not asynchronous, so perform sync invocation + return super.invoke(proxy, method, args); + } // end if + } // end method doInvokeSync + + /** + * Invoke an async callback method - note that this form of the async client API has as its final parameter + * an AsyncHandler method, used for callbacks to the client code + * @param proxy - the reference proxy + * @param asyncMethod - the async method to invoke + * @param args - array of input arguments to the method + * @return - the Future<?> object that is returned to the client application, typed by the type of + * the response + */ + @SuppressWarnings("unchecked") + private Object doInvokeAsyncCallback(final Object proxy, final Method asyncMethod, final Object[] args) + throws Exception { + + AsyncHandler callback = (AsyncHandler)args[args.length - 1]; + Response response = doInvokeAsyncPoll(proxy, asyncMethod, Arrays.copyOf(args, args.length - 1), callback); + return response; + + } // end method doInvokeAsyncCallback + + /** + * Invoke the target (synchronous) method asynchronously + * @param proxy - the reference proxy object + * @param method - the method to invoke + * @param args - arguments for the call + * @param future - Future for handling the response + * @return - returns the response from the invocation + * @throws Throwable - if an exception is thrown during the invocation + */ + @SuppressWarnings("unchecked") + private void invokeAsync(Object proxy, + Method method, + Object[] args, + AsyncInvocationFutureImpl<?> future, + Method asyncMethod) throws Throwable { + if (source == null) { + throw new ServiceRuntimeException("No runtime source is available"); + } + + if (source instanceof RuntimeEndpointReference) { + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + if (epr.isOutOfDate()) { + epr.rebuild(); + chains.clear(); + } + } // end if + + InvocationChain chain = getInvocationChain(method, source); + + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + method); + } + + // Organize for an async service + RuntimeEndpoint theEndpoint = getAsyncCallback(source); + boolean isAsyncService = false; + if (theEndpoint != null) { + // ... the service is asynchronous but binding does not support async natively ... + attachFuture(theEndpoint, future); + } // end if + + if( isAsyncInvocation((RuntimeEndpointReference)source ) ) { + isAsyncService = true; + // Get hold of the JavaAsyncResponseHandler from the chain dealing with the async response + Invoker theInvoker = chain.getHeadInvoker(); + if( theInvoker instanceof InterceptorAsync ) { + InvokerAsyncResponse responseInvoker = ((InterceptorAsync)theInvoker).getPrevious(); + if( responseInvoker instanceof JDKAsyncResponseInvoker ) { + // Register the future as the response object with its ID + ((JDKAsyncResponseInvoker)responseInvoker).registerAsyncResponse(future.getUniqueID(), future); + } // end if + } // end if + } // end if + + // Perform the invocations on separate thread... + scheduler.scheduleWork(new SeparateThreadInvoker(chain, args, source, future, asyncMethod, isAsyncService)); + + return; + } // end method invokeAsync + + /** + * An inner class which acts as a runnable task for invoking services asynchronously on threads that are separate from + * those used to execute operations of components + * + * This supports both synchronous services and asynchronous services + */ + private class SeparateThreadInvoker implements Runnable { + + private AsyncInvocationFutureImpl future; + private Method asyncMethod; + private InvocationChain chain; + private Object[] args; + private Invocable invocable; + private boolean isAsyncService; + + public SeparateThreadInvoker(InvocationChain chain, + Object[] args, + Invocable invocable, + AsyncInvocationFutureImpl future, + Method asyncMethod, + boolean isAsyncService) { + super(); + this.chain = chain; + this.asyncMethod = asyncMethod; + this.args = args; + this.invocable = invocable; + this.future = future; + this.isAsyncService = isAsyncService; + } // end constructor + + public void run() { + Object result; + + try { + if (isAsyncService) { + if( supportsNativeAsync(invocable) ) { + // Binding supports native async invocations + invokeAsync(chain, args, invocable, future.getUniqueID()); + } else { + // Binding does not support native async invocations + invoke(asyncMethod, chain, args, invocable, future.getUniqueID()); + } // end if + // The result is returned asynchronously via the future... + } else { + // ... the service is synchronous ... + result = invoke(asyncMethod, chain, args, invocable); + Type type = null; + if (asyncMethod.getReturnType() == Future.class) { + // For callback async method, where a Future is returned + Type[] types = asyncMethod.getGenericParameterTypes(); + if (types.length > 0 && asyncMethod.getParameterTypes()[types.length - 1] == AsyncHandler.class) { + // Last parameter is AsyncHandler<T> + type = types[types.length - 1]; + } // end if + } else if (asyncMethod.getReturnType() == Response.class) { + // For the polling method, Response<T> + type = asyncMethod.getGenericReturnType(); + } // end if + if (type instanceof ParameterizedType) { + // Check if the parameterized type of Response<T> is a doc-lit-wrapper class + Class<?> wrapperClass = (Class<?>)((ParameterizedType)type).getActualTypeArguments()[0]; + WrapperInfo wrapperInfo = chain.getSourceOperation().getOutputWrapper(); + if (wrapperInfo != null && wrapperInfo.getWrapperClass() == wrapperClass) { + Object wrapper = wrapperClass.newInstance(); + // Find the 1st matching property + for (PropertyDescriptor p : Introspector.getBeanInfo(wrapperClass).getPropertyDescriptors()) { + if (p.getWriteMethod() == null) { + // There is a "class" property ... + continue; + } // end if + if (p.getWriteMethod().getParameterTypes()[0].isInstance(result)) { + p.getWriteMethod().invoke(wrapper, result); + result = wrapper; + break; + } // end if + } // end for + } // end if + } // end if + future.setResponse(result); + } // end if + } catch (ServiceRuntimeException s) { + Throwable e = s.getCause(); + if (e != null && e instanceof FaultException) { + if ("AsyncResponse".equals(e.getMessage())) { + // Do nothing... + } else { + future.setFault(s); + } // end if + } // end if + else { + future.setFault(s); + } + } catch (AsyncResponseException ar) { + // This exception is received in the case where the Binding does not support async invocation + // natively - the initial invocation is effectively synchronous with this exception thrown to + // indicate that the service received the request but will send the response separately - do nothing + } catch (Throwable t) { + //System.out.println("Async invoke got exception: " + t.toString()); + // If we invoked a sync service, this might be an application exception. + // The databinding ensured the exception is type-compatible with the application. + future.setFault(t); + } // end try + + } // end method run + + } // end class SeparateThreadInvoker + + /** + * Attaches a future to the callback endpoint - so that the Future is triggered when a response is + * received from the asynchronous service invocation associated with the Future + * @param endpoint - the async callback endpoint + * @param future - the async invocation future to attach + */ + private void attachFuture(RuntimeEndpoint endpoint, AsyncInvocationFutureImpl<?> future) { + Implementation impl = endpoint.getComponent().getImplementation(); + AsyncResponseHandlerImpl<?> asyncHandler = (AsyncResponseHandlerImpl<?>)impl; + asyncHandler.addFuture(future); + } // end method attachFuture + + /** + * Perform an async invocation on the reference + * @param chain - the chain + * @param args - parameters for the invocation + * @param invocable - the reference + * @param msgID - a message ID + */ + public void invokeAsync(InvocationChain chain, Object[] args, Invocable invocable, String msgID) { + Message msg = messageFactory.createMessage(); + if (invocable instanceof RuntimeEndpointReference) { + msg.setFrom((RuntimeEndpointReference)invocable); + } // end if + if (target != null) { + msg.setTo(target); + } else if (source instanceof RuntimeEndpointReference) { + msg.setTo(((RuntimeEndpointReference)invocable).getTargetEndpoint()); + } // end if + + Operation operation = chain.getTargetOperation(); + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // Deal with header information that needs to be copied from the message context to the new message... + transferMessageHeaders( msg, msgContext); + + ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID" + if( msgID != null ){ + msg.getHeaders().put("MESSAGE_ID", msgID); + } // end if + + try { + // Invoke the reference + invocable.invokeAsync(msg); + return; + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } // end try + } // end method invokeAsync + + /** + * Get the async callback endpoint - if not already created, create and start it + * @param source - the RuntimeEndpointReference which needs an async callback endpoint + * @param future + * @return - the RuntimeEndpoint of the async callback + */ + private RuntimeEndpoint getAsyncCallback(Invocable source) { + if (!(source instanceof RuntimeEndpointReference)) + return null; + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + if (!isAsyncInvocation(epr)) return null; + + // Check to see if the binding supports async invocation natively + ReferenceBindingProvider eprProvider = epr.getBindingProvider(); + if( eprProvider instanceof EndpointReferenceAsyncProvider) { + if( ((EndpointReferenceAsyncProvider)eprProvider).supportsNativeAsync() ) return null; + } // end if + + RuntimeEndpoint endpoint; + synchronized (epr) { + endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint(); + // If the async callback endpoint is already created, return it... + if (endpoint != null) + return endpoint; + // Create the endpoint for the async callback + endpoint = createAsyncCallbackEndpoint(epr); + epr.setCallbackEndpoint(endpoint); + } // end synchronized + + // Activate the new callback endpoint + startEndpoint(epr.getCompositeContext(), endpoint); + endpoint.getInvocationChains(); + + return endpoint; + } // end method setupAsyncCallback + + /** + * Start the callback endpoint + * @param compositeContext - the composite context + * @param ep - the endpoint to start + */ + private void startEndpoint(CompositeContext compositeContext, RuntimeEndpoint ep) { + for (PolicyProvider policyProvider : ep.getPolicyProviders()) { + policyProvider.start(); + } // end for + + final ServiceBindingProvider bindingProvider = ep.getBindingProvider(); + if (bindingProvider != null) { + // Allow bindings to add shutdown hooks. Requires RuntimePermission shutdownHooks in policy. + AccessController.doPrivileged(new PrivilegedAction<Object>() { + public Object run() { + bindingProvider.start(); + return null; + } + }); + compositeContext.getEndpointRegistry().addEndpoint(ep); + } + } // end method startEndpoint + + /** + * Create the async callback endpoint for a reference that is going to invoke an asyncInvocation service + * @param epr - the RuntimeEndpointReference for which the callback is created + * @return - a RuntimeEndpoint representing the callback endpoint + */ + private RuntimeEndpoint createAsyncCallbackEndpoint(RuntimeEndpointReference epr) { + CompositeContext compositeContext = epr.getCompositeContext(); + RuntimeAssemblyFactory assemblyFactory = getAssemblyFactory(compositeContext); + RuntimeEndpoint endpoint = (RuntimeEndpoint)assemblyFactory.createEndpoint(); + endpoint.bind(compositeContext); + + // Create a pseudo-component and pseudo-service + // - need to end with a chain with an invoker into the AsyncCallbackHandler class + RuntimeComponent fakeComponent = null; + try { + fakeComponent = (RuntimeComponent)epr.getComponent().clone(); + applyImplementation(fakeComponent); + } catch (CloneNotSupportedException e2) { + // will not happen + } // end try + endpoint.setComponent(fakeComponent); + + // Create pseudo-service + ComponentService service = assemblyFactory.createComponentService(); + ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry(); + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + JavaInterfaceFactory javaInterfaceFactory = + (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class); + JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract(); + try { + interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseService.class)); + } catch (InvalidInterfaceException e1) { + // Nothing to do here - will not happen + } // end try + service.setInterfaceContract(interfaceContract); + String serviceName = epr.getReference().getName() + "_asyncCallback"; + service.setName(serviceName); + // MJE 06/12/2010 - fixup for JMS binding code which looks at the implementation service + // as well as the component service... + // Create a pseudo implementation service... + Service implService = assemblyFactory.createService(); + implService.setName(serviceName); + implService.setInterfaceContract(interfaceContract); + service.setService(implService); + // + endpoint.setService(service); + // Set pseudo-service onto the pseudo-component + List<ComponentService> services = fakeComponent.getServices(); + services.clear(); + services.add(service); + + // Create a binding + Binding binding = createMatchingBinding(epr.getBinding(), fakeComponent, service, registry); + endpoint.setBinding(binding); + + // Need to establish policies here (binding has some...) + endpoint.getRequiredIntents().addAll(epr.getRequiredIntents()); + endpoint.getPolicySets().addAll(epr.getPolicySets()); + String epURI = epr.getComponent().getName() + "#service-binding(" + serviceName + "/" + serviceName + ")"; + endpoint.setURI(epURI); + endpoint.setUnresolved(false); + return endpoint; + } + + /** + * Create a matching binding to a supplied binding + * - the matching binding has the same binding type, but is for the supplied component and service + * @param matchBinding - the binding to match + * @param component - the component + * @param service - the service + * @param registry - registry for extensions + * @return - the matching binding, or null if it could not be created + */ + @SuppressWarnings("unchecked") + private Binding createMatchingBinding(Binding matchBinding, + RuntimeComponent component, + ComponentService service, + ExtensionPointRegistry registry) { + // Since there is no simple way to obtain a Factory for a binding where the type is not known ahead of + // time, the process followed here is to generate the <binding.xxx/> XML element from the binding type QName + // and then read the XML using the processor for that XML... + QName bindingName = matchBinding.getType(); + String bindingXML = + "<ns1:" + bindingName.getLocalPart() + " xmlns:ns1='" + bindingName.getNamespaceURI() + "'/>"; + + StAXArtifactProcessorExtensionPoint processors = + registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); + StAXArtifactProcessor<?> processor = (StAXArtifactProcessor<?>)processors.getProcessor(bindingName); + + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + ValidatingXMLInputFactory inputFactory = modelFactories.getFactory(ValidatingXMLInputFactory.class); + StreamSource source = new StreamSource(new StringReader(bindingXML)); + + ProcessorContext context = new ProcessorContext(); + try { + XMLStreamReader reader = inputFactory.createXMLStreamReader(source); + reader.next(); + Binding newBinding = (Binding)processor.read(reader, context); + + // Create a URI address for the callback based on the Component_Name/Reference_Name pattern + String callbackURI = "/" + component.getName() + "/" + service.getName(); + newBinding.setURI(callbackURI); + + BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class); + BindingBuilder builder = builders.getBindingBuilder(newBinding.getType()); + if (builder != null) { + org.apache.tuscany.sca.assembly.builder.BuilderContext builderContext = new BuilderContext(registry); + builder.build(component, service, newBinding, builderContext, true); + } // end if + + return newBinding; + } catch (ContributionReadException e) { + e.printStackTrace(); + } catch (XMLStreamException e) { + e.printStackTrace(); + } + + return null; + } // end method createMatchingBinding + + /** + * Gets a RuntimeAssemblyFactory from the CompositeContext + * @param compositeContext + * @return the RuntimeAssemblyFactory + */ + private RuntimeAssemblyFactory getAssemblyFactory(CompositeContext compositeContext) { + ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry(); + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class); + } // end method RuntimeAssemblyFactory + + /** + * Applies an AsyncResponseHandlerImpl as the implementation of a RuntimeComponent + * - the AsyncResponseHandlerImpl acts as both the implementation class and the implementation provider... + * @param component - the component + */ + private void applyImplementation(RuntimeComponent component) { + AsyncResponseHandlerImpl<?> asyncHandler = new AsyncResponseHandlerImpl<Object>(); + component.setImplementation(asyncHandler); + component.setImplementationProvider(asyncHandler); + return; + } // end method getImplementationProvider + + private static QName ASYNC_INVOKE = new QName(Constants.SCA11_NS, "asyncInvocation"); + + /** + * Determines if the service invocation is asynchronous + * @param source - the EPR involved in the invocation + * @return - true if the invocation is async + */ + private boolean isAsyncInvocation(Invocable source) { + if (!(source instanceof RuntimeEndpointReference)) + return false; + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + // First check is to see if the EPR itself has the asyncInvocation intent marked + for (Intent intent : epr.getRequiredIntents()) { + if (intent.getName().equals(ASYNC_INVOKE)) + return true; + } // end for + + // Second check is to see if the target service has the asyncInvocation intent marked + Endpoint ep = epr.getTargetEndpoint(); + for (Intent intent : ep.getRequiredIntents()) { + if (intent.getName().equals(ASYNC_INVOKE)) + return true; + } // end for + return false; + } // end isAsyncInvocation + + private boolean supportsNativeAsync(Invocable source) { + if (!(source instanceof RuntimeEndpointReference)) + return false; + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + + // TODO - need to update this once BindingProvider interface is refactored to contain + // supportsNativeAsync directly... + ReferenceBindingProvider provider = epr.getBindingProvider(); + if( provider instanceof EndpointReferenceAsyncProvider ) { + return ((EndpointReferenceAsyncProvider)provider).supportsNativeAsync(); + } else { + return false; + } // end if + } // end method supportsNativeAsync + + /** + * Return the synchronous method that is the equivalent of an async method + * @param asyncMethod - the async method + * @return - the equivalent synchronous method + */ + protected Method getNonAsyncMethod(Method asyncMethod) { + String methodName = asyncMethod.getName().substring(0, asyncMethod.getName().length() - 5); + for (Method m : businessInterface.getMethods()) { + if (methodName.equals(m.getName())) { + return m; + } + } + throw new IllegalStateException("No synchronous method matching async method " + asyncMethod.getName()); + } // end method getNonAsyncMethod +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java new file mode 100644 index 0000000000..7b459f3e7d --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.xml.ws.Response; + +public class AsyncResponse implements Response { + + private Object response; + private boolean isException; + + public AsyncResponse(Object response, boolean isException) { + this.response = response; + this.isException = isException; + } + + public Map getContext() { + return new HashMap(); + } + + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + public Object get() throws InterruptedException, ExecutionException { + if (isException) { + throw new ExecutionException((Throwable)response); + } else { + return response; + } + } + + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return get(); + } + + public boolean isCancelled() { + return false; + } + + public boolean isDone() { + return true; + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java new file mode 100644 index 0000000000..9de1809200 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import javax.xml.namespace.QName; + +import org.apache.tuscany.sca.assembly.Implementation; +import org.apache.tuscany.sca.assembly.Property; +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.Constants; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.policy.ExtensionType; +import org.apache.tuscany.sca.policy.Intent; +import org.apache.tuscany.sca.policy.PolicySet; +import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +/** + * A class intended to form the final link in the chain calling into a Future which represents + * the response to an asynchronous service invocation + * + * Most methods are dummies, required to fulfil the contracts for ImplementationProvider, Implementation + * and Invoker, since this class collapses together the functions of these separate interfaces, due to its + * specialized nature, where most of the function will never be used. + * + * The class acts as the implementation object that terminates the chain - and also as the provider of the implementation. + * The class accepts Future objects which represent individual invocations of forward operations on the async service + * and expects that the responses it handles as invocations will carry the unique ID of one of the Future objects in the + * message header. On receipt of each message, the class seeks out the Future with that unique ID and completes the future + * either with a response message or with a Fault. + * + * @param <V> + */ +public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>, + ImplementationProvider, Implementation, Invoker { + + private ConcurrentHashMap< String, AsyncInvocationFutureImpl<?> > table = + new ConcurrentHashMap< String, AsyncInvocationFutureImpl<?> >(); + + /** + * This class is its own invoker... + */ + public Invoker createInvoker(RuntimeComponentService service, + Operation operation) { + return this; + } + + /** + * Add a future to this response handler + * @param future - the future + */ + public void addFuture( AsyncInvocationFutureImpl<?> future ) { + // The Future is stored in the table indexed by its unique ID + table.put(future.getUniqueID(), future); + } // end method addFuture + + public boolean supportsOneWayInvocation() { + return true; + } + + public void start() {} + + public void stop() {} + + public List<Operation> getOperations() { + return null; + } + + public QName getType() { + return null; + } + + public List<Property> getProperties() { + return null; + } + + public Property getProperty(String name) { + return null; + } + + public Reference getReference(String name) { + return null; + } + + public List<Reference> getReferences() { + return null; + } + + public Service getService(String name) { + return null; + } + + public List<Service> getServices() { + return null; + } + + public String getURI() { + return null; + } + + public void setURI(String uri) {} + + public boolean isUnresolved() { + return false; + } + + public void setUnresolved(boolean unresolved) {} + + public ExtensionType getExtensionType() { + return null; + } + + public List<PolicySet> getPolicySets() { + return null; + } + + public List<Intent> getRequiredIntents() { + return null; + } + + public void setExtensionType(ExtensionType type) {} + + public void setWrappedFault(AsyncFaultWrapper e) {} + + public void setFault(Throwable e) {} + + public void setResponse(V res) { } + + /** + * Method which is the termination for the invocation chain from the callback endpoint + * @param msg - the Tuscany message containing the response from the async service invocation + * which is either the Response message or an exception of some kind + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Message invoke(Message msg) { + // Get the unique ID from the RELATES_TO message header + String idValue = (String)msg.getHeaders().get(Constants.RELATES_TO); + + if( idValue == null ) { + System.out.println( "Async message ID not found "); + } else { + // Fetch the Future with that Unique ID + AsyncInvocationFutureImpl future = table.get(idValue); + if( future == null ) { + System.out.println("Future not found for id: " + idValue); + } else { + // Complete the Future with a Response message + Object payload = msg.getBody(); + Object response; + if( payload == null ) { + System.out.println("Returned response message was null"); + } else { + if (payload.getClass().isArray()) { + response = ((Object[])payload)[0]; + } else { + response = payload; + } // end if + if( response.getClass().equals(AsyncFaultWrapper.class)) { + future.setWrappedFault((AsyncFaultWrapper) response ); + } else { + future.setResponse(response); + } // end if + } // end if + } // end if + } // end if + + // Prepare an empty response message + msg.setBody(null); + return msg; + } // end method invoke + +} // end class diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java new file mode 100644 index 0000000000..0e4d4344d2 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.DataExchangeSemantics; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.InterceptorAsync; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; +import org.apache.tuscany.sca.invocation.Phase; +import org.apache.tuscany.sca.invocation.PhasedInterceptor; + +/** + * Default implementation of an invocation chain + * + * @version $Rev$ $Date$ + */ +public class InvocationChainImpl implements InvocationChain { + private Operation sourceOperation; + private Operation targetOperation; + private List<Node> nodes = new ArrayList<Node>(); + + private final PhaseManager phaseManager; + private boolean forReference; + private boolean allowsPassByReference; + private boolean isAsyncInvocation; + + public InvocationChainImpl(Operation sourceOperation, Operation targetOperation, boolean forReference, PhaseManager phaseManager, boolean isAsyncInvocation) { + this.targetOperation = targetOperation; + this.sourceOperation = sourceOperation; + this.forReference = forReference; + this.phaseManager = phaseManager; + this.isAsyncInvocation = isAsyncInvocation; + } + + public Operation getTargetOperation() { + return targetOperation; + } + + public void setTargetOperation(Operation operation) { + this.targetOperation = operation; + } + + public void addInterceptor(Interceptor interceptor) { + if (interceptor instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)interceptor; + if (pi.getPhase() != null) { + addInvoker(pi.getPhase(), pi); + return; + } + } + String phase = forReference ? Phase.REFERENCE : Phase.SERVICE; + addInterceptor(phase, interceptor); + } // end method addInterceptor + + public void addInvoker(Invoker invoker) { + if (invoker instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)invoker; + if (pi.getPhase() != null) { + addInvoker(pi.getPhase(), pi); + return; + } + } + String phase = forReference ? Phase.REFERENCE_BINDING : Phase.IMPLEMENTATION; + addInvoker(phase, invoker); + } + + public Invoker getHeadInvoker() { + return nodes.isEmpty() ? null : nodes.get(0).getInvoker(); + } + + public Invoker getTailInvoker() { + int nodeCount = nodes.size(); + if( nodeCount > 0 ) { + return nodes.get( nodeCount - 1).getInvoker(); + } // end if + + return null; + } // end method getTailInvoker + + public Invoker getHeadInvoker(String phase) { + int index = phaseManager.getAllPhases().indexOf(phase); + if (index == -1) { + throw new IllegalArgumentException("Invalid phase name: " + phase); + } + for (Node node : nodes) { + if (index <= node.getPhaseIndex()) { + return node.getInvoker(); + } + } + return null; + } + + /** + * @return the sourceOperation + */ + public Operation getSourceOperation() { + return sourceOperation; + } + + /** + * @param sourceOperation the sourceOperation to set + */ + public void setSourceOperation(Operation sourceOperation) { + this.sourceOperation = sourceOperation; + } + + public void addInterceptor(String phase, Interceptor interceptor) { + addInvoker(phase, interceptor); + } + + private void addInvoker(String phase, Invoker invoker) { + if (isAsyncInvocation && + !(invoker instanceof InvokerAsyncRequest) && + !(invoker instanceof InvokerAsyncResponse) ){ + // TODO - should raise an error but don't want to break + // the existing non-native async support +/* + throw new IllegalArgumentException("Trying to add synchronous invoker " + + invoker.getClass().getName() + + " to asynchronous chain"); +*/ + } + + int index = phaseManager.getAllPhases().indexOf(phase); + if (index == -1) { + throw new IllegalArgumentException("Invalid phase name: " + phase); + } + Node node = new Node(index, invoker); + ListIterator<Node> li = nodes.listIterator(); + Node before = null, after = null; + boolean found = false; + while (li.hasNext()) { + before = after; + after = li.next(); + if (after.getPhaseIndex() > index) { + // Move back + li.previous(); + li.add(node); + found = true; + break; + } + } + if (!found) { + // Add to the end + nodes.add(node); + before = after; + after = null; + } + + // Relink the interceptors + if (before != null) { + if (before.getInvoker() instanceof Interceptor) { + ((Interceptor)before.getInvoker()).setNext(invoker); + if ((invoker instanceof InterceptorAsync) && + (before.getInvoker() instanceof InvokerAsyncResponse)) { + ((InterceptorAsync) invoker).setPrevious((InvokerAsyncResponse)before.getInvoker()); + } + } + } + if (after != null) { + if (invoker instanceof Interceptor) { + ((Interceptor)invoker).setNext(after.getInvoker()); + if ((after.getInvoker() instanceof InterceptorAsync) && + (invoker instanceof InvokerAsyncResponse)){ + ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsyncResponse)invoker); + } + } + } + + } + + public boolean allowsPassByReference() { + if (allowsPassByReference) { + // No need to check the invokers + return true; + } + // Check if any of the invokers allows pass-by-reference + boolean allowsPBR = false; + for (Node i : nodes) { + if (i.getInvoker() instanceof DataExchangeSemantics) { + if (((DataExchangeSemantics)i.getInvoker()).allowsPassByReference()) { + allowsPBR = true; + break; + } + } + } + return allowsPBR; + } + + public void setAllowsPassByReference(boolean allowsPBR) { + this.allowsPassByReference = allowsPBR; + } + + private static class Node { + private int phaseIndex; + private Invoker invoker; + + public Node(int phaseIndex, Invoker invoker) { + super(); + this.phaseIndex = phaseIndex; + this.invoker = invoker; + } + + public int getPhaseIndex() { + return phaseIndex; + } + + public Invoker getInvoker() { + return invoker; + } + + @Override + public String toString() { + return "(" + phaseIndex + ")" + invoker; + } + } + + public boolean isAsyncInvocation() { + return isAsyncInvocation; + } + + public void addHeadInterceptor(Interceptor interceptor) { + String phase = forReference ? Phase.REFERENCE : Phase.SERVICE_BINDING; + if (interceptor instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)interceptor; + if (pi.getPhase() != null) { + phase = pi.getPhase(); + } // end if + } // end if + + addHeadInterceptor(phase, interceptor); + } // end method addHeadInterceptor + + public void addHeadInterceptor(String phase, Interceptor interceptor) { + // TODO Auto-generated method stub + Invoker invoker = (Invoker)interceptor; + + int index = phaseManager.getAllPhases().indexOf(phase); + if (index == -1) { + throw new IllegalArgumentException("Invalid phase name: " + phase); + } // end if + Node node = new Node(index, invoker); + + ListIterator<Node> li = nodes.listIterator(); + Node before = null, after = null; + boolean found = false; + while (li.hasNext()) { + before = after; + after = li.next(); + // Look for the first node with a phase index equal to or greater than the one provided + if (after.getPhaseIndex() >= index) { + // Move back + li.previous(); + li.add(node); + found = true; + break; + } + } + if (!found) { + // Add to the end + nodes.add(node); + before = after; + after = null; + } + + // Relink the interceptors + if (before != null) { + if (before.getInvoker() instanceof Interceptor) { + ((Interceptor)before.getInvoker()).setNext(invoker); + if ((invoker instanceof InterceptorAsync) && + (before.getInvoker() instanceof InvokerAsyncResponse)) { + ((InterceptorAsync) invoker).setPrevious((InvokerAsyncResponse)before.getInvoker()); + } + } + } + if (after != null) { + if (invoker instanceof Interceptor) { + ((Interceptor)invoker).setNext(after.getInvoker()); + if ((after.getInvoker() instanceof InterceptorAsync) && + (invoker instanceof InvokerAsyncResponse)){ + ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsyncResponse)invoker); + } + } + } + + } // end method addHeadInterceptor + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java new file mode 100644 index 0000000000..bde3e92c27 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl; +import org.apache.tuscany.sca.core.invocation.Constants; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * Responsible for dispatching to a callback through a wire. <p/> TODO cache + * target invoker + * + * @version $Rev$ $Date$ + */ +public class JDKCallbackInvocationHandler extends JDKInvocationHandler { + private static final long serialVersionUID = -3350283555825935609L; + + public JDKCallbackInvocationHandler(MessageFactory messageFactory, ServiceReference<?> ref) { + super(messageFactory, ref); + this.fixedWire = false; + } + + @Override + @SuppressWarnings( {"unchecked", "rawtypes"}) + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + if (Object.class == method.getDeclaringClass()) { + return invokeObjectMethod(method, args); + } + + // obtain a dedicated wire to be used for this callback invocation + RuntimeEndpointReference wire = ((CallbackServiceReferenceImpl)callableReference).getCallbackEPR(); + if (wire == null) { + //FIXME: need better exception + throw new ServiceRuntimeException("No callback wire found"); + } + + setEndpoint(((CallbackServiceReferenceImpl)callableReference).getResolvedEndpoint()); + + InvocationChain chain = getInvocationChain(method, wire); + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + method); + } + + try { + String msgID = ((CallbackServiceReferenceImpl)callableReference).getMsgID(); + return invoke(method, chain, args, wire, msgID ); + } catch (InvocationTargetException e) { + Throwable t = e.getCause(); + throw t; + } finally { + // allow the cloned wire to be reused by subsequent callbacks + } + } + + /** + * Invoke the chain + * @param chain - the chain + * @param args - arguments to the invocation as an array of Objects + * @param source - the Endpoint or EndpointReference to which the chain relates + * @param msgID - ID of the message to which this invovation is a callback - ID ends up in "RELATES_TO" header + * @return - the Response message from the invocation + * @throws Throwable - if any exception occurs during the invocation + */ + @Override + protected Object invoke(Method method, InvocationChain chain, Object[] args, Invocable source, String msgID) + throws Throwable { + Message msg = messageFactory.createMessage(); + if (source instanceof RuntimeEndpointReference) { + msg.setFrom((RuntimeEndpointReference)source); + } + if (target != null) { + msg.setTo(target); + } else { + if (source instanceof RuntimeEndpointReference) { + msg.setTo(((RuntimeEndpointReference)source).getTargetEndpoint()); + } + } + + msg.getHeaders().put(Constants.CALLBACK, ((CallbackServiceReferenceImpl)callableReference).getCallbackHandler()); + + Invoker headInvoker = chain.getHeadInvoker(); + + Operation operation = null; + if(source instanceof RuntimeEndpoint) { + for (InvocationChain c : source.getInvocationChains()) { + Operation op = c.getTargetOperation(); + if (method.getName().equals(op.getName())) { + operation = op; + break; + } + } + } else { + operation = chain.getTargetOperation(); + } + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // Deal with header information that needs to be copied from the message context to the new message... + transferMessageHeaders( msg, msgContext); + + ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "RELATES_TO" + if( msgID != null ){ + msg.getHeaders().put(Constants.RELATES_TO, msgID); + } // end if + + try { + // dispatch the source down the chain and get the response + Message resp = headInvoker.invoke(msg); + Object body = resp.getBody(); + if (resp.isFault()) { + throw (Throwable)body; + } + return body; + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } + } // end method invoke + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java new file mode 100644 index 0000000000..827008dc73 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.io.Serializable; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; + +import javax.xml.ws.Holder; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.context.ServiceReferenceExt; +import org.apache.tuscany.sca.interfacedef.DataType; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.ParameterMode; +import org.apache.tuscany.sca.interfacedef.java.JavaOperation; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * @version $Rev$ $Date$ + */ +public class JDKInvocationHandler implements InvocationHandler, Serializable { + private static final long serialVersionUID = -3366410500152201371L; + + protected MessageFactory messageFactory; + protected Endpoint target; + protected Invocable source; + protected ServiceReferenceExt<?> callableReference; + protected Class<?> businessInterface; + + protected boolean fixedWire = true; + + protected transient Map<Method, InvocationChain> chains = new IdentityHashMap<Method, InvocationChain>(); + + public JDKInvocationHandler(MessageFactory messageFactory, Class<?> businessInterface, Invocable source) { + this.messageFactory = messageFactory; + this.source = source; + this.businessInterface = businessInterface; + } + + public JDKInvocationHandler(MessageFactory messageFactory, ServiceReference<?> callableReference) { + this.messageFactory = messageFactory; + this.callableReference = (ServiceReferenceExt<?>)callableReference; + if (callableReference != null) { + this.businessInterface = callableReference.getBusinessInterface(); + this.source = (RuntimeEndpointReference) this.callableReference.getEndpointReference(); + } + } + + + public Class<?> getBusinessInterface() { + return businessInterface; + } + + protected Object getCallbackID() { + return null; + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (Object.class == method.getDeclaringClass()) { + return invokeObjectMethod(method, args); + } + if (source == null) { + throw new ServiceRuntimeException("No runtime source is available"); + } + + if (source instanceof RuntimeEndpointReference) { + RuntimeEndpointReference epr = (RuntimeEndpointReference)source; + if (epr.isOutOfDate()) { + epr.rebuild(); + chains.clear(); + } + } + + InvocationChain chain = getInvocationChain(method, source); + + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + method); + } + + // Holder pattern. Items stored in a Holder<T> are promoted to T. + // After the invoke, the returned data <T> are placed back in Holder<T>. + Object [] promotedArgs = promoteHolderArgs( args ); + + // Strip out OUT-only arguments. Not too sure if the presence + // of a sourceOperation is exactly the right check to use to + // know whether or not to do this, but will assume it is until + // learning otherwise. + Operation sourceOp = chain.getSourceOperation(); + if (sourceOp != null) { + promotedArgs = removeOutOnlyArgs(sourceOp, promotedArgs ); + } + + Object result = invoke(method, chain, promotedArgs, source); + + // TODO - Based on the code in JavaInterfaceIntrospectorImpl, it seems there are + // some cases involving generics that we're not taking into account. + boolean voidReturnType = (void.class == method.getReturnType() ? true : false); + + // Returned Holder data <T> are placed back in Holder<T>. + boolean holderPattern = false; + Class [] parameters = method.getParameterTypes(); + if ( parameters != null ) { + int resultIdx = (voidReturnType ? 0 : 1); + for ( int i = 0; i < parameters.length; i++ ) { + Class parameterType = parameters[ i ]; + if ( isHolder( parameterType ) ) { + holderPattern = true; + // Pop results and place in holder (demote). + Holder holder = (Holder) args[ i ]; + + Object[] results = (Object[])result; + if ( result != null ) { + holder.value = results[resultIdx++]; + } + } + } + } + if (holderPattern && result != null) { + if (voidReturnType) { + return null; + } else { + return ((Object[])result)[0]; + } + } else { + return result; + } + } + + /** + * Handle the methods on the Object.class + * @param method + * @param args + */ + protected Object invokeObjectMethod(Method method, Object[] args) throws Throwable { + String name = method.getName(); + if ("toString".equals(name)) { + return "[Proxy - " + toString() + "]"; + } else if ("equals".equals(name)) { + Object obj = args[0]; + if (obj == null) { + return false; + } + if (!Proxy.isProxyClass(obj.getClass())) { + return false; + } + return equals(Proxy.getInvocationHandler(obj)); + } else if ("hashCode".equals(name)) { + return hashCode(); + } else { + return method.invoke(this); + } + } + + /** + * Determines if the given operation matches the given method + * + * @return true if the operation matches, false if does not + */ + // FIXME: Should it be in the InterfaceContractMapper? + @SuppressWarnings("unchecked") + private static boolean match(Operation operation, Method method) { + if (operation instanceof JavaOperation) { + JavaOperation javaOp = (JavaOperation)operation; + Method m = javaOp.getJavaMethod(); + if (!method.getName().equals(m.getName())) { + return false; + } + if (method.equals(m)) { + return true; + } + } else { + if (!method.getName().equals(operation.getName())) { + return false; + } + } + + // For remotable interface, operation is not overloaded. + if (operation.getInterface().isRemotable()) { + return true; + } + + Class<?>[] params = method.getParameterTypes(); + + DataType<List<DataType>> inputType = null; + if (operation.isInputWrapperStyle()) { + inputType = operation.getInputWrapper().getUnwrappedType(); + } else { + inputType = operation.getInputType(); + } + List<DataType> types = inputType.getLogical(); + boolean matched = true; + if (types.size() == params.length && method.getName().equals(operation.getName())) { + for (int i = 0; i < params.length; i++) { + Class<?> clazz = params[i]; + Class<?> type = types.get(i).getPhysical(); + // Object.class.isAssignableFrom(int.class) returns false + if (type != Object.class && (!type.isAssignableFrom(clazz))) { + matched = false; + } + } + } else { + matched = false; + } + return matched; + + } + + protected synchronized InvocationChain getInvocationChain(Method method, Invocable source) { + if (source instanceof RuntimeEndpoint) { + // [rfeng] Start with the binding invocation chain + return source.getBindingInvocationChain(); + } + if (fixedWire && chains.containsKey(method)) { + return chains.get(method); + } + InvocationChain found = null; + for (InvocationChain chain : source.getInvocationChains()) { + Operation operation = chain.getSourceOperation(); + if (operation.isDynamic()) { + operation.setName(method.getName()); + found = chain; + break; + } else if (match(operation, method)) { + found = chain; + break; + } + } + if (fixedWire) { + chains.put(method, found); + } + return found; + } + + protected void setEndpoint(Endpoint endpoint) { + this.target = endpoint; + } + + protected Object invoke(Method method, InvocationChain chain, Object[] args, Invocable source) + throws Throwable { + return invoke( method, chain, args, source, null ); + } + + /** + * Invoke the chain + * @param chain - the chain + * @param args - arguments to the invocation as an array of Objects + * @param source - the Endpoint or EndpointReference to which the chain relates + * @param msgID - an ID for the message being sent, may be null + * @return - the Response message from the invocation + * @throws Throwable - if any exception occurs during the invocation + */ + protected Object invoke(Method method, InvocationChain chain, Object[] args, Invocable source, String msgID) + throws Throwable { + Message msg = messageFactory.createMessage(); + if (source instanceof RuntimeEndpointReference) { + msg.setFrom((RuntimeEndpointReference)source); + } + if (target != null) { + msg.setTo(target); + } else { + if (source instanceof RuntimeEndpointReference) { + msg.setTo(((RuntimeEndpointReference)source).getTargetEndpoint()); + } + } + Invoker headInvoker = chain.getHeadInvoker(); + Operation operation = null; + if(source instanceof RuntimeEndpoint) { + // [rfeng] We cannot use the targetOperation from the binding invocation chain. + // For each method, we need to find the matching operation so that we can set the operation on to the message + for (InvocationChain c : source.getInvocationChains()) { + Operation op = c.getTargetOperation(); + if (method.getName().equals(op.getName())) { + operation = op; + break; + } + } + } else { + operation = chain.getTargetOperation(); + } + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // Deal with header information that needs to be copied from the message context to the new message... + transferMessageHeaders( msg, msgContext); + + ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID" + if( msgID != null ){ + msg.getHeaders().put("MESSAGE_ID", msgID); + } // end if + + try { + // dispatch the source down the chain and get the response + Message resp = headInvoker.invoke(msg); + Object body = resp.getBody(); + if (resp.isFault()) { + throw (Throwable)body; + } + return body; + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } + } + + /** + * Transfer relevant header information from the old message (incoming) to the new message (outgoing) + * @param newMsg + * @param oldMsg + */ + protected void transferMessageHeaders( Message newMsg, Message oldMsg ) { + if( oldMsg == null ) return; + // For the present, simply copy all the headers + if( !oldMsg.getHeaders().isEmpty() ) newMsg.getHeaders().putAll( oldMsg.getHeaders() ); + } // end transferMessageHeaders + + /** + * @return the callableReference + */ + public ServiceReference<?> getCallableReference() { + return callableReference; + } + + /** + * @param callableReference the callableReference to set + */ + public void setCallableReference(ServiceReference<?> callableReference) { + this.callableReference = (ServiceReferenceExt<?>)callableReference; + } + + /** + * Creates a copy of arguments. Holder<T> values are promoted to T. + * Note. It is essential that arg Holders not be destroyed here. + * PromotedArgs should not destroy holders. They are used on response return. + * @param args containing Holders and other objects. + * @return Object [] + */ + protected static Object [] promoteHolderArgs( Object [] args ) { + if ( args == null ) + return args; + Object [] promotedArgs = new Object[ args.length ]; + + for ( int i = 0; i < args.length; i++ ) { + Object argument = args[ i ]; + if ( argument != null ) { + if ( isHolder( argument ) ) { + promotedArgs[ i ] = ((Holder)argument).value; + } else { + promotedArgs[ i ] = args[ i ]; + } + + } + } + return promotedArgs; + } + + /** + * Given an argument array, filters out (removes) OUT-only parameters + * @param sourceOp + * @return array of filtered arguments + */ + Object[] removeOutOnlyArgs(Operation sourceOp, Object[] args) { + if ( args == null ) + return args; + ArrayList<Object> retValList = new ArrayList<Object>(); + List<ParameterMode> parmList = sourceOp.getParameterModes(); + for (int i = 0; i < args.length; i++) { + if (parmList.get(i) != ParameterMode.OUT) { + retValList.add(args[i]); + } + } + return retValList.toArray(); + } + + /** + * Given a Class, tells if it is a Holder by comparing to "javax.xml.ws.Holder" + * @param testClass + * @return boolean whether class is Holder type. + */ + protected static boolean isHolder( Class testClass ) { + if ( testClass.getName().startsWith( "javax.xml.ws.Holder" )) { + return true; + } + return false; + } + + + /** + * Given an Object, tells if it is a Holder by comparing to "javax.xml.ws.Holder" + * @param testClass + * @return boolean stating whether Object is a Holder type. + * @author DOB + */ + protected static boolean isHolder( Object object ) { + String objectName = object.getClass().getName(); + if ( object instanceof javax.xml.ws.Holder ) { + return true; + } + return false; + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java new file mode 100644 index 0000000000..7163357042 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; + +import javax.xml.bind.JAXBContext; +import javax.xml.ws.AsyncHandler; +import javax.xml.ws.Response; + +import org.apache.tuscany.sca.common.java.collection.LRUCache; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.core.context.ServiceReferenceExt; +import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl; +import org.apache.tuscany.sca.core.context.impl.ServiceReferenceImpl; +import org.apache.tuscany.sca.core.invocation.ProxyCreationException; +import org.apache.tuscany.sca.core.invocation.ProxyFactory; +import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * the default implementation of a wire service that uses JDK dynamic proxies + * + * @version $Rev$ $Date$ + */ +public class JDKProxyFactory implements ProxyFactory, LifeCycleListener { + protected ExtensionPointRegistry registry; + protected InterfaceContractMapper contractMapper; + private MessageFactory messageFactory; + + public JDKProxyFactory(ExtensionPointRegistry registry, + MessageFactory messageFactory, + InterfaceContractMapper mapper) { + this.registry = registry; + this.contractMapper = mapper; + this.messageFactory = messageFactory; + } + + /** + * The original createProxy method assumes that the proxy doesn't want to + * share conversation state so sets the conversation object to null + */ + public <T> T createProxy(final Class<T> interfaze, Invocable invocable) throws ProxyCreationException { + if (invocable instanceof RuntimeEndpoint) { + InvocationHandler handler; + // TUSCANY-3659 - Always install a asynch handler regardless of whether ref is sync or async + // needs tidying + // if (isAsync(interfaze)) { + handler = new AsyncJDKInvocationHandler(registry, messageFactory, interfaze, invocable); + // } else { + // handler = new JDKInvocationHandler(messageFactory, interfaze, invocable); + // } + // Allow privileged access to class loader. Requires RuntimePermission in security policy. + ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { + public ClassLoader run() { + return interfaze.getClassLoader(); + } + }); + T proxy = interfaze.cast(newProxyInstance(cl, new Class[] {interfaze}, handler)); + return proxy; + } + ServiceReference<T> serviceReference = new ServiceReferenceImpl<T>(interfaze, invocable, null); + return createProxy(serviceReference); + } + + public <T> T createProxy(ServiceReference<T> callableReference) throws ProxyCreationException { + assert callableReference != null; + final Class<T> interfaze = callableReference.getBusinessInterface(); + InvocationHandler handler; + // TUSCANY-3659 - Always install a asynch handler regardless of whether ref is sync or async + // needs tidying + // if (isAsync(interfaze)) { + handler = new AsyncJDKInvocationHandler(registry, messageFactory, callableReference); + // } else { + // handler = new JDKInvocationHandler(messageFactory, callableReference); + // } + // Allow privileged access to class loader. Requires RuntimePermission in security policy. + ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { + public ClassLoader run() { + return interfaze.getClassLoader(); + } + }); + T proxy = interfaze.cast(newProxyInstance(cl, new Class[] {interfaze}, handler)); + ((ServiceReferenceExt<T>)callableReference).setProxy(proxy); + return proxy; + } + + private boolean isAsync(Class<?> interfaze) { + for (Method method : interfaze.getMethods()) { + if (method.getName().endsWith("Async")) { + if (method.getReturnType().isAssignableFrom(Future.class)) { + if (method.getParameterTypes().length > 0) { + if (method.getParameterTypes()[method.getParameterTypes().length - 1] + .isAssignableFrom(AsyncHandler.class)) { + return true; + } + } + } + if (method.getReturnType().isAssignableFrom(Response.class)) { + return true; + } + } + } + return false; + } + + public <T> T createCallbackProxy(Class<T> interfaze, List<? extends Invocable> wires) throws ProxyCreationException { + ServiceReferenceImpl<T> callbackReference = null; + try { + callbackReference = new CallbackServiceReferenceImpl(interfaze, wires); + } catch (ServiceRuntimeException e) { + // [rfeng] In case that the call is not from a bidirectional interface, the field should be injected with null + callbackReference = null; + } + return callbackReference != null ? createCallbackProxy(callbackReference) : null; + } + + public <T> T createCallbackProxy(ServiceReference<T> callbackReference) throws ProxyCreationException { + assert callbackReference != null; + final Class<T> interfaze = callbackReference.getBusinessInterface(); + InvocationHandler handler = new JDKCallbackInvocationHandler(messageFactory, callbackReference); + ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { + public ClassLoader run() { + return interfaze.getClassLoader(); + } + }); + T proxy = interfaze.cast(newProxyInstance(cl, new Class[] {interfaze}, handler)); + ((ServiceReferenceExt<T>)callbackReference).setProxy(proxy); + return proxy; + } + + public <B, R extends ServiceReference<B>> R cast(B target) throws IllegalArgumentException { + InvocationHandler handler = Proxy.getInvocationHandler(target); + if (handler instanceof JDKInvocationHandler) { + return (R)((JDKInvocationHandler)handler).getCallableReference(); + } else { + throw new IllegalArgumentException("The object is not a known proxy."); + } + } + + /** + * @see org.apache.tuscany.sca.core.invocation.ProxyFactory#isProxyClass(java.lang.Class) + */ + public boolean isProxyClass(Class<?> clazz) { + return Proxy.isProxyClass(clazz); + } + + // This is a cache containing the proxy class constructor for each business interface. + // This improves performance compared to calling Proxy.newProxyInstance() + // every time that a proxy is needed. + private final LRUCache<Class<?>, Constructor<?>> cache = new LRUCache<Class<?>, Constructor<?>>(512); + + public Object newProxyInstance(ClassLoader classloader, Class<?> interfaces[], InvocationHandler invocationhandler) + throws IllegalArgumentException { + if (interfaces.length > 1) { + // We only cache the proxy constructors with one single interface which the case in SCA where + // one reference can have one interface + return Proxy.newProxyInstance(classloader, interfaces, invocationhandler); + } + try { + if (invocationhandler == null) + throw new NullPointerException("InvocationHandler is null"); + // Lookup cached constructor. aclass[0] is the reference's business interface. + Constructor<?> proxyCTOR; + synchronized (cache) { + proxyCTOR = cache.get(interfaces[0]); + } + if (proxyCTOR == null) { + Class<?> proxyClass = Proxy.getProxyClass(classloader, interfaces); + proxyCTOR = proxyClass.getConstructor(InvocationHandler.class); + synchronized (cache) { + cache.put(interfaces[0], proxyCTOR); + } + } + return proxyCTOR.newInstance(invocationhandler); + } catch (Throwable e) { + throw new IllegalArgumentException(e); + } + } + + public void start() { + } + + public void stop() { + cache.clear(); + } + + public void removeProxiesForContribution(ClassLoader contributionClassloader){ + try { + synchronized(cache) { + Set<Class<?>> objSet = cache.keySet(); + List<Class<?>> toRemove = new ArrayList<Class<?>>(); + Iterator<Class<?>> i = objSet.iterator(); + loop: + while(i.hasNext()) { + Class<?> cls = i.next(); + ClassLoader cl = cls.getClassLoader(); + while (cl != null){ + if (cl == contributionClassloader){ + toRemove.add(cls); + break loop; + } + // take account of generated classes + cl = cl.getParent(); + } + } + for (Class<?> cls : toRemove){ + cache.remove(cls); + } + } + } catch(Exception e) { + throw new ServiceRuntimeException(e); + } + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java new file mode 100644 index 0000000000..6f3e947631 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.io.Serializable; + +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; + +/** + * Implementation of MessageFactory. + * + * @version $Rev$ $Date$ + */ +public class MessageFactoryImpl implements MessageFactory, Serializable { + + /** + * + */ + private static final long serialVersionUID = -2112289169275106977L; + + public Message createMessage() { + return new MessageImpl(); + } + +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java new file mode 100644 index 0000000000..cc8cb48cc5 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Message; + +/** + * The default implementation of a message flowed through a wire during an invocation + * + * @version $Rev $Date$ + */ +public class MessageImpl implements Message { + private Map<String, Object> headers = new HashMap<String, Object>(); + private Object body; + private Object messageID; + private boolean isFault; + private Operation operation; + + private EndpointReference from; + private Endpoint to; + + private Object bindingContext; + + public MessageImpl() { + this.from = null; + this.to = null; + } + + @SuppressWarnings("unchecked") + public <T> T getBody() { + return (T)body; + } + + public <T> void setBody(T body) { + this.isFault = false; + this.body = body; + } + + public Object getMessageID() { + return messageID; + } + + public void setMessageID(Object messageId) { + this.messageID = messageId; + } + + public boolean isFault() { + return isFault; + } + + public void setFaultBody(Object fault) { + this.isFault = true; + this.body = fault; + } + + public EndpointReference getFrom() { + return from; + } + + public void setFrom(EndpointReference from) { + this.from = from; + } + + public Endpoint getTo() { + return to; + } + + public void setTo(Endpoint to) { + this.to = to; + } + + public Operation getOperation() { + return operation; + } + + public void setOperation(Operation op) { + this.operation = op; + } + + public Map<String, Object> getHeaders() { + return headers; + } + + @SuppressWarnings("unchecked") + public <T> T getBindingContext() { + return (T)bindingContext; + } + + public <T> void setBindingContext(T bindingContext) { + this.bindingContext = bindingContext; + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java new file mode 100644 index 0000000000..45f4bf52bf --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import org.apache.tuscany.sca.core.invocation.ProxyCreationException; + + +/** + * Thrown when an {@link org.apache.tuscany.sca.core.factory.model.Operation} cannot be mapped to a method on an interface + * @version $Rev$ $Date$ + */ +public class NoMethodForOperationException extends ProxyCreationException { + private static final long serialVersionUID = 5116536602309483679L; + + public NoMethodForOperationException() { + } + + public NoMethodForOperationException(String message) { + super(message); + } + + public NoMethodForOperationException(String message, Throwable cause) { + super(message, cause); + } + + public NoMethodForOperationException(Throwable cause) { + super(cause); + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java new file mode 100644 index 0000000000..85ef79b5d7 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import static org.apache.tuscany.sca.invocation.Phase.IMPLEMENTATION; +import static org.apache.tuscany.sca.invocation.Phase.IMPLEMENTATION_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_TRANSPORT; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_WIREFORMAT; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_INTERFACE; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_OPERATION_SELECTOR; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_TRANSPORT; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_WIREFORMAT; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_INTERFACE; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_POLICY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.extensibility.ServiceDeclaration; +import org.apache.tuscany.sca.invocation.Phase; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * @version $Rev$ $Date$ + */ +public class PhaseManager { + private static final Logger log = Logger.getLogger(PhaseManager.class.getName()); + + public static final String STAGE_REFERENCE = "reference"; + public static final String STAGE_REFERENCE_BINDING = "reference.binding"; + public static final String STAGE_SERVICE_BINDING = "service.binding"; + public static final String STAGE_SERVICE = "service"; + public static final String STAGE_IMPLEMENTATION = "implementation"; + + private static final String[] SYSTEM_REFERENCE_PHASES = + {REFERENCE, REFERENCE_POLICY, REFERENCE_INTERFACE, REFERENCE_BINDING}; + + private static final String[] SYSTEM_REFERENCE_BINDING_PHASES = + {REFERENCE_BINDING_WIREFORMAT, REFERENCE_BINDING_POLICY, REFERENCE_BINDING_TRANSPORT}; + + private static final String[] SYSTEM_SERVICE_BINDING_PHASES = + {SERVICE_BINDING_TRANSPORT, SERVICE_BINDING_OPERATION_SELECTOR, SERVICE_BINDING_WIREFORMAT, SERVICE_BINDING_POLICY}; + + private static final String[] SYSTEM_SERVICE_PHASES = + {SERVICE_BINDING, SERVICE_INTERFACE, SERVICE_POLICY, SERVICE}; + + private static final String[] SYSTEM_IMPLEMENTATION_PHASES = {IMPLEMENTATION_POLICY, IMPLEMENTATION}; + + private ExtensionPointRegistry registry; + private String pattern = Phase.class.getName(); + private Map<String, Stage> stages; + private List<String> phases; + + public class Stage { + private String name; + private PhaseSorter<String> sorter = new PhaseSorter<String>(); + private Set<String> firstSet = new HashSet<String>(); + private Set<String> lastSet = new HashSet<String>(); + private List<String> phases = new ArrayList<String>(); + + public Stage(String name) { + super(); + this.name = name; + } + + public String getName() { + return name; + } + + public PhaseSorter<String> getSorter() { + return sorter; + } + + public Set<String> getFirstSet() { + return firstSet; + } + + public Set<String> getLastSet() { + return lastSet; + } + + public List<String> getPhases() { + return phases; + } + + @Override + public String toString() { + return name + phases; + } + } + + /** + * @param registry + */ + public PhaseManager(ExtensionPointRegistry registry) { + super(); + this.registry = registry; + } + + public static PhaseManager getInstance(ExtensionPointRegistry registry) { + UtilityExtensionPoint utilityExtensionPoint = registry.getExtensionPoint(UtilityExtensionPoint.class); + return utilityExtensionPoint.getUtility(PhaseManager.class); + } + + // For unit test purpose + PhaseManager(String pattern) { + super(); + this.pattern = pattern; + this.registry = new DefaultExtensionPointRegistry(); + } + + private List<String> getPhases(String stage) { + Stage s = getStages().get(stage); + return s == null ? null : s.getPhases(); + } + + public List<String> getReferencePhases() { + return getPhases(STAGE_REFERENCE); + } + + public List<String> getServicePhases() { + return getPhases(STAGE_SERVICE); + } + + public List<String> getReferenceBindingPhases() { + return getPhases(STAGE_REFERENCE_BINDING); + } + + public List<String> getServiceBindingPhases() { + return getPhases(STAGE_SERVICE_BINDING); + } + + public List<String> getImplementationPhases() { + return getPhases(STAGE_IMPLEMENTATION); + } + + public synchronized List<String> getAllPhases() { + if (phases == null) { + phases = new ArrayList<String>(); + phases.addAll(getReferencePhases()); + phases.addAll(getReferenceBindingPhases()); + phases.addAll(getServiceBindingPhases()); + phases.addAll(getServicePhases()); + phases.addAll(getImplementationPhases()); + } + return phases; + } + + public synchronized Map<String, Stage> getStages() { + if (stages != null) { + return stages; + } + init(); + + Collection<ServiceDeclaration> services; + try { + services = registry.getServiceDiscovery().getServiceDeclarations(pattern); + } catch (IOException e) { + throw new ServiceRuntimeException(e); + } + + for (ServiceDeclaration d : services) { + if (log.isLoggable(Level.FINE)) { + log.fine(d.getLocation() + ": " + d.getAttributes()); + } + String name = d.getAttributes().get("name"); + if (name == null) { + throw new ServiceRuntimeException("Required attribute 'name' is missing."); + } + String stageName = d.getAttributes().get("stage"); + if (stageName == null) { + throw new ServiceRuntimeException("Required attribute 'stage' is missing."); + } + Stage stage = stages.get(stageName); + if (stage == null) { + throw new ServiceRuntimeException("Invalid stage: " + stageName); + } + PhaseSorter<String> graph = stage.getSorter(); + Set<String> firstSet = stage.getFirstSet(), lastSet = stage.getLastSet(); + + String before = d.getAttributes().get("before"); + String after = d.getAttributes().get("after"); + if (before != null) { + StringTokenizer tokenizer = new StringTokenizer(before); + while (tokenizer.hasMoreTokens()) { + String p = tokenizer.nextToken(); + if (!"*".equals(p)) { + graph.addEdge(name, p); + } else { + firstSet.add(name); + } + } + } + if (after != null) { + StringTokenizer tokenizer = new StringTokenizer(after); + while (tokenizer.hasMoreTokens()) { + String p = tokenizer.nextToken(); + if (!"*".equals(p)) { + graph.addEdge(p, name); + } else { + lastSet.add(name); + } + } + } + graph.addVertext(name); + if(firstSet.size()>1) { + log.warning("More than one phases are declared to be first: "+firstSet); + } + for (String s : firstSet) { + for (String v : new HashSet<String>(graph.getVertices().keySet())) { + if (!firstSet.contains(v)) { + graph.addEdge(s, v); + } + } + } + if(lastSet.size()>1) { + log.warning("More than one phases are declared to be the last: "+lastSet); + } + for (String s : lastSet) { + for (String v : new HashSet<String>(graph.getVertices().keySet())) { + if (!lastSet.contains(v)) { + graph.addEdge(v, s); + } + } + } + + } + + for (Stage s : stages.values()) { + List<String> phases = s.getSorter().topologicalSort(false); + s.getPhases().clear(); + s.getPhases().addAll(phases); + } + if (log.isLoggable(Level.FINE)) { + log.fine("Stages: " + stages); + } + return stages; + } + + private void init() { + stages = new HashMap<String, Stage>(); + + Stage referenceStage = new Stage(STAGE_REFERENCE); + for (int i = 1; i < SYSTEM_REFERENCE_PHASES.length; i++) { + referenceStage.getSorter().addEdge(SYSTEM_REFERENCE_PHASES[i - 1], SYSTEM_REFERENCE_PHASES[i]); + } + referenceStage.getLastSet().add(REFERENCE_BINDING); + stages.put(referenceStage.getName(), referenceStage); + + Stage referenceBindingStage = new Stage(STAGE_REFERENCE_BINDING); + for (int i = 1; i < SYSTEM_REFERENCE_BINDING_PHASES.length; i++) { + referenceBindingStage.getSorter().addEdge(SYSTEM_REFERENCE_BINDING_PHASES[i - 1], SYSTEM_REFERENCE_BINDING_PHASES[i]); + } + stages.put(referenceBindingStage.getName(), referenceBindingStage); + + Stage serviceBindingStage = new Stage(STAGE_SERVICE_BINDING); + for (int i = 1; i < SYSTEM_SERVICE_BINDING_PHASES.length; i++) { + serviceBindingStage.getSorter().addEdge(SYSTEM_SERVICE_BINDING_PHASES[i - 1], SYSTEM_SERVICE_BINDING_PHASES[i]); + } + stages.put(serviceBindingStage.getName(), serviceBindingStage); + + + Stage serviceStage = new Stage(STAGE_SERVICE); + for (int i = 1; i < SYSTEM_SERVICE_PHASES.length; i++) { + serviceStage.getSorter().addEdge(SYSTEM_SERVICE_PHASES[i - 1], SYSTEM_SERVICE_PHASES[i]); + } + stages.put(serviceStage.getName(), serviceStage); + + Stage implementationStage = new Stage(STAGE_IMPLEMENTATION); + for (int i = 1; i < SYSTEM_IMPLEMENTATION_PHASES.length; i++) { + implementationStage.getSorter().addEdge(SYSTEM_IMPLEMENTATION_PHASES[i - 1], + SYSTEM_IMPLEMENTATION_PHASES[i]); + } + implementationStage.getLastSet().add(IMPLEMENTATION); + stages.put(implementationStage.getName(), implementationStage); + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java new file mode 100644 index 0000000000..175f3463ad --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Directed, weighted graph + * + * @param <V> The type of vertex object + * @param <E> The type of edge object + * + * @version $Rev$ $Date$ + */ +public class PhaseSorter<V> implements Cloneable { + private final Map<V, Vertex> vertices = new HashMap<V, Vertex>(); + + /** + * Vertex of a graph + */ + public final class Vertex { + private V value; + + // TODO: Do we want to support multiple edges for a vertex pair? If so, + // we should use a List instead of Map + private Map<Vertex, Edge> outEdges = new HashMap<Vertex, Edge>(); + private Map<Vertex, Edge> inEdges = new HashMap<Vertex, Edge>(); + + private Vertex(V value) { + this.value = value; + } + + @Override + public String toString() { + return "(" + value + ")"; + } + + public V getValue() { + return value; + } + + public Map<Vertex, Edge> getOutEdges() { + return outEdges; + } + + public Map<Vertex, Edge> getInEdges() { + return inEdges; + } + + } + + /** + * An Edge connects two vertices in one direction + */ + public final class Edge { + private Vertex sourceVertex; + + private Vertex targetVertex; + + public Edge(Vertex source, Vertex target) { + this.sourceVertex = source; + this.targetVertex = target; + } + + @Override + public String toString() { + return sourceVertex + "->" + targetVertex; + } + + public Vertex getTargetVertex() { + return targetVertex; + } + + public void setTargetVertex(Vertex vertex) { + this.targetVertex = vertex; + } + + public Vertex getSourceVertex() { + return sourceVertex; + } + + public void setSourceVertex(Vertex sourceVertex) { + this.sourceVertex = sourceVertex; + } + } + + public void addEdge(V source, V target) { + Vertex s = getVertex(source); + if (s == null) { + s = new Vertex(source); + vertices.put(source, s); + } + Vertex t = getVertex(target); + if (t == null) { + t = new Vertex(target); + vertices.put(target, t); + } + Edge edge = new Edge(s, t); + s.outEdges.put(t, edge); + t.inEdges.put(s, edge); + } + + public void addVertext(V source) { + Vertex s = getVertex(source); + if (s == null) { + s = new Vertex(source); + vertices.put(source, s); + } + } + + public Vertex getVertex(V source) { + Vertex s = vertices.get(source); + return s; + } + + public boolean removeEdge(V source, V target) { + Vertex s = getVertex(source); + if (s == null) { + return false; + } + + Vertex t = getVertex(target); + if (t == null) { + return false; + } + + return s.outEdges.remove(t) != null && t.inEdges.remove(s) != null; + + } + + public void removeEdge(Edge edge) { + edge.sourceVertex.outEdges.remove(edge.targetVertex); + edge.targetVertex.inEdges.remove(edge.sourceVertex); + } + + public void removeVertex(Vertex vertex) { + vertices.remove(vertex.getValue()); + for (Edge e : new ArrayList<Edge>(vertex.outEdges.values())) { + removeEdge(e); + } + for (Edge e : new ArrayList<Edge>(vertex.inEdges.values())) { + removeEdge(e); + } + } + + public Edge getEdge(Vertex source, Vertex target) { + return source.outEdges.get(target); + } + + public Edge getEdge(V source, V target) { + Vertex sv = getVertex(source); + if (sv == null) { + return null; + } + Vertex tv = getVertex(target); + if (tv == null) { + return null; + } + return getEdge(getVertex(source), getVertex(target)); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + for (Vertex v : vertices.values()) { + sb.append(v.outEdges.values()).append("\n"); + } + return sb.toString(); + } + + public Map<V, Vertex> getVertices() { + return vertices; + } + + public void addGraph(PhaseSorter<V> otherGraph) { + for (Vertex v : otherGraph.vertices.values()) { + for (Edge e : v.outEdges.values()) { + addEdge(e.sourceVertex.value, e.targetVertex.value); + } + } + } + + private Vertex getFirst() { + for (Vertex v : vertices.values()) { + if (v.inEdges.isEmpty()) { + return v; + } + } + if (!vertices.isEmpty()) { + throw new IllegalArgumentException("Circular ordering has been detected: " + toString()); + } else { + return null; + } + } + + public List<V> topologicalSort(boolean readOnly) { + PhaseSorter<V> graph = (!readOnly) ? this : (PhaseSorter<V>)clone(); + List<V> list = new ArrayList<V>(); + while (true) { + Vertex v = graph.getFirst(); + if (v == null) { + break; + } + list.add(v.getValue()); + graph.removeVertex(v); + } + + return list; + } + + @Override + public Object clone() { + PhaseSorter<V> copy = new PhaseSorter<V>(); + copy.addGraph(this); + return copy; + } +} |