summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2011-01-20 16:56:44 +0000
committeredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2011-01-20 16:56:44 +0000
commit873ca9dc544083b4e686a05ec0529a3a2459689f (patch)
tree686094341e075990f549fee9fa4abfe7b255f474
parentc57e6bf5db9304120dd85036492206593ef7424e (diff)
Stage 1 of Axis2 binding-ws support of async callbacks, as described in TUSCANY-3821
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1061391 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java56
1 files changed, 32 insertions, 24 deletions
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
index dcbf83262e..53dc708a30 100644
--- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
@@ -39,6 +39,7 @@ import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory;
+import org.apache.tuscany.sca.core.invocation.Constants;
import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory;
import org.apache.tuscany.sca.core.invocation.ProxyFactory;
import org.apache.tuscany.sca.invocation.Message;
@@ -64,42 +65,47 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
* Generated serialVersionUID value
*/
private static final long serialVersionUID = 300158355992568592L;
- private static String WS_MESSAGE_ID = "WS_MESSAGE_ID";
- private static String MESSAGE_ID = "MESSAGE_ID";
// A latch used to ensure that the sendResponse() and sendFault() operations are used at most once
// The latch is initialized with the value "false"
- private transient AtomicBoolean latch = new AtomicBoolean();
+ private AtomicBoolean latch = new AtomicBoolean();
- private transient final Lock lock = new ReentrantLock();
- private transient final Condition completed = lock.newCondition();
+ private final Lock lock = new ReentrantLock();
+ private final Condition completed = lock.newCondition();
// The result
private transient volatile T response = null;
private transient volatile Throwable fault = null;
- private ExtensionPointRegistry registry;
+ private transient ExtensionPointRegistry registry;
+ private MessageFactory msgFactory;
// Service Reference used for the callback
- private ServiceReference<AsyncResponseHandler<?>> callbackRef;
+ private volatile ServiceReference<AsyncResponseHandler<?>> callbackRef;
private AsyncResponseInvoker<?> respInvoker;
private String messageID;
+ /**
+ * No-arg constructor for serialization purposes
+ */
+ public ResponseDispatchImpl() {
+ super();
+ } // end constructor
+
public ResponseDispatchImpl( Message msg ) {
super();
- callbackRef = getAsyncCallbackRef( msg );
- respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
+ respInvoker = (AsyncResponseInvoker<?>)msg.getHeaders().get(Constants.ASYNC_RESPONSE_INVOKER);
//if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker");
+
+ if( respInvoker == null ) {
+ callbackRef = getAsyncCallbackRef( msg );
+ } // end if
- // TODO - why is WS stuff bleeding into general code?
- messageID = (String) msg.getHeaders().get(MESSAGE_ID);
- if (messageID == null){
- messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID);
- }
+ messageID = (String) msg.getHeaders().get(Constants.MESSAGE_ID);
} // end constructor
-
+
public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type, Message msg ) {
return new ResponseDispatchImpl<T>( msg );
}
@@ -160,16 +166,18 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
} // end if
+ // Now dispatch the response to the callback, if present...
+ if( callbackRef != null ) {
+ AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
+ setResponseHeaders();
+ handler.setResponse(res);
+ } // end if
+
// Use response invoker if present
if( respInvoker != null ) {
respInvoker.invokeAsyncResponse(res);
return;
} // end if
-
- // Now dispatch the response to the callback...
- AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
- setResponseHeaders();
- handler.setResponse(res);
} // end method sendResponse
public T get(long timeout, TimeUnit unit) throws Throwable {
@@ -203,12 +211,13 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
*/
@SuppressWarnings("unchecked")
private ServiceReference<AsyncResponseHandler<?>> getAsyncCallbackRef( Message msg ) {
- RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get("ASYNC_CALLBACK");
+ RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get(Constants.ASYNC_CALLBACK);
if( callbackEPR == null ) return null;
CompositeContext compositeContext = callbackEPR.getCompositeContext();
registry = compositeContext.getExtensionPointRegistry();
ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry);
+ msgFactory = getMessageFactory();
List<EndpointReference> eprList = new ArrayList<EndpointReference>();
eprList.add(callbackEPR);
ObjectFactory<?> factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList);
@@ -225,12 +234,11 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
Message msgContext = ThreadMessageContext.getMessageContext();
if( msgContext == null ) {
// Create a message context
- msgContext = getMessageFactory().createMessage();
+ msgContext = msgFactory.createMessage();
} // end if
// Add in the header for the RelatesTo Message ID
- msgContext.getHeaders().put(WS_MESSAGE_ID, messageID);
- msgContext.getHeaders().put(MESSAGE_ID, messageID);
+ msgContext.getHeaders().put(Constants.RELATES_TO, messageID);
ThreadMessageContext.setMessageContext(msgContext);
} // end method setResponseHeaders