From 873ca9dc544083b4e686a05ec0529a3a2459689f Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Thu, 20 Jan 2011 16:56:44 +0000 Subject: Stage 1 of Axis2 binding-ws support of async callbacks, as described in TUSCANY-3821 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1061391 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/invocation/ResponseDispatchImpl.java | 56 ++++++++++++---------- 1 file changed, 32 insertions(+), 24 deletions(-) (limited to 'sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation') diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java index dcbf83262e..53dc708a30 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java @@ -39,6 +39,7 @@ import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker; import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory; +import org.apache.tuscany.sca.core.invocation.Constants; import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory; import org.apache.tuscany.sca.core.invocation.ProxyFactory; import org.apache.tuscany.sca.invocation.Message; @@ -64,42 +65,47 @@ public class ResponseDispatchImpl implements ResponseDispatch, Serializabl * Generated serialVersionUID value */ private static final long serialVersionUID = 300158355992568592L; - private static String WS_MESSAGE_ID = "WS_MESSAGE_ID"; - private static String MESSAGE_ID = "MESSAGE_ID"; // A latch used to ensure that the sendResponse() and sendFault() operations are used at most once // The latch is initialized with the value "false" - private transient AtomicBoolean latch = new AtomicBoolean(); + private AtomicBoolean latch = new AtomicBoolean(); - private transient final Lock lock = new ReentrantLock(); - private transient final Condition completed = lock.newCondition(); + private final Lock lock = new ReentrantLock(); + private final Condition completed = lock.newCondition(); // The result private transient volatile T response = null; private transient volatile Throwable fault = null; - private ExtensionPointRegistry registry; + private transient ExtensionPointRegistry registry; + private MessageFactory msgFactory; // Service Reference used for the callback - private ServiceReference> callbackRef; + private volatile ServiceReference> callbackRef; private AsyncResponseInvoker respInvoker; private String messageID; + /** + * No-arg constructor for serialization purposes + */ + public ResponseDispatchImpl() { + super(); + } // end constructor + public ResponseDispatchImpl( Message msg ) { super(); - callbackRef = getAsyncCallbackRef( msg ); - respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); + respInvoker = (AsyncResponseInvoker)msg.getHeaders().get(Constants.ASYNC_RESPONSE_INVOKER); //if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); + + if( respInvoker == null ) { + callbackRef = getAsyncCallbackRef( msg ); + } // end if - // TODO - why is WS stuff bleeding into general code? - messageID = (String) msg.getHeaders().get(MESSAGE_ID); - if (messageID == null){ - messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID); - } + messageID = (String) msg.getHeaders().get(Constants.MESSAGE_ID); } // end constructor - + public static ResponseDispatchImpl newInstance( Class type, Message msg ) { return new ResponseDispatchImpl( msg ); } @@ -160,16 +166,18 @@ public class ResponseDispatchImpl implements ResponseDispatch, Serializabl throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); } // end if + // Now dispatch the response to the callback, if present... + if( callbackRef != null ) { + AsyncResponseHandler handler = (AsyncResponseHandler) callbackRef.getService(); + setResponseHeaders(); + handler.setResponse(res); + } // end if + // Use response invoker if present if( respInvoker != null ) { respInvoker.invokeAsyncResponse(res); return; } // end if - - // Now dispatch the response to the callback... - AsyncResponseHandler handler = (AsyncResponseHandler) callbackRef.getService(); - setResponseHeaders(); - handler.setResponse(res); } // end method sendResponse public T get(long timeout, TimeUnit unit) throws Throwable { @@ -203,12 +211,13 @@ public class ResponseDispatchImpl implements ResponseDispatch, Serializabl */ @SuppressWarnings("unchecked") private ServiceReference> getAsyncCallbackRef( Message msg ) { - RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get("ASYNC_CALLBACK"); + RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get(Constants.ASYNC_CALLBACK); if( callbackEPR == null ) return null; CompositeContext compositeContext = callbackEPR.getCompositeContext(); registry = compositeContext.getExtensionPointRegistry(); ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry); + msgFactory = getMessageFactory(); List eprList = new ArrayList(); eprList.add(callbackEPR); ObjectFactory factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList); @@ -225,12 +234,11 @@ public class ResponseDispatchImpl implements ResponseDispatch, Serializabl Message msgContext = ThreadMessageContext.getMessageContext(); if( msgContext == null ) { // Create a message context - msgContext = getMessageFactory().createMessage(); + msgContext = msgFactory.createMessage(); } // end if // Add in the header for the RelatesTo Message ID - msgContext.getHeaders().put(WS_MESSAGE_ID, messageID); - msgContext.getHeaders().put(MESSAGE_ID, messageID); + msgContext.getHeaders().put(Constants.RELATES_TO, messageID); ThreadMessageContext.setMessageContext(msgContext); } // end method setResponseHeaders -- cgit v1.2.3