From 132aa8a77685ec92bc90c03f987650d275a7b639 Mon Sep 17 00:00:00 2001 From: lresende Date: Mon, 30 Sep 2013 06:59:11 +0000 Subject: 2.0.1 RC1 release tag git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1527464 13f79535-47bb-0310-9956-ffa450edef68 --- .../implementation/bpel/ode/EmbeddedODEServer.java | 563 +++++++++++++++++++++ 1 file changed, 563 insertions(+) create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java (limited to 'sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java') diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java b/sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java new file mode 100644 index 0000000000..939c33c21c --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.transaction.TransactionManager; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC; +import org.apache.ode.bpel.engine.BpelServerImpl; +import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy; +import org.apache.ode.bpel.evt.BpelEvent; +import org.apache.ode.bpel.evt.CorrelationMatchEvent; +import org.apache.ode.bpel.evt.NewProcessInstanceEvent; +import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent; +import org.apache.ode.bpel.iapi.BpelEventListener; +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl; +import org.apache.ode.il.config.OdeConfigProperties; +import org.apache.ode.il.dbutil.Database; +import org.apache.ode.scheduler.simple.JdbcDelegate; +import org.apache.ode.scheduler.simple.SimpleScheduler; +import org.apache.ode.utils.GUID; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.eclipse.core.runtime.FileLocator; + + + +/** + * Embedded ODE process server + * + * @version $Rev$ $Date$ + */ +public class EmbeddedODEServer { + private static final String TUSCANY_IMPL_BPEL_DBLOCATION = "TUSCANY_IMPL_BPEL_DBLOCATION"; + + protected final Log __log = LogFactory.getLog(getClass()); + + private boolean _initialized; + + private OdeConfigProperties _config; + + private TransactionManager _txMgr; + + private Database _db; + + private File _workRoot; + + private BpelDAOConnectionFactoryJDBC _daoCF; + + private BpelServerImpl _bpelServer; + + private Scheduler _scheduler; + + protected ExecutorService _executorService; + + private Map tuscanyRuntimeComponents = new ConcurrentHashMap(); + + private Map mexToProcessMap = new ConcurrentHashMap(); + + private Map> callbackMap = new ConcurrentHashMap>(); + + private final Lock metadataLock = new ReentrantLock(); + private final Condition mexAdded = metadataLock.newCondition(); + private final Condition callbackAdded = metadataLock.newCondition(); + + public EmbeddedODEServer(TransactionManager txMgr) { + _txMgr = txMgr; + } + + public void init() throws ODEInitializationException { + + Properties p = System.getProperties(); + p.put("derby.system.home", "target"); + + Properties confProps = new Properties(); + confProps.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=false)"); + + _config = new OdeConfigProperties(confProps, "ode-sca"); + + // Setting work root as the directory containing our database + try { + _workRoot = getDatabaseLocationAsFile(); + } catch (URISyntaxException e) { + throw new ODEInitializationException(e); + } + + initTxMgr(); + initPersistence(); + initBpelServer(); + + try { + _bpelServer.start(); + } catch (Exception ex) { + String errmsg = "An error occured during the ODE BPEL server startup."; + __log.error(errmsg, ex); + throw new ODEInitializationException(errmsg, ex); + } + + // Start ODE scheduler + _scheduler.start(); + + __log.info("ODE BPEL server started."); + _initialized = true; + + } // end method init() + + /** + * Gets the location of the database used for the ODE BPEL engine as a File object for + * the directory containing the database + * @return + * @throws ODEInitializationException + * @throws URISyntaxException + */ + private File getDatabaseLocationAsFile() throws ODEInitializationException, URISyntaxException { + File locationFile = null; + URL dbLocation = null; + + // An environment variable to set the path to the DB + String dbFile = System.getenv(TUSCANY_IMPL_BPEL_DBLOCATION); + if( dbFile != null ) { + try { + locationFile = new File(dbFile).getParentFile(); + } catch (Exception e ) { + System.out.println("Environment variable " + TUSCANY_IMPL_BPEL_DBLOCATION + " has the wrong format: " + dbFile); + System.out.println("Exception is: " + e.getClass().toString() + " " + e.getMessage()); + } // end try + } else { + dbLocation = getClass().getClassLoader().getResource("jpadb"); + if (dbLocation == null) { + throw new ODEInitializationException("Couldn't find database in the classpath:" + + " try setting the " + TUSCANY_IMPL_BPEL_DBLOCATION + " environment variable"); + } + // Handle OSGI bundle case + if( dbLocation.getProtocol() == "bundleresource" ) { + try { + dbLocation = FileLocator.toFileURL( dbLocation ); + } catch (Exception ce ) { + throw new ODEInitializationException("Couldn't find database in the OSGi bundle"); + } // end try + } // end if + locationFile = new File(dbLocation.toURI()).getParentFile(); + } // end if + + return locationFile; + } // end method getDatabaseLocationAsFile + + private void initTxMgr() { + if(_txMgr == null) { + try { + GeronimoTxFactory txFactory = new GeronimoTxFactory(); + _txMgr = txFactory.getTransactionManager(); + } catch (Exception e) { + __log.fatal("Couldn't initialize a transaction manager using Geronimo's transaction factory.", e); + throw new ODEInitializationException("Couldn't initialize a transaction manager using " + "Geronimo's transaction factory.", e); + } + } + } + + private void initPersistence() { + _db = new Database(_config); + _db.setTransactionManager(_txMgr); + _db.setWorkRoot(_workRoot); + + try { + _db.start(); + _daoCF = _db.createDaoCF(); + } catch (Exception ex) { + String errmsg = "Error while configuring ODE persistence."; + __log.error(errmsg, ex); + throw new ODEInitializationException(errmsg, ex); + } + } + + private void initBpelServer() { + if (__log.isDebugEnabled()) { + __log.debug("ODE initializing"); + } + ThreadFactory threadFactory = new ThreadFactory() { + int threadNumber = 0; + public Thread newThread(Runnable r) { + threadNumber += 1; + Thread t = new Thread(r, "ODEServer-"+threadNumber); + t.setDaemon(true); + return t; + } + }; + + _executorService = Executors.newCachedThreadPool(threadFactory); + + // executor service for long running bulk transactions + ExecutorService _polledRunnableExecutorService = Executors.newCachedThreadPool(new ThreadFactory() { + int threadNumber = 0; + public Thread newThread(Runnable r) { + threadNumber += 1; + Thread t = new Thread(r, "PolledRunnable-"+threadNumber); + t.setDaemon(true); + return t; + } + }); + + _bpelServer = new BpelServerImpl(); + _scheduler = createScheduler(); + _scheduler.setJobProcessor(_bpelServer); + + BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor(); + polledRunnableProcessor.setPolledRunnableExecutorService(_polledRunnableExecutorService); + polledRunnableProcessor.setContexts(_bpelServer.getContexts()); + //_scheduler.setPolledRunnableProcesser(polledRunnableProcessor); + + _bpelServer.setDaoConnectionFactory(_daoCF); + _bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler)); + + _bpelServer.setEndpointReferenceContext( new ODEEprContext() ); + _bpelServer.setMessageExchangeContext(new ODEMessageExchangeContext(this)); + _bpelServer.setBindingContext(new ODEBindingContext()); + _bpelServer.setScheduler(_scheduler); + if (_config.isDehydrationEnabled()) { + CountLRUDehydrationPolicy dehy = new CountLRUDehydrationPolicy(); + dehy.setProcessMaxAge(_config.getDehydrationMaximumAge()); + dehy.setProcessMaxCount(_config.getDehydrationMaximumCount()); + _bpelServer.setDehydrationPolicy(dehy); + } + + _bpelServer.setConfigProperties(_config.getProperties()); + _bpelServer.init(); + _bpelServer.setInstanceThrottledMaximumCount(_config.getInstanceThrottledMaximumCount()); + _bpelServer.setProcessThrottledMaximumCount(_config.getProcessThrottledMaximumCount()); + _bpelServer.setProcessThrottledMaximumSize(_config.getProcessThrottledMaximumSize()); + _bpelServer.setHydrationLazy(_config.isHydrationLazy()); + _bpelServer.setHydrationLazyMinimumSize(_config.getHydrationLazyMinimumSize()); + + // Register event listener on the BPEL server + _bpelServer.registerBpelEventListener( new ODEEventListener( this, _bpelServer) ); + } // end method initBpelLServer + + public void stop() throws ODEShutdownException { + if(_bpelServer != null) { + try { + __log.debug("Stopping BPEL Embedded server"); + _bpelServer.shutdown(); + _bpelServer = null; + } catch (Exception ex) { + __log.debug("Error stopping BPEL server"); + } + } + + if(_scheduler != null) { + try { + __log.debug("Stopping scheduler"); + _scheduler.shutdown(); + _scheduler = null; + } catch (Exception ex) { + __log.debug("Error stopping scheduler"); + } + } + + if(_daoCF != null) { + try { + __log.debug("Stopping DAO"); + _daoCF.shutdown(); + _daoCF = null; + } catch (Exception ex) { + __log.debug("Error stopping DAO"); + } + } + + if(_db != null) { + try { + __log.debug("Stopping DB"); + _db.shutdown(); + _db = null; + } catch (Exception ex) { + __log.debug("Error stopping DB"); + } + } + + if(_txMgr != null) { + try { + __log.debug("Stopping Transaction Manager"); + _txMgr = null; + } catch (Exception ex) { + __log.debug("Error stopping Transaction Manager"); + } + } + } + + protected Scheduler createScheduler() { + Properties odeProperties = new Properties(); + // TODO Find correct values for these properties - MJE 22/06/2009 + odeProperties.put("ode.scheduler.queueLength", "100" ); + odeProperties.put("ode.scheduler.immediateInterval", "30000" ); + odeProperties.put("ode.scheduler.nearFutureInterval", "600000" ); + odeProperties.put("ode.scheduler.staleInterval", "100000" ); + + SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(), + new JdbcDelegate(_db.getDataSource()), + odeProperties ); + scheduler.setExecutorService(_executorService); + scheduler.setTransactionManager(_txMgr); + + return scheduler; + } + + public boolean isInitialized() { + return _initialized; + } + + public BpelServerImpl getBpelServer() { + return _bpelServer; + } + + public Scheduler getScheduler() { + return _scheduler; + } + + public ExecutorService getExecutor() { + return _executorService; + } + + /** + * Deploy the BPEL process implementation to the ODE Engine + * @param d - ODEDeployment structure + * @param implementation - the BPEL Implementation + * @param component - the SCA component which uses the implementation + */ + public void deploy(ODEDeployment d, BPELImplementation implementation, RuntimeComponent component ) { + try { + TuscanyProcessConfImpl processConf = new TuscanyProcessConfImpl( implementation, component ); + _bpelServer.register(processConf); + d.setProcessConf(processConf); + __log.debug("Completed calling new Process deployment code..."); + } catch (Exception ex) { + String errMsg = ">>> DEPLOY: Unexpected exception during deploy of BPEL. /n Component = " + + component.getName() + + " implementation = " + + implementation.getProcess() + + ex.getMessage(); + __log.debug(errMsg, ex); + throw new ODEDeploymentException(errMsg,ex); + } + } + + /** + * Undeploy the BPEL process implementation from the ODE Engine + * @param d - ODEDeployment structure + */ + public void undeploy(ODEDeployment d) { + TuscanyProcessConfImpl processConf = d.getProcessConf(); + if( processConf != null ) { + processConf.stop(); + } // end if + } // end method undeploy + + public void registerTuscanyRuntimeComponent(QName processName,RuntimeComponent componentContext) { + tuscanyRuntimeComponents.put(processName, componentContext); + } + + public RuntimeComponent getTuscanyRuntimeComponent(QName processName) { + return tuscanyRuntimeComponents.get(processName); + } + + /** + * Records a connection between a MessageExchange ID and a Process Instance ID + * @param mexID + * @param processID + */ + public void addMexToProcessIDLink( String mexID, Long processID ) { + //System.out.println("Add mapping Mex - ProcessID = " + mexID + " " + processID.toString()); + if( mexID == null ) { + //System.out.println("Mex ID is null !"); + return; + } // end if + metadataLock.lock(); + try { + mexToProcessMap.put(mexID, processID); + mexAdded.signalAll(); + return; + } catch (Exception e) { + return; + } finally { + metadataLock.unlock(); + } // end try + } // end method addMexToProcessIDLink( mexID, processID ) + + /** + * Connects from a MessageExchangeID to a Process Instance ID + * @param mexID - the MessageExchange ID + * @return - a Long which is the Process Instance ID + */ + public Long getProcessIDFromMex( String mexID ) { + //System.out.println("Get mapping for Mex: " + mexID); + metadataLock.lock(); + try { + Long processID = mexToProcessMap.get(mexID); + while( processID == null ) { + mexAdded.await(); + processID = mexToProcessMap.get(mexID); + } // end while + return processID; + } catch (Exception e) { + return null; + } finally { + metadataLock.unlock(); + } // end try + + } // end method getProcessIDFromMex + + /** + * Remove the connection between a Message Exchange ID and a Process Instance ID + * @param mexID - the Message Exchange ID + */ + public void removeMexToProcessIDLink( String mexID ) { + mexToProcessMap.remove(mexID); + } // end method removeMexToProcessIDLink + + /** + * Stores the metadata for a Callback + * @param processID - Process ID of the BPEL Process Instance for which this callback applies + * @param serviceName - the name of the service which has the callback + * @param callbackEndpoint - a Tuscany Endpoint which is the target of the callback + */ + public void saveCallbackMetadata( Long processID, String serviceName, EndpointReference callbackEPR ) { + //System.out.println("Save callback metadata: ProcessID " + processID.toString() + " service: " + serviceName); + metadataLock.lock(); + try { + Map processMap = callbackMap.get(processID); + if( processMap == null ) { + processMap = new ConcurrentHashMap(); + callbackMap.put(processID, processMap); + } // end if + // Put the mapping of service name to callback endpoint - note that this overwrites any + // previous mapping for the same service name + processMap.put(serviceName, callbackEPR); + callbackAdded.signalAll(); + } finally { + metadataLock.unlock(); + } // end try + } // end saveCallbackMetadata + + /** + * Get the metadata for a Callback, based on a BPEL Process Instance ID and a Service name + * @param processID - the BPEL Process Instance ID + * @param serviceName - the service name + * @return - and Endpoint which is the Callback endpoint for the service for this process instance. + * Returns null if there is no callback metadata for this service. + */ + public EndpointReference getCallbackMetadata( Long processID, String serviceName ) { + EndpointReference theEPR; + //System.out.println("Get callback metadata: ProcessID " + processID.toString() + " service: " + serviceName); + + metadataLock.lock(); + try { + while(true) { + Map processMap = callbackMap.get(processID); + theEPR = processMap.get(serviceName); + if( theEPR != null ) return theEPR; + callbackAdded.await(); + } // end while + } catch (Exception e) { + return null; + } finally { + metadataLock.unlock(); + } // end try + } // end method getCallbackMetadata + + /** + * Removes the metadata for a Callback + * @param processID - the Process Instance ID of the process instance to which the callback metadata applies + * @param serviceName - the service name for the service which has a callback - can be NULL, in which case ALL + * callback metadata for the process instance is removed + */ + public void removeCallbackMetadata( Long processID, String serviceName ) { + + if( serviceName == null ) { + callbackMap.remove(processID); + } else { + Map processMap = callbackMap.get(processID); + processMap.remove(serviceName); + } // end if + + } // end method removeCallbackMetadata + + private class ODEEventListener implements BpelEventListener { + + private EmbeddedODEServer ODEServer; + private BpelServerImpl bpelServer; + + ODEEventListener( EmbeddedODEServer ODEServer, BpelServerImpl bpelServer ) { + this.ODEServer = ODEServer; + this.bpelServer = bpelServer; + } // end constructor + + /** + * Method which receives events from the ODE Engine as processing proceeds + */ + public void onEvent(BpelEvent bpelEvent) { + if( bpelEvent instanceof ProcessMessageExchangeEvent || + bpelEvent instanceof NewProcessInstanceEvent || + bpelEvent instanceof CorrelationMatchEvent ) { + handleProcMexEvent( (ProcessMessageExchangeEvent) bpelEvent ); + return; + } // end if + + } // end method onEvent + + /** + * Handle a ProcessMessageExchangeEvent + * - the important aspect of this event is that it establishes a connection between a MessageExchange object + * and the BPEL Process instance to which it relates. + * @param bpelEvent - the ProcessMessageExchangeEvent + */ + private void handleProcMexEvent( ProcessMessageExchangeEvent bpelEvent) { + // Extract the message ID and the process instance ID - it is the connection between these + // that is vital to know + String mexID = bpelEvent.getMessageExchangeId(); + Long processID = bpelEvent.getProcessInstanceId(); + ODEServer.addMexToProcessIDLink( mexID, processID ); + } // end method handleProcMexEvent + + public void shutdown() { + // Intentionally left blank + } + + public void startup(Properties configProperties) { + // Intentionally left blank + } + + } // end Class BPELEventListener +} -- cgit v1.2.3