summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java')
-rw-r--r--sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java563
1 files changed, 563 insertions, 0 deletions
diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java b/sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java
new file mode 100644
index 0000000000..939c33c21c
--- /dev/null
+++ b/sca-java-2.x/tags/2.0.1-RC1/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.implementation.bpel.ode;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC;
+import org.apache.ode.bpel.engine.BpelServerImpl;
+import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
+import org.apache.ode.bpel.evt.BpelEvent;
+import org.apache.ode.bpel.evt.CorrelationMatchEvent;
+import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
+import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent;
+import org.apache.ode.bpel.iapi.BpelEventListener;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
+import org.apache.ode.il.config.OdeConfigProperties;
+import org.apache.ode.il.dbutil.Database;
+import org.apache.ode.scheduler.simple.JdbcDelegate;
+import org.apache.ode.scheduler.simple.SimpleScheduler;
+import org.apache.ode.utils.GUID;
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.implementation.bpel.BPELImplementation;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.eclipse.core.runtime.FileLocator;
+
+
+
+/**
+ * Embedded ODE process server
+ *
+ * @version $Rev$ $Date$
+ */
+public class EmbeddedODEServer {
+ private static final String TUSCANY_IMPL_BPEL_DBLOCATION = "TUSCANY_IMPL_BPEL_DBLOCATION";
+
+ protected final Log __log = LogFactory.getLog(getClass());
+
+ private boolean _initialized;
+
+ private OdeConfigProperties _config;
+
+ private TransactionManager _txMgr;
+
+ private Database _db;
+
+ private File _workRoot;
+
+ private BpelDAOConnectionFactoryJDBC _daoCF;
+
+ private BpelServerImpl _bpelServer;
+
+ private Scheduler _scheduler;
+
+ protected ExecutorService _executorService;
+
+ private Map<QName, RuntimeComponent> tuscanyRuntimeComponents = new ConcurrentHashMap<QName, RuntimeComponent>();
+
+ private Map<String, Long> mexToProcessMap = new ConcurrentHashMap<String, Long>();
+
+ private Map<Long, Map<String, EndpointReference>> callbackMap = new ConcurrentHashMap<Long, Map<String, EndpointReference>>();
+
+ private final Lock metadataLock = new ReentrantLock();
+ private final Condition mexAdded = metadataLock.newCondition();
+ private final Condition callbackAdded = metadataLock.newCondition();
+
+ public EmbeddedODEServer(TransactionManager txMgr) {
+ _txMgr = txMgr;
+ }
+
+ public void init() throws ODEInitializationException {
+
+ Properties p = System.getProperties();
+ p.put("derby.system.home", "target");
+
+ Properties confProps = new Properties();
+ confProps.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=false)");
+
+ _config = new OdeConfigProperties(confProps, "ode-sca");
+
+ // Setting work root as the directory containing our database
+ try {
+ _workRoot = getDatabaseLocationAsFile();
+ } catch (URISyntaxException e) {
+ throw new ODEInitializationException(e);
+ }
+
+ initTxMgr();
+ initPersistence();
+ initBpelServer();
+
+ try {
+ _bpelServer.start();
+ } catch (Exception ex) {
+ String errmsg = "An error occured during the ODE BPEL server startup.";
+ __log.error(errmsg, ex);
+ throw new ODEInitializationException(errmsg, ex);
+ }
+
+ // Start ODE scheduler
+ _scheduler.start();
+
+ __log.info("ODE BPEL server started.");
+ _initialized = true;
+
+ } // end method init()
+
+ /**
+ * Gets the location of the database used for the ODE BPEL engine as a File object for
+ * the directory containing the database
+ * @return
+ * @throws ODEInitializationException
+ * @throws URISyntaxException
+ */
+ private File getDatabaseLocationAsFile() throws ODEInitializationException, URISyntaxException {
+ File locationFile = null;
+ URL dbLocation = null;
+
+ // An environment variable to set the path to the DB
+ String dbFile = System.getenv(TUSCANY_IMPL_BPEL_DBLOCATION);
+ if( dbFile != null ) {
+ try {
+ locationFile = new File(dbFile).getParentFile();
+ } catch (Exception e ) {
+ System.out.println("Environment variable " + TUSCANY_IMPL_BPEL_DBLOCATION + " has the wrong format: " + dbFile);
+ System.out.println("Exception is: " + e.getClass().toString() + " " + e.getMessage());
+ } // end try
+ } else {
+ dbLocation = getClass().getClassLoader().getResource("jpadb");
+ if (dbLocation == null) {
+ throw new ODEInitializationException("Couldn't find database in the classpath:" +
+ " try setting the " + TUSCANY_IMPL_BPEL_DBLOCATION + " environment variable");
+ }
+ // Handle OSGI bundle case
+ if( dbLocation.getProtocol() == "bundleresource" ) {
+ try {
+ dbLocation = FileLocator.toFileURL( dbLocation );
+ } catch (Exception ce ) {
+ throw new ODEInitializationException("Couldn't find database in the OSGi bundle");
+ } // end try
+ } // end if
+ locationFile = new File(dbLocation.toURI()).getParentFile();
+ } // end if
+
+ return locationFile;
+ } // end method getDatabaseLocationAsFile
+
+ private void initTxMgr() {
+ if(_txMgr == null) {
+ try {
+ GeronimoTxFactory txFactory = new GeronimoTxFactory();
+ _txMgr = txFactory.getTransactionManager();
+ } catch (Exception e) {
+ __log.fatal("Couldn't initialize a transaction manager using Geronimo's transaction factory.", e);
+ throw new ODEInitializationException("Couldn't initialize a transaction manager using " + "Geronimo's transaction factory.", e);
+ }
+ }
+ }
+
+ private void initPersistence() {
+ _db = new Database(_config);
+ _db.setTransactionManager(_txMgr);
+ _db.setWorkRoot(_workRoot);
+
+ try {
+ _db.start();
+ _daoCF = _db.createDaoCF();
+ } catch (Exception ex) {
+ String errmsg = "Error while configuring ODE persistence.";
+ __log.error(errmsg, ex);
+ throw new ODEInitializationException(errmsg, ex);
+ }
+ }
+
+ private void initBpelServer() {
+ if (__log.isDebugEnabled()) {
+ __log.debug("ODE initializing");
+ }
+ ThreadFactory threadFactory = new ThreadFactory() {
+ int threadNumber = 0;
+ public Thread newThread(Runnable r) {
+ threadNumber += 1;
+ Thread t = new Thread(r, "ODEServer-"+threadNumber);
+ t.setDaemon(true);
+ return t;
+ }
+ };
+
+ _executorService = Executors.newCachedThreadPool(threadFactory);
+
+ // executor service for long running bulk transactions
+ ExecutorService _polledRunnableExecutorService = Executors.newCachedThreadPool(new ThreadFactory() {
+ int threadNumber = 0;
+ public Thread newThread(Runnable r) {
+ threadNumber += 1;
+ Thread t = new Thread(r, "PolledRunnable-"+threadNumber);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ _bpelServer = new BpelServerImpl();
+ _scheduler = createScheduler();
+ _scheduler.setJobProcessor(_bpelServer);
+
+ BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor();
+ polledRunnableProcessor.setPolledRunnableExecutorService(_polledRunnableExecutorService);
+ polledRunnableProcessor.setContexts(_bpelServer.getContexts());
+ //_scheduler.setPolledRunnableProcesser(polledRunnableProcessor);
+
+ _bpelServer.setDaoConnectionFactory(_daoCF);
+ _bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler));
+
+ _bpelServer.setEndpointReferenceContext( new ODEEprContext() );
+ _bpelServer.setMessageExchangeContext(new ODEMessageExchangeContext(this));
+ _bpelServer.setBindingContext(new ODEBindingContext());
+ _bpelServer.setScheduler(_scheduler);
+ if (_config.isDehydrationEnabled()) {
+ CountLRUDehydrationPolicy dehy = new CountLRUDehydrationPolicy();
+ dehy.setProcessMaxAge(_config.getDehydrationMaximumAge());
+ dehy.setProcessMaxCount(_config.getDehydrationMaximumCount());
+ _bpelServer.setDehydrationPolicy(dehy);
+ }
+
+ _bpelServer.setConfigProperties(_config.getProperties());
+ _bpelServer.init();
+ _bpelServer.setInstanceThrottledMaximumCount(_config.getInstanceThrottledMaximumCount());
+ _bpelServer.setProcessThrottledMaximumCount(_config.getProcessThrottledMaximumCount());
+ _bpelServer.setProcessThrottledMaximumSize(_config.getProcessThrottledMaximumSize());
+ _bpelServer.setHydrationLazy(_config.isHydrationLazy());
+ _bpelServer.setHydrationLazyMinimumSize(_config.getHydrationLazyMinimumSize());
+
+ // Register event listener on the BPEL server
+ _bpelServer.registerBpelEventListener( new ODEEventListener( this, _bpelServer) );
+ } // end method initBpelLServer
+
+ public void stop() throws ODEShutdownException {
+ if(_bpelServer != null) {
+ try {
+ __log.debug("Stopping BPEL Embedded server");
+ _bpelServer.shutdown();
+ _bpelServer = null;
+ } catch (Exception ex) {
+ __log.debug("Error stopping BPEL server");
+ }
+ }
+
+ if(_scheduler != null) {
+ try {
+ __log.debug("Stopping scheduler");
+ _scheduler.shutdown();
+ _scheduler = null;
+ } catch (Exception ex) {
+ __log.debug("Error stopping scheduler");
+ }
+ }
+
+ if(_daoCF != null) {
+ try {
+ __log.debug("Stopping DAO");
+ _daoCF.shutdown();
+ _daoCF = null;
+ } catch (Exception ex) {
+ __log.debug("Error stopping DAO");
+ }
+ }
+
+ if(_db != null) {
+ try {
+ __log.debug("Stopping DB");
+ _db.shutdown();
+ _db = null;
+ } catch (Exception ex) {
+ __log.debug("Error stopping DB");
+ }
+ }
+
+ if(_txMgr != null) {
+ try {
+ __log.debug("Stopping Transaction Manager");
+ _txMgr = null;
+ } catch (Exception ex) {
+ __log.debug("Error stopping Transaction Manager");
+ }
+ }
+ }
+
+ protected Scheduler createScheduler() {
+ Properties odeProperties = new Properties();
+ // TODO Find correct values for these properties - MJE 22/06/2009
+ odeProperties.put("ode.scheduler.queueLength", "100" );
+ odeProperties.put("ode.scheduler.immediateInterval", "30000" );
+ odeProperties.put("ode.scheduler.nearFutureInterval", "600000" );
+ odeProperties.put("ode.scheduler.staleInterval", "100000" );
+
+ SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),
+ new JdbcDelegate(_db.getDataSource()),
+ odeProperties );
+ scheduler.setExecutorService(_executorService);
+ scheduler.setTransactionManager(_txMgr);
+
+ return scheduler;
+ }
+
+ public boolean isInitialized() {
+ return _initialized;
+ }
+
+ public BpelServerImpl getBpelServer() {
+ return _bpelServer;
+ }
+
+ public Scheduler getScheduler() {
+ return _scheduler;
+ }
+
+ public ExecutorService getExecutor() {
+ return _executorService;
+ }
+
+ /**
+ * Deploy the BPEL process implementation to the ODE Engine
+ * @param d - ODEDeployment structure
+ * @param implementation - the BPEL Implementation
+ * @param component - the SCA component which uses the implementation
+ */
+ public void deploy(ODEDeployment d, BPELImplementation implementation, RuntimeComponent component ) {
+ try {
+ TuscanyProcessConfImpl processConf = new TuscanyProcessConfImpl( implementation, component );
+ _bpelServer.register(processConf);
+ d.setProcessConf(processConf);
+ __log.debug("Completed calling new Process deployment code...");
+ } catch (Exception ex) {
+ String errMsg = ">>> DEPLOY: Unexpected exception during deploy of BPEL. /n Component = "
+ + component.getName()
+ + " implementation = "
+ + implementation.getProcess()
+ + ex.getMessage();
+ __log.debug(errMsg, ex);
+ throw new ODEDeploymentException(errMsg,ex);
+ }
+ }
+
+ /**
+ * Undeploy the BPEL process implementation from the ODE Engine
+ * @param d - ODEDeployment structure
+ */
+ public void undeploy(ODEDeployment d) {
+ TuscanyProcessConfImpl processConf = d.getProcessConf();
+ if( processConf != null ) {
+ processConf.stop();
+ } // end if
+ } // end method undeploy
+
+ public void registerTuscanyRuntimeComponent(QName processName,RuntimeComponent componentContext) {
+ tuscanyRuntimeComponents.put(processName, componentContext);
+ }
+
+ public RuntimeComponent getTuscanyRuntimeComponent(QName processName) {
+ return tuscanyRuntimeComponents.get(processName);
+ }
+
+ /**
+ * Records a connection between a MessageExchange ID and a Process Instance ID
+ * @param mexID
+ * @param processID
+ */
+ public void addMexToProcessIDLink( String mexID, Long processID ) {
+ //System.out.println("Add mapping Mex - ProcessID = " + mexID + " " + processID.toString());
+ if( mexID == null ) {
+ //System.out.println("Mex ID is null !");
+ return;
+ } // end if
+ metadataLock.lock();
+ try {
+ mexToProcessMap.put(mexID, processID);
+ mexAdded.signalAll();
+ return;
+ } catch (Exception e) {
+ return;
+ } finally {
+ metadataLock.unlock();
+ } // end try
+ } // end method addMexToProcessIDLink( mexID, processID )
+
+ /**
+ * Connects from a MessageExchangeID to a Process Instance ID
+ * @param mexID - the MessageExchange ID
+ * @return - a Long which is the Process Instance ID
+ */
+ public Long getProcessIDFromMex( String mexID ) {
+ //System.out.println("Get mapping for Mex: " + mexID);
+ metadataLock.lock();
+ try {
+ Long processID = mexToProcessMap.get(mexID);
+ while( processID == null ) {
+ mexAdded.await();
+ processID = mexToProcessMap.get(mexID);
+ } // end while
+ return processID;
+ } catch (Exception e) {
+ return null;
+ } finally {
+ metadataLock.unlock();
+ } // end try
+
+ } // end method getProcessIDFromMex
+
+ /**
+ * Remove the connection between a Message Exchange ID and a Process Instance ID
+ * @param mexID - the Message Exchange ID
+ */
+ public void removeMexToProcessIDLink( String mexID ) {
+ mexToProcessMap.remove(mexID);
+ } // end method removeMexToProcessIDLink
+
+ /**
+ * Stores the metadata for a Callback
+ * @param processID - Process ID of the BPEL Process Instance for which this callback applies
+ * @param serviceName - the name of the service which has the callback
+ * @param callbackEndpoint - a Tuscany Endpoint which is the target of the callback
+ */
+ public void saveCallbackMetadata( Long processID, String serviceName, EndpointReference callbackEPR ) {
+ //System.out.println("Save callback metadata: ProcessID " + processID.toString() + " service: " + serviceName);
+ metadataLock.lock();
+ try {
+ Map<String, EndpointReference> processMap = callbackMap.get(processID);
+ if( processMap == null ) {
+ processMap = new ConcurrentHashMap<String, EndpointReference>();
+ callbackMap.put(processID, processMap);
+ } // end if
+ // Put the mapping of service name to callback endpoint - note that this overwrites any
+ // previous mapping for the same service name
+ processMap.put(serviceName, callbackEPR);
+ callbackAdded.signalAll();
+ } finally {
+ metadataLock.unlock();
+ } // end try
+ } // end saveCallbackMetadata
+
+ /**
+ * Get the metadata for a Callback, based on a BPEL Process Instance ID and a Service name
+ * @param processID - the BPEL Process Instance ID
+ * @param serviceName - the service name
+ * @return - and Endpoint which is the Callback endpoint for the service for this process instance.
+ * Returns null if there is no callback metadata for this service.
+ */
+ public EndpointReference getCallbackMetadata( Long processID, String serviceName ) {
+ EndpointReference theEPR;
+ //System.out.println("Get callback metadata: ProcessID " + processID.toString() + " service: " + serviceName);
+
+ metadataLock.lock();
+ try {
+ while(true) {
+ Map<String, EndpointReference> processMap = callbackMap.get(processID);
+ theEPR = processMap.get(serviceName);
+ if( theEPR != null ) return theEPR;
+ callbackAdded.await();
+ } // end while
+ } catch (Exception e) {
+ return null;
+ } finally {
+ metadataLock.unlock();
+ } // end try
+ } // end method getCallbackMetadata
+
+ /**
+ * Removes the metadata for a Callback
+ * @param processID - the Process Instance ID of the process instance to which the callback metadata applies
+ * @param serviceName - the service name for the service which has a callback - can be NULL, in which case ALL
+ * callback metadata for the process instance is removed
+ */
+ public void removeCallbackMetadata( Long processID, String serviceName ) {
+
+ if( serviceName == null ) {
+ callbackMap.remove(processID);
+ } else {
+ Map<String, EndpointReference> processMap = callbackMap.get(processID);
+ processMap.remove(serviceName);
+ } // end if
+
+ } // end method removeCallbackMetadata
+
+ private class ODEEventListener implements BpelEventListener {
+
+ private EmbeddedODEServer ODEServer;
+ private BpelServerImpl bpelServer;
+
+ ODEEventListener( EmbeddedODEServer ODEServer, BpelServerImpl bpelServer ) {
+ this.ODEServer = ODEServer;
+ this.bpelServer = bpelServer;
+ } // end constructor
+
+ /**
+ * Method which receives events from the ODE Engine as processing proceeds
+ */
+ public void onEvent(BpelEvent bpelEvent) {
+ if( bpelEvent instanceof ProcessMessageExchangeEvent ||
+ bpelEvent instanceof NewProcessInstanceEvent ||
+ bpelEvent instanceof CorrelationMatchEvent ) {
+ handleProcMexEvent( (ProcessMessageExchangeEvent) bpelEvent );
+ return;
+ } // end if
+
+ } // end method onEvent
+
+ /**
+ * Handle a ProcessMessageExchangeEvent
+ * - the important aspect of this event is that it establishes a connection between a MessageExchange object
+ * and the BPEL Process instance to which it relates.
+ * @param bpelEvent - the ProcessMessageExchangeEvent
+ */
+ private void handleProcMexEvent( ProcessMessageExchangeEvent bpelEvent) {
+ // Extract the message ID and the process instance ID - it is the connection between these
+ // that is vital to know
+ String mexID = bpelEvent.getMessageExchangeId();
+ Long processID = bpelEvent.getProcessInstanceId();
+ ODEServer.addMexToProcessIDLink( mexID, processID );
+ } // end method handleProcMexEvent
+
+ public void shutdown() {
+ // Intentionally left blank
+ }
+
+ public void startup(Properties configProperties) {
+ // Intentionally left blank
+ }
+
+ } // end Class BPELEventListener
+}