summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/implementation-java-runtime
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java16
-rw-r--r--sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java87
2 files changed, 91 insertions, 12 deletions
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
index 005a508217..de810a0f02 100644
--- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.tuscany.sca.core.factory.InstanceWrapper;
import org.apache.tuscany.sca.core.factory.ObjectCreationException;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseException;
import org.apache.tuscany.sca.interfacedef.DataType;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.interfacedef.java.JavaOperation;
@@ -70,7 +71,7 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
// For an async server method, there is an extra input parameter, which is a DispatchResponse instance
// which is typed by the type of the response
Class<?> responseType = op.getOutputType().getPhysical();
- ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType);
+ ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType, msg );
Object ret;
Object[] payload2;
@@ -87,15 +88,12 @@ public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
ret = method.invoke(instance, (Object[])payload2);
- try {
- ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
- } catch (Throwable t) {
- throw new InvocationTargetException(t);
- } // end try
-
- scopeContainer.returnWrapper(wrapper, contextId);
+ //ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
+ throw new InvocationTargetException( new AsyncResponseException("AsyncResponse") );
+
+ //scopeContainer.returnWrapper(wrapper, contextId);
- msg.setBody(ret);
+ //msg.setBody(ret);
} catch (InvocationTargetException e) {
Throwable cause = e.getTargetException();
boolean isChecked = false;
diff --git a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
index ea23f06ab5..0d56a6ef9d 100644
--- a/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
+++ b/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
@@ -20,6 +20,8 @@
package org.apache.tuscany.sca.implementation.java.invocation;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,7 +29,22 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.context.CompositeContext;
+import org.apache.tuscany.sca.context.ThreadMessageContext;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.core.factory.ObjectFactory;
+import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory;
+import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory;
+import org.apache.tuscany.sca.core.invocation.ProxyFactory;
+import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ResponseDispatch;
+import org.oasisopen.sca.ServiceReference;
/**
* Implementation of the ResponseDispatch interface of the OASIS SCA Java API
@@ -45,6 +62,7 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
* Generated serialVersionUID value
*/
private static final long serialVersionUID = 300158355992568592L;
+ private static String WS_MESSAGE_ID = "WS_MESSAGE_ID";
// A latch used to ensure that the sendResponse() and sendFault() operations are used at most once
// The latch is initialized with the value "false"
@@ -57,12 +75,24 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
private volatile T response = null;
private volatile Throwable fault = null;
- public ResponseDispatchImpl( ) {
+ private ExtensionPointRegistry registry;
+
+ // Service Reference used for the callback
+ private ServiceReference<AsyncResponseHandler<?>> callbackRef;
+ private String callbackAddress;
+ private String messageID;
+
+ public ResponseDispatchImpl( Message msg ) {
super();
+ callbackRef = getAsyncCallbackRef( msg );
+
+ callbackAddress = msg.getFrom().getCallbackEndpoint().getURI();
+ messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID);
+
} // end constructor
- public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type ) {
- return new ResponseDispatchImpl<T>();
+ public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type, Message msg ) {
+ return new ResponseDispatchImpl<T>( msg );
}
/**
@@ -89,6 +119,10 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
} else {
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
} // end if
+ // Now dispatch the response to the callback...
+ AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
+ setResponseHeaders();
+ handler.setFault(new AsyncFaultWrapper(e));
} // end method sendFault
/**
@@ -108,6 +142,10 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
} else {
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
} // end if
+ // Now dispatch the response to the callback...
+ AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
+ setResponseHeaders();
+ handler.setResponse(res);
} // end method sendResponse
public T get(long timeout, TimeUnit unit) throws Throwable {
@@ -133,4 +171,47 @@ public class ResponseDispatchImpl<T> implements ResponseDispatch<T>, Serializabl
private boolean sendOK() {
return latch.compareAndSet(false, true);
}
+
+ /**
+ * Creates a service reference for the async callback, based on information contained in the supplied message
+ * @param msg - the incoming message
+ * @return - a CallBackServiceReference
+ */
+ @SuppressWarnings("unchecked")
+ private ServiceReference<AsyncResponseHandler<?>> getAsyncCallbackRef( Message msg ) {
+ RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get("ASYNC_CALLBACK");
+ if( callbackEPR == null ) return null;
+
+ CompositeContext compositeContext = callbackEPR.getCompositeContext();
+ registry = compositeContext.getExtensionPointRegistry();
+ ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry);
+ List<EndpointReference> eprList = new ArrayList<EndpointReference>();
+ eprList.add(callbackEPR);
+ ObjectFactory<?> factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList);
+
+ return (ServiceReference<AsyncResponseHandler<?>>) factory.getInstance();
+
+ } // end method getAsyncCallbackEPR
+
+ /**
+ * Sets the values of various headers in the response message
+ */
+ private void setResponseHeaders() {
+ // Is there an existing message context?
+ Message msgContext = ThreadMessageContext.getMessageContext();
+ if( msgContext == null ) {
+ // Create a message context
+ msgContext = getMessageFactory().createMessage();
+ } // end if
+
+ // Add in the header for the RelatesTo Message ID
+ msgContext.getHeaders().put(WS_MESSAGE_ID, messageID);
+
+ ThreadMessageContext.setMessageContext(msgContext);
+ } // end method setResponseHeaders
+
+ private MessageFactory getMessageFactory() {
+ FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+ return modelFactories.getFactory(MessageFactory.class);
+ } // end method getMessageFactory
}