From ff7755834cd88fc3c501ee9be9cceca8db77385c Mon Sep 17 00:00:00 2001 From: edwardsmj Date: Thu, 25 Jun 2009 21:50:48 +0000 Subject: Changes to move implementation-bpel-runtime to use ODE 1.3.2 on Tuscany 2.x git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@788522 13f79535-47bb-0310-9956-ffa450edef68 --- .../implementation/bpel/ode/EmbeddedODEServer.java | 104 ++++++++++++++++++--- .../implementation/bpel/ode/ODEBindingContext.java | 12 +++ .../bpel/ode/TuscanyProcessConfImpl.java | 81 +++++++++++++--- .../ode/provider/BPELImplementationProvider.java | 19 +++- .../bpel/ode/provider/BPELInvoker.java | 6 +- 5 files changed, 189 insertions(+), 33 deletions(-) (limited to 'java/sca/modules/implementation-bpel-runtime/src/main') diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java index 619643eaa3..f5139b20d9 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java @@ -27,12 +27,17 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import javax.transaction.TransactionManager; import javax.xml.namespace.QName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +//import org.apache.ode.axis2.BindingContextImpl; +//import org.apache.ode.axis2.EndpointReferenceContextImpl; +//import org.apache.ode.axis2.MessageExchangeContextImpl; +//import org.apache.ode.axis2.EndpointReferenceContextImpl; import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC; import org.apache.ode.bpel.engine.BpelServerImpl; import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy; @@ -45,6 +50,9 @@ import org.apache.ode.scheduler.simple.SimpleScheduler; import org.apache.ode.utils.GUID; import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.eclipse.core.runtime.FileLocator; + + /** * Embedded ODE process server @@ -86,12 +94,10 @@ public class EmbeddedODEServer { confProps.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=false)"); _config = new OdeConfigProperties(confProps, "ode-sca"); - // Setting work root as the directory containing our database (wherever in the classpath) - URL dbLocation = getClass().getClassLoader().getResource("jpadb"); - if (dbLocation == null) - throw new ODEInitializationException("Couldn't find database in the classpath"); + // Setting work root as the directory containing our database try { - _workRoot = new File(dbLocation.toURI()).getParentFile(); + _workRoot = getDatabaseLocationAsFile(); + //_workRoot = new File(dbLocation.toURI()).getParentFile(); } catch (URISyntaxException e) { throw new ODEInitializationException(e); } @@ -107,10 +113,41 @@ public class EmbeddedODEServer { __log.error(errmsg, ex); throw new ODEInitializationException(errmsg, ex); } + + // Added MJE, 24/06/2009 - for 1.3.2 version of ODE + _scheduler.start(); + // end of addition __log.info("ODE BPEL server started."); _initialized = true; } + + /** + * Gets the location of the database used for the ODE BPEL engine as a File object for + * the directory containing the database + * @return + * @throws ODEInitializationException + * @throws URISyntaxException + */ + private File getDatabaseLocationAsFile() throws ODEInitializationException, + URISyntaxException { + File locationFile = null; + // TODO - provide a system property / environment variable to set the path to the DB + + URL dbLocation = getClass().getClassLoader().getResource("jpadb"); + // Handle OSGI bundle case + if( dbLocation.getProtocol() == "bundleresource" ) { + try { + dbLocation = FileLocator.toFileURL( dbLocation ); + } catch (Exception ce ) { + throw new ODEInitializationException("Couldn't find database in the OSGi bundle"); + } // end try + } // end if + if (dbLocation == null) + throw new ODEInitializationException("Couldn't find database in the classpath"); + locationFile = new File(dbLocation.toURI()).getParentFile(); + return locationFile; + } // end method getDatabaseLocationAsFile private void initTxMgr() { if(_txMgr == null) { @@ -138,32 +175,64 @@ public class EmbeddedODEServer { throw new ODEInitializationException(errmsg, ex); } } - + private void initBpelServer() { if (__log.isDebugEnabled()) { __log.debug("ODE initializing"); } + ThreadFactory threadFactory = new ThreadFactory() { + int threadNumber = 0; + public Thread newThread(Runnable r) { + threadNumber += 1; + Thread t = new Thread(r, "ODEServer-"+threadNumber); + t.setDaemon(true); + return t; + } + }; + + _executorService = Executors.newCachedThreadPool(threadFactory); - //FIXME: externalize the configuration for ThreadPoolMaxSize - _executorService = Executors.newCachedThreadPool(); - + // executor service for long running bulk transactions + ExecutorService _polledRunnableExecutorService = Executors.newCachedThreadPool(new ThreadFactory() { + int threadNumber = 0; + public Thread newThread(Runnable r) { + threadNumber += 1; + Thread t = new Thread(r, "PolledRunnable-"+threadNumber); + t.setDaemon(true); + return t; + } + }); + _bpelServer = new BpelServerImpl(); _scheduler = createScheduler(); _scheduler.setJobProcessor(_bpelServer); + + BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor(); + polledRunnableProcessor.setPolledRunnableExecutorService(_polledRunnableExecutorService); + polledRunnableProcessor.setContexts(_bpelServer.getContexts()); + _scheduler.setPolledRunnableProcesser(polledRunnableProcessor); _bpelServer.setDaoConnectionFactory(_daoCF); _bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler)); - // _bpelServer.setEndpointReferenceContext(new EndpointReferenceContextImpl(this)); + + //_bpelServer.setEndpointReferenceContext(eprContext); _bpelServer.setMessageExchangeContext(new ODEMessageExchangeContext(this)); _bpelServer.setBindingContext(new ODEBindingContext()); _bpelServer.setScheduler(_scheduler); if (_config.isDehydrationEnabled()) { CountLRUDehydrationPolicy dehy = new CountLRUDehydrationPolicy(); + dehy.setProcessMaxAge(_config.getDehydrationMaximumAge()); + dehy.setProcessMaxCount(_config.getDehydrationMaximumCount()); _bpelServer.setDehydrationPolicy(dehy); } - + _bpelServer.setConfigProperties(_config.getProperties()); _bpelServer.init(); - } // end InitBpelServer + _bpelServer.setInstanceThrottledMaximumCount(_config.getInstanceThrottledMaximumCount()); + _bpelServer.setProcessThrottledMaximumCount(_config.getProcessThrottledMaximumCount()); + _bpelServer.setProcessThrottledMaximumSize(_config.getProcessThrottledMaximumSize()); + _bpelServer.setHydrationLazy(_config.isHydrationLazy()); + _bpelServer.setHydrationLazyMinimumSize(_config.getHydrationLazyMinimumSize()); + } public void stop() throws ODEShutdownException { if(_bpelServer != null) { @@ -217,7 +286,16 @@ public class EmbeddedODEServer { } protected Scheduler createScheduler() { - SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_db.getDataSource())); + Properties odeProperties = new Properties(); + // TODO Find correct values for these properties - MJE 22/06/2009 + odeProperties.put("ode.scheduler.queueLength", "100" ); + odeProperties.put("ode.scheduler.immediateInterval", "30000" ); + odeProperties.put("ode.scheduler.nearFutureInterval", "600000" ); + odeProperties.put("ode.scheduler.staleInterval", "100000" ); + + SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(), + new JdbcDelegate(_db.getDataSource()), + odeProperties ); scheduler.setTransactionManager(_txMgr); return scheduler; diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java index 3f2db7b244..8e339812cd 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java @@ -76,4 +76,16 @@ public class ODEBindingContext implements BindingContext { String processName = pid.getLocalPart().substring(0, pid.getLocalPart().lastIndexOf("-")); return new QName(pid.getNamespaceURI(), processName); } + + /** + * Calculate the size of the service that this endpoint references. + * @param epr the endpoint reference for the service + * @returns the size of the service + */ + public long calculateSizeofService(EndpointReference epr) { + // TODO It is not at all clear from the ODE code what "size" means + // eg number of service operations? size of the process in bytes? + // So for the present, return a fixed random value... + return 10000; + } } diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java index a2d4645e8b..6a33f65a53 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java @@ -30,6 +30,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.wsdl.Definition; import javax.xml.namespace.QName; @@ -39,6 +40,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.compiler.BpelC; import org.apache.ode.bpel.evt.BpelEvent.TYPE; import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; import org.apache.ode.bpel.iapi.ProcessConf; import org.apache.ode.bpel.iapi.ProcessState; import org.apache.tuscany.sca.assembly.Reference; @@ -112,21 +114,10 @@ public class TuscanyProcessConfImpl implements ProcessConf { */ public InputStream getCBPInputStream() { //System.out.println("getCBPInputStream called"); - // Find the CBP file - it has the same name as the BPEL process and lives in the same - // directory as the process file - String cbpFileName = null; - try { - String fileName = getRelativePath( getDirectory(), getBPELFile() ); - cbpFileName = fileName.substring(0, fileName.lastIndexOf(".")) + ".cbp"; - } catch (Exception e ) { - // IOException trying to fetch the BPEL file name - if(__log.isDebugEnabled()) { - __log.debug("Unable to calculate the file name for BPEL process: " + - implementation.getProcessDefinition().getName(), e); - return null; - } // end if - } // end try - File cbpFile = new File( getDirectory(), cbpFileName ); + + File cbpFile = getCBPFile(); + if( cbpFile == null ) return null; + if( cbpFile.exists() ) { // Create an InputStream from the cbp file... try { @@ -147,6 +138,29 @@ public class TuscanyProcessConfImpl implements ProcessConf { // TODO - need better exception handling if we can't open the cbp file for any reason return null; } // end getCBPInputStream + + /** + * Gets the File object for the CBP file for this BPEL Process + * @return - the File object for the CBP file + */ + private File getCBPFile() { + // Find the CBP file - it has the same name as the BPEL process and lives in the same + // directory as the process file + String cbpFileName = null; + try { + String fileName = getRelativePath( getDirectory(), getBPELFile() ); + cbpFileName = fileName.substring(0, fileName.lastIndexOf(".")) + ".cbp"; + } catch (Exception e ) { + // IOException trying to fetch the BPEL file name + if(__log.isDebugEnabled()) { + __log.debug("Unable to calculate the file name for BPEL process: " + + implementation.getProcessDefinition().getName(), e); + return null; + } // end if + } // end try + File cbpFile = new File( getDirectory(), cbpFileName ); + return cbpFile; + } // end getCBPFile /** * Return the WSDL Definition for a given PortType @@ -439,4 +453,41 @@ public class TuscanyProcessConfImpl implements ProcessConf { // end of other public APIs //----------------------------------------------------------------------------- + /** + * Get the size in bytes of the CBP file + * @return - this size in bytes of the CBP file, 0 if the file cannot be found + */ + public long getCBPFileSize() { + File cbpFile = getCBPFile(); + if( cbpFile == null ) return 0; + + return cbpFile.length(); + } // end getCBPFileSize + + public Set getCleanupCategories(boolean instanceSucceeded) { + // TODO Is null acceptable or is an empty Set the correct response? + return null; + } + + public Map getEndpointProperties(EndpointReference epr) { + // TODO Is null acceptable or is an empty Map the correct response? + return null; + } + + public Map getProcessProperties() { + // TODO Is null acceptable or is an empty Map the correct response? + return null; + } + + public boolean isCleanupCategoryEnabled(boolean instanceSucceeded, + CLEANUP_CATEGORY category) { + // TODO Currently returns false - should this be changed for some categories? + return false; + } + + public boolean isSharedService(QName serviceName) { + // Tuscany does not share the service + return false; + } + } // end class TuscanyProcessConfImpl diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java index c500f001dc..2a3f75a575 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java @@ -25,6 +25,8 @@ import javax.transaction.TransactionManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.assembly.Endpoint2; +import org.apache.tuscany.sca.assembly.EndpointReference2; import org.apache.tuscany.sca.assembly.Reference; import org.apache.tuscany.sca.assembly.Service; import org.apache.tuscany.sca.databinding.xml.DOMDataBinding; @@ -66,12 +68,21 @@ public class BPELImplementationProvider implements ImplementationProvider { // Configure the service and reference interfaces to use a DOM databinding // as it's what ODE expects - for (Service service: implementation.getServices()) { + for (Service service: component.getServices()) { + //TODO - MJE, 06/06/2009 - we can eventually remove the reset of the service interface + // contract and leave it to the Endpoints only service.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); - } - for (Reference reference: implementation.getReferences()) { + for( Endpoint2 endpoint : service.getEndpoints() ) { + endpoint.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } // end for + } // end for + + for (Reference reference: component.getReferences()) { reference.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); - } + /* for( EndpointReference2 epr : reference.getEndpointReferences() ) { + epr.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } // end for */ + } // end for } diff --git a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java index e07fdbc302..b28669d049 100644 --- a/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java +++ b/java/sca/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java @@ -237,6 +237,10 @@ public class BPELInvoker implements Invoker { * @return */ private Element processResponse(Element response) { - return (Element) DOMUtils.findChildByName(response, new QName("",bpelOperationOutputPart.getName())); + return (Element) DOMUtils.findChildByName(response, new QName("",bpelOperationOutputPart.getName())).getFirstChild(); + + // MJE, 12/06/2009 - changed to return the message without the PART wrapper element, since this element is not + // transmitted in the SOAP messages on the wire + //return (Element) DOMUtils.findChildByName(response, new QName("",bpelOperationOutputPart.getName())); } } -- cgit v1.2.3