summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java1
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java34
-rw-r--r--sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java4
-rw-r--r--sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java69
-rw-r--r--sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java24
-rw-r--r--sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java67
6 files changed, 171 insertions, 28 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
index e598be277a..0f86ee79ba 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
@@ -308,6 +308,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
} // end method invokeAsync(Operation, Message)
public void invokeAsyncResponse(Message msg){
+ resolve();
invoker.invokeAsyncResponse(msg);
}
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java
index 55c8d7fcab..38727d4670 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java
@@ -19,11 +19,15 @@
package org.apache.tuscany.sca.core.invocation;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
@@ -32,7 +36,11 @@ import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.node.NodeFactory;
import org.apache.tuscany.sca.provider.EndpointAsyncProvider;
+import org.apache.tuscany.sca.runtime.DomainRegistryFactory;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ComponentContext;
@@ -189,5 +197,29 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
public void setResponseEndpointReference(
RuntimeEndpointReference responseEndpointReference) {
this.responseEndpointReference = responseEndpointReference;
- }
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
+ {
+ in.defaultReadObject();
+
+ // find the real endpoint
+ ExtensionPointRegistry extensionPointRegistry = NodeFactory.getInstance().getExtensionPointRegistry();
+ DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(extensionPointRegistry);
+ Collection<EndpointRegistry> endpointRegistries = domainRegistryFactory.getEndpointRegistries();
+ EndpointRegistry endpointRegistry = endpointRegistries.iterator().next();
+ List<Endpoint> endpoints = endpointRegistry.findEndpoint(requestEndpoint.getURI());
+ requestEndpoint = (RuntimeEndpoint)endpoints.get(0);
+
+ if (responseTargetAddress instanceof EndpointReference){
+ // fix the target as in this case it will be an EPR
+ EndpointReference epr = (EndpointReference)responseTargetAddress;
+ List<EndpointReference> endpointReferences = endpointRegistry.getEndpointReferences();
+ for (EndpointReference endpointReference : endpointReferences){
+ if (endpointReference.getURI().equals(epr.getURI())){
+ responseTargetAddress = (T)endpointReference;
+ }
+ }
+ }
+ }
} // end class
diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java
index a7e68cfee6..7186c0a0d2 100644
--- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java
+++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java
@@ -49,7 +49,9 @@ class SampleProvider implements ImplementationAsyncProvider {
final ProxyFactory pxf;
final ExtensionPointRegistry ep;
Object instance;
- Map<String, Object> asyncMessageMap = new HashMap<String, Object>();
+
+ // make this static rather than worrying about persistence on the reference side
+ static Map<String, Object> asyncMessageMap = new HashMap<String, Object>();
SampleProvider(final RuntimeComponent comp, final SampleImplementation impl, ProxyFactory pf, ExtensionPointRegistry ep) {
this.comp = comp;
diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java
index 41e53d90b7..5c76cfd90a 100644
--- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java
+++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java
@@ -19,6 +19,10 @@
package sample.impl;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
@@ -57,23 +61,58 @@ class SampleWSDLInvoker extends InterceptorAsyncImpl {
}
public void invokeAsyncRequest(Message msg) {
- // Retrieve the async callback information
- AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
- if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker");
-
- Message responseMsg = processRequest(msg);
+ if (msg.getOperation().getName().equals("upper")){
+ // Retrieve the async callback information
+ AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
+ if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker");
+
+ Message responseMsg = processRequest(msg);
+
+ // in this sample programming model we make the async
+ // response from the implementation provider. The
+ // component implementation itself doesn't get a chance to
+ // do async responses.
- // in this sample programming model we make the async
- // response from the implementation provider. The
- // component implementation itself doesn't get a chance to
- // do async responses.
-
- // At this point we could serialize the AsyncResponseInvoker and pick it up again
- // later to send the async response
-
- if (responseMsg.getBody() != null){
- respInvoker.invokeAsyncResponse(responseMsg);
+ // At this point we could serialize the AsyncResponseInvoker and pick it up again
+ // later to send the async response
+
+ try {
+ FileOutputStream fos = new FileOutputStream("ari.dat");
+ ObjectOutputStream oos = new ObjectOutputStream(fos);
+ oos.writeObject(respInvoker);
+ oos.close();
+ respInvoker.invokeAsyncResponse(responseMsg);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+
+ } else if (msg.getOperation().getName().equals("upper2")){
+ Message responseMsg = processRequest(msg);
+
+ // read the async response invoker back in and call it
+ FileInputStream fis = null;
+ ObjectInputStream ois = null;
+ try {
+ fis = new FileInputStream("ari.dat");
+ ois = new ObjectInputStream(fis);
+ AsyncResponseInvoker respInvoker = (AsyncResponseInvoker) ois.readObject();
+ ois.close();
+ respInvoker.invokeAsyncResponse(responseMsg);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ } else {
+ // Retrieve the async callback information
+ AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
+ if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker");
+
+ Message responseMsg = processRequest(msg);
+
+ if (responseMsg.getBody() != null){
+ respInvoker.invokeAsyncResponse(responseMsg);
+ }
}
+
} // end method invokeAsyncRequest
public Message processRequest(Message msg) {
diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java
index 4bfc8356c4..e21ab5ac90 100644
--- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java
+++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java
@@ -62,14 +62,17 @@ public class UpperSampleAsyncReferenceImpl {
upper.callAsync("upper", ureq);
try {
- Thread.sleep(500);
- latch.await(500, TimeUnit.SECONDS);
+ //Thread.sleep(500);
+ latch.await(5, TimeUnit.SECONDS);
} catch (Exception ex) {
// do nothing
}
- if( response != null ) return response.getTextContent();
- else return "upper did not get called back";
+ if( response != null ) {
+ return response.getTextContent();
+ } else {
+ return "upper did not get called back";
+ }
}
/**
@@ -95,14 +98,19 @@ public class UpperSampleAsyncReferenceImpl {
upper.callAsync("upper2", ureq);
try {
- Thread.sleep(500);
- latch.await(500, TimeUnit.SECONDS);
+ //Thread.sleep(500);
+ latch.await(5, TimeUnit.SECONDS);
} catch (Exception ex) {
// do nothing
}
- if( response2 != null ) return response2.getTextContent();
- else return "upper did not get called back";
+ // because we serialize the upper request and re-use it in upper2
+ // the response to upper2 comes back to the upper callback
+ if( response != null ) {
+ return response.getTextContent();
+ } else {
+ return "upper2 did not get called back";
+ }
}
/**
diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java
index 28d54647c5..9667150063 100644
--- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java
+++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertEquals;
import org.apache.tuscany.sca.node.Contribution;
import org.apache.tuscany.sca.node.Node;
import org.apache.tuscany.sca.node.NodeFactory;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -39,9 +41,10 @@ import sample.Upper;
public class SampleNativeAsyncTestCase {
static Node node;
+/*
@BeforeClass
public static void setUp() throws Exception {
- final NodeFactory nf = NodeFactory.newInstance();
+ final NodeFactory nf = NodeFactory.getInstance();
String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString();
// Create the node using the pattern "name of composite file to start" / Contribution to use
node = nf.createNode("testnativeasync.composite", new Contribution("test", here));
@@ -52,7 +55,25 @@ public class SampleNativeAsyncTestCase {
public static void tearDown() throws Exception {
node.stop();
}
+*/
+
+ @Before
+ public void setUp() throws Exception {
+ final NodeFactory nf = NodeFactory.getInstance();
+ String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString();
+ // Create the node using the pattern "name of composite file to start" / Contribution to use
+ node = nf.createNode("testnativeasync.composite", new Contribution("test", here));
+ node.start();
+ }
+ @After
+ public void tearDown() throws Exception {
+ node.stop();
+ }
+
+ /**
+ * Show that we can make a basic call
+ */
@Test
public void testUpper() {
System.out.println("SampleNaiveAsyncTestCase.testUpper");
@@ -62,15 +83,55 @@ public class SampleNativeAsyncTestCase {
assertEquals("ASYNC", r);
}
+ /**
+ * Show that we can make a call that requires us to persist the
+ * AsyncResponseInvoker
+ */
@Test
- public void testUpper2() {
+ public void testPersistAsyncResponseInvoker() {
System.out.println("SampleNaiveAsyncTestCase.testUpper2");
Upper upper = node.getService(Upper.class, "SampleNativeAsyncReference");
- final String r = upper.upper2("async2");
+ // call upper to write out the async response invoker
+ String r = upper.upper("async");
+ // call upper2 to read it back in again
+ r = upper.upper2("async2");
System.out.println(r);
assertEquals("ASYNC2", r);
}
+ /**
+ * Show that we can make a call that works over service restarts
+ */
+ @Test
+ public void testServiceRestart() {
+ System.out.println("SampleNaiveAsyncTestCase.testUpper2");
+ System.out.println("Starting first node");
+ Upper upper = node.getService(Upper.class, "SampleNativeAsyncReference");
+ String r = upper.upper("async");
+ System.out.println(r);
+ assertEquals("ASYNC", r);
+
+ System.out.println("Stopping first node");
+ node.stop();
+
+ // now start another node and try call back in to get the
+ // async response to come back
+
+ System.out.println("Starting second node");
+ final NodeFactory nf = NodeFactory.getInstance();
+ String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString();
+ // Create the node using the pattern "name of composite file to start" / Contribution to use
+ node = nf.createNode("testnativeasync.composite", new Contribution("test", here));
+ node.start();
+ upper = node.getService(Upper.class, "SampleNativeAsyncReference");
+ r = upper.upper2("async2");
+ System.out.println(r);
+ assertEquals("ASYNC2", r);
+ }
+
+ /**
+ * Show that one-way operations work in the async case
+ */
@Test
public void testVoid() {
System.out.println("SampleNaiveAsyncTestCase.testUpperVoid");