diff options
Diffstat (limited to '')
2 files changed, 91 insertions, 12 deletions
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java index 005a508217..de810a0f02 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.tuscany.sca.core.factory.InstanceWrapper; import org.apache.tuscany.sca.core.factory.ObjectCreationException; +import org.apache.tuscany.sca.core.invocation.AsyncResponseException; import org.apache.tuscany.sca.interfacedef.DataType; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.java.JavaOperation; @@ -70,7 +71,7 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { // For an async server method, there is an extra input parameter, which is a DispatchResponse instance // which is typed by the type of the response Class<?> responseType = op.getOutputType().getPhysical(); - ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType); + ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType, msg ); Object ret; Object[] payload2; @@ -87,15 +88,12 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker { ret = method.invoke(instance, (Object[])payload2); - try { - ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS); - } catch (Throwable t) { - throw new InvocationTargetException(t); - } // end try - - scopeContainer.returnWrapper(wrapper, contextId); + //ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS); + throw new InvocationTargetException( new AsyncResponseException("AsyncResponse") ); + + //scopeContainer.returnWrapper(wrapper, contextId); - msg.setBody(ret); + //msg.setBody(ret); } catch (InvocationTargetException e) { Throwable cause = e.getTargetException(); boolean isChecked = false; diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java index ea23f06ab5..0d56a6ef9d 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java @@ -20,6 +20,8 @@ package org.apache.tuscany.sca.implementation.java.invocation; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -27,7 +29,22 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.context.CompositeContext; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.factory.ObjectFactory; +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory; +import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory; +import org.apache.tuscany.sca.core.invocation.ProxyFactory; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; import org.oasisopen.sca.ResponseDispatch; +import org.oasisopen.sca.ServiceReference; /** * Implementation of the ResponseDispatch interface of the OASIS SCA Java API @@ -45,6 +62,7 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl * Generated serialVersionUID value */ private static final long serialVersionUID = 300158355992568592L; + private static String WS_MESSAGE_ID = "WS_MESSAGE_ID"; // A latch used to ensure that the sendResponse() and sendFault() operations are used at most once // The latch is initialized with the value "false" @@ -57,12 +75,24 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl private volatile T response = null; private volatile Throwable fault = null; - public ResponseDispatchImpl( ) { + private ExtensionPointRegistry registry; + + // Service Reference used for the callback + private ServiceReference<AsyncResponseHandler<?>> callbackRef; + private String callbackAddress; + private String messageID; + + public ResponseDispatchImpl( Message msg ) { super(); + callbackRef = getAsyncCallbackRef( msg ); + + callbackAddress = msg.getFrom().getCallbackEndpoint().getURI(); + messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID); + } // end constructor - public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type ) { - return new ResponseDispatchImpl<T>(); + public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type, Message msg ) { + return new ResponseDispatchImpl<T>( msg ); } /** @@ -89,6 +119,10 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl } else { throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); } // end if + // Now dispatch the response to the callback... + AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService(); + setResponseHeaders(); + handler.setFault(new AsyncFaultWrapper(e)); } // end method sendFault /** @@ -108,6 +142,10 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl } else { throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); } // end if + // Now dispatch the response to the callback... + AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService(); + setResponseHeaders(); + handler.setResponse(res); } // end method sendResponse public T get(long timeout, TimeUnit unit) throws Throwable { @@ -133,4 +171,47 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl private boolean sendOK() { return latch.compareAndSet(false, true); } + + /** + * Creates a service reference for the async callback, based on information contained in the supplied message + * @param msg - the incoming message + * @return - a CallBackServiceReference + */ + @SuppressWarnings("unchecked") + private ServiceReference<AsyncResponseHandler<?>> getAsyncCallbackRef( Message msg ) { + RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get("ASYNC_CALLBACK"); + if( callbackEPR == null ) return null; + + CompositeContext compositeContext = callbackEPR.getCompositeContext(); + registry = compositeContext.getExtensionPointRegistry(); + ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry); + List<EndpointReference> eprList = new ArrayList<EndpointReference>(); + eprList.add(callbackEPR); + ObjectFactory<?> factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList); + + return (ServiceReference<AsyncResponseHandler<?>>) factory.getInstance(); + + } // end method getAsyncCallbackEPR + + /** + * Sets the values of various headers in the response message + */ + private void setResponseHeaders() { + // Is there an existing message context? + Message msgContext = ThreadMessageContext.getMessageContext(); + if( msgContext == null ) { + // Create a message context + msgContext = getMessageFactory().createMessage(); + } // end if + + // Add in the header for the RelatesTo Message ID + msgContext.getHeaders().put(WS_MESSAGE_ID, messageID); + + ThreadMessageContext.setMessageContext(msgContext); + } // end method setResponseHeaders + + private MessageFactory getMessageFactory() { + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + return modelFactories.getFactory(MessageFactory.class); + } // end method getMessageFactory } |