From 84224f4f575d39eb3e3a2a906a8b4733973287d8 Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Fri, 25 Jun 2010 10:24:08 +0000 Subject: Provide basic support for invocation of Async Server style service methods, as described in TUSCANY-3611 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@957880 13f79535-47bb-0310-9956-ffa450edef68 --- .../invocation/JavaAsyncImplementationInvoker.java | 136 ++++++++++++++++++++ .../invocation/JavaComponentContextProvider.java | 8 +- .../java/invocation/ResponseDispatchImpl.java | 137 +++++++++++++++++++++ 3 files changed, 279 insertions(+), 2 deletions(-) create mode 100644 sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java create mode 100644 sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java (limited to 'sca-java-2.x/trunk') diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java new file mode 100644 index 0000000000..2e5ca19dd5 --- /dev/null +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.java.invocation; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.concurrent.TimeUnit; + +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.core.factory.InstanceWrapper; +import org.apache.tuscany.sca.core.factory.ObjectCreationException; +import org.apache.tuscany.sca.core.scope.ScopedRuntimeComponent; +import org.apache.tuscany.sca.implementation.java.JavaImplementation; +import org.apache.tuscany.sca.interfacedef.DataType; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.java.impl.JavaInterfaceUtil; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.oasisopen.sca.ResponseDispatch; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * Responsible for asynchronously dispatching an invocation to a Java component + * implementation instance + * + */ +public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { + + public JavaAsyncImplementationInvoker(Operation operation, Method method, RuntimeComponent component) { + super( operation, method, component); + assert method != null : "Operation method cannot be null"; + assert operation.isAsyncServer() : "Operation must be async"; + } // end constructor + + public Message invoke(Message msg) { + Operation op = this.operation; + + Object payload = msg.getBody(); + + Object contextId = null; + + // store the current thread context classloader + // - replace it with the class loader used to load the java class as per SCA Spec + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); + + try { + // The following call might create a new conversation, as a result, the msg.getConversationID() might + // return a new value + InstanceWrapper wrapper = scopeContainer.getWrapper(contextId); + + Object instance = wrapper.getInstance(); + + // Set the TCCL to the classloader used to load the implementation class + Thread.currentThread().setContextClassLoader(instance.getClass().getClassLoader()); + + // For an async server method, there is an extra input parameter, which is a DispatchResponse instance + // which is typed by the type of the response + Class responseType = op.getOutputType().getPhysical(); + ResponseDispatch dispatch = ResponseDispatchImpl.newInstance(responseType); + + Object ret; + Object[] payload2; + if (payload != null && !payload.getClass().isArray()) { + payload2 = new Object[2]; + payload2[0] = payload; + } else { + payload2 = new Object[ ((Object[])payload).length + 1 ]; + for( int i = 0; i < ((Object[])payload).length; i++) { + payload2[i] = ((Object[])payload)[i]; + } // end for + } + payload2[ payload2.length - 1 ] = dispatch; + + ret = method.invoke(instance, (Object[])payload2); + + try { + ret = ((ResponseDispatchImpl)dispatch).get(50, TimeUnit.SECONDS); + } catch (Throwable t) { + throw new InvocationTargetException(t); + } // end try + + scopeContainer.returnWrapper(wrapper, contextId); + + msg.setBody(ret); + } catch (InvocationTargetException e) { + Throwable cause = e.getTargetException(); + boolean isChecked = false; + for (DataType d : operation.getFaultTypes()) { + if (d.getPhysical().isInstance(cause)) { + isChecked = true; + msg.setFaultBody(cause); + break; + } + } + + if (!isChecked) { + if (cause instanceof RuntimeException) { + throw (RuntimeException)cause; + } + if (cause instanceof Error) { + throw (Error)cause; + } else { + throw new ServiceRuntimeException(cause.getMessage(), cause); + } + } + + } catch (ObjectCreationException e) { + throw new ServiceRuntimeException(e.getMessage(), e); + } catch (Exception e) { + msg.setFaultBody(e); + } finally { + // set the tccl + Thread.currentThread().setContextClassLoader(tccl); + } + return msg; + } // end method invoke + +} diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java index 614590d738..4114193e4e 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java @@ -291,8 +291,12 @@ public class JavaComponentContextProvider { Class implClass = instanceFactoryProvider.getImplementationClass(); Method method = JavaInterfaceUtil.findMethod(implClass, operation); - return new JavaImplementationInvoker(operation, method, component); - } + if( operation.isAsyncServer() ) { + return new JavaAsyncImplementationInvoker(operation, method, component); + } else { + return new JavaImplementationInvoker(operation, method, component); + } // end if + } // end private static class OptimizedObjectFactory implements ObjectFactory { private ScopeContainer scopeContainer; diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java new file mode 100644 index 0000000000..576202a5de --- /dev/null +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.java.invocation; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.oasisopen.sca.ResponseDispatch; + +/** + * Implementation of the ResponseDispatch interface of the OASIS SCA Java API + * + * This is used for invocations of asynchronous services, where it is passed as a parameter on async service operations + * and it provides the path by which the service implementation returns the response to the request, or a Fault + * + * Note that this class is serializable and can be serialized, stored and deserialized by the service implementation + * + * @param - type of the response message + */ +public class ResponseDispatchImpl implements ResponseDispatch, Serializable { + + /** + * Generated serialVersionUID value + */ + private static final long serialVersionUID = 300158355992568592L; + + // A latch used to ensure that the sendResponse() and sendFault() operations are used at most once + // The latch is initialized with the value "false" + private AtomicBoolean latch = new AtomicBoolean(); + + private final Lock lock = new ReentrantLock(); + private final Condition completed = lock.newCondition(); + + // The result + private T response = null; + private Throwable fault = null; + + public ResponseDispatchImpl( ) { + super(); + } // end constructor + + public static ResponseDispatchImpl newInstance( Class type ) { + return new ResponseDispatchImpl(); + } + + /** + * Provide Context data for this ResponseDispatch that the service implementation can use + */ + @Override + public Map getContext() { + return null; + } + + /** + * Send a Fault. Must only be invoked once for this ResponseDispatch object + * @param e - the Fault to send + * @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously + */ + @Override + public void sendFault(Throwable e) { + if( sendOK() ) { + lock.lock(); + try { + fault = e; + } finally { + lock.unlock(); + } // end try + } else { + throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); + } // end if + } // end method sendFault + + /** + * Send the response message. Must only be invoked once for this ResponseDispatch object + * @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously + * @param res - the response message, which is of type T + */ + @Override + public void sendResponse(T res) { + if( sendOK() ) { + lock.lock(); + try { + response = res; + } finally { + lock.unlock(); + } // end try + } else { + throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); + } // end if + } // end method sendResponse + + public T get(long timeout, TimeUnit unit) throws Throwable { + lock.lock(); + try { + // wait for result to be available + if( response == null && fault == null ) completed.await( timeout, unit); + if( response != null ) return response; + if( fault != null ) throw fault; + } finally { + lock.unlock(); + } // end try + + return null; + } // end method get + + /** + * Indicates that sending a response is OK - this is a transactional + * query in that it also updates the state of this ResponseDispatch, so + * that it will return true once and once only + * @return - true if it is OK to send the response, false otherwise + */ + private boolean sendOK() { + return latch.compareAndSet(false, true); + } +} -- cgit v1.2.3