From e5b7380c874745c989d1816b8f552504f038e1bc Mon Sep 17 00:00:00 2001 From: lresende Date: Thu, 26 Sep 2013 20:33:20 +0000 Subject: 2.0 branch for possible maintenance release git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1526672 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/invocation/ResponseDispatchImpl.java | 251 +++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 sca-java-2.x/branches/2.0/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java (limited to 'sca-java-2.x/branches/2.0/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java') diff --git a/sca-java-2.x/branches/2.0/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java b/sca-java-2.x/branches/2.0/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java new file mode 100644 index 0000000000..343682d8b1 --- /dev/null +++ b/sca-java-2.x/branches/2.0/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.java.invocation; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.context.CompositeContext; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.factory.ObjectFactory; +import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper; +import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler; +import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker; +import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory; +import org.apache.tuscany.sca.core.invocation.Constants; +import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory; +import org.apache.tuscany.sca.core.invocation.ProxyFactory; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.oasisopen.sca.ResponseDispatch; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * Implementation of the ResponseDispatch interface of the OASIS SCA Java API + * + * This is used for invocations of asynchronous services, where it is passed as a parameter on async service operations + * and it provides the path by which the service implementation returns the response to the request, or a Fault + * + * Note that this class is serializable and can be serialized, stored and deserialized by the service implementation + * + * @param - type of the response message + */ +public class ResponseDispatchImpl implements ResponseDispatch, Serializable { + + /** + * Generated serialVersionUID value + */ + private static final long serialVersionUID = 300158355992568592L; + + // A latch used to ensure that the sendResponse() and sendFault() operations are used at most once + // The latch is initialized with the value "false" + private AtomicBoolean latch = new AtomicBoolean(); + + private final Lock lock = new ReentrantLock(); + private final Condition completed = lock.newCondition(); + + // The result + private transient volatile T response = null; + private transient volatile Throwable fault = null; + + private transient ExtensionPointRegistry registry; + private MessageFactory msgFactory; + + // Service Reference used for the callback + private volatile ServiceReference> callbackRef; + private AsyncResponseInvoker respInvoker; + private String messageID; + + /** + * No-arg constructor for serialization purposes + */ + public ResponseDispatchImpl() { + super(); + } // end constructor + + public ResponseDispatchImpl( Message msg ) { + super(); + + respInvoker = (AsyncResponseInvoker)msg.getHeaders().get(Constants.ASYNC_RESPONSE_INVOKER); + //if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker"); + + if( respInvoker == null ) { + callbackRef = getAsyncCallbackRef( msg ); + } // end if + + messageID = (String) msg.getHeaders().get(Constants.MESSAGE_ID); + + } // end constructor + + public static ResponseDispatchImpl newInstance( Class type, Message msg ) { + return new ResponseDispatchImpl( msg ); + } + + /** + * Provide Context data for this ResponseDispatch that the service implementation can use + */ + public Map getContext() { + return null; + } + + /** + * Send a Fault. Must only be invoked once for this ResponseDispatch object + * @param e - the Fault to send + * @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously + */ + public void sendFault(Throwable e) { + if( sendOK() ) { + lock.lock(); + try { + fault = e; + completed.signalAll(); + } finally { + lock.unlock(); + } // end try + } else { + throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); + } // end if + + // Use response invoker if present + if( respInvoker != null ) { + //respInvoker.invokeAsyncResponse(new AsyncFaultWrapper(e)); + respInvoker.invokeAsyncResponse(e, null); + return; + } // end if + + // Now dispatch the response to the callback... + AsyncResponseHandler handler = (AsyncResponseHandler) callbackRef.getService(); + setResponseHeaders(); + handler.setWrappedFault(new AsyncFaultWrapper(e)); + } // end method sendFault + + /** + * Send the response message. Must only be invoked once for this ResponseDispatch object + * @throws IllegalStateException if either the sendResponse method or the sendFault method have been called previously + * @param res - the response message, which is of type T + */ + @SuppressWarnings("unchecked") + public void sendResponse(T res) { + if( sendOK() ) { + lock.lock(); + try { + response = res; + completed.signalAll(); + } finally { + lock.unlock(); + } // end try + } else { + throw new IllegalStateException("sendResponse() or sendFault() has been called previously"); + } // end if + + // Now dispatch the response to the callback, if present... + if( callbackRef != null ) { + AsyncResponseHandler handler = (AsyncResponseHandler) callbackRef.getService(); + setResponseHeaders(); + handler.setResponse(res); + } // end if + + // Use response invoker if present + if( respInvoker != null ) { + respInvoker.invokeAsyncResponse(res, null); + return; + } // end if + } // end method sendResponse + + public T get(long timeout, TimeUnit unit) throws Throwable { + lock.lock(); + try { + // wait for result to be available + if( response == null && fault == null ) completed.await( timeout, unit); + if( response != null ) return response; + if( fault != null ) throw fault; + } finally { + lock.unlock(); + } // end try + + return null; + } // end method get + + /** + * Indicates that sending a response is OK - this is a transactional + * query in that it also updates the state of this ResponseDispatch, so + * that it will return true once and once only + * @return - true if it is OK to send the response, false otherwise + */ + private boolean sendOK() { + return latch.compareAndSet(false, true); + } + + /** + * Creates a service reference for the async callback, based on information contained in the supplied message + * @param msg - the incoming message + * @return - a CallBackServiceReference + */ + @SuppressWarnings("unchecked") + private ServiceReference> getAsyncCallbackRef( Message msg ) { + RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get(Constants.ASYNC_CALLBACK); + if( callbackEPR == null ) return null; + + CompositeContext compositeContext = callbackEPR.getCompositeContext(); + registry = compositeContext.getExtensionPointRegistry(); + ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry); + msgFactory = getMessageFactory(); + List eprList = new ArrayList(); + eprList.add(callbackEPR); + ObjectFactory factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList); + + return (ServiceReference>) factory.getInstance(); + + } // end method getAsyncCallbackEPR + + /** + * Sets the values of various headers in the response message + */ + private void setResponseHeaders() { + // Is there an existing message context? + Message msgContext = ThreadMessageContext.getMessageContext(); + if( msgContext == null ) { + // Create a message context + msgContext = msgFactory.createMessage(); + } // end if + + // Add in the header for the RelatesTo Message ID + msgContext.getHeaders().put(Constants.RELATES_TO, messageID); + + ThreadMessageContext.setMessageContext(msgContext); + } // end method setResponseHeaders + + private MessageFactory getMessageFactory() { + FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class); + return modelFactories.getFactory(MessageFactory.class); + } // end method getMessageFactory +} -- cgit v1.2.3