diff options
author | slaws <slaws@13f79535-47bb-0310-9956-ffa450edef68> | 2010-12-07 11:23:53 +0000 |
---|---|---|
committer | slaws <slaws@13f79535-47bb-0310-9956-ffa450edef68> | 2010-12-07 11:23:53 +0000 |
commit | 74c8e2f04c98bd599c29709203737009b75f224a (patch) | |
tree | 7791b5c061679f94f88aeed0da51a776d53fbb7a /sca-java-2.x/trunk/modules | |
parent | c8fc4b58a3c30107f403e53f1213af0946121e5a (diff) |
TUSCANY-3801 - Allow the response chain "previous" link to be attached to an async response handler so that the invokerAsyncResponse can have a void return type.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1042976 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules')
11 files changed, 193 insertions, 129 deletions
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsync.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsync.java deleted file mode 100644 index a67a28a931..0000000000 --- a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsync.java +++ /dev/null @@ -1,74 +0,0 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tuscany.sca.invocation;
-
-/**
- * TUSCANY-3786
- *
- * Interface to describe an invoation where the request processing
- * can be performed independently of the response processing. This
- * has been instigated to allow async responses to be processed
- * independently of the requests that instigated them. Due to the need
- * to run the reponse processing interceptors effectively backwards the
- * methods defined here are not responsible for finding the next invoker
- * in the chain.
- *
- */
-public interface InvokerAsync {
-
- /**
- * Process the forward message and pass it down the chain
- *
- * @param msg The request Message
- * @return the processed message
- *
- */
- void invokeAsyncRequest(Message msg);
-
- /**
- * Process response message and pass it back up the chain.
- * This returns the message that is processed by the chain
- * so that it can be passes onto the appropriate invoker by the caller
- * the response path doesn't have an invoker.
- *
- * @param msg The request Message
- * @return the processed message
- *
- */
- Message invokeAsyncResponse(Message msg);
-
- /**
- * Process a request message
- *
- * @param msg The request Message
- * @return the processed message
- *
- */
- Message processRequest(Message msg);
-
- /**
- * Process a response message
- *
- * @param msg The request Message
- * @return the processed message
- *
- */
- Message processResponse(Message msg);
-
-}
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsyncRequest.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsyncRequest.java new file mode 100644 index 0000000000..5a53504ffb --- /dev/null +++ b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/invocation/InvokerAsyncRequest.java @@ -0,0 +1,51 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.invocation;
+
+/**
+ * TUSCANY-3786
+ *
+ * The request side of an Interface to describe an invocation where
+ * the request processing can be performed independently of the
+ * response processing.
+ */
+public interface InvokerAsyncRequest {
+
+ /**
+ * Process the request message and pass it down the chain
+ *
+ * @param msg The request Message
+ * @return the processed message
+ *
+ */
+ void invokeAsyncRequest(Message msg);
+
+ /**
+ * Process a request message. Provided so that the synchronous
+ * and asynchronous patterns can re-use the request message
+ * processing
+ *
+ * @param msg The request Message
+ * @return the processed message
+ *
+ */
+ Message processRequest(Message msg);
+
+
+}
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/EndpointAsyncProvider.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/EndpointAsyncProvider.java index dbf0c0fd8d..b74b1dd9a7 100644 --- a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/EndpointAsyncProvider.java +++ b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/EndpointAsyncProvider.java @@ -21,6 +21,7 @@ package org.apache.tuscany.sca.provider; import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
/**
@@ -51,5 +52,5 @@ public interface EndpointAsyncProvider extends EndpointProvider { * @para operation
* @return the invoker that will dispatch the async response
*/
- Invoker createAsyncResponseInvoker(Operation operation);
+ InvokerAsyncResponse createAsyncResponseInvoker();
}
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/EndpointReferenceAsyncProvider.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/EndpointReferenceAsyncProvider.java index 536b6f2ea5..044ba523b9 100644 --- a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/EndpointReferenceAsyncProvider.java +++ b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/EndpointReferenceAsyncProvider.java @@ -31,11 +31,10 @@ public interface EndpointReferenceAsyncProvider extends EndpointReferenceProvide /**
* TUSCANY-3801
- * Returns true if the service binding provider is natively able
- * to dispatch async responses.
+ * Returns true if the reference binding provider is natively able
+ * to receive async responses.
*
* @return true if the service provide support async operation natively
*/
boolean supportsNativeAsync();
-
}
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/ImplementationAsyncProvider.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/ImplementationAsyncProvider.java index b555087d82..1ddf015568 100644 --- a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/ImplementationAsyncProvider.java +++ b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/provider/ImplementationAsyncProvider.java @@ -22,6 +22,8 @@ package org.apache.tuscany.sca.provider; import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsyncRequest;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
/**
@@ -36,9 +38,10 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService; public interface ImplementationAsyncProvider extends ImplementationProvider {
/**
+ * TUSCANY-3801
* Create an async invoker for the component implementation in the invocation
* chain. The invoker will be responsible for calling the implementation
- * logic for the given component. The only realy difference between this and
+ * logic for the given component. The only real difference between this and
* createInvoker is that the Endpoint is passed in so that the invoker can
* engineer the async response
*
@@ -48,9 +51,10 @@ public interface ImplementationAsyncProvider extends ImplementationProvider { * @return An invoker that handles the invocation logic, null should be
* returned if no invoker is required
*/
- Invoker createAsyncInvoker(Endpoint endpoint, RuntimeComponentService service, Operation operation);
+ InvokerAsyncRequest createAsyncInvoker(Endpoint endpoint, RuntimeComponentService service, Operation operation);
/**
+ * TUSCANY-3801
* Create an invoker for the asynchronous responses in the invocation
* chain. The invoker will be responsible for processing the async
* response including correlating it with the forward call using
@@ -60,6 +64,6 @@ public interface ImplementationAsyncProvider extends ImplementationProvider { * @param operation The operation that the interceptor will handle
* @return An AsyncResponseHandler<T> instance
*/
- Invoker createAsyncResponseInvoker(Operation operation);
+ InvokerAsyncResponse createAsyncResponseInvoker(Operation operation);
}
diff --git a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java index 7c7c104947..9a936bf141 100644 --- a/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java +++ b/sca-java-2.x/trunk/modules/core-spi/src/main/java/org/apache/tuscany/sca/runtime/Invocable.java @@ -29,7 +29,6 @@ import org.apache.tuscany.sca.context.CompositeContext; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.InvocationChain; -import org.apache.tuscany.sca.invocation.InvokerAsync; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.provider.PolicyProvider; diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java index d74ee4a8f9..26fc6722aa 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java @@ -73,13 +73,15 @@ import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.invocation.Phase; import org.apache.tuscany.sca.provider.BindingProviderFactory; +import org.apache.tuscany.sca.provider.EndpointAsyncProvider; import org.apache.tuscany.sca.provider.EndpointProvider; import org.apache.tuscany.sca.provider.ImplementationAsyncProvider; import org.apache.tuscany.sca.provider.ImplementationProvider; @@ -365,6 +367,31 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } wireProcessor.process(this); + + // If we have to support async and there is no binding chain + // then set the response path to point directly to the + // binding provided async response handler + if (isAsyncInvocation() && + bindingInvocationChain == null){ + // fix up the operation chain response path to point back to the + // binding provided async response handler + ServiceBindingProvider serviceBindingProvider = getBindingProvider(); + if (serviceBindingProvider instanceof EndpointAsyncProvider){ + EndpointAsyncProvider asyncEndpointProvider = (EndpointAsyncProvider)serviceBindingProvider; + InvokerAsyncResponse asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(); + + for (InvocationChain chain : getInvocationChains()){ + Invoker invoker = chain.getHeadInvoker(); + if (invoker instanceof InterceptorAsync){ + ((InterceptorAsync)invoker).setPrevious(asyncResponseInvoker); + } else { + //TODO - throw error once the old async code is removed + } + } + } else { + // TODO - throw error once the old async code is removed + } + } } /** @@ -570,11 +597,41 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } - // TODO - add something on the end of the wire to invoke the - // invocation chain. Need to split out the runtime - // wire invoker into conversation, callback interceptors etc + // Add the runtime invoker to the end of the binding chain. + // It mediates between the binding chain and selects the + // correct invocation chain based on the operation that's + // been selected bindingInvocationChain.addInvoker(invoker); - + + if (isAsyncInvocation()){ + // fix up the invocation chains to point back to the + // binding chain so that async response messages + // are processed correctly + for (InvocationChain chain : getInvocationChains()){ + Invoker invoker = chain.getHeadInvoker(); + if (invoker instanceof InterceptorAsync){ + ((InterceptorAsync)invoker).setPrevious((InvokerAsyncResponse)bindingInvocationChain.getTailInvoker()); + } else { + // TODO - raise an error. Not doing that while + // we have the old async mechanism in play + } + } + + // fix up the binding chain response path to point back to the + // binding provided async response handler + ServiceBindingProvider serviceBindingProvider = getBindingProvider(); + if (serviceBindingProvider instanceof EndpointAsyncProvider){ + EndpointAsyncProvider asyncEndpointProvider = (EndpointAsyncProvider)serviceBindingProvider; + InvokerAsyncResponse asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(); + if (bindingInvocationChain.getHeadInvoker() instanceof InterceptorAsync){ + ((InterceptorAsync)bindingInvocationChain.getHeadInvoker()).setPrevious(asyncResponseInvoker); + } else { + //TODO - throw error once the old async code is removed + } + } else { + //TODO - throw error once the old async code is removed + } + } } /** @@ -639,7 +696,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint RuntimeComponentService runtimeService = (RuntimeComponentService)service; if (runtimeService.getName().endsWith("_asyncCallback")){ if (provider instanceof ImplementationAsyncProvider){ - invoker = ((ImplementationAsyncProvider)provider).createAsyncResponseInvoker(operation); + invoker = (Invoker)((ImplementationAsyncProvider)provider).createAsyncResponseInvoker(operation); } else { // TODO - This should be an error but taking account of the // existing non-native async support @@ -656,7 +713,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } } else if (isAsyncInvocation() && provider instanceof ImplementationAsyncProvider){ - invoker = ((ImplementationAsyncProvider)provider).createAsyncInvoker(this, (RuntimeComponentService)service, operation); + invoker = (Invoker)((ImplementationAsyncProvider)provider).createAsyncInvoker(this, (RuntimeComponentService)service, operation); } else { invoker = provider.createInvoker((RuntimeComponentService)service, operation); } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java index b8ca59b214..a854e833ac 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointReferenceImpl.java @@ -63,14 +63,17 @@ import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory; import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterfaceContract; import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; import org.apache.tuscany.sca.invocation.Phase; import org.apache.tuscany.sca.provider.BindingProviderFactory; import org.apache.tuscany.sca.provider.EndpointReferenceProvider; +import org.apache.tuscany.sca.provider.ImplementationAsyncProvider; +import org.apache.tuscany.sca.provider.ImplementationProvider; import org.apache.tuscany.sca.provider.PolicyProvider; import org.apache.tuscany.sca.provider.PolicyProviderFactory; import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint; @@ -324,10 +327,27 @@ public class RuntimeEndpointReferenceImpl extends EndpointReferenceImpl implemen } } - // Set the chains until it's fully populated. If we initialize too early, any exeception could + // Set the chains until it's fully populated. If we initialize too early, any exception could // leave this endpoint reference in a wrong state with an empty chain. chains = chainList; wireProcessor.process(this); + + if (isAsyncInvocation()){ + // fix up all of the operation chain response paths + // to point back to the implementation provided + // async response handler + ImplementationProvider implementationProvider = ((RuntimeComponent)getComponent()).getImplementationProvider(); + if (implementationProvider instanceof ImplementationAsyncProvider){ + for (InvocationChain chain : getInvocationChains()){ + InvokerAsyncResponse asyncResponseInvoker = ((ImplementationAsyncProvider)implementationProvider).createAsyncResponseInvoker(chain.getSourceOperation()); + if (chain.getHeadInvoker() instanceof InterceptorAsync){ + ((InterceptorAsync)chain.getHeadInvoker()).setPrevious(asyncResponseInvoker); + } else { + //TODO - throw error once the old async code is removed + } + } + } + } } /** diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java index 0c42a523a6..8495445951 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/InterceptorAsyncImpl.java @@ -22,7 +22,8 @@ package org.apache.tuscany.sca.core.invocation; import org.apache.tuscany.sca.invocation.InterceptorAsync;
import org.apache.tuscany.sca.invocation.Invoker;
-import org.apache.tuscany.sca.invocation.InvokerAsync;
+import org.apache.tuscany.sca.invocation.InvokerAsyncRequest;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.invocation.Message;
/**
@@ -33,22 +34,22 @@ import org.apache.tuscany.sca.invocation.Message; */
public abstract class InterceptorAsyncImpl implements InterceptorAsync {
- protected InvokerAsync next;
- protected InvokerAsync previous;
+ protected Invoker next;
+ protected InvokerAsyncResponse previous;
public Invoker getNext() {
return (Invoker)next;
}
public void setNext(Invoker next) {
- this.next = (InvokerAsync)next;
+ this.next = next;
}
- public InvokerAsync getPrevious() {
+ public InvokerAsyncResponse getPrevious() {
return previous;
}
- public void setPrevious(InvokerAsync previous) {
+ public void setPrevious(InvokerAsyncResponse previous) {
this.previous = previous;
}
@@ -61,14 +62,23 @@ public abstract class InterceptorAsyncImpl implements InterceptorAsync { public void invokeAsyncRequest(Message msg) {
msg = processRequest(msg);
- ((InvokerAsync)getNext()).invokeAsyncRequest(msg);
+ ((InvokerAsyncRequest)getNext()).invokeAsyncRequest(msg);
}
- public Message invokeAsyncResponse(Message msg) {
+ public void invokeAsyncResponse(Message msg) {
msg = processResponse(msg);
- if (getPrevious() != null){
- return ((InvokerAsync)getPrevious()).invokeAsyncResponse(msg);
- }
- return msg;
+ ((InvokerAsyncResponse)getPrevious()).invokeAsyncResponse(msg);
+ }
+
+ /**
+ * A testing method while I use the local SCA binding wire to look
+ * at how the async response path works. This allows me to detect the
+ * point where the reference wire turns into the service with in the
+ * optimized case
+ *
+ * @return
+ */
+ public boolean isLocalSCABIndingInvoker() {
+ return false;
}
}
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java index e1ec899fa5..6c9f13ff17 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeInvoker.java @@ -30,20 +30,13 @@ import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.interfacedef.Operation; -import org.apache.tuscany.sca.invocation.Interceptor; -import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; -import org.apache.tuscany.sca.provider.EndpointAsyncProvider; -import org.apache.tuscany.sca.provider.ImplementationAsyncProvider; -import org.apache.tuscany.sca.provider.ImplementationProvider; -import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.Invocable; -import org.apache.tuscany.sca.runtime.RuntimeComponent; -import org.apache.tuscany.sca.runtime.RuntimeEndpoint; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; import org.apache.tuscany.sca.work.WorkScheduler; @@ -144,14 +137,14 @@ public class RuntimeInvoker implements Invoker{ } // Perform the async invocation - InvokerAsync headInvoker = (InvokerAsync)chain.getHeadInvoker(); + Invoker headInvoker = chain.getHeadInvoker(); Message msgContext = ThreadMessageContext.setMessageContext(msg); try { // TODO - is this the way we'll pass async messages down the chain? Message resp = null; try { - headInvoker.invokeAsyncRequest(msg); + ((InvokerAsyncRequest)headInvoker).invokeAsyncRequest(msg); } catch (Throwable ex) { // temporary fix to swallow the dummy exception that's // thrown back to get past the response chain processing. @@ -179,10 +172,11 @@ public class RuntimeInvoker implements Invoker{ public void invokeAsyncResponse(Message msg) { InvocationChain chain = invocable.getInvocationChain(msg.getOperation()); - InvokerAsync tailInvoker = (InvokerAsync)chain.getTailInvoker(); + Invoker tailInvoker = chain.getTailInvoker(); - Message asyncResponseMsg = tailInvoker.invokeAsyncResponse(msg); + ((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg); +/* now statically configured // now get the asyncResponseInvoker Invoker asyncResponseInvoker = null; @@ -193,7 +187,7 @@ public class RuntimeInvoker implements Invoker{ ServiceBindingProvider serviceBindingProvider = ep.getBindingProvider(); if (serviceBindingProvider instanceof EndpointAsyncProvider){ EndpointAsyncProvider asyncEndpointProvider = (EndpointAsyncProvider)serviceBindingProvider; - asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(asyncResponseMsg.getOperation()); + asyncResponseInvoker = asyncEndpointProvider.createAsyncResponseInvoker(); } else { // TODO - throw error @@ -211,5 +205,6 @@ public class RuntimeInvoker implements Invoker{ } asyncResponseInvoker.invoke(asyncResponseMsg); +*/ } } diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java index 716379d141..69158683ef 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java @@ -22,13 +22,15 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; +import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.DataExchangeSemantics; import org.apache.tuscany.sca.invocation.Interceptor; import org.apache.tuscany.sca.invocation.InterceptorAsync; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Invoker; -import org.apache.tuscany.sca.invocation.InvokerAsync; +import org.apache.tuscany.sca.invocation.InvokerAsyncRequest; +import org.apache.tuscany.sca.invocation.InvokerAsyncResponse; import org.apache.tuscany.sca.invocation.Phase; import org.apache.tuscany.sca.invocation.PhasedInterceptor; @@ -98,19 +100,18 @@ public class InvocationChainImpl implements InvocationChain { while (next != null){ tail = next; if (next instanceof Interceptor){ - next = ((Interceptor)next).getNext(); - // TODO - hack to get round SCA binding optimization // On the reference side this loop will go all the way // across to the service invoker so stop looking if we find // an invoker with no "previous" pointer. This will be the point // where the SCA binding invoker points to the head of the // service chain - if (!(next instanceof InterceptorAsync) || - ((InterceptorAsync)next).getPrevious() == null){ + ((InterceptorAsyncImpl)next).isLocalSCABIndingInvoker()){ break; } + + next = ((Interceptor)next).getNext(); } else { next = null; } @@ -152,7 +153,8 @@ public class InvocationChainImpl implements InvocationChain { private void addInvoker(String phase, Invoker invoker) { if (isAsyncInvocation && - !(invoker instanceof InvokerAsync)){ + !(invoker instanceof InvokerAsyncRequest) && + !(invoker instanceof InvokerAsyncResponse) ){ // TODO - should raise an error but don't want to break // the existing non-native async support /* @@ -192,18 +194,18 @@ public class InvocationChainImpl implements InvocationChain { if (before != null) { if (before.getInvoker() instanceof Interceptor) { ((Interceptor)before.getInvoker()).setNext(invoker); - if (invoker instanceof InterceptorAsync && - before.getInvoker() instanceof InvokerAsync){ - ((InterceptorAsync) invoker).setPrevious((InvokerAsync)before.getInvoker()); + if ((invoker instanceof InterceptorAsync) && + (before.getInvoker() instanceof InvokerAsyncResponse)) { + ((InterceptorAsync) invoker).setPrevious((InvokerAsyncResponse)before.getInvoker()); } } } if (after != null) { if (invoker instanceof Interceptor) { ((Interceptor)invoker).setNext(after.getInvoker()); - if (after.getInvoker() instanceof InterceptorAsync && - invoker instanceof InvokerAsync){ - ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsync)invoker); + if ((after.getInvoker() instanceof InterceptorAsync) && + (invoker instanceof InvokerAsyncResponse)){ + ((InterceptorAsync) after.getInvoker()).setPrevious((InvokerAsyncResponse)invoker); } } } |