From e5133b4dd0a5990194d41ba691af68c828a30b01 Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Mon, 17 Aug 2009 11:13:17 +0000 Subject: Adding support for SCA callbacks, as described in TUSCANY 3216 git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@804937 13f79535-47bb-0310-9956-ffa450edef68 --- .../implementation/bpel/ode/EmbeddedODEServer.java | 191 ++++++++++++++++++++- .../bpel/ode/ODEExternalService.java | 134 ++++++++++++--- .../sca/implementation/bpel/ode/TuscanyPRC.java | 2 + .../bpel/ode/TuscanyProcessConfImpl.java | 27 +-- 4 files changed, 318 insertions(+), 36 deletions(-) (limited to 'java/sca/modules/implementation-bpel-runtime/src/main') diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java index b2522326f8..d812a64e1c 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java @@ -28,6 +28,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.transaction.TransactionManager; import javax.xml.namespace.QName; @@ -41,6 +44,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC; import org.apache.ode.bpel.engine.BpelServerImpl; import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy; +import org.apache.ode.bpel.evt.BpelEvent; +import org.apache.ode.bpel.evt.CorrelationMatchEvent; +import org.apache.ode.bpel.evt.NewProcessInstanceEvent; +import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent; +import org.apache.ode.bpel.iapi.BpelEventListener; import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl; import org.apache.ode.il.config.OdeConfigProperties; @@ -48,6 +56,8 @@ import org.apache.ode.il.dbutil.Database; import org.apache.ode.scheduler.simple.JdbcDelegate; import org.apache.ode.scheduler.simple.SimpleScheduler; import org.apache.ode.utils.GUID; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.eclipse.core.runtime.FileLocator; @@ -81,6 +91,14 @@ public class EmbeddedODEServer { protected ExecutorService _executorService; private Map tuscanyRuntimeComponents = new ConcurrentHashMap(); + + private Map mexToProcessMap = new ConcurrentHashMap(); + + private Map> callbackMap = new ConcurrentHashMap>(); + + private final Lock metadataLock = new ReentrantLock(); + private final Condition mexAdded = metadataLock.newCondition(); + private final Condition callbackAdded = metadataLock.newCondition(); public EmbeddedODEServer(TransactionManager txMgr) { _txMgr = txMgr; @@ -245,7 +263,10 @@ public class EmbeddedODEServer { _bpelServer.setProcessThrottledMaximumSize(_config.getProcessThrottledMaximumSize()); _bpelServer.setHydrationLazy(_config.isHydrationLazy()); _bpelServer.setHydrationLazyMinimumSize(_config.getHydrationLazyMinimumSize()); - } + + // Register event listener on the BPEL server + _bpelServer.registerBpelEventListener( new ODEEventListener( this, _bpelServer) ); + } // end method initBpelLServer public void stop() throws ODEShutdownException { if(_bpelServer != null) { @@ -371,4 +392,172 @@ public class EmbeddedODEServer { public RuntimeComponent getTuscanyRuntimeComponent(QName processName) { return tuscanyRuntimeComponents.get(processName); } + + /** + * Records a connection between a MessageExchange ID and a Process Instance ID + * @param mexID + * @param processID + */ + public void addMexToProcessIDLink( String mexID, Long processID ) { + System.out.println("Add mapping Mex - ProcessID = " + mexID + " " + processID.toString()); + if( mexID == null ) { + System.out.println("Mex ID is null !"); + return; + } // end if + metadataLock.lock(); + try { + mexToProcessMap.put(mexID, processID); + mexAdded.signalAll(); + return; + } catch (Exception e) { + return; + } finally { + metadataLock.unlock(); + } // end try + } // end method addMexToProcessIDLink( mexID, processID ) + + /** + * Connects from a MessageExchangeID to a Process Instance ID + * @param mexID - the MessageExchange ID + * @return - a Long which is the Process Instance ID + */ + public Long getProcessIDFromMex( String mexID ) { + System.out.println("Get mapping for Mex: " + mexID); + metadataLock.lock(); + try { + Long processID = mexToProcessMap.get(mexID); + while( processID == null ) { + mexAdded.await(); + processID = mexToProcessMap.get(mexID); + } // end while + return processID; + } catch (Exception e) { + return null; + } finally { + metadataLock.unlock(); + } // end try + + } // end method getProcessIDFromMex + + /** + * Remove the connection between a Message Exchange ID and a Process Instance ID + * @param mexID - the Message Exchange ID + */ + public void removeMexToProcessIDLink( String mexID ) { + mexToProcessMap.remove(mexID); + } // end method removeMexToProcessIDLink + + /** + * Stores the metadata for a Callback + * @param processID - Process ID of the BPEL Process Instance for which this callback applies + * @param serviceName - the name of the service which has the callback + * @param callbackEndpoint - a Tuscany Endpoint which is the target of the callback + */ + public void saveCallbackMetadata( Long processID, String serviceName, EndpointReference callbackEPR ) { + System.out.println("Save callback metadata: ProcessID " + processID.toString() + " service: " + serviceName); + metadataLock.lock(); + try { + Map processMap = callbackMap.get(processID); + if( processMap == null ) { + processMap = new ConcurrentHashMap(); + callbackMap.put(processID, processMap); + } // end if + // Put the mapping of service name to callback endpoint - note that this overwrites any + // previous mapping for the same service name + processMap.put(serviceName, callbackEPR); + callbackAdded.signalAll(); + } finally { + metadataLock.unlock(); + } // end try + } // end saveCallbackMetadata + + /** + * Get the metadata for a Callback, based on a BPEL Process Instance ID and a Service name + * @param processID - the BPEL Process Instance ID + * @param serviceName - the service name + * @return - and Endpoint which is the Callback endpoint for the service for this process instance. + * Returns null if there is no callback metadata for this service. + */ + public EndpointReference getCallbackMetadata( Long processID, String serviceName ) { + EndpointReference theEPR; + System.out.println("Get callback metadata: ProcessID " + processID.toString() + " service: " + serviceName); + + metadataLock.lock(); + try { + while(true) { + Map processMap = callbackMap.get(processID); + theEPR = processMap.get(serviceName); + if( theEPR != null ) return theEPR; + callbackAdded.await(); + } // end while + } catch (Exception e) { + return null; + } finally { + metadataLock.unlock(); + } // end try + } // end method getCallbackMetadata + + /** + * Removes the metadata for a Callback + * @param processID - the Process Instance ID of the process instance to which the callback metadata applies + * @param serviceName - the service name for the service which has a callback - can be NULL, in which case ALL + * callback metadata for the process instance is removed + */ + public void removeCallbackMetadata( Long processID, String serviceName ) { + + if( serviceName == null ) { + callbackMap.remove(processID); + } else { + Map processMap = callbackMap.get(processID); + processMap.remove(serviceName); + } // end if + + } // end method removeCallbackMetadata + + private class ODEEventListener implements BpelEventListener { + + private EmbeddedODEServer ODEServer; + private BpelServerImpl bpelServer; + + ODEEventListener( EmbeddedODEServer ODEServer, BpelServerImpl bpelServer ) { + this.ODEServer = ODEServer; + this.bpelServer = bpelServer; + } // end constructor + + /** + * Method which receives events from the ODE Engine as processing proceeds + */ + public void onEvent(BpelEvent bpelEvent) { + if( bpelEvent instanceof ProcessMessageExchangeEvent || + bpelEvent instanceof NewProcessInstanceEvent || + bpelEvent instanceof CorrelationMatchEvent ) { + handleProcMexEvent( (ProcessMessageExchangeEvent) bpelEvent ); + return; + } // end if + + } // end method onEvent + + /** + * Handle a ProcessMessageExchangeEvent + * - the important aspect of this event is that it establishes a connection between a MessageExchange object + * and the BPEL Process instance to which it relates. + * @param bpelEvent - the ProcessMessageExchangeEvent + */ + private void handleProcMexEvent( ProcessMessageExchangeEvent bpelEvent) { + // Extract the message ID and the process instance ID - it is the connection between these + // that is vital to know + String mexID = bpelEvent.getMessageExchangeId(); + Long processID = bpelEvent.getProcessInstanceId(); + ODEServer.addMexToProcessIDLink( mexID, processID ); + } // end method handleProcMexEvent + + public void shutdown() { + // Intentionally left blank + } + + public void startup(Properties configProperties) { + // Intentionally left blank + } + + } // end Class BPELEventListener } diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java index c24d703638..55bde26c08 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java @@ -26,11 +26,14 @@ import javax.xml.namespace.QName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.Endpoint; import org.apache.ode.bpel.iapi.Message; import org.apache.ode.bpel.iapi.MessageExchange; import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.utils.DOMUtils; +import org.apache.tuscany.sca.assembly.ComponentReference; +import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; import org.apache.tuscany.sca.runtime.RuntimeComponent; @@ -55,7 +58,6 @@ public class ODEExternalService { this._sched = _server.getScheduler(); } - public void invoke(final PartnerRoleMessageExchange partnerRoleMessageExchange) { boolean isTwoWay = partnerRoleMessageExchange.getMessageExchangePattern() == org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE; @@ -69,8 +71,7 @@ public class ODEExternalService { public void afterCompletion(boolean success) { // If the TX is rolled back, then we don't send the request. - if (!success) - return; + if (!success) return; // The invocation must happen in a separate thread, holding on the afterCompletion // blocks other operations that could have been listed there as well. @@ -85,17 +86,14 @@ public class ODEExternalService { TuscanyPRC channel = (TuscanyPRC) partnerRoleMessageExchange.getChannel(); RuntimeComponent tuscanyRuntimeComponent = _server.getTuscanyRuntimeComponent(channel.getProcessName()); - // MJE 17/07/2009 - the get(0) here is totally bogus - if the component has >1 reference, this will fail - // miserably. We should be fetching the reference BY NAME - and this name must be stored in the PRC - RuntimeComponentReference runtimeComponentReference = - (RuntimeComponentReference)tuscanyRuntimeComponent.getReferences().get(0); - RuntimeWire runtimeWire = - runtimeComponentReference.getRuntimeWire(runtimeComponentReference.getEndpointReferences().get(0)); + // Fetching the reference based on the data held in the PRC / Endpoint + String refName = channel.getEndpoint().serviceName.getLocalPart(); + RuntimeComponentReference runtimeComponentReference = getReferenceByName( tuscanyRuntimeComponent, refName ); + RuntimeWire runtimeWire = getRuntimeWire( runtimeComponentReference, partnerRoleMessageExchange ); // convert operations Operation operation = findOperation(partnerRoleMessageExchange.getOperation().getName(), runtimeComponentReference); - /* This is how a request looks like (payload is wrapped with extra info) @@ -109,16 +107,14 @@ public class ODEExternalService { */ Element msg = partnerRoleMessageExchange.getRequest().getMessage(); if (msg != null) { - String xml = DOMUtils.domToString(msg); - - String payload = - DOMUtils.domToString(getPayload(partnerRoleMessageExchange.getRequest())); - + if(__log.isDebugEnabled()) { + String xml = DOMUtils.domToString(msg); + String payload = DOMUtils.domToString(getPayload(partnerRoleMessageExchange.getRequest())); __log.debug("Starting invocation of SCA Reference"); __log.debug(">>> Original message: " + xml); __log.debug(">>> Payload: " + payload); - } + } // end if Object[] args = new Object[] {getPayload(partnerRoleMessageExchange.getRequest())}; @@ -129,32 +125,29 @@ public class ODEExternalService { result = runtimeWire.invoke(operation, args); success = true; } catch (Exception e) { + e.printStackTrace(); partnerRoleMessageExchange.replyWithFailure(MessageExchange.FailureType.OTHER, e.getMessage(), null); - } - + } // end try if(__log.isDebugEnabled()) { __log.debug("SCA Reference invocation finished"); __log.debug(">>> Result : " + DOMUtils.domToString((Element)result)); - } + } // end if - if (!success) { - return null; - } + if (!success) { return null; } // two way invocation // process results based on type of message invocation replyTwoWayInvocation(partnerRoleMessageExchange.getMessageExchangeId(), operation, (Element)result); - } + } // end if } catch (Throwable t) { // some error - String errmsg = - "Error sending message (mex=" + partnerRoleMessageExchange + "): " + t.getMessage(); + String errmsg = "Error sending message (mex=" + partnerRoleMessageExchange + "): " + t.getMessage(); __log.error(errmsg, t); /*replyWithFailure(partnerRoleMessageExchange.getMessageExchangeId(), MessageExchange.FailureType.COMMUNICATION_ERROR, @@ -182,6 +175,97 @@ public class ODEExternalService { } } + /** + * Gets a RuntimeComponentReference of a supplied RuntimeComponent by name + * @param tuscanyRuntimeComponent - the runtime component + * @param name - the name of the reference + * @return - the RuntimeComponentReference with the supplied name - null if there is no reference with that name + */ + private RuntimeComponentReference getReferenceByName( RuntimeComponent tuscanyRuntimeComponent, String name ) { + if( name == null ) return null; + for( ComponentReference reference : tuscanyRuntimeComponent.getReferences() ) { + if( name.equals(reference.getName()) ) return (RuntimeComponentReference)reference; + } // end for + return null; + } // end method getReferenceByName + + /** + * Get the Runtime Wire for the supplied reference + * @param componentReference - the reference + * @return - the RuntimeWire - null if it cannot be found + */ + private RuntimeWire getRuntimeWire( RuntimeComponentReference componentReference, + PartnerRoleMessageExchange mex) { + if( componentReference.isForCallback() ) { + // Where there is a callback, it is necessary to create a specialized wire, based on callback information + // present on the forward call + + // Get the callbackEPR for the callback using the BPEL Process ID and the Reference name + // - which is the same name as the service name for a callback + Long processID = _server.getProcessIDFromMex(mex.getMessageExchangeId()); + org.apache.tuscany.sca.assembly.EndpointReference callbackEPR = + _server.getCallbackMetadata(processID, componentReference.getName()); + RuntimeWire wire = selectCallbackWire( callbackEPR.getTargetEndpoint(), componentReference ); + wire = clone_bind( componentReference, callbackEPR.getCallbackEndpoint() ); + return wire; + } else { + // No callback case... + //TODO - fix the x..n multiplicity case, which needs to select the correct ONE of multiple + // EndpointReferences here + return componentReference.getRuntimeWire(componentReference.getEndpointReferences().get(0)); + } // end if + } // end method getRuntimeWire + + private RuntimeWire selectCallbackWire( org.apache.tuscany.sca.assembly.Endpoint endpoint, + RuntimeComponentReference componentReference) { + // Look for callback binding with same name as service binding + if (endpoint == null) { + throw new RuntimeException("Destination for forward call is not available"); + } + + for (RuntimeWire wire : componentReference.getRuntimeWires()) { + if (wire.getEndpointReference().getBinding().getName().equals(endpoint.getBinding().getName())) { + return wire; + } + } // end for + + // if no match, look for callback binding with same type as service binding + for (RuntimeWire wire : componentReference.getRuntimeWires()) { + if (wire.getEndpointReference().getBinding().getClass() == endpoint.getBinding().getClass()) { + return wire; + } + } // end for + + // no suitable callback wire was found + return null; + } // end method selectCallbackWire + + private RuntimeWire clone_bind(RuntimeComponentReference reference, + org.apache.tuscany.sca.assembly.Endpoint callbackEndpoint) { + + try { + // clone the callback reference ready to configure it for this callback endpoint + RuntimeComponentReference ref = (RuntimeComponentReference)reference.clone(); + ref.getTargets().clear(); + ref.getBindings().clear(); + ref.getEndpointReferences().clear(); + + // clone epr + EndpointReference callbackEndpointReference = (EndpointReference)reference.getEndpointReferences().get(0).clone(); + callbackEndpointReference.setReference(ref); + callbackEndpointReference.setTargetEndpoint(callbackEndpoint); + callbackEndpointReference.setUnresolved(true); + + // The callback endpoint will be resolved when the wire chains are created + ref.getEndpointReferences().add(callbackEndpointReference); + RuntimeWire wire = ref.getRuntimeWires().get(0); + + return wire; + } catch ( CloneNotSupportedException e ) { + return null; + } // end try clone_bind + + } // end method /** * Find the SCA Reference operation diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java index 0de474a553..aaa00069f5 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java @@ -33,10 +33,12 @@ import org.apache.ode.bpel.iapi.PartnerRoleChannel; */ public class TuscanyPRC implements PartnerRoleChannel { private final QName processName; + private final QName pid; private final Endpoint endpoint; public TuscanyPRC(QName processName, QName pid, PortType portType, Endpoint endpoint){ this.processName = processName; + this.pid = pid; this.endpoint = endpoint; } diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java index aaa42261e2..19e7d3e913 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java @@ -59,6 +59,7 @@ import org.apache.ode.bpel.iapi.ProcessState; import org.apache.tuscany.sca.assembly.Base; import org.apache.tuscany.sca.assembly.ComponentProperty; import org.apache.tuscany.sca.assembly.ComponentReference; +import org.apache.tuscany.sca.assembly.ComponentService; import org.apache.tuscany.sca.assembly.Reference; import org.apache.tuscany.sca.assembly.Service; import org.apache.tuscany.sca.databinding.SimpleTypeMapper; @@ -279,7 +280,8 @@ public class TuscanyProcessConfImpl implements ProcessConf { public Map getInvokeEndpoints() { if( invokeEndpoints == null ) { invokeEndpoints = new HashMap(); - // Get a collection of the references + // Get a collection of the component references - note that this includes "pseudo-references" for any + // services that have a callback interface List theReferences = component.getReferences(); //List theReferences = implementation.getReferences(); // Create an endpoint for each reference, using the reference name as the "service" @@ -338,11 +340,13 @@ public class TuscanyProcessConfImpl implements ProcessConf { if( provideEndpoints == null ) { provideEndpoints = new HashMap(); String componentURI = component.getURI(); - // Get a collection of the references - List theServices = implementation.getServices(); + // Get a collection of the services - note that the Component services include additional + // "pseudo-services" for each reference that has a callback... + //List theServices = implementation.getServices(); + List theServices = component.getServices(); // Create an endpoint for each reference, using the reference name as the "service" // name, combined with http://tuscany.apache.org to make a QName - for( Service service : theServices ) { + for( ComponentService service : theServices ) { // MJE 14/07/2009 - added componentURI to the service name to get unique service name provideEndpoints.put( service.getName(), new Endpoint( new QName( TUSCANY_NAMESPACE, componentURI + service.getName() ), @@ -382,15 +386,19 @@ public class TuscanyProcessConfImpl implements ProcessConf { * List. These events are "ODE Execution Events" and there is a definition of them on this * page: http://ode.apache.org/user-guide.html#UserGuide-ProcessDeployment * - * For the present Tuscany does not support manipulating the event enablement and always - * returns that the event is not enabled + * Tuscany currently uses: + * - instanceLifecycle events in order to establish the relationship of MessageExchange objects + * to the BPEL Process instances * @param scopeNames - list of BPEL process Scope names * @param type - the event type */ public boolean isEventEnabled(List scopeNames, TYPE type) { - //System.out.println("isEventEnabled called with scopeNames: " + - // scopeNames + " and type: " + type ); - return false; + if( type == TYPE.dataHandling ) return false; + if( type == TYPE.activityLifecycle ) return false; + if( type == TYPE.scopeHandling ) return true; + if( type == TYPE.instanceLifecycle ) return true; + if( type == TYPE.correlation ) return true; + return false; } // end isEventEnabled /** @@ -400,7 +408,6 @@ public class TuscanyProcessConfImpl implements ProcessConf { * - returning true causes problems in communicating with the BPEL process */ public boolean isTransient() { - //System.out.println("isTransient called"); return false; } // end isTransient -- cgit v1.2.3