diff options
Diffstat (limited to '')
6 files changed, 171 insertions, 28 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java index e598be277a..0f86ee79ba 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java @@ -308,6 +308,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } // end method invokeAsync(Operation, Message) public void invokeAsyncResponse(Message msg){ + resolve(); invoker.invokeAsyncResponse(msg); } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java index 55c8d7fcab..38727d4670 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java @@ -19,11 +19,15 @@ package org.apache.tuscany.sca.core.invocation;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
@@ -32,7 +36,11 @@ import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.node.NodeFactory;
import org.apache.tuscany.sca.provider.EndpointAsyncProvider;
+import org.apache.tuscany.sca.runtime.DomainRegistryFactory;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ComponentContext;
@@ -189,5 +197,29 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab public void setResponseEndpointReference(
RuntimeEndpointReference responseEndpointReference) {
this.responseEndpointReference = responseEndpointReference;
- }
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
+ {
+ in.defaultReadObject();
+
+ // find the real endpoint
+ ExtensionPointRegistry extensionPointRegistry = NodeFactory.getInstance().getExtensionPointRegistry();
+ DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(extensionPointRegistry);
+ Collection<EndpointRegistry> endpointRegistries = domainRegistryFactory.getEndpointRegistries();
+ EndpointRegistry endpointRegistry = endpointRegistries.iterator().next();
+ List<Endpoint> endpoints = endpointRegistry.findEndpoint(requestEndpoint.getURI());
+ requestEndpoint = (RuntimeEndpoint)endpoints.get(0);
+
+ if (responseTargetAddress instanceof EndpointReference){
+ // fix the target as in this case it will be an EPR
+ EndpointReference epr = (EndpointReference)responseTargetAddress;
+ List<EndpointReference> endpointReferences = endpointRegistry.getEndpointReferences();
+ for (EndpointReference endpointReference : endpointReferences){
+ if (endpointReference.getURI().equals(epr.getURI())){
+ responseTargetAddress = (T)endpointReference;
+ }
+ }
+ }
+ }
} // end class
diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java index a7e68cfee6..7186c0a0d2 100644 --- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java +++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleProvider.java @@ -49,7 +49,9 @@ class SampleProvider implements ImplementationAsyncProvider { final ProxyFactory pxf; final ExtensionPointRegistry ep; Object instance; - Map<String, Object> asyncMessageMap = new HashMap<String, Object>(); + + // make this static rather than worrying about persistence on the reference side + static Map<String, Object> asyncMessageMap = new HashMap<String, Object>(); SampleProvider(final RuntimeComponent comp, final SampleImplementation impl, ProxyFactory pf, ExtensionPointRegistry ep) { this.comp = comp; diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java index 41e53d90b7..5c76cfd90a 100644 --- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java +++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/main/java/sample/impl/SampleWSDLInvoker.java @@ -19,6 +19,10 @@ package sample.impl; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.lang.reflect.Method; import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker; @@ -57,23 +61,58 @@ class SampleWSDLInvoker extends InterceptorAsyncImpl { } public void invokeAsyncRequest(Message msg) { - // Retrieve the async callback information - AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); - if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); - - Message responseMsg = processRequest(msg); + if (msg.getOperation().getName().equals("upper")){ + // Retrieve the async callback information + AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); + if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); + + Message responseMsg = processRequest(msg); + + // in this sample programming model we make the async + // response from the implementation provider. The + // component implementation itself doesn't get a chance to + // do async responses. - // in this sample programming model we make the async - // response from the implementation provider. The - // component implementation itself doesn't get a chance to - // do async responses. - - // At this point we could serialize the AsyncResponseInvoker and pick it up again - // later to send the async response - - if (responseMsg.getBody() != null){ - respInvoker.invokeAsyncResponse(responseMsg); + // At this point we could serialize the AsyncResponseInvoker and pick it up again + // later to send the async response + + try { + FileOutputStream fos = new FileOutputStream("ari.dat"); + ObjectOutputStream oos = new ObjectOutputStream(fos); + oos.writeObject(respInvoker); + oos.close(); + respInvoker.invokeAsyncResponse(responseMsg); + } catch (Exception ex) { + ex.printStackTrace(); + } + + } else if (msg.getOperation().getName().equals("upper2")){ + Message responseMsg = processRequest(msg); + + // read the async response invoker back in and call it + FileInputStream fis = null; + ObjectInputStream ois = null; + try { + fis = new FileInputStream("ari.dat"); + ois = new ObjectInputStream(fis); + AsyncResponseInvoker respInvoker = (AsyncResponseInvoker) ois.readObject(); + ois.close(); + respInvoker.invokeAsyncResponse(responseMsg); + } catch (Exception ex) { + ex.printStackTrace(); + } + } else { + // Retrieve the async callback information + AsyncResponseInvoker respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER"); + if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); + + Message responseMsg = processRequest(msg); + + if (responseMsg.getBody() != null){ + respInvoker.invokeAsyncResponse(responseMsg); + } } + } // end method invokeAsyncRequest public Message processRequest(Message msg) { diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java index 4bfc8356c4..e21ab5ac90 100644 --- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java +++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/UpperSampleAsyncReferenceImpl.java @@ -62,14 +62,17 @@ public class UpperSampleAsyncReferenceImpl { upper.callAsync("upper", ureq); try { - Thread.sleep(500); - latch.await(500, TimeUnit.SECONDS); + //Thread.sleep(500); + latch.await(5, TimeUnit.SECONDS); } catch (Exception ex) { // do nothing } - if( response != null ) return response.getTextContent(); - else return "upper did not get called back"; + if( response != null ) { + return response.getTextContent(); + } else { + return "upper did not get called back"; + } } /** @@ -95,14 +98,19 @@ public class UpperSampleAsyncReferenceImpl { upper.callAsync("upper2", ureq); try { - Thread.sleep(500); - latch.await(500, TimeUnit.SECONDS); + //Thread.sleep(500); + latch.await(5, TimeUnit.SECONDS); } catch (Exception ex) { // do nothing } - if( response2 != null ) return response2.getTextContent(); - else return "upper did not get called back"; + // because we serialize the upper request and re-use it in upper2 + // the response to upper2 comes back to the upper callback + if( response != null ) { + return response.getTextContent(); + } else { + return "upper2 did not get called back"; + } } /** diff --git a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java index 28d54647c5..9667150063 100644 --- a/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java +++ b/sca-java-2.x/trunk/samples/extending-tuscany/implementation-sample/src/test/java/sample/impl/SampleNativeAsyncTestCase.java @@ -24,7 +24,9 @@ import static org.junit.Assert.assertEquals; import org.apache.tuscany.sca.node.Contribution; import org.apache.tuscany.sca.node.Node; import org.apache.tuscany.sca.node.NodeFactory; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -39,9 +41,10 @@ import sample.Upper; public class SampleNativeAsyncTestCase { static Node node; +/* @BeforeClass public static void setUp() throws Exception { - final NodeFactory nf = NodeFactory.newInstance(); + final NodeFactory nf = NodeFactory.getInstance(); String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString(); // Create the node using the pattern "name of composite file to start" / Contribution to use node = nf.createNode("testnativeasync.composite", new Contribution("test", here)); @@ -52,7 +55,25 @@ public class SampleNativeAsyncTestCase { public static void tearDown() throws Exception { node.stop(); } +*/ + + @Before + public void setUp() throws Exception { + final NodeFactory nf = NodeFactory.getInstance(); + String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString(); + // Create the node using the pattern "name of composite file to start" / Contribution to use + node = nf.createNode("testnativeasync.composite", new Contribution("test", here)); + node.start(); + } + @After + public void tearDown() throws Exception { + node.stop(); + } + + /** + * Show that we can make a basic call + */ @Test public void testUpper() { System.out.println("SampleNaiveAsyncTestCase.testUpper"); @@ -62,15 +83,55 @@ public class SampleNativeAsyncTestCase { assertEquals("ASYNC", r); } + /** + * Show that we can make a call that requires us to persist the + * AsyncResponseInvoker + */ @Test - public void testUpper2() { + public void testPersistAsyncResponseInvoker() { System.out.println("SampleNaiveAsyncTestCase.testUpper2"); Upper upper = node.getService(Upper.class, "SampleNativeAsyncReference"); - final String r = upper.upper2("async2"); + // call upper to write out the async response invoker + String r = upper.upper("async"); + // call upper2 to read it back in again + r = upper.upper2("async2"); System.out.println(r); assertEquals("ASYNC2", r); } + /** + * Show that we can make a call that works over service restarts + */ + @Test + public void testServiceRestart() { + System.out.println("SampleNaiveAsyncTestCase.testUpper2"); + System.out.println("Starting first node"); + Upper upper = node.getService(Upper.class, "SampleNativeAsyncReference"); + String r = upper.upper("async"); + System.out.println(r); + assertEquals("ASYNC", r); + + System.out.println("Stopping first node"); + node.stop(); + + // now start another node and try call back in to get the + // async response to come back + + System.out.println("Starting second node"); + final NodeFactory nf = NodeFactory.getInstance(); + String here = SampleNativeAsyncTestCase.class.getProtectionDomain().getCodeSource().getLocation().toString(); + // Create the node using the pattern "name of composite file to start" / Contribution to use + node = nf.createNode("testnativeasync.composite", new Contribution("test", here)); + node.start(); + upper = node.getService(Upper.class, "SampleNativeAsyncReference"); + r = upper.upper2("async2"); + System.out.println(r); + assertEquals("ASYNC2", r); + } + + /** + * Show that one-way operations work in the async case + */ @Test public void testVoid() { System.out.println("SampleNaiveAsyncTestCase.testUpperVoid"); |