Provide basic support for invocation of Async Server style service methods, as described in TUSCANY-3611
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@957880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7d7d7269a2
commit
84224f4f57
3 changed files with 279 additions and 2 deletions
sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.tuscany.sca.implementation.java.invocation;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.tuscany.sca.assembly.EndpointReference;
|
||||
import org.apache.tuscany.sca.core.factory.InstanceWrapper;
|
||||
import org.apache.tuscany.sca.core.factory.ObjectCreationException;
|
||||
import org.apache.tuscany.sca.core.scope.ScopedRuntimeComponent;
|
||||
import org.apache.tuscany.sca.implementation.java.JavaImplementation;
|
||||
import org.apache.tuscany.sca.interfacedef.DataType;
|
||||
import org.apache.tuscany.sca.interfacedef.Operation;
|
||||
import org.apache.tuscany.sca.interfacedef.java.impl.JavaInterfaceUtil;
|
||||
import org.apache.tuscany.sca.invocation.Message;
|
||||
import org.apache.tuscany.sca.runtime.RuntimeComponent;
|
||||
import org.oasisopen.sca.ResponseDispatch;
|
||||
import org.oasisopen.sca.ServiceRuntimeException;
|
||||
|
||||
/**
|
||||
* Responsible for asynchronously dispatching an invocation to a Java component
|
||||
* implementation instance
|
||||
*
|
||||
*/
|
||||
public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
|
||||
|
||||
public JavaAsyncImplementationInvoker(Operation operation, Method method, RuntimeComponent component) {
|
||||
super( operation, method, component);
|
||||
assert method != null : "Operation method cannot be null";
|
||||
assert operation.isAsyncServer() : "Operation must be async";
|
||||
} // end constructor
|
||||
|
||||
public Message invoke(Message msg) {
|
||||
Operation op = this.operation;
|
||||
|
||||
Object payload = msg.getBody();
|
||||
|
||||
Object contextId = null;
|
||||
|
||||
// store the current thread context classloader
|
||||
// - replace it with the class loader used to load the java class as per SCA Spec
|
||||
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
|
||||
|
||||
try {
|
||||
// The following call might create a new conversation, as a result, the msg.getConversationID() might
|
||||
// return a new value
|
||||
InstanceWrapper wrapper = scopeContainer.getWrapper(contextId);
|
||||
|
||||
Object instance = wrapper.getInstance();
|
||||
|
||||
// Set the TCCL to the classloader used to load the implementation class
|
||||
Thread.currentThread().setContextClassLoader(instance.getClass().getClassLoader());
|
||||
|
||||
// For an async server method, there is an extra input parameter, which is a DispatchResponse instance
|
||||
// which is typed by the type of the response
|
||||
Class<?> responseType = op.getOutputType().getPhysical();
|
||||
ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType);
|
||||
|
||||
Object ret;
|
||||
Object[] payload2;
|
||||
if (payload != null && !payload.getClass().isArray()) {
|
||||
payload2 = new Object[2];
|
||||
payload2[0] = payload;
|
||||
} else {
|
||||
payload2 = new Object[ ((Object[])payload).length + 1 ];
|
||||
for( int i = 0; i < ((Object[])payload).length; i++) {
|
||||
payload2[i] = ((Object[])payload)[i];
|
||||
} // end for
|
||||
}
|
||||
payload2[ payload2.length - 1 ] = dispatch;
|
||||
|
||||
ret = method.invoke(instance, (Object[])payload2);
|
||||
|
||||
try {
|
||||
ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
|
||||
} catch (Throwable t) {
|
||||
throw new InvocationTargetException(t);
|
||||
} // end try
|
||||
|
||||
scopeContainer.returnWrapper(wrapper, contextId);
|
||||
|
||||
msg.setBody(ret);
|
||||
} catch (InvocationTargetException e) {
|
||||
Throwable cause = e.getTargetException();
|
||||
boolean isChecked = false;
|
||||
for (DataType<?> d : operation.getFaultTypes()) {
|
||||
if (d.getPhysical().isInstance(cause)) {
|
||||
isChecked = true;
|
||||
msg.setFaultBody(cause);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!isChecked) {
|
||||
if (cause instanceof RuntimeException) {
|
||||
throw (RuntimeException)cause;
|
||||
}
|
||||
if (cause instanceof Error) {
|
||||
throw (Error)cause;
|
||||
} else {
|
||||
throw new ServiceRuntimeException(cause.getMessage(), cause);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (ObjectCreationException e) {
|
||||
throw new ServiceRuntimeException(e.getMessage(), e);
|
||||
} catch (Exception e) {
|
||||
msg.setFaultBody(e);
|
||||
} finally {
|
||||
// set the tccl
|
||||
Thread.currentThread().setContextClassLoader(tccl);
|
||||
}
|
||||
return msg;
|
||||
} // end method invoke
|
||||
|
||||
}
|
|
@ -291,8 +291,12 @@ public class JavaComponentContextProvider {
|
|||
Class<?> implClass = instanceFactoryProvider.getImplementationClass();
|
||||
|
||||
Method method = JavaInterfaceUtil.findMethod(implClass, operation);
|
||||
return new JavaImplementationInvoker(operation, method, component);
|
||||
}
|
||||
if( operation.isAsyncServer() ) {
|
||||
return new JavaAsyncImplementationInvoker(operation, method, component);
|
||||
} else {
|
||||
return new JavaImplementationInvoker(operation, method, component);
|
||||
} // end if
|
||||
} // end
|
||||
|
||||
private static class OptimizedObjectFactory<T> implements ObjectFactory<T> {
|
||||
private ScopeContainer scopeContainer;
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.tuscany.sca.implementation.java.invocation;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.oasisopen.sca.ResponseDispatch;
|
||||
|
||||
/**
|
||||
* Implementation of the ResponseDispatch interface of the OASIS SCA Java API
|
||||
*
|
||||
* This is used for invocations of asynchronous services, where it is passed as a parameter on async service operations
|
||||
* and it provides the path by which the service implementation returns the response to the request, or a Fault
|
||||
*
|
||||
* Note that this class is serializable and can be serialized, stored and deserialized by the service implementation
|
||||
*
|
||||
* @param <T> - type of the response message
|
||||
*/
|
||||
public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializable {
|
||||
|
||||
/**
|
||||
* Generated serialVersionUID value
|
||||
*/
|
||||
private static final long serialVersionUID = 300158355992568592L;
|
||||
|
||||
// A latch used to ensure that the sendResponse() and sendFault() operations are used at most once
|
||||
// The latch is initialized with the value "false"
|
||||
private AtomicBoolean latch = new AtomicBoolean();
|
||||
|
||||
private final Lock lock = new ReentrantLock();
|
||||
private final Condition completed = lock.newCondition();
|
||||
|
||||
// The result
|
||||
private T response = null;
|
||||
private Throwable fault = null;
|
||||
|
||||
public ResponseDispatchImpl( ) {
|
||||
super();
|
||||
} // end constructor
|
||||
|
||||
public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type ) {
|
||||
return new ResponseDispatchImpl<T>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide Context data for this ResponseDispatch that the service implementation can use
|
||||
*/
|
||||
@Override
|
||||
public Map<String, Object> getContext() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a Fault. Must only be invoked once for this ResponseDispatch object
|
||||
* @param e - the Fault to send
|
||||
* @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously
|
||||
*/
|
||||
@Override
|
||||
public void sendFault(Throwable e) {
|
||||
if( sendOK() ) {
|
||||
lock.lock();
|
||||
try {
|
||||
fault = e;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
} // end try
|
||||
} else {
|
||||
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
|
||||
} // end if
|
||||
} // end method sendFault
|
||||
|
||||
/**
|
||||
* Send the response message. Must only be invoked once for this ResponseDispatch object
|
||||
* @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously
|
||||
* @param res - the response message, which is of type T
|
||||
*/
|
||||
@Override
|
||||
public void sendResponse(T res) {
|
||||
if( sendOK() ) {
|
||||
lock.lock();
|
||||
try {
|
||||
response = res;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
} // end try
|
||||
} else {
|
||||
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
|
||||
} // end if
|
||||
} // end method sendResponse
|
||||
|
||||
public T get(long timeout, TimeUnit unit) throws Throwable {
|
||||
lock.lock();
|
||||
try {
|
||||
// wait for result to be available
|
||||
if( response == null && fault == null ) completed.await( timeout, unit);
|
||||
if( response != null ) return response;
|
||||
if( fault != null ) throw fault;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
} // end try
|
||||
|
||||
return null;
|
||||
} // end method get
|
||||
|
||||
/**
|
||||
* Indicates that sending a response is OK - this is a transactional
|
||||
* query in that it also updates the state of this ResponseDispatch, so
|
||||
* that it will return true once and once only
|
||||
* @return - true if it is OK to send the response, false otherwise
|
||||
*/
|
||||
private boolean sendOK() {
|
||||
return latch.compareAndSet(false, true);
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue