summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/core/src
diff options
context:
space:
mode:
authoredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2011-01-20 16:52:03 +0000
committeredwardsmj <edwardsmj@13f79535-47bb-0310-9956-ffa450edef68>2011-01-20 16:52:03 +0000
commit2a85e28de723360da8fe53efa9d0a4021683493e (patch)
tree8b92996aa5676385475ec489be22a7a0c4d0fb8e /sca-java-2.x/trunk/modules/core/src
parentaf86dfd4530823849827aecf12c4215c335f27cc (diff)
Stage 1 of Axis2 binding-ws support of async callbacks, as described in TUSCANY-3821
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1061385 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules/core/src')
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java69
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java16
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java61
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java8
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java12
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java64
6 files changed, 181 insertions, 49 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
index 263ba6a8c7..e598be277a 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
@@ -62,6 +62,7 @@ import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
import org.apache.tuscany.sca.core.invocation.AsyncResponseService;
+import org.apache.tuscany.sca.core.invocation.Constants;
import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor;
import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor;
import org.apache.tuscany.sca.core.invocation.RuntimeInvoker;
@@ -279,10 +280,11 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
// Deal with async callback
// Ensure invocation chains are built...
getInvocationChains();
- if ( !this.getCallbackEndpointReferences().isEmpty() ) {
+ // async callback handling
+ if( this.isAsyncInvocation() && !this.getCallbackEndpointReferences().isEmpty() ) {
RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0);
// Place a link to the callback EPR into the message headers...
- msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR );
+ msg.getHeaders().put(Constants.ASYNC_CALLBACK, asyncEPR );
}
// end of async callback handling
return invoker.invokeBinding(msg);
@@ -372,7 +374,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
// Handle cases where the operation is an async server
if( targetOperation.isAsyncServer() ) {
- createAsyncServerCallback( this, operation );
+ createAsyncServerCallback();
} // end if
}
@@ -412,24 +414,32 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
} // end method initInvocationChains
/**
- * Creates the async callback for the supplied Endpoint and Operation, if it does not already exist
+ * Creates the async callback for this Endpoint, if it does not already exist
* and stores it into the Endpoint
- * @param endpoint - the Endpoint
- * @param operation - the Operation
*/
- private void createAsyncServerCallback( RuntimeEndpoint endpoint, Operation operation ) {
+ public void createAsyncServerCallback( ) {
// No need to create a callback if the Binding supports async natively...
- if( hasNativeAsyncBinding(endpoint) ) return;
+ if( hasNativeAsyncBinding(this) ) return;
// Check to see if the callback already exists
- if( asyncCallbackExists( endpoint ) ) return;
+ if( asyncCallbackExists( this ) ) return;
- RuntimeEndpointReference asyncEPR = createAsyncEPR( endpoint );
+ RuntimeEndpointReference asyncEPR = createAsyncEPR( this );
// Store the new callback EPR into the Endpoint
- endpoint.getCallbackEndpointReferences().add(asyncEPR);
+ this.getCallbackEndpointReferences().add(asyncEPR);
+
+ // Also store the callback EPR into the EndpointRegistry
+ EndpointRegistry epReg = getEndpointRegistry( registry );
+ if( epReg != null ) epReg.addEndpointReference(asyncEPR);
} // end method createAsyncServerCallback
+ public RuntimeEndpointReference getAsyncServerCallback() {
+
+ return (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0);
+ } // end method getAsyncServerCallback
+
+
/**
* Indicates if a given endpoint has a Binding that supports native async invocation
* @param endpoint - the endpoint
@@ -455,6 +465,9 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
RuntimeEndpointReference epr = (RuntimeEndpointReference)assemblyFactory.createEndpointReference();
epr.bind( compositeContext );
+ // Create pseudo-component
+ epr.setComponent(component);
+
// Create pseudo-reference
ComponentReference reference = assemblyFactory.createComponentReference();
ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry();
@@ -487,8 +500,6 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
// Need to establish policies here (binding has some...)
epr.getRequiredIntents().addAll( endpoint.getRequiredIntents() );
epr.getPolicySets().addAll( endpoint.getPolicySets() );
- String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")";
- epr.setURI(eprURI);
// Attach a dummy endpoint to the epr
RuntimeEndpoint ep = (RuntimeEndpoint)assemblyFactory.createEndpoint();
@@ -497,6 +508,10 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
//epr.setStatus(EndpointReference.Status.RESOLVED_BINDING);
epr.setStatus(EndpointReference.Status.WIRED_TARGET_FOUND_AND_MATCHED);
epr.setUnresolved(false);
+
+ // Set the URI for the EPR
+ String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")";
+ epr.setURI(eprURI);
return epr;
} // end method RuntimeEndpointReference
@@ -536,10 +551,10 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
XMLStreamReader reader = inputFactory.createXMLStreamReader(source);
reader.next();
Binding newBinding = (Binding) processor.read(reader, context );
- newBinding.setName("asyncCallback");
+ newBinding.setName(reference.getName());
// Create a URI address for the callback based on the Component_Name/Reference_Name pattern
- String callbackURI = "/" + component.getName() + "/" + reference.getName();
+ //String callbackURI = "/" + component.getName() + "/" + reference.getName();
//newBinding.setURI(callbackURI);
BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class);
@@ -934,13 +949,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
*/
private RuntimeEndpointImpl findActualEP(RuntimeEndpointImpl ep,
ExtensionPointRegistry registry) {
- // Get the EndpointRegistry
- DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry);
-
- if( domainRegistryFactory == null ) return null;
-
- // TODO: For the moment, just use the first (and only!) EndpointRegistry...
- EndpointRegistry endpointRegistry = (EndpointRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0];
+ EndpointRegistry endpointRegistry = getEndpointRegistry( registry );
if( endpointRegistry == null ) return null;
@@ -951,6 +960,22 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
return null;
} // end method findActualEP
+
+ /**
+ * Get the EndpointRegistry
+ * @param registry - the ExtensionPoint registry
+ * @return the EndpointRegistry - will be null if the EndpointRegistry cannot be found
+ */
+ private EndpointRegistry getEndpointRegistry( ExtensionPointRegistry registry) {
+ DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry);
+
+ if( domainRegistryFactory == null ) return null;
+
+ // TODO: For the moment, just use the first (and only!) EndpointRegistry...
+ EndpointRegistry endpointRegistry = (EndpointRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0];
+
+ return endpointRegistry;
+ } // end method
public InterfaceContract getBindingInterfaceContract() {
resolve();
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java
index 542a80f926..6f5b0be0e7 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java
@@ -24,6 +24,7 @@ import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.context.ThreadMessageContext;
+import org.apache.tuscany.sca.core.invocation.Constants;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ServiceRuntimeException;
@@ -32,8 +33,18 @@ public class CallbackServiceReferenceImpl<B> extends ServiceReferenceImpl<B> {
private RuntimeEndpointReference callbackEPR;
private List<? extends EndpointReference> callbackEPRs;
private Endpoint resolvedEndpoint;
+ // Holds the ID of the Message that caused the creation of this CallbackServiceReference
+ private String msgID;
- /*
+ /**
+ * Gets the message ID associated with this callback reference
+ * @return the message ID
+ */
+ public String getMsgID() {
+ return msgID;
+ }
+
+ /*
* Public constructor for Externalizable serialization/deserialization
*/
public CallbackServiceReferenceImpl() {
@@ -62,6 +73,9 @@ public class CallbackServiceReferenceImpl<B> extends ServiceReferenceImpl<B> {
throw new ServiceRuntimeException("No callback binding found for " + msgContext.getTo().toString());
}
resolvedEndpoint = msgContext.getFrom().getCallbackEndpoint();
+
+ // Capture the Message ID from the message which caused the creation of this CallBackServiceReference
+ this.msgID = (String) msgContext.getHeaders().get(Constants.MESSAGE_ID);
}
@Override
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java
index 8bcda4efb2..55c8d7fcab 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
+import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
@@ -58,6 +59,7 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
private String operationName;
private MessageFactory messageFactory;
private String bindingType = "";
+ private boolean isNativeAsync;
public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint,
RuntimeEndpointReference responseEndpointReference,
@@ -70,6 +72,13 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
this.relatesToMsgID = relatesToMsgID;
this.operationName = operationName;
this.messageFactory = messageFactory;
+
+ if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) &&
+ (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){
+ isNativeAsync = true;
+ } else {
+ isNativeAsync = false;
+ } // end if
} // end constructor
/**
@@ -79,8 +88,7 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
responseMessage.getHeaders().put(Constants.ASYNC_RESPONSE_INVOKER, this);
responseMessage.getHeaders().put(Constants.RELATES_TO, relatesToMsgID);
- if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) &&
- (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){
+ if (isNativeAsync){
// process the response as a native async response
requestEndpoint.invokeAsyncResponse(responseMessage);
} else {
@@ -106,8 +114,8 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
}
/**
- * If you have Java beans you can call this and we'll create
- * a Tuscany message
+ * Invokes the async response where the parameter is Java bean(s)
+ * - this method creates a Tuscany message
*
* @param args the response data
*/
@@ -115,10 +123,22 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
Message msg = messageFactory.createMessage();
- msg.setOperation(getOperation());
+ msg.setOperation(getOperation( args ));
+
+ // If this is not native async, then any Throwable is being passed as a parameter and
+ // requires wrapping
+ if( !isNativeAsync && args instanceof Throwable ) {
+ args = new AsyncFaultWrapper( (Throwable) args );
+ } // end if
- // on the the following will be null depending
- // on whether this is native or non-native async
+ // If this is not native async, then the message must contain an array of args since
+ // this is what is expected when invoking an EPR for the async response...
+ if( !isNativeAsync ) {
+ Object[] objs = new Object[1];
+ objs[0] = args;
+ args = objs;
+ } // end if
+
msg.setTo(requestEndpoint);
msg.setFrom(responseEndpointReference);
@@ -132,12 +152,22 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
} // end method invokeAsyncResponse(Object)
- private Operation getOperation() {
- List<Operation> ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations();
- for (Operation op : ops) {
- if( operationName.equals(op.getName()) ) return op;
- } // end for
- return null;
+ private Operation getOperation( Object args ) {
+ if( isNativeAsync ) {
+ List<Operation> ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations();
+ for (Operation op : ops) {
+ if( operationName.equals(op.getName()) ) return op;
+ } // end for
+ return null;
+ } else {
+ operationName = "setResponse";
+ if( args instanceof Throwable ) { operationName = "setWrappedFault"; }
+ List<Operation> ops = responseEndpointReference.getReference().getInterfaceContract().getInterface().getOperations();
+ for (Operation op : ops) {
+ if( operationName.equals(op.getName()) ) return op;
+ } // end for
+ return null;
+ } // end if
} // end getOperation
public void setBindingType(String bindingType) {
@@ -155,4 +185,9 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab
public RuntimeEndpointReference getResponseEndpointReference() {
return this.responseEndpointReference;
}
+
+ public void setResponseEndpointReference(
+ RuntimeEndpointReference responseEndpointReference) {
+ this.responseEndpointReference = responseEndpointReference;
+ }
} // end class
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java
index 5534828b8a..d6f872d00a 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java
@@ -22,10 +22,10 @@ package org.apache.tuscany.sca.core.invocation;
/**
* Constants used during invocation in the runtime
*
-
*/
public interface Constants {
- String MESSAGE_ID = "MESSAGE_ID";
- String RELATES_TO = "RELATES_TO";
- String ASYNC_RESPONSE_INVOKER = "ASYNC_RESPONSE_INVOKER";
+ String MESSAGE_ID = "MESSAGE_ID";
+ String RELATES_TO = "RELATES_TO";
+ String ASYNC_RESPONSE_INVOKER = "ASYNC_RESPONSE_INVOKER";
+ String ASYNC_CALLBACK = "ASYNC_CALLBACK";
}
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
index 1e49767880..9de1809200 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
@@ -30,6 +30,7 @@ import org.apache.tuscany.sca.assembly.Reference;
import org.apache.tuscany.sca.assembly.Service;
import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.Constants;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.Message;
@@ -155,13 +156,10 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>,
* @param msg - the Tuscany message containing the response from the async service invocation
* which is either the Response message or an exception of some kind
*/
- private static final String WS_MESSAGE_ID = "WS_MESSAGE_ID";
- public Message invoke(Message msg) {
- // Get the unique ID from the message header
- String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID);
- if (idValue == null){
- idValue = (String)msg.getHeaders().get("MESSAGE_ID");
- }
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Message invoke(Message msg) {
+ // Get the unique ID from the RELATES_TO message header
+ String idValue = (String)msg.getHeaders().get(Constants.RELATES_TO);
if( idValue == null ) {
System.out.println( "Async message ID not found ");
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java
index 168af952db..9b51aefe39 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java
@@ -21,9 +21,15 @@ package org.apache.tuscany.sca.core.invocation.impl;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import org.apache.tuscany.sca.context.ThreadMessageContext;
import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl;
+import org.apache.tuscany.sca.core.invocation.Constants;
+import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.InvocationChain;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.runtime.Invocable;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ServiceReference;
import org.oasisopen.sca.ServiceRuntimeException;
@@ -43,7 +49,7 @@ public class JDKCallbackInvocationHandler extends JDKInvocationHandler {
}
@Override
- @SuppressWarnings( {"unchecked"})
+ @SuppressWarnings( {"unchecked", "rawtypes"})
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class == method.getDeclaringClass()) {
@@ -65,7 +71,8 @@ public class JDKCallbackInvocationHandler extends JDKInvocationHandler {
}
try {
- return invoke(chain, args, wire);
+ String msgID = ((CallbackServiceReferenceImpl)callableReference).getMsgID();
+ return invoke(chain, args, wire, msgID );
} catch (InvocationTargetException e) {
Throwable t = e.getCause();
throw t;
@@ -73,5 +80,58 @@ public class JDKCallbackInvocationHandler extends JDKInvocationHandler {
// allow the cloned wire to be reused by subsequent callbacks
}
}
+
+ /**
+ * Invoke the chain
+ * @param chain - the chain
+ * @param args - arguments to the invocation as an array of Objects
+ * @param source - the Endpoint or EndpointReference to which the chain relates
+ * @param msgID - ID of the message to which this invovation is a callback - ID ends up in "RELATES_TO" header
+ * @return - the Response message from the invocation
+ * @throws Throwable - if any exception occurs during the invocation
+ */
+ @Override
+ protected Object invoke(InvocationChain chain, Object[] args, Invocable source, String msgID)
+ throws Throwable {
+ Message msg = messageFactory.createMessage();
+ if (source instanceof RuntimeEndpointReference) {
+ msg.setFrom((RuntimeEndpointReference)source);
+ }
+ if (target != null) {
+ msg.setTo(target);
+ } else {
+ if (source instanceof RuntimeEndpointReference) {
+ msg.setTo(((RuntimeEndpointReference)source).getTargetEndpoint());
+ }
+ }
+ Invoker headInvoker = chain.getHeadInvoker();
+ Operation operation = chain.getTargetOperation();
+ msg.setOperation(operation);
+ msg.setBody(args);
+
+ Message msgContext = ThreadMessageContext.getMessageContext();
+
+ // Deal with header information that needs to be copied from the message context to the new message...
+ transferMessageHeaders( msg, msgContext);
+
+ ThreadMessageContext.setMessageContext(msg);
+
+ // If there is a supplied message ID, place its value into the Message Header under "RELATES_TO"
+ if( msgID != null ){
+ msg.getHeaders().put(Constants.RELATES_TO, msgID);
+ } // end if
+
+ try {
+ // dispatch the source down the chain and get the response
+ Message resp = headInvoker.invoke(msg);
+ Object body = resp.getBody();
+ if (resp.isFault()) {
+ throw (Throwable)body;
+ }
+ return body;
+ } finally {
+ ThreadMessageContext.setMessageContext(msgContext);
+ }
+ } // end method invoke
}