From 2ea0eebe2f68ad06dc19abc7be2b6597ee80ed49 Mon Sep 17 00:00:00 2001 From: slaws Date: Fri, 21 Jan 2011 15:39:42 +0000 Subject: TUSCANY-3783 - Fix a hole in the AsyncReponseInvoker serialization to cover the case there invoker is de-serialized inside the same context that serialized it. Update the sample to demonstrate a stop/start scenario. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1061851 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/sample/impl/SampleProvider.java | 4 +- .../main/java/sample/impl/SampleWSDLInvoker.java | 69 +++++++++++++++++----- .../java/sample/UpperSampleAsyncReferenceImpl.java | 24 +++++--- .../sample/impl/SampleNativeAsyncTestCase.java | 67 ++++++++++++++++++++- 4 files changed, 137 insertions(+), 27 deletions(-) (limited to 'sca-java-2.x/trunk/samples/extending-tuscany') diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java index a7e68cfee6..7186c0a0d2 100644 --- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java +++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java @@ -49,7 +49,9 @@ class SampleProvider implements ImplementationAsyncProvider { final ProxyFactory pxf; final ExtensionPointRegistry ep; Object instance; - Map asyncMessageMap = new HashMap(); + + // make this static rather than worrying about persistence on the reference side + static Map asyncMessageMap = new HashMap(); SampleProvider(final RuntimeComponent comp, final SampleImplementation impl, ProxyFactory pf, ExtensionPointRegistry ep) { this.comp = comp; diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java index 41e53d90b7..5c76cfd90a 100644 --- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java +++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java @@ -19,6 +19,10 @@ package sample.impl; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.lang.reflect.Method; import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker; @@ -57,23 +61,58 @@ class SampleWSDLInvoker extends InterceptorAsyncImpl { } public void invokeAsyncRequest(Message msg) { - // Retrieve the async callback information - AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); - if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); - - Message responseMsg = processRequest(msg); + if (msg.getOperation().getName().equals("upper")){ + // Retrieve the async callback information + AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); + if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); + + Message responseMsg = processRequest(msg); + + // in this sample programming model we make the async + // response from the implementation provider. The + // component implementation itself doesn't get a chance to + // do async responses. - // in this sample programming model we make the async - // response from the implementation provider. The - // component implementation itself doesn't get a chance to - // do async responses. - - // At this point we could serialize the AsyncResponseInvoker and pick it up again - // later to send the async response - - if (responseMsg.getBody() != null){ - respInvoker.invokeAsyncResponse(responseMsg); + // At this point we could serialize the AsyncResponseInvoker and pick it up again + // later to send the async response + + try { + FileOutputStream fos = new FileOutputStream("ari.dat"); + ObjectOutputStream oos = new ObjectOutputStream(fos); + oos.writeObject(respInvoker); + oos.close(); + respInvoker.invokeAsyncResponse(responseMsg); + } catch (Exception ex) { + ex.printStackTrace(); + } + + } else if (msg.getOperation().getName().equals("upper2")){ + Message responseMsg = processRequest(msg); + + // read the async response invoker back in and call it + FileInputStream fis = null; + ObjectInputStream ois = null; + try { + fis = new FileInputStream("ari.dat"); + ois = new ObjectInputStream(fis); + AsyncResponseInvoker respInvoker = (AsyncResponseInvoker) ois.readObject(); + ois.close(); + respInvoker.invokeAsyncResponse(responseMsg); + } catch (Exception ex) { + ex.printStackTrace(); + } + } else { + // Retrieve the async callback information + AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); + if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); + + Message responseMsg = processRequest(msg); + + if (responseMsg.getBody() != null){ + respInvoker.invokeAsyncResponse(responseMsg); + } } + } // end method invokeAsyncRequest public Message processRequest(Message msg) { diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java index 4bfc8356c4..e21ab5ac90 100644 --- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java +++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java @@ -62,14 +62,17 @@ public class UpperSampleAsyncReferenceImpl { upper.callAsync("upper", ureq); try { - Thread.sleep(500); - latch.await(500, TimeUnit.SECONDS); + //Thread.sleep(500); + latch.await(5, TimeUnit.SECONDS); } catch (Exception ex) { // do nothing } - if( response != null ) return response.getTextContent(); - else return "upper did not get called back"; + if( response != null ) { + return response.getTextContent(); + } else { + return "upper did not get called back"; + } } /** @@ -95,14 +98,19 @@ public class UpperSampleAsyncReferenceImpl { upper.callAsync("upper2", ureq); try { - Thread.sleep(500); - latch.await(500, TimeUnit.SECONDS); + //Thread.sleep(500); + latch.await(5, TimeUnit.SECONDS); } catch (Exception ex) { // do nothing } - if( response2 != null ) return response2.getTextContent(); - else return "upper did not get called back"; + // because we serialize the upper request and re-use it in upper2 + // the response to upper2 comes back to the upper callback + if( response != null ) { + return response.getTextContent(); + } else { + return "upper2 did not get called back"; + } } /** diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java index 28d54647c5..9667150063 100644 --- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java +++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java @@ -24,7 +24,9 @@ import static org.junit.Assert.assertEquals; import org.apache.tuscany.sca.node.Contribution; import org.apache.tuscany.sca.node.Node; import org.apache.tuscany.sca.node.NodeFactory; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -39,9 +41,10 @@ import sample.Upper; public class SampleNativeAsyncTestCase { static Node node; +/* @BeforeClass public static void setUp() throws Exception { - final NodeFactory nf = NodeFactory.newInstance(); + final NodeFactory nf = NodeFactory.getInstance(); String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString(); // Create the node using the pattern "name of composite file to start" / Contribution to use node = nf.createNode("testnativeasync.composite", new Contribution("test", here)); @@ -52,7 +55,25 @@ public class SampleNativeAsyncTestCase { public static void tearDown() throws Exception { node.stop(); } +*/ + + @Before + public void setUp() throws Exception { + final NodeFactory nf = NodeFactory.getInstance(); + String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString(); + // Create the node using the pattern "name of composite file to start" / Contribution to use + node = nf.createNode("testnativeasync.composite", new Contribution("test", here)); + node.start(); + } + @After + public void tearDown() throws Exception { + node.stop(); + } + + /** + * Show that we can make a basic call + */ @Test public void testUpper() { System.out.println("SampleNaiveAsyncTestCase.testUpper"); @@ -62,15 +83,55 @@ public class SampleNativeAsyncTestCase { assertEquals("ASYNC", r); } + /** + * Show that we can make a call that requires us to persist the + * AsyncResponseInvoker + */ @Test - public void testUpper2() { + public void testPersistAsyncResponseInvoker() { System.out.println("SampleNaiveAsyncTestCase.testUpper2"); Upper upper = node.getService(Upper.class, "SampleNativeAsyncReference"); - final String r = upper.upper2("async2"); + // call upper to write out the async response invoker + String r = upper.upper("async"); + // call upper2 to read it back in again + r = upper.upper2("async2"); System.out.println(r); assertEquals("ASYNC2", r); } + /** + * Show that we can make a call that works over service restarts + */ + @Test + public void testServiceRestart() { + System.out.println("SampleNaiveAsyncTestCase.testUpper2"); + System.out.println("Starting first node"); + Upper upper = node.getService(Upper.class, "SampleNativeAsyncReference"); + String r = upper.upper("async"); + System.out.println(r); + assertEquals("ASYNC", r); + + System.out.println("Stopping first node"); + node.stop(); + + // now start another node and try call back in to get the + // async response to come back + + System.out.println("Starting second node"); + final NodeFactory nf = NodeFactory.getInstance(); + String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString(); + // Create the node using the pattern "name of composite file to start" / Contribution to use + node = nf.createNode("testnativeasync.composite", new Contribution("test", here)); + node.start(); + upper = node.getService(Upper.class, "SampleNativeAsyncReference"); + r = upper.upper2("async2"); + System.out.println(r); + assertEquals("ASYNC2", r); + } + + /** + * Show that one-way operations work in the async case + */ @Test public void testVoid() { System.out.println("SampleNaiveAsyncTestCase.testUpperVoid"); -- cgit v1.2.3