summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/binding-sca-runtime/src
diff options
context:
space:
mode:
authoredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2011-01-11 14:18:49 +0000
committeredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2011-01-11 14:18:49 +0000
commitcb90f665644dc236e77ffca9ca508749c9fc2e42 (patch)
treeb8b78f2567b5a3e7986e875f1eead6a42d8004c8 /sca-java-2.x/trunk/modules/binding-sca-runtime/src
parent2c3a68e1c46978fea1603cd106ccc630d13ad68f (diff)
Complete enablement of the SCA Binding to deal with local Async invocations - as under TUSCANY-3811
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1057653 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules/binding-sca-runtime/src')
-rw-r--r--sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java6
-rw-r--r--sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAServiceBindingProvider.java27
-rw-r--r--sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingAsyncResponseInvoker.java23
-rw-r--r--sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java52
-rw-r--r--sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingLocalInvocationInterceptor.java109
5 files changed, 202 insertions, 15 deletions
diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java
index dff89b8d79..6bda9f0c2d 100644
--- a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java
+++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAReferenceBindingProvider.java
@@ -63,10 +63,12 @@ public class RuntimeSCAReferenceBindingProvider implements EndpointReferenceAsyn
private Mediator mediator;
private InterfaceContractMapper interfaceContractMapper;
private SCABindingMapper scaBindingMapper;
+ private ExtensionPointRegistry registry;
public RuntimeSCAReferenceBindingProvider(ExtensionPointRegistry extensionPoints,
RuntimeEndpointReference endpointReference) {
- this.endpointReference = endpointReference;
+ this.registry = extensionPoints;
+ this.endpointReference = endpointReference;
this.component = (RuntimeComponent)endpointReference.getComponent();
this.reference = (RuntimeComponentReference)endpointReference.getReference();
this.binding = (SCABinding)endpointReference.getBinding();
@@ -155,7 +157,7 @@ public class RuntimeSCAReferenceBindingProvider implements EndpointReferenceAsyn
// it turns out that the chain source and target operations are the same, and are the operation
// from the target, not sure if thats by design or a bug. The SCA binding invoker needs to know
// the source and target class loaders so pass in the real source operation in the constructor
- return chain == null ? null : new SCABindingInvoker(chain, operation, mediator, passByValue, epr);
+ return chain == null ? null : new SCABindingInvoker(chain, operation, mediator, passByValue, epr, registry);
}
}
return null;
diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAServiceBindingProvider.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAServiceBindingProvider.java
index 559ee59bc1..f5745f75e3 100644
--- a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAServiceBindingProvider.java
+++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/RuntimeSCAServiceBindingProvider.java
@@ -19,11 +19,16 @@
package org.apache.tuscany.sca.binding.sca.provider;
+import java.util.Iterator;
+
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
+import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
+import org.apache.tuscany.sca.invocation.Phase;
import org.apache.tuscany.sca.provider.EndpointAsyncProvider;
+import org.apache.tuscany.sca.provider.OptimisingBindingProvider;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
@@ -36,7 +41,7 @@ import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
*
* @version $Rev$ $Date$
*/
-public class RuntimeSCAServiceBindingProvider implements EndpointAsyncProvider {
+public class RuntimeSCAServiceBindingProvider implements EndpointAsyncProvider, OptimisingBindingProvider {
private RuntimeEndpoint endpoint;
private RuntimeComponentService service;
@@ -112,5 +117,23 @@ public class RuntimeSCAServiceBindingProvider implements EndpointAsyncProvider {
public InvokerAsyncResponse createAsyncResponseInvoker() {
return new SCABindingAsyncResponseInvoker(null, null);
}
+
+ /**
+ * Handles the optimisation for the service side chain, which provides a mechanism for direct local
+ * invocation of the service in cases where the component reference is in the same JVM as the
+ * component service. Effectively, this means skipping any Remote Binding listener and its associated
+ * binding chain and data binding processors.
+ *
+ * This means inserting a SCABindingLocalInvocationInterceptor into the chains for the Endpoint,
+ * which is placed immediately before the Policy processors (and after any Databinding processors)
+ */
+ public void optimiseBinding(RuntimeEndpoint ep) {
+ // To optimise, place an SCA binding Local Invocation interceptor at the start of the POLICY phase
+ // of the service chain...
+ for (InvocationChain chain : ep.getInvocationChains()) {
+ chain.addHeadInterceptor( Phase.SERVICE_POLICY, new SCABindingLocalInvocationInterceptor() );
+ } // end for
+
+ } // end method optimiseBinding
-}
+} // end class RuntimeSCAServiceBinding
diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingAsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingAsyncResponseInvoker.java
index 240ba69d62..fb7b6210d6 100644
--- a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingAsyncResponseInvoker.java
+++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingAsyncResponseInvoker.java
@@ -20,9 +20,11 @@
package org.apache.tuscany.sca.binding.sca.provider;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
+import org.oasisopen.sca.ServiceRuntimeException;
/**
* @version $Rev: 989157 $ $Date: 2010-08-25 16:02:01 +0100 (Wed, 25 Aug 2010) $
@@ -34,8 +36,21 @@ public class SCABindingAsyncResponseInvoker implements InvokerAsyncResponse {
}
// TODO - this only works for the local case!
- public void invokeAsyncResponse(Message msg) {
- RuntimeEndpointReference epr = (RuntimeEndpointReference)msg.getFrom();
- epr.invokeAsyncResponse(msg);
- }
+ @SuppressWarnings("unchecked")
+ public void invokeAsyncResponse(Message msg) {
+ AsyncResponseInvoker<RuntimeEndpointReference> asyncInvoker =
+ (AsyncResponseInvoker<RuntimeEndpointReference>)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
+ RuntimeEndpointReference epr;
+ if( asyncInvoker != null ) {
+ epr = asyncInvoker.getResponseTargetAddress();
+ } else {
+ epr = (RuntimeEndpointReference)msg.getFrom();
+ } // end if
+ if( epr != null ) {
+ epr.invokeAsyncResponse(msg);
+ } else {
+ throw new ServiceRuntimeException("SCABindingAsyncResponseInvoker - invokeAsyncResponse has null epr");
+ } // end if
+
+ } // end method invokeAsyncResponse
} \ No newline at end of file
diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java
index 8ea754c8fc..de0b1cfe0e 100644
--- a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java
+++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingInvoker.java
@@ -19,13 +19,18 @@
package org.apache.tuscany.sca.binding.sca.provider;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl;
import org.apache.tuscany.sca.databinding.Mediator;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsyncRequest;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.invocation.Phase;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
@@ -42,11 +47,13 @@ public class SCABindingInvoker extends InterceptorAsyncImpl {
private boolean passByValue;
private RuntimeEndpointReference epr;
private RuntimeEndpoint ep;
+ private ExtensionPointRegistry registry;
/**
- * Construct a SCABindingInvoker that delegates to the service invocaiton chain
+ * Construct a SCABindingInvoker that delegates to the service invocation chain
*/
- public SCABindingInvoker(InvocationChain chain, Operation sourceOperation, Mediator mediator, boolean passByValue, RuntimeEndpointReference epr) {
+ public SCABindingInvoker(InvocationChain chain, Operation sourceOperation, Mediator mediator,
+ boolean passByValue, RuntimeEndpointReference epr, ExtensionPointRegistry registry) {
super();
this.chain = chain;
this.mediator = mediator;
@@ -55,6 +62,7 @@ public class SCABindingInvoker extends InterceptorAsyncImpl {
this.passByValue = passByValue;
this.epr = epr;
this.ep = (RuntimeEndpoint)epr.getTargetEndpoint();
+ this.registry = registry;
}
/**
@@ -87,15 +95,21 @@ public class SCABindingInvoker extends InterceptorAsyncImpl {
// Get the message ID
String msgID = (String)msg.getHeaders().get("MESSAGE_ID");
+ String operationName = msg.getOperation().getName();
+
// Create a response invoker and add it to the message headers
AsyncResponseInvoker<RuntimeEndpointReference> respInvoker =
- new AsyncResponseInvoker<RuntimeEndpointReference>(ep, null, epr, msgID);
+ new AsyncResponseInvoker<RuntimeEndpointReference>(ep, null, epr, msgID, operationName, getMessageFactory());
+ respInvoker.setBindingType("SCA_LOCAL");
msg.getHeaders().put("ASYNC_RESPONSE_INVOKER", respInvoker);
} // end if
return msg;
- }
+ } // end method processRequest
+ /**
+ * Regular (sync) processing of response message
+ */
public Message processResponse(Message msg){
if (passByValue) {
// Note source and target operation swapped so result is in source class loader
@@ -108,22 +122,46 @@ public class SCABindingInvoker extends InterceptorAsyncImpl {
} // end if
} // end if
+ return msg;
+ } // end method processResponse
+
+ public void invokeAsyncRequest(Message msg) throws Throwable {
+ try{
+ msg = processRequest(msg);
+ InvokerAsyncRequest theNext = (InvokerAsyncRequest)getNext();
+ if( theNext != null ) theNext.invokeAsyncRequest(msg);
+ postProcessRequest(msg);
+ } catch (Throwable e) {
+ postProcessRequest(msg, e);
+ } // end try
+ } // end method invokeAsyncRequest
+
+ public void invokeAsyncResponse(Message msg) {
+ msg = processResponse(msg);
+
// Handle async response Relates_To message ID value
@SuppressWarnings("unchecked")
AsyncResponseInvoker<RuntimeEndpointReference> respInvoker =
(AsyncResponseInvoker<RuntimeEndpointReference>)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
- if( respInvoker != null ) {
+ // TODO - this deals with the Local case only - not distributed
+ if( respInvoker != null && "SCA_LOCAL".equals(respInvoker.getBindingType()) ) {
RuntimeEndpointReference responseEPR = respInvoker.getResponseTargetAddress();
msg.setFrom(responseEPR);
String msgID = respInvoker.getRelatesToMsgID();
msg.getHeaders().put("RELATES_TO", msgID);
} // end if
- return msg;
- } // end method processResponse
+ InvokerAsyncResponse thePrevious = (InvokerAsyncResponse)getPrevious();
+ if (thePrevious != null ) thePrevious.invokeAsyncResponse(msg);
+ } // end method invokeAsyncResponse
public boolean isLocalSCABIndingInvoker() {
return true;
}
+
+ private MessageFactory getMessageFactory() {
+ FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+ return modelFactories.getFactory(MessageFactory.class);
+ } // end method getMessageFactory
}
diff --git a/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingLocalInvocationInterceptor.java b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingLocalInvocationInterceptor.java
new file mode 100644
index 0000000000..45933bde63
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/binding-sca-runtime/src/main/java/org/apache/tuscany/sca/binding/sca/provider/SCABindingLocalInvocationInterceptor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.sca.provider;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
+import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
+import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
+
+/**
+ * Interceptor used by the SCA Binding on the service side chain to provide a mechanism for optimising
+ * invocations when the reference and the service involved are both in the same JVM, and thus the need
+ * to use a transport of any kind is unnecessary.
+ *
+ */
+public class SCABindingLocalInvocationInterceptor extends InterceptorAsyncImpl {
+ private static final Logger logger = Logger.getLogger(SCABindingLocalInvocationInterceptor.class.getName());
+
+ private Invoker next;
+
+ private boolean skipPrevious;
+
+ public SCABindingLocalInvocationInterceptor() {
+ super();
+ } // end constructor
+
+ public Message invoke(Message msg) {
+ return next.invoke(msg);
+ } // end method invoke
+
+ public Invoker getNext() {
+ return next;
+ } // end method getNext
+
+ public void setNext(Invoker next) {
+ this.next = next;
+ } // end method setNext
+
+ /**
+ * Process request method is simply a passthrough
+ */
+ public Message processRequest(Message msg) {
+ return msg ;
+ } // end method processRequest
+
+
+ /**
+ * Handle an async response
+ * - deals with the local SCA binding case only (at present)
+ * - in this case, extract the async response invoker from the message header and call the EPR
+ * that is present in the invoker, which is in fact the local EPR from which the original forward
+ * request came
+ */
+ public void invokeAsyncResponse(Message msg) {
+ @SuppressWarnings("unchecked")
+ AsyncResponseInvoker<?> respInvoker =
+ (AsyncResponseInvoker<?>)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
+ if( respInvoker != null && "SCA_LOCAL".equals(respInvoker.getBindingType()) ) {
+ // Handle the locally optimised case
+ RuntimeEndpointReference responseEPR = (RuntimeEndpointReference)respInvoker.getResponseTargetAddress();
+ msg.setFrom(responseEPR);
+ // Handle async response Relates_To message ID value
+ String msgID = respInvoker.getRelatesToMsgID();
+ msg.getHeaders().put("RELATES_TO", msgID);
+
+ // Call the processing on the reference chain directly
+ responseEPR.invokeAsyncResponse(msg);
+
+ // Prevent the response being processed by the rest of the service chain
+ return;
+ } else {
+ // Carry on processing the response by the rest of the service chain
+ InvokerAsyncResponse thePrevious = (InvokerAsyncResponse)getPrevious();
+ if (thePrevious != null ) thePrevious.invokeAsyncResponse(msg);
+ return;
+ } // end if
+
+ } // end method invokeAsyncResponse
+
+ /**
+ * processResponse is not called during async response handling (all handled by invokeAsyncResponse)
+ * - this version is a dummy which does nothing.
+ */
+ public Message processResponse(Message msg) {
+ return msg;
+ } // end method processResponse
+
+} // end class