summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk
diff options
context:
space:
mode:
authoredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2010-06-25 10:24:08 +0000
committeredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2010-06-25 10:24:08 +0000
commit84224f4f575d39eb3e3a2a906a8b4733973287d8 (patch)
tree3d06bd535d407d2e5c0a7cc0d688a4d64a0da654 /sca-java-2.x/trunk
parent7d7d7269a21d2e857f7e69ae82b5604af503f3e9 (diff)
Provide basic support for invocation of Async Server style service methods, as described in TUSCANY-3611
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@957880 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk')
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java136
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java8
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java137
3 files changed, 279 insertions, 2 deletions
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
new file mode 100644
index 0000000000..2e5ca19dd5
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.implementation.java.invocation;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.core.factory.InstanceWrapper;
+import org.apache.tuscany.sca.core.factory.ObjectCreationException;
+import org.apache.tuscany.sca.core.scope.ScopedRuntimeComponent;
+import org.apache.tuscany.sca.implementation.java.JavaImplementation;
+import org.apache.tuscany.sca.interfacedef.DataType;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.interfacedef.java.impl.JavaInterfaceUtil;
+import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.oasisopen.sca.ResponseDispatch;
+import org.oasisopen.sca.ServiceRuntimeException;
+
+/**
+ * Responsible for asynchronously dispatching an invocation to a Java component
+ * implementation instance
+ *
+ */
+public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
+
+ public JavaAsyncImplementationInvoker(Operation operation, Method method, RuntimeComponent component) {
+ super( operation, method, component);
+ assert method != null : "Operation method cannot be null";
+ assert operation.isAsyncServer() : "Operation must be async";
+ } // end constructor
+
+ public Message invoke(Message msg) {
+ Operation op = this.operation;
+
+ Object payload = msg.getBody();
+
+ Object contextId = null;
+
+ // store the current thread context classloader
+ // - replace it with the class loader used to load the java class as per SCA Spec
+ ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+
+ try {
+ // The following call might create a new conversation, as a result, the msg.getConversationID() might
+ // return a new value
+ InstanceWrapper wrapper = scopeContainer.getWrapper(contextId);
+
+ Object instance = wrapper.getInstance();
+
+ // Set the TCCL to the classloader used to load the implementation class
+ Thread.currentThread().setContextClassLoader(instance.getClass().getClassLoader());
+
+ // For an async server method, there is an extra input parameter, which is a DispatchResponse instance
+ // which is typed by the type of the response
+ Class<?> responseType = op.getOutputType().getPhysical();
+ ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType);
+
+ Object ret;
+ Object[] payload2;
+ if (payload != null && !payload.getClass().isArray()) {
+ payload2 = new Object[2];
+ payload2[0] = payload;
+ } else {
+ payload2 = new Object[ ((Object[])payload).length + 1 ];
+ for( int i = 0; i < ((Object[])payload).length; i++) {
+ payload2[i] = ((Object[])payload)[i];
+ } // end for
+ }
+ payload2[ payload2.length - 1 ] = dispatch;
+
+ ret = method.invoke(instance, (Object[])payload2);
+
+ try {
+ ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ throw new InvocationTargetException(t);
+ } // end try
+
+ scopeContainer.returnWrapper(wrapper, contextId);
+
+ msg.setBody(ret);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getTargetException();
+ boolean isChecked = false;
+ for (DataType<?> d : operation.getFaultTypes()) {
+ if (d.getPhysical().isInstance(cause)) {
+ isChecked = true;
+ msg.setFaultBody(cause);
+ break;
+ }
+ }
+
+ if (!isChecked) {
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException)cause;
+ }
+ if (cause instanceof Error) {
+ throw (Error)cause;
+ } else {
+ throw new ServiceRuntimeException(cause.getMessage(), cause);
+ }
+ }
+
+ } catch (ObjectCreationException e) {
+ throw new ServiceRuntimeException(e.getMessage(), e);
+ } catch (Exception e) {
+ msg.setFaultBody(e);
+ } finally {
+ // set the tccl
+ Thread.currentThread().setContextClassLoader(tccl);
+ }
+ return msg;
+ } // end method invoke
+
+}
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java
index 614590d738..4114193e4e 100644
--- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaComponentContextProvider.java
@@ -291,8 +291,12 @@ public class JavaComponentContextProvider {
Class<?> implClass = instanceFactoryProvider.getImplementationClass();
Method method = JavaInterfaceUtil.findMethod(implClass, operation);
- return new JavaImplementationInvoker(operation, method, component);
- }
+ if( operation.isAsyncServer() ) {
+ return new JavaAsyncImplementationInvoker(operation, method, component);
+ } else {
+ return new JavaImplementationInvoker(operation, method, component);
+ } // end if
+ } // end
private static class OptimizedObjectFactory<T> implements ObjectFactory<T> {
private ScopeContainer scopeContainer;
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
new file mode 100644
index 0000000000..576202a5de
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.implementation.java.invocation;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.oasisopen.sca.ResponseDispatch;
+
+/**
+ * Implementation of the ResponseDispatch interface of the OASIS SCA Java API
+ *
+ * This is used for invocations of asynchronous services, where it is passed as a parameter on async service operations
+ * and it provides the path by which the service implementation returns the response to the request, or a Fault
+ *
+ * Note that this class is serializable and can be serialized, stored and deserialized by the service implementation
+ *
+ * @param <T> - type of the response message
+ */
+public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializable {
+
+ /**
+ * Generated serialVersionUID value
+ */
+ private static final long serialVersionUID = 300158355992568592L;
+
+ // A latch used to ensure that the sendResponse() and sendFault() operations are used at most once
+ // The latch is initialized with the value "false"
+ private AtomicBoolean latch = new AtomicBoolean();
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition completed = lock.newCondition();
+
+ // The result
+ private T response = null;
+ private Throwable fault = null;
+
+ public ResponseDispatchImpl( ) {
+ super();
+ } // end constructor
+
+ public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type ) {
+ return new ResponseDispatchImpl<T>();
+ }
+
+ /**
+ * Provide Context data for this ResponseDispatch that the service implementation can use
+ */
+ @Override
+ public Map<String, Object> getContext() {
+ return null;
+ }
+
+ /**
+ * Send a Fault. Must only be invoked once for this ResponseDispatch object
+ * @param e - the Fault to send
+ * @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously
+ */
+ @Override
+ public void sendFault(Throwable e) {
+ if( sendOK() ) {
+ lock.lock();
+ try {
+ fault = e;
+ } finally {
+ lock.unlock();
+ } // end try
+ } else {
+ throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
+ } // end if
+ } // end method sendFault
+
+ /**
+ * Send the response message. Must only be invoked once for this ResponseDispatch object
+ * @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously
+ * @param res - the response message, which is of type T
+ */
+ @Override
+ public void sendResponse(T res) {
+ if( sendOK() ) {
+ lock.lock();
+ try {
+ response = res;
+ } finally {
+ lock.unlock();
+ } // end try
+ } else {
+ throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
+ } // end if
+ } // end method sendResponse
+
+ public T get(long timeout, TimeUnit unit) throws Throwable {
+ lock.lock();
+ try {
+ // wait for result to be available
+ if( response == null && fault == null ) completed.await( timeout, unit);
+ if( response != null ) return response;
+ if( fault != null ) throw fault;
+ } finally {
+ lock.unlock();
+ } // end try
+
+ return null;
+ } // end method get
+
+ /**
+ * Indicates that sending a response is OK - this is a transactional
+ * query in that it also updates the state of this ResponseDispatch, so
+ * that it will return true once and once only
+ * @return - true if it is OK to send the response, false otherwise
+ */
+ private boolean sendOK() {
+ return latch.compareAndSet(false, true);
+ }
+}