diff options
6 files changed, 181 insertions, 49 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java index 263ba6a8c7..e598be277a 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java @@ -62,6 +62,7 @@ import org.apache.tuscany.sca.core.FactoryExtensionPoint; import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; import org.apache.tuscany.sca.core.invocation.AsyncResponseService; +import org.apache.tuscany.sca.core.invocation.Constants; import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor; import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor; import org.apache.tuscany.sca.core.invocation.RuntimeInvoker; @@ -279,10 +280,11 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint // Deal with async callback // Ensure invocation chains are built... getInvocationChains(); - if ( !this.getCallbackEndpointReferences().isEmpty() ) { + // async callback handling + if( this.isAsyncInvocation() && !this.getCallbackEndpointReferences().isEmpty() ) { RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0); // Place a link to the callback EPR into the message headers... - msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR ); + msg.getHeaders().put(Constants.ASYNC_CALLBACK, asyncEPR ); } // end of async callback handling return invoker.invokeBinding(msg); @@ -372,7 +374,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint // Handle cases where the operation is an async server if( targetOperation.isAsyncServer() ) { - createAsyncServerCallback( this, operation ); + createAsyncServerCallback(); } // end if } @@ -412,24 +414,32 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint } // end method initInvocationChains /** - * Creates the async callback for the supplied Endpoint and Operation, if it does not already exist + * Creates the async callback for this Endpoint, if it does not already exist * and stores it into the Endpoint - * @param endpoint - the Endpoint - * @param operation - the Operation */ - private void createAsyncServerCallback( RuntimeEndpoint endpoint, Operation operation ) { + public void createAsyncServerCallback( ) { // No need to create a callback if the Binding supports async natively... - if( hasNativeAsyncBinding(endpoint) ) return; + if( hasNativeAsyncBinding(this) ) return; // Check to see if the callback already exists - if( asyncCallbackExists( endpoint ) ) return; + if( asyncCallbackExists( this ) ) return; - RuntimeEndpointReference asyncEPR = createAsyncEPR( endpoint ); + RuntimeEndpointReference asyncEPR = createAsyncEPR( this ); // Store the new callback EPR into the Endpoint - endpoint.getCallbackEndpointReferences().add(asyncEPR); + this.getCallbackEndpointReferences().add(asyncEPR); + + // Also store the callback EPR into the EndpointRegistry + EndpointRegistry epReg = getEndpointRegistry( registry ); + if( epReg != null ) epReg.addEndpointReference(asyncEPR); } // end method createAsyncServerCallback + public RuntimeEndpointReference getAsyncServerCallback() { + + return (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0); + } // end method getAsyncServerCallback + + /** * Indicates if a given endpoint has a Binding that supports native async invocation * @param endpoint - the endpoint @@ -455,6 +465,9 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint RuntimeEndpointReference epr = (RuntimeEndpointReference)assemblyFactory.createEndpointReference(); epr.bind( compositeContext ); + // Create pseudo-component + epr.setComponent(component); + // Create pseudo-reference ComponentReference reference = assemblyFactory.createComponentReference(); ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry(); @@ -487,8 +500,6 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint // Need to establish policies here (binding has some...) epr.getRequiredIntents().addAll( endpoint.getRequiredIntents() ); epr.getPolicySets().addAll( endpoint.getPolicySets() ); - String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")"; - epr.setURI(eprURI); // Attach a dummy endpoint to the epr RuntimeEndpoint ep = (RuntimeEndpoint)assemblyFactory.createEndpoint(); @@ -497,6 +508,10 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint //epr.setStatus(EndpointReference.Status.RESOLVED_BINDING); epr.setStatus(EndpointReference.Status.WIRED_TARGET_FOUND_AND_MATCHED); epr.setUnresolved(false); + + // Set the URI for the EPR + String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")"; + epr.setURI(eprURI); return epr; } // end method RuntimeEndpointReference @@ -536,10 +551,10 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint XMLStreamReader reader = inputFactory.createXMLStreamReader(source); reader.next(); Binding newBinding = (Binding) processor.read(reader, context ); - newBinding.setName("asyncCallback"); + newBinding.setName(reference.getName()); // Create a URI address for the callback based on the Component_Name/Reference_Name pattern - String callbackURI = "/" + component.getName() + "/" + reference.getName(); + //String callbackURI = "/" + component.getName() + "/" + reference.getName(); //newBinding.setURI(callbackURI); BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class); @@ -934,13 +949,7 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint */ private RuntimeEndpointImpl findActualEP(RuntimeEndpointImpl ep, ExtensionPointRegistry registry) { - // Get the EndpointRegistry - DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry); - - if( domainRegistryFactory == null ) return null; - - // TODO: For the moment, just use the first (and only!) EndpointRegistry... - EndpointRegistry endpointRegistry = (EndpointRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0]; + EndpointRegistry endpointRegistry = getEndpointRegistry( registry ); if( endpointRegistry == null ) return null; @@ -951,6 +960,22 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint return null; } // end method findActualEP + + /** + * Get the EndpointRegistry + * @param registry - the ExtensionPoint registry + * @return the EndpointRegistry - will be null if the EndpointRegistry cannot be found + */ + private EndpointRegistry getEndpointRegistry( ExtensionPointRegistry registry) { + DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry); + + if( domainRegistryFactory == null ) return null; + + // TODO: For the moment, just use the first (and only!) EndpointRegistry... + EndpointRegistry endpointRegistry = (EndpointRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0]; + + return endpointRegistry; + } // end method public InterfaceContract getBindingInterfaceContract() { resolve(); diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java index 542a80f926..6f5b0be0e7 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java @@ -24,6 +24,7 @@ import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.context.CompositeContext; import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.invocation.Constants; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; import org.oasisopen.sca.ServiceRuntimeException; @@ -32,8 +33,18 @@ public class CallbackServiceReferenceImpl<B> extends ServiceReferenceImpl<B> { private RuntimeEndpointReference callbackEPR; private List<? extends EndpointReference> callbackEPRs; private Endpoint resolvedEndpoint; + // Holds the ID of the Message that caused the creation of this CallbackServiceReference + private String msgID; - /* + /** + * Gets the message ID associated with this callback reference + * @return the message ID + */ + public String getMsgID() { + return msgID; + } + + /* * Public constructor for Externalizable serialization/deserialization */ public CallbackServiceReferenceImpl() { @@ -62,6 +73,9 @@ public class CallbackServiceReferenceImpl<B> extends ServiceReferenceImpl<B> { throw new ServiceRuntimeException("No callback binding found for " + msgContext.getTo().toString()); } resolvedEndpoint = msgContext.getFrom().getCallbackEndpoint(); + + // Capture the Message ID from the message which caused the creation of this CallBackServiceReference + this.msgID = (String) msgContext.getHeaders().get(Constants.MESSAGE_ID); } @Override diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java index 8bcda4efb2..55c8d7fcab 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java @@ -23,6 +23,7 @@ import java.io.Serializable; import java.util.Iterator;
import java.util.List;
+import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
@@ -58,6 +59,7 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab private String operationName;
private MessageFactory messageFactory;
private String bindingType = "";
+ private boolean isNativeAsync;
public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint,
RuntimeEndpointReference responseEndpointReference,
@@ -70,6 +72,13 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab this.relatesToMsgID = relatesToMsgID;
this.operationName = operationName;
this.messageFactory = messageFactory;
+
+ if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) &&
+ (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){
+ isNativeAsync = true;
+ } else {
+ isNativeAsync = false;
+ } // end if
} // end constructor
/**
@@ -79,8 +88,7 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab responseMessage.getHeaders().put(Constants.ASYNC_RESPONSE_INVOKER, this);
responseMessage.getHeaders().put(Constants.RELATES_TO, relatesToMsgID);
- if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) &&
- (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){
+ if (isNativeAsync){
// process the response as a native async response
requestEndpoint.invokeAsyncResponse(responseMessage);
} else {
@@ -106,8 +114,8 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab }
/**
- * If you have Java beans you can call this and we'll create
- * a Tuscany message
+ * Invokes the async response where the parameter is Java bean(s)
+ * - this method creates a Tuscany message
*
* @param args the response data
*/
@@ -115,10 +123,22 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab Message msg = messageFactory.createMessage();
- msg.setOperation(getOperation());
+ msg.setOperation(getOperation( args ));
+
+ // If this is not native async, then any Throwable is being passed as a parameter and
+ // requires wrapping
+ if( !isNativeAsync && args instanceof Throwable ) {
+ args = new AsyncFaultWrapper( (Throwable) args );
+ } // end if
- // on the the following will be null depending
- // on whether this is native or non-native async
+ // If this is not native async, then the message must contain an array of args since
+ // this is what is expected when invoking an EPR for the async response...
+ if( !isNativeAsync ) {
+ Object[] objs = new Object[1];
+ objs[0] = args;
+ args = objs;
+ } // end if
+
msg.setTo(requestEndpoint);
msg.setFrom(responseEndpointReference);
@@ -132,12 +152,22 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab } // end method invokeAsyncResponse(Object)
- private Operation getOperation() {
- List<Operation> ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations();
- for (Operation op : ops) {
- if( operationName.equals(op.getName()) ) return op;
- } // end for
- return null;
+ private Operation getOperation( Object args ) {
+ if( isNativeAsync ) {
+ List<Operation> ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations();
+ for (Operation op : ops) {
+ if( operationName.equals(op.getName()) ) return op;
+ } // end for
+ return null;
+ } else {
+ operationName = "setResponse";
+ if( args instanceof Throwable ) { operationName = "setWrappedFault"; }
+ List<Operation> ops = responseEndpointReference.getReference().getInterfaceContract().getInterface().getOperations();
+ for (Operation op : ops) {
+ if( operationName.equals(op.getName()) ) return op;
+ } // end for
+ return null;
+ } // end if
} // end getOperation
public void setBindingType(String bindingType) {
@@ -155,4 +185,9 @@ public class AsyncResponseInvoker<T> implements InvokerAsyncResponse, Serializab public RuntimeEndpointReference getResponseEndpointReference() {
return this.responseEndpointReference;
}
+
+ public void setResponseEndpointReference(
+ RuntimeEndpointReference responseEndpointReference) {
+ this.responseEndpointReference = responseEndpointReference;
+ }
} // end class
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java index 5534828b8a..d6f872d00a 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java @@ -22,10 +22,10 @@ package org.apache.tuscany.sca.core.invocation; /**
* Constants used during invocation in the runtime
*
-
*/
public interface Constants {
- String MESSAGE_ID = "MESSAGE_ID";
- String RELATES_TO = "RELATES_TO";
- String ASYNC_RESPONSE_INVOKER = "ASYNC_RESPONSE_INVOKER";
+ String MESSAGE_ID = "MESSAGE_ID";
+ String RELATES_TO = "RELATES_TO";
+ String ASYNC_RESPONSE_INVOKER = "ASYNC_RESPONSE_INVOKER";
+ String ASYNC_CALLBACK = "ASYNC_CALLBACK";
}
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java index 1e49767880..9de1809200 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java @@ -30,6 +30,7 @@ import org.apache.tuscany.sca.assembly.Reference; import org.apache.tuscany.sca.assembly.Service; import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.Constants; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.Message; @@ -155,13 +156,10 @@ public class AsyncResponseHandlerImpl<V> implements AsyncResponseHandler<V>, * @param msg - the Tuscany message containing the response from the async service invocation * which is either the Response message or an exception of some kind */ - private static final String WS_MESSAGE_ID = "WS_MESSAGE_ID"; - public Message invoke(Message msg) { - // Get the unique ID from the message header - String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID); - if (idValue == null){ - idValue = (String)msg.getHeaders().get("MESSAGE_ID"); - } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Message invoke(Message msg) { + // Get the unique ID from the RELATES_TO message header + String idValue = (String)msg.getHeaders().get(Constants.RELATES_TO); if( idValue == null ) { System.out.println( "Async message ID not found "); diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java index 168af952db..9b51aefe39 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java @@ -21,9 +21,15 @@ package org.apache.tuscany.sca.core.invocation.impl; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import org.apache.tuscany.sca.context.ThreadMessageContext; import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl; +import org.apache.tuscany.sca.core.invocation.Constants; +import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.Invocable; import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; import org.oasisopen.sca.ServiceReference; import org.oasisopen.sca.ServiceRuntimeException; @@ -43,7 +49,7 @@ public class JDKCallbackInvocationHandler extends JDKInvocationHandler { } @Override - @SuppressWarnings( {"unchecked"}) + @SuppressWarnings( {"unchecked", "rawtypes"}) public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (Object.class == method.getDeclaringClass()) { @@ -65,7 +71,8 @@ public class JDKCallbackInvocationHandler extends JDKInvocationHandler { } try { - return invoke(chain, args, wire); + String msgID = ((CallbackServiceReferenceImpl)callableReference).getMsgID(); + return invoke(chain, args, wire, msgID ); } catch (InvocationTargetException e) { Throwable t = e.getCause(); throw t; @@ -73,5 +80,58 @@ public class JDKCallbackInvocationHandler extends JDKInvocationHandler { // allow the cloned wire to be reused by subsequent callbacks } } + + /** + * Invoke the chain + * @param chain - the chain + * @param args - arguments to the invocation as an array of Objects + * @param source - the Endpoint or EndpointReference to which the chain relates + * @param msgID - ID of the message to which this invovation is a callback - ID ends up in "RELATES_TO" header + * @return - the Response message from the invocation + * @throws Throwable - if any exception occurs during the invocation + */ + @Override + protected Object invoke(InvocationChain chain, Object[] args, Invocable source, String msgID) + throws Throwable { + Message msg = messageFactory.createMessage(); + if (source instanceof RuntimeEndpointReference) { + msg.setFrom((RuntimeEndpointReference)source); + } + if (target != null) { + msg.setTo(target); + } else { + if (source instanceof RuntimeEndpointReference) { + msg.setTo(((RuntimeEndpointReference)source).getTargetEndpoint()); + } + } + Invoker headInvoker = chain.getHeadInvoker(); + Operation operation = chain.getTargetOperation(); + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // Deal with header information that needs to be copied from the message context to the new message... + transferMessageHeaders( msg, msgContext); + + ThreadMessageContext.setMessageContext(msg); + + // If there is a supplied message ID, place its value into the Message Header under "RELATES_TO" + if( msgID != null ){ + msg.getHeaders().put(Constants.RELATES_TO, msgID); + } // end if + + try { + // dispatch the source down the chain and get the response + Message resp = headInvoker.invoke(msg); + Object body = resp.getBody(); + if (resp.isFault()) { + throw (Throwable)body; + } + return body; + } finally { + ThreadMessageContext.setMessageContext(msgContext); + } + } // end method invoke } |