summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/tags/2.0-Beta2-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-2.x/tags/2.0-Beta2-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java')
-rw-r--r--sca-java-2.x/tags/2.0-Beta2-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java754
1 files changed, 0 insertions, 754 deletions
diff --git a/sca-java-2.x/tags/2.0-Beta2-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/sca-java-2.x/tags/2.0-Beta2-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
deleted file mode 100644
index a36ee8d2f8..0000000000
--- a/sca-java-2.x/tags/2.0-Beta2-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java
+++ /dev/null
@@ -1,754 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.core.invocation.impl;
-
-import java.beans.Introspector;
-import java.beans.PropertyDescriptor;
-import java.io.StringReader;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.ws.AsyncHandler;
-import javax.xml.ws.Response;
-
-import org.apache.tuscany.sca.assembly.AssemblyFactory;
-import org.apache.tuscany.sca.assembly.Binding;
-import org.apache.tuscany.sca.assembly.ComponentService;
-import org.apache.tuscany.sca.assembly.Endpoint;
-import org.apache.tuscany.sca.assembly.Implementation;
-import org.apache.tuscany.sca.assembly.Service;
-import org.apache.tuscany.sca.assembly.builder.BindingBuilder;
-import org.apache.tuscany.sca.assembly.builder.BuilderContext;
-import org.apache.tuscany.sca.assembly.builder.BuilderExtensionPoint;
-import org.apache.tuscany.sca.assembly.xml.Constants;
-import org.apache.tuscany.sca.context.CompositeContext;
-import org.apache.tuscany.sca.context.ThreadMessageContext;
-import org.apache.tuscany.sca.contribution.processor.ContributionReadException;
-import org.apache.tuscany.sca.contribution.processor.ProcessorContext;
-import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor;
-import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint;
-import org.apache.tuscany.sca.contribution.processor.ValidatingXMLInputFactory;
-import org.apache.tuscany.sca.core.ExtensionPointRegistry;
-import org.apache.tuscany.sca.core.FactoryExtensionPoint;
-import org.apache.tuscany.sca.core.UtilityExtensionPoint;
-import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
-import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
-import org.apache.tuscany.sca.core.invocation.AsyncResponseException;
-import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
-import org.apache.tuscany.sca.core.invocation.AsyncResponseService;
-import org.apache.tuscany.sca.core.invocation.JDKAsyncResponseInvoker;
-import org.apache.tuscany.sca.interfacedef.InvalidInterfaceException;
-import org.apache.tuscany.sca.interfacedef.Operation;
-import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract;
-import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceFactory;
-import org.apache.tuscany.sca.interfacedef.util.FaultException;
-import org.apache.tuscany.sca.interfacedef.util.WrapperInfo;
-import org.apache.tuscany.sca.invocation.InterceptorAsync;
-import org.apache.tuscany.sca.invocation.InvocationChain;
-import org.apache.tuscany.sca.invocation.Invoker;
-import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
-import org.apache.tuscany.sca.invocation.Message;
-import org.apache.tuscany.sca.invocation.MessageFactory;
-import org.apache.tuscany.sca.policy.Intent;
-import org.apache.tuscany.sca.provider.EndpointReferenceAsyncProvider;
-import org.apache.tuscany.sca.provider.PolicyProvider;
-import org.apache.tuscany.sca.provider.ReferenceBindingProvider;
-import org.apache.tuscany.sca.provider.ServiceBindingProvider;
-import org.apache.tuscany.sca.runtime.Invocable;
-import org.apache.tuscany.sca.runtime.RuntimeComponent;
-import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
-import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
-import org.apache.tuscany.sca.work.WorkScheduler;
-import org.oasisopen.sca.ServiceReference;
-import org.oasisopen.sca.ServiceRuntimeException;
-
-/**
- * An InvocationHandler which deals with JAXWS-defined asynchronous client Java API method calls
- *
- * 2 asynchronous mappings exist for any given synchronous service operation, as shown in this example:
- * public interface StockQuote {
- * float getPrice(String ticker);
- * Response<Float> getPriceAsync(String ticker);
- * Future<?> getPriceAsync(String ticker, AsyncHandler<Float> handler);
- * }
- *
- * - the second method is called the "polling method", since the returned Response<?> object permits
- * the client to poll to see if the async call has completed
- * - the third method is called the "async callback method", since in this case the client application can specify
- * a callback operation that is automatically called when the async call completes
- */
-public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
-
- private static final long serialVersionUID = 1L;
-
- private static int invocationCount = 10; // # of threads to use
- private static long maxWaitTime = 30; // Max wait time for completion = 30sec
-
- // Run the async service invocations using a ThreadPoolExecutor
- private ExecutorService theExecutor;
-
- public AsyncJDKInvocationHandler(ExtensionPointRegistry registry,
- MessageFactory messageFactory,
- ServiceReference<?> callableReference ) {
- super(messageFactory, callableReference);
- initExecutorService(registry);
- }
-
- public AsyncJDKInvocationHandler(ExtensionPointRegistry registry,
- MessageFactory messageFactory,
- Class<?> businessInterface,
- Invocable source ) {
- super(messageFactory, businessInterface, source);
- initExecutorService(registry);
- }
-
- private final void initExecutorService(ExtensionPointRegistry registry) {
- UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class);
- WorkScheduler scheduler = utilities.getUtility(WorkScheduler.class);
- theExecutor = scheduler.getExecutorService();
-
- } // end method initExecutorService
-
- /**
- * Perform the invocation of the operation
- * - provides support for all 3 forms of client method: synchronous, polling and async callback
- */
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-
- // force the bind of the reference so that we can look at the
- // target contract to see if it's asynchronous
- source.getInvocationChains();
-
- if (isAsyncCallback(method)) {
- return doInvokeAsyncCallback(proxy, method, args);
- } else if (isAsyncPoll(method)) {
- return doInvokeAsyncPoll(proxy, method, args);
- } else {
- // Regular synchronous method call
- return doInvokeSync(proxy, method, args);
- }
- }
-
- /**
- * Indicates if a supplied method has the form of an async callback method
- * @param method - the method
- * @return - true if the method has the form of an async callback
- */
- protected boolean isAsyncCallback(Method method) {
- if (method.getName().endsWith("Async") && (method.getReturnType() == Future.class)) {
- if (method.getParameterTypes().length > 0) {
- return method.getParameterTypes()[method.getParameterTypes().length - 1] == AsyncHandler.class;
- }
- }
- return false;
- }
-
- /**
- * Indicates is a supplied method has the form of an async polling method
- * @param method - the method
- * @return - true if the method has the form of an async polling method
- */
- protected boolean isAsyncPoll(Method method) {
- return method.getName().endsWith("Async") && (method.getReturnType() == Response.class);
- }
-
- /**
- * Invoke an async polling method
- * @param proxy - the reference proxy
- * @param asyncMethod - the async method to invoke
- * @param args - array of input arguments to the method
- * @return - the Response<?> object that is returned to the client application, typed by the
- * type of the response
- */
- @SuppressWarnings("unchecked")
- protected Response doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) {
- Method method = getNonAsyncMethod(asyncMethod);
- Class<?> returnType = method.getReturnType();
- // Allocate the Future<?> / Response<?> object - note: Response<?> is a subclass of Future<?>
- AsyncInvocationFutureImpl future = AsyncInvocationFutureImpl.newInstance(returnType, getInterfaceClassloader());
- try {
- invokeAsync(proxy, method, args, future, asyncMethod);
- } catch (Exception e) {
- future.setWrappedFault(new AsyncFaultWrapper(e));
- } catch (Throwable t) {
- Exception e =
- new ServiceRuntimeException("Received Throwable: " + t.getClass().getName()
- + " when invoking: "
- + asyncMethod.getName(), t);
- future.setWrappedFault(new AsyncFaultWrapper(e));
- } // end try
- return future;
- } // end method doInvokeAsyncPoll
-
- /**
- * Provide a synchronous invocation of a service operation that is either synchronous or asynchronous
- * @return
- */
- protected Object doInvokeSync(Object proxy, Method method, Object[] args) throws Throwable {
- if (isAsyncInvocation(source)) {
- // Target service is asynchronous
- Class<?> returnType = method.getReturnType();
- AsyncInvocationFutureImpl future =
- AsyncInvocationFutureImpl.newInstance(returnType, getInterfaceClassloader());
- invokeAsync(proxy, method, args, future, method);
- // Wait for some maximum time for the result - 1000 seconds here
- // Really, if the service is async, the client should use async client methods to invoke the service
- // - and be prepared to wait a *really* long time
- Object response = null;
- try {
- response = future.get(1000, TimeUnit.SECONDS);
- } catch (ExecutionException ex) {
- throw ex.getCause();
- }
- return response;
- } else {
- // Target service is not asynchronous, so perform sync invocation
- return super.invoke(proxy, method, args);
- } // end if
- } // end method doInvokeSync
-
- /**
- * Invoke an async callback method - note that this form of the async client API has as its final parameter
- * an AsyncHandler method, used for callbacks to the client code
- * @param proxy - the reference proxy
- * @param asyncMethod - the async method to invoke
- * @param args - array of input arguments to the method
- * @return - the Future<?> object that is returned to the client application, typed by the type of
- * the response
- */
- @SuppressWarnings("unchecked")
- private Object doInvokeAsyncCallback(final Object proxy, final Method asyncMethod, final Object[] args)
- throws Exception {
- Future<Response> future = theExecutor.submit(new Callable<Response>() {
-
- @Override
- public Response call() {
- AsyncHandler handler = (AsyncHandler)args[args.length - 1];
- Response response = doInvokeAsyncPoll(proxy, asyncMethod, Arrays.copyOf(args, args.length - 1));
- // Invoke the callback handler, if present
- if (handler != null) {
- handler.handleResponse(response);
- } // end if
- return response;
- }
- });
- return future.get();
-
- } // end method doInvokeAsyncCallback
-
- /**
- * Invoke the target (synchronous) method asynchronously
- * @param proxy - the reference proxy object
- * @param method - the method to invoke
- * @param args - arguments for the call
- * @param future - Future for handling the response
- * @return - returns the response from the invocation
- * @throws Throwable - if an exception is thrown during the invocation
- */
- @SuppressWarnings("unchecked")
- private void invokeAsync(Object proxy,
- Method method,
- Object[] args,
- AsyncInvocationFutureImpl<?> future,
- Method asyncMethod) throws Throwable {
- if (source == null) {
- throw new ServiceRuntimeException("No runtime source is available");
- }
-
- if (source instanceof RuntimeEndpointReference) {
- RuntimeEndpointReference epr = (RuntimeEndpointReference)source;
- if (epr.isOutOfDate()) {
- epr.rebuild();
- chains.clear();
- }
- } // end if
-
- InvocationChain chain = getInvocationChain(method, source);
-
- if (chain == null) {
- throw new IllegalArgumentException("No matching operation is found: " + method);
- }
-
- // Organize for an async service
- RuntimeEndpoint theEndpoint = getAsyncCallback(source);
- boolean isAsyncService = false;
- if (theEndpoint != null) {
- // ... the service is asynchronous but binding does not support async natively ...
- attachFuture(theEndpoint, future);
- } // end if
-
- if( isAsyncInvocation((RuntimeEndpointReference)source ) ) {
- isAsyncService = true;
- // Get hold of the JavaAsyncResponseHandler from the chain dealing with the async response
- Invoker theInvoker = chain.getHeadInvoker();
- if( theInvoker instanceof InterceptorAsync ) {
- InvokerAsyncResponse responseInvoker = ((InterceptorAsync)theInvoker).getPrevious();
- if( responseInvoker instanceof JDKAsyncResponseInvoker ) {
- // Register the future as the response object with its ID
- ((JDKAsyncResponseInvoker)responseInvoker).registerAsyncResponse(future.getUniqueID(), future);
- } // end if
- } // end if
- } // end if
-
- // Perform the invocations on separate thread...
- theExecutor.submit(new separateThreadInvoker(chain, args, source, future, asyncMethod, isAsyncService));
-
- return;
- } // end method invokeAsync
-
- /**
- * An inner class which acts as a runnable task for invoking services asynchronously on threads that are separate from
- * those used to execute operations of components
- *
- * This supports both synchronous services and asynchronous services
- */
- private class separateThreadInvoker implements Runnable {
-
- private AsyncInvocationFutureImpl future;
- private Method asyncMethod;
- private InvocationChain chain;
- private Object[] args;
- private Invocable invocable;
- private boolean isAsyncService;
-
- public separateThreadInvoker(InvocationChain chain,
- Object[] args,
- Invocable invocable,
- AsyncInvocationFutureImpl future,
- Method asyncMethod,
- boolean isAsyncService) {
- super();
- this.chain = chain;
- this.asyncMethod = asyncMethod;
- this.args = args;
- this.invocable = invocable;
- this.future = future;
- this.isAsyncService = isAsyncService;
- } // end constructor
-
- public void run() {
- Object result;
-
- try {
- if (isAsyncService) {
- if( supportsNativeAsync(invocable) ) {
- // Binding supports native async invocations
- invokeAsync(chain, args, invocable, future.getUniqueID());
- } else {
- // Binding does not support native async invocations
- invoke(chain, args, invocable, future.getUniqueID());
- } // end if
- // The result is returned asynchronously via the future...
- } else {
- // ... the service is synchronous ...
- result = invoke(chain, args, invocable);
- Type type = null;
- if (asyncMethod.getReturnType() == Future.class) {
- // For callback async method, where a Future is returned
- Type[] types = asyncMethod.getGenericParameterTypes();
- if (types.length > 0 && asyncMethod.getParameterTypes()[types.length - 1] == AsyncHandler.class) {
- // Last parameter is AsyncHandler<T>
- type = types[types.length - 1];
- } // end if
- } else if (asyncMethod.getReturnType() == Response.class) {
- // For the polling method, Response<T>
- type = asyncMethod.getGenericReturnType();
- } // end if
- if (type instanceof ParameterizedType) {
- // Check if the parameterized type of Response<T> is a doc-lit-wrapper class
- Class<?> wrapperClass = (Class<?>)((ParameterizedType)type).getActualTypeArguments()[0];
- WrapperInfo wrapperInfo = chain.getSourceOperation().getWrapper();
- if (wrapperInfo != null && wrapperInfo.getOutputWrapperClass() == wrapperClass) {
- Object wrapper = wrapperClass.newInstance();
- // Find the 1st matching property
- for (PropertyDescriptor p : Introspector.getBeanInfo(wrapperClass).getPropertyDescriptors()) {
- if (p.getWriteMethod() == null) {
- // There is a "class" property ...
- continue;
- } // end if
- if (p.getWriteMethod().getParameterTypes()[0].isInstance(result)) {
- p.getWriteMethod().invoke(wrapper, result);
- result = wrapper;
- break;
- } // end if
- } // end for
- } // end if
- } // end if
- future.setResponse(result);
- } // end if
- } catch (ServiceRuntimeException s) {
- Throwable e = s.getCause();
- if (e != null && e instanceof FaultException) {
- if ("AsyncResponse".equals(e.getMessage())) {
- // Do nothing...
- } else {
- future.setWrappedFault(new AsyncFaultWrapper(s));
- } // end if
- } // end if
- } catch (AsyncResponseException ar) {
- // This exception is received in the case where the Binding does not support async invocation
- // natively - the initial invocation is effectively synchronous with this exception thrown to
- // indicate that the service received the request but will send the response separately - do nothing
- } catch (Throwable t) {
- //System.out.println("Async invoke got exception: " + t.toString());
- future.setWrappedFault(new AsyncFaultWrapper(t));
- } // end try
-
- } // end method run
-
- } // end class separateThreadInvoker
-
- /**
- * Attaches a future to the callback endpoint - so that the Future is triggered when a response is
- * received from the asynchronous service invocation associated with the Future
- * @param endpoint - the async callback endpoint
- * @param future - the async invocation future to attach
- */
- private void attachFuture(RuntimeEndpoint endpoint, AsyncInvocationFutureImpl<?> future) {
- Implementation impl = endpoint.getComponent().getImplementation();
- AsyncResponseHandlerImpl<?> asyncHandler = (AsyncResponseHandlerImpl<?>)impl;
- asyncHandler.addFuture(future);
- } // end method attachFuture
-
- /**
- * Perform an async invocation on the reference
- * @param chain - the chain
- * @param args - parameters for the invocation
- * @param invocable - the reference
- * @param msgID - a message ID
- */
- public void invokeAsync(InvocationChain chain, Object[] args, Invocable invocable, String msgID) {
- Message msg = messageFactory.createMessage();
- if (invocable instanceof RuntimeEndpointReference) {
- msg.setFrom((RuntimeEndpointReference)invocable);
- } // end if
- if (target != null) {
- msg.setTo(target);
- } else if (source instanceof RuntimeEndpointReference) {
- msg.setTo(((RuntimeEndpointReference)invocable).getTargetEndpoint());
- } // end if
-
- Operation operation = chain.getTargetOperation();
- msg.setOperation(operation);
- msg.setBody(args);
-
- Message msgContext = ThreadMessageContext.getMessageContext();
-
- // Deal with header information that needs to be copied from the message context to the new message...
- transferMessageHeaders( msg, msgContext);
-
- ThreadMessageContext.setMessageContext(msg);
-
- // If there is a supplied message ID, place its value into the Message Header under "MESSAGE_ID"
- if( msgID != null ){
- msg.getHeaders().put("MESSAGE_ID", msgID);
- } // end if
-
- try {
- // Invoke the reference
- invocable.invokeAsync(msg);
- return;
- } finally {
- ThreadMessageContext.setMessageContext(msgContext);
- } // end try
- } // end method invokeAsync
-
- /**
- * Get the async callback endpoint - if not already created, create and start it
- * @param source - the RuntimeEndpointReference which needs an async callback endpoint
- * @param future
- * @return - the RuntimeEndpoint of the async callback
- */
- private RuntimeEndpoint getAsyncCallback(Invocable source) {
- if (!(source instanceof RuntimeEndpointReference))
- return null;
- RuntimeEndpointReference epr = (RuntimeEndpointReference)source;
- if (!isAsyncInvocation(epr)) return null;
-
- // Check to see if the binding supports async invocation natively
- ReferenceBindingProvider eprProvider = epr.getBindingProvider();
- if( eprProvider instanceof EndpointReferenceAsyncProvider) {
- if( ((EndpointReferenceAsyncProvider)eprProvider).supportsNativeAsync() ) return null;
- } // end if
-
- RuntimeEndpoint endpoint;
- synchronized (epr) {
- endpoint = (RuntimeEndpoint)epr.getCallbackEndpoint();
- // If the async callback endpoint is already created, return it...
- if (endpoint != null)
- return endpoint;
- // Create the endpoint for the async callback
- endpoint = createAsyncCallbackEndpoint(epr);
- epr.setCallbackEndpoint(endpoint);
- } // end synchronized
-
- // Activate the new callback endpoint
- startEndpoint(epr.getCompositeContext(), endpoint);
- endpoint.getInvocationChains();
-
- return endpoint;
- } // end method setupAsyncCallback
-
- /**
- * Start the callback endpoint
- * @param compositeContext - the composite context
- * @param ep - the endpoint to start
- */
- private void startEndpoint(CompositeContext compositeContext, RuntimeEndpoint ep) {
- for (PolicyProvider policyProvider : ep.getPolicyProviders()) {
- policyProvider.start();
- } // end for
-
- final ServiceBindingProvider bindingProvider = ep.getBindingProvider();
- if (bindingProvider != null) {
- // Allow bindings to add shutdown hooks. Requires RuntimePermission shutdownHooks in policy.
- AccessController.doPrivileged(new PrivilegedAction<Object>() {
- public Object run() {
- bindingProvider.start();
- return null;
- }
- });
- compositeContext.getEndpointRegistry().addEndpoint(ep);
- }
- } // end method startEndpoint
-
- /**
- * Create the async callback endpoint for a reference that is going to invoke an asyncInvocation service
- * @param epr - the RuntimeEndpointReference for which the callback is created
- * @return - a RuntimeEndpoint representing the callback endpoint
- */
- private RuntimeEndpoint createAsyncCallbackEndpoint(RuntimeEndpointReference epr) {
- CompositeContext compositeContext = epr.getCompositeContext();
- RuntimeAssemblyFactory assemblyFactory = getAssemblyFactory(compositeContext);
- RuntimeEndpoint endpoint = (RuntimeEndpoint)assemblyFactory.createEndpoint();
- endpoint.bind(compositeContext);
-
- // Create a pseudo-component and pseudo-service
- // - need to end with a chain with an invoker into the AsyncCallbackHandler class
- RuntimeComponent fakeComponent = null;
- try {
- fakeComponent = (RuntimeComponent)epr.getComponent().clone();
- applyImplementation(fakeComponent);
- } catch (CloneNotSupportedException e2) {
- // will not happen
- } // end try
- endpoint.setComponent(fakeComponent);
-
- // Create pseudo-service
- ComponentService service = assemblyFactory.createComponentService();
- ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry();
- FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
- JavaInterfaceFactory javaInterfaceFactory =
- (JavaInterfaceFactory)modelFactories.getFactory(JavaInterfaceFactory.class);
- JavaInterfaceContract interfaceContract = javaInterfaceFactory.createJavaInterfaceContract();
- try {
- interfaceContract.setInterface(javaInterfaceFactory.createJavaInterface(AsyncResponseService.class));
- } catch (InvalidInterfaceException e1) {
- // Nothing to do here - will not happen
- } // end try
- service.setInterfaceContract(interfaceContract);
- String serviceName = epr.getReference().getName() + "_asyncCallback";
- service.setName(serviceName);
- // MJE 06/12/2010 - fixup for JMS binding code which looks at the implementation service
- // as well as the component service...
- // Create a pseudo implementation service...
- Service implService = assemblyFactory.createService();
- implService.setName(serviceName);
- implService.setInterfaceContract(interfaceContract);
- service.setService(implService);
- //
- endpoint.setService(service);
- // Set pseudo-service onto the pseudo-component
- List<ComponentService> services = fakeComponent.getServices();
- services.clear();
- services.add(service);
-
- // Create a binding
- Binding binding = createMatchingBinding(epr.getBinding(), fakeComponent, service, registry);
- endpoint.setBinding(binding);
-
- // Need to establish policies here (binding has some...)
- endpoint.getRequiredIntents().addAll(epr.getRequiredIntents());
- endpoint.getPolicySets().addAll(epr.getPolicySets());
- String epURI = epr.getComponent().getName() + "#service-binding(" + serviceName + "/" + serviceName + ")";
- endpoint.setURI(epURI);
- endpoint.setUnresolved(false);
- return endpoint;
- }
-
- /**
- * Create a matching binding to a supplied binding
- * - the matching binding has the same binding type, but is for the supplied component and service
- * @param matchBinding - the binding to match
- * @param component - the component
- * @param service - the service
- * @param registry - registry for extensions
- * @return - the matching binding, or null if it could not be created
- */
- @SuppressWarnings("unchecked")
- private Binding createMatchingBinding(Binding matchBinding,
- RuntimeComponent component,
- ComponentService service,
- ExtensionPointRegistry registry) {
- // Since there is no simple way to obtain a Factory for a binding where the type is not known ahead of
- // time, the process followed here is to generate the <binding.xxx/> XML element from the binding type QName
- // and then read the XML using the processor for that XML...
- QName bindingName = matchBinding.getType();
- String bindingXML =
- "<ns1:" + bindingName.getLocalPart() + " xmlns:ns1='" + bindingName.getNamespaceURI() + "'/>";
-
- StAXArtifactProcessorExtensionPoint processors =
- registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class);
- StAXArtifactProcessor<?> processor = (StAXArtifactProcessor<?>)processors.getProcessor(bindingName);
-
- FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
- ValidatingXMLInputFactory inputFactory = modelFactories.getFactory(ValidatingXMLInputFactory.class);
- StreamSource source = new StreamSource(new StringReader(bindingXML));
-
- ProcessorContext context = new ProcessorContext();
- try {
- XMLStreamReader reader = inputFactory.createXMLStreamReader(source);
- reader.next();
- Binding newBinding = (Binding)processor.read(reader, context);
-
- // Create a URI address for the callback based on the Component_Name/Reference_Name pattern
- String callbackURI = "/" + component.getName() + "/" + service.getName();
- newBinding.setURI(callbackURI);
-
- BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class);
- BindingBuilder builder = builders.getBindingBuilder(newBinding.getType());
- if (builder != null) {
- org.apache.tuscany.sca.assembly.builder.BuilderContext builderContext = new BuilderContext(registry);
- builder.build(component, service, newBinding, builderContext, true);
- } // end if
-
- return newBinding;
- } catch (ContributionReadException e) {
- e.printStackTrace();
- } catch (XMLStreamException e) {
- e.printStackTrace();
- }
-
- return null;
- } // end method createMatchingBinding
-
- /**
- * Gets a RuntimeAssemblyFactory from the CompositeContext
- * @param compositeContext
- * @return the RuntimeAssemblyFactory
- */
- private RuntimeAssemblyFactory getAssemblyFactory(CompositeContext compositeContext) {
- ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry();
- FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
- return (RuntimeAssemblyFactory)modelFactories.getFactory(AssemblyFactory.class);
- } // end method RuntimeAssemblyFactory
-
- /**
- * Applies an AsyncResponseHandlerImpl as the implementation of a RuntimeComponent
- * - the AsyncResponseHandlerImpl acts as both the implementation class and the implementation provider...
- * @param component - the component
- */
- private void applyImplementation(RuntimeComponent component) {
- AsyncResponseHandlerImpl<?> asyncHandler = new AsyncResponseHandlerImpl<Object>();
- component.setImplementation(asyncHandler);
- component.setImplementationProvider(asyncHandler);
- return;
- } // end method getImplementationProvider
-
- private static QName ASYNC_INVOKE = new QName(Constants.SCA11_NS, "asyncInvocation");
-
- /**
- * Determines if the service invocation is asynchronous
- * @param source - the EPR involved in the invocation
- * @return - true if the invocation is async
- */
- private boolean isAsyncInvocation(Invocable source) {
- if (!(source instanceof RuntimeEndpointReference))
- return false;
- RuntimeEndpointReference epr = (RuntimeEndpointReference)source;
- // First check is to see if the EPR itself has the asyncInvocation intent marked
- for (Intent intent : epr.getRequiredIntents()) {
- if (intent.getName().equals(ASYNC_INVOKE))
- return true;
- } // end for
-
- // Second check is to see if the target service has the asyncInvocation intent marked
- Endpoint ep = epr.getTargetEndpoint();
- for (Intent intent : ep.getRequiredIntents()) {
- if (intent.getName().equals(ASYNC_INVOKE))
- return true;
- } // end for
- return false;
- } // end isAsyncInvocation
-
- private boolean supportsNativeAsync(Invocable source) {
- if (!(source instanceof RuntimeEndpointReference))
- return false;
- RuntimeEndpointReference epr = (RuntimeEndpointReference)source;
-
- // TODO - need to update this once BindingProvider interface is refactored to contain
- // supportsNativeAsync directly...
- ReferenceBindingProvider provider = epr.getBindingProvider();
- if( provider instanceof EndpointReferenceAsyncProvider ) {
- return ((EndpointReferenceAsyncProvider)provider).supportsNativeAsync();
- } else {
- return false;
- } // end if
- } // end method supportsNativeAsync
-
- /**
- * Return the synchronous method that is the equivalent of an async method
- * @param asyncMethod - the async method
- * @return - the equivalent synchronous method
- */
- protected Method getNonAsyncMethod(Method asyncMethod) {
- String methodName = asyncMethod.getName().substring(0, asyncMethod.getName().length() - 5);
- for (Method m : businessInterface.getMethods()) {
- if (methodName.equals(m.getName())) {
- return m;
- }
- }
- throw new IllegalStateException("No synchronous method matching async method " + asyncMethod.getName());
- } // end method getNonAsyncMethod
-
- /**
- * Gets the classloader of the business interface
- * @return
- */
- private ClassLoader getInterfaceClassloader() {
- return businessInterface.getClassLoader();
- }
-}