diff options
Diffstat (limited to 'sca-java-2.x/trunk')
-rw-r--r-- | sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java | 56 |
1 files changed, 32 insertions, 24 deletions
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java index dcbf83262e..53dc708a30 100644 --- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java +++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java @@ -39,6 +39,7 @@ import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker; import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory; +import org.apache.tuscany.sca.core.invocation.Constants; import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory; import org.apache.tuscany.sca.core.invocation.ProxyFactory; import org.apache.tuscany.sca.invocation.Message; @@ -64,42 +65,47 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl * Generated serialVersionUID value */ private static final long serialVersionUID = 300158355992568592L; - private static String WS_MESSAGE_ID = "WS_MESSAGE_ID"; - private static String MESSAGE_ID = "MESSAGE_ID"; // A latch used to ensure that the sendResponse() and sendFault() operations are used at most once // The latch is initialized with the value "false" - private transient AtomicBoolean latch = new AtomicBoolean(); + private AtomicBoolean latch = new AtomicBoolean(); - private transient final Lock lock = new ReentrantLock(); - private transient final Condition completed = lock.newCondition(); + private final Lock lock = new ReentrantLock(); + private final Condition completed = lock.newCondition(); // The result private transient volatile T response = null; private transient volatile Throwable fault = null; - private ExtensionPointRegistry registry; + private transient ExtensionPointRegistry registry; + private MessageFactory msgFactory; // Service Reference used for the callback - private ServiceReference<AsyncResponseHandler<?>> callbackRef; + private volatile ServiceReference<AsyncResponseHandler<?>> callbackRef; private AsyncResponseInvoker<?> respInvoker; private String messageID; + /** + * No-arg constructor for serialization purposes + */ + public ResponseDispatchImpl() { + super(); + } // end constructor + public ResponseDispatchImpl( Message msg ) { super(); - callbackRef = getAsyncCallbackRef( msg ); - respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); + respInvoker = (AsyncResponseInvoker<?>)msg.getHeaders().get(Constants.ASYNC_RESPONSE_INVOKER); //if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); + + if( respInvoker == null ) { + callbackRef = getAsyncCallbackRef( msg ); + } // end if - // TODO - why is WS stuff bleeding into general code? - messageID = (String) msg.getHeaders().get(MESSAGE_ID); - if (messageID == null){ - messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID); - } + messageID = (String) msg.getHeaders().get(Constants.MESSAGE_ID); } // end constructor - + public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type, Message msg ) { return new ResponseDispatchImpl<T>( msg ); } @@ -160,16 +166,18 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); } // end if + // Now dispatch the response to the callback, if present... + if( callbackRef != null ) { + AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService(); + setResponseHeaders(); + handler.setResponse(res); + } // end if + // Use response invoker if present if( respInvoker != null ) { respInvoker.invokeAsyncResponse(res); return; } // end if - - // Now dispatch the response to the callback... - AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService(); - setResponseHeaders(); - handler.setResponse(res); } // end method sendResponse public T get(long timeout, TimeUnit unit) throws Throwable { @@ -203,12 +211,13 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl */ @SuppressWarnings("unchecked") private ServiceReference<AsyncResponseHandler<?>> getAsyncCallbackRef( Message msg ) { - RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get("ASYNC_CALLBACK"); + RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get(Constants.ASYNC_CALLBACK); if( callbackEPR == null ) return null; CompositeContext compositeContext = callbackEPR.getCompositeContext(); registry = compositeContext.getExtensionPointRegistry(); ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry); + msgFactory = getMessageFactory(); List<EndpointReference> eprList = new ArrayList<EndpointReference>(); eprList.add(callbackEPR); ObjectFactory<?> factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList); @@ -225,12 +234,11 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl Message msgContext = ThreadMessageContext.getMessageContext(); if( msgContext == null ) { // Create a message context - msgContext = getMessageFactory().createMessage(); + msgContext = msgFactory.createMessage(); } // end if // Add in the header for the RelatesTo Message ID - msgContext.getHeaders().put(WS_MESSAGE_ID, messageID); - msgContext.getHeaders().put(MESSAGE_ID, messageID); + msgContext.getHeaders().put(Constants.RELATES_TO, messageID); ThreadMessageContext.setMessageContext(msgContext); } // end method setResponseHeaders |