diff options
Diffstat (limited to 'sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main')
21 files changed, 3473 insertions, 0 deletions
diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/BPELODEDeployFile.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/BPELODEDeployFile.java new file mode 100644 index 0000000000..fe1e2d620b --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/BPELODEDeployFile.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; +import java.util.List; + +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.assembly.ComponentType; +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; + +/** + * A class that handles the deploy.xml file required for each BPEL process by the ODE runtime + * @author Mike Edwards + * + * An explanation of the structure of the ODE deploy file: + * + * <deploy xmlns="http://www.apache.org/ode/schemas/dd/2007/03" + * xmlns:tns="http://helloworld" + * xmlns:tus="http://tuscany.apache.org/xmlns/sca/1.1"> + * + * <process name="tns:HelloWorld"> + * <active>true</active> + * <provide partnerLink="helloPartnerLink"> + * <service name="tus:helloPartnerLink" port="HelloWorld"/> + * </provide> + * <invoke partnerLink="greetingsPartnerLink"> + * <service name="tus:greetingsPartnerLink" port="Greetings"/> + * </invoke> + * </process> + * </deploy> + * + * For SCA purposes: + * + * a) Each partner link in the BPEL process is declared using either a <provide.../> + * (for a service) or using a <invoke.../> (for a reference). + * + * b) Each <provide/> and <invoke/> must use the partnerLink name, as declared in the + * BPEL process. + * + * c) The <provide/> and <invoke/> elements each have a single child <service/> element. + * The <service/> elements have name and port attributes. The NAME attribute MUST be set + * to the same name as the partnerLink and MUST be prefixed by a prefix which references + * the namespace "http://tuscany.apache.org" ("tus" in the example above). + * The port attribute can be set to any name (it must be present but it is not actually + * used for anything significant). + * + * When SCA loads a BPEL process to the ODE server, this file is read by the ODE server to + * characterize the process. When SCA interacts with ODE at later points - either when a + * service is being invoked or the process invokes a reference - it is the service @name + * attribute that identifies the service or reference involved. + * + * @version + */ +public class BPELODEDeployFile { + private final Log __log = LogFactory.getLog(getClass()); + + static final String DEPLOY_ELEMENT_START = "<deploy xmlns=\"http://www.apache.org/ode/schemas/dd/2007/03\""; + static final String DEPLOY_ENDELEMENT = "</deploy>"; + static final String PROCESS_NAMESPACE_DECL = "xmlns:tns="; + static final String SERVICE_NAMESPACE = "xmlns:tus=\"http://tuscany.apache.org\""; + static final String PROCESS_ELEMENT_START = "<process name=\"tns:"; + static final String PROCESS_ELEMENT_END = "\">"; + static final String PROCESS_ENDELEMENT = "</process>"; + static final String ACTIVE_ELEMENT = "<active>true</active>"; + static final String PROVIDE_ELEMENT_START = "<provide partnerLink=\""; + static final String PROVIDE_ELEMENT_END = "\">"; + static final String PROVIDE_ENDELEMENT = "</provide>"; + static final String SERVICE_ELEMENT_START = "<service name=\"tus:"; + static final String SERVICE_ELEMENT_PORT = "\" port=\""; + static final String SERVICE_ELEMENT_END = "Port\"/>"; + static final String INVOKE_ELEMENT_START = "<invoke partnerLink=\""; + static final String INVOKE_ELEMENT_END = "\">"; + static final String INVOKE_ENDELEMENT = "</invoke>"; + + static final String DEPLOY_FILENAME = "deploy.xml"; + + private BPELImplementation implementation; + + /** + * Constructor - requires a BPELImplementation as a parameter + * The ODE deploy.xml file is for this supplied BPELImplementation + * @param theImplementation + */ + public BPELODEDeployFile( BPELImplementation theImplementation ) { + + implementation = theImplementation; + + } // end BPELODEDeployFile constructor + + /** + * Writes the deploy file into the same directory as the BPEL process file, with the name + * "deploy.xml" + */ + public void writeDeployfile() throws IOException { + + File theDirectory = getDirectory(); + + File deployFile = new File( theDirectory, DEPLOY_FILENAME ); + new FileOutputStream( deployFile ); + //if( !deployFile.canWrite() ) throw new IOException( "Unable to write to deploy file" + + // deployFile.getPath() ); + + // Create a stream for the data and write the data to the file + PrintStream theStream = new PrintStream( new FileOutputStream( deployFile ) ); + try { + constructDeployXML( theStream ); + if( theStream.checkError() ) throw new IOException(); + } catch (Exception e) { + throw new IOException( "Unable to write data to deploy file" + + deployFile.getPath() ); + } finally { + theStream.close(); + } // end try + + } // end writeDeployFile + + /** + * Creates the deploy.xml data and writes it to a supplied PrintStream + * @param stream + */ + public void constructDeployXML( PrintStream stream ) { + + // <deploy + namespace... + stream.println( DEPLOY_ELEMENT_START ); + // namespace of the BPEL process + QName process = implementation.getProcess(); + String processNamespace = process.getNamespaceURI(); + stream.println( PROCESS_NAMESPACE_DECL + "\"" + processNamespace + "\"" ); + // namespace for the service name elements + stream.println( SERVICE_NAMESPACE + ">" ); + + // <process> element + stream.println( PROCESS_ELEMENT_START + process.getLocalPart() + + PROCESS_ELEMENT_END ); + + // <active/> element + stream.println( ACTIVE_ELEMENT ); + + ComponentType componentType = implementation.getComponentType(); + List<Service> theServices = componentType.getServices(); + // Loop over the <provide/> elements - one per service + for ( Service service : theServices ) { + String serviceName = service.getName(); + // Provide element... + stream.println( PROVIDE_ELEMENT_START + serviceName + PROVIDE_ELEMENT_END ); + // Child service element... + stream.println( SERVICE_ELEMENT_START + serviceName + + SERVICE_ELEMENT_PORT + serviceName + SERVICE_ELEMENT_END ); + stream.println( PROVIDE_ENDELEMENT ); + } // end for + + // Loop over the <invoke/> elements - one per reference + List<Reference> theReferences = componentType.getReferences(); + for ( Reference reference : theReferences ) { + String referenceName = reference.getName(); + stream.println( INVOKE_ELEMENT_START + referenceName + INVOKE_ELEMENT_END ); + // Child service element... + stream.println( SERVICE_ELEMENT_START + referenceName + + SERVICE_ELEMENT_PORT + referenceName + SERVICE_ELEMENT_END ); + stream.println( INVOKE_ENDELEMENT ); + + } // end for + + // </process> element + stream.println( PROCESS_ENDELEMENT ); + + // </deploy> + stream.println( DEPLOY_ENDELEMENT ); + + } // end constructDeployXML + + /** + * Gets the directory containing the BPEL process + * @return + */ + private File getDirectory() { + File theDir = getBPELFile().getParentFile(); + return theDir; + } // end getDirectory + + /** + * Gets the File containing the BPEL process definition + * @return - the File object containing the BPEL process + */ + private File getBPELFile() { + try { + String location = this.implementation.getProcessDefinition().getLocation(); + URI locationURI; + if (location.indexOf('%') != -1) { + locationURI = URI.create(location); + } else { + locationURI = new URI(null, location, null); + } + + File theProcess = new File(locationURI); + return theProcess; + } catch( Exception e ) { + if(__log.isDebugEnabled()) { + __log.debug("Exception converting BPEL file URL to an URI: " + e ); + } + } // end try + return null; + } // end getBPELFile + + + +} // end class BPELODEDeployFile diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/DeploymentWorkspace.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/DeploymentWorkspace.java new file mode 100644 index 0000000000..8553574aa9 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/DeploymentWorkspace.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; + +/** + * + */ +public class DeploymentWorkspace { + private static final Logger logger = Logger.getLogger(DeploymentWorkspace.class.getName()); + + static final String DEPLOY_FILENAME = "deploy.xml"; + + private BPELImplementation implementation; + private File workingDir; + private File bpelFile; + + /** + * @param implementation + */ + public DeploymentWorkspace(BPELImplementation implementation) { + super(); + this.implementation = implementation; + this.workingDir = createWorkingDirectory(); + } + + /** + * @param implementation + */ + public DeploymentWorkspace(BPELImplementation implementation, File workingDir) { + super(); + this.implementation = implementation; + this.workingDir = workingDir; + if (this.workingDir == null) { + this.workingDir = createWorkingDirectory(); + } + } + + public File getCBPFile() throws IOException { + String name = getBPELFile().getName(); + int index = name.lastIndexOf('.'); + if (index != -1) { + name = name.substring(0, index); + } + return new File(workingDir, name + ".cbp"); + } + + public synchronized File getBPELFile() throws IOException { + if (bpelFile != null) { + String location = implementation.getProcessDefinition().getLocation(); + String fileName = implementation.getProcessDefinition().getURI(); + File file = new File(workingDir, fileName); + if (file.isFile()) { + bpelFile = file; + return file; + } + URL url = new URL(location); + this.bpelFile = copy(url, workingDir, fileName); + } + return bpelFile; + } + + /** + * Escape the space in URL string + * @param uri + * @return + */ + public static URI createURI(String uri) { + if (uri == null) { + return null; + } + if (uri.indexOf('%') != -1) { + // Avoid double-escaping + return URI.create(uri); + } + int index = uri.indexOf(':'); + String scheme = null; + String ssp = uri; + if (index != -1) { + scheme = uri.substring(0, index); + ssp = uri.substring(index + 1); + } + try { + return new URI(scheme, ssp, null); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Gets the File containing the BPEL process definition + * @return - the File object containing the BPEL process + */ + private static File getContainer(String location) { + try { + File theProcess = null; + URI locationURI = createURI(location); + String protocol = locationURI.getScheme(); + if ("file".equals(protocol)) { + theProcess = new File(locationURI); + } else if ("jar".equals(protocol) || "wsjar".equals(protocol) || "zip".equals(protocol)) { + String uri = locationURI.toString(); + // jar contribution + uri = uri.substring(protocol.length() + 1, uri.lastIndexOf("!/")); + locationURI = createURI(uri); + if ("file".equals(locationURI.getScheme())) { + theProcess = new File(locationURI); + } + } + return theProcess; + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception converting BPEL file URL to an URI: " + location, e); + } // end try + return null; + } // end getBPELFile + + /** + * Gets the directory containing the BPEL process + * @return + */ + static File getDirectory(String location) { + File file = getContainer(location); + if (file == null) { + return null; + } + File theDir = file.getParentFile(); + return theDir; + } // end getDirectory + + public File getDeployFile() { + return new File(workingDir, DEPLOY_FILENAME); + } + + private static String getSystemProperty(final String name) { + return AccessController.doPrivileged(new PrivilegedAction<String>() { + public String run() { + return System.getProperty(name); + } + }); + } + + private File createWorkingDirectory() { + String tmpDir = getSystemProperty("java.io.tmpdir"); + File root = new File(tmpDir); + // Add user name as the prefix. For multiple users on the same Lunix, + // there will be permission issue if one user creates the .tuscany folder + // first under /tmp with no write permission for others. + String userName = getSystemProperty("user.name"); + if (userName != null) { + root = new File(root, userName); + } + root = new File(root, ".tuscany/bpel/" + UUID.randomUUID().toString()); + if (logger.isLoggable(Level.FINE)) { + logger.fine("BPEL working directory: " + root); + } + return root; + } + + public static File copy(URL url, File directory, String fileName) throws IOException { + File file = new File(directory, fileName); + file.getParentFile().mkdirs(); + FileOutputStream os = new FileOutputStream(file); + URLConnection connection = url.openConnection(); + connection.setUseCaches(false); + InputStream is = connection.getInputStream(); + byte[] buf = new byte[8192]; + while (true) { + int size = is.read(buf); + if (size < 0) + break; + os.write(buf, 0, size); + } + is.close(); + os.close(); + return file; + } + + private static boolean deleteFiles(File file) { + boolean result = true; + if (file.isFile()) { + if (!file.delete()) { + result = false; + } + } else if (file.isDirectory()) { + for (File f : file.listFiles()) { + if (!deleteFiles(f)) { + result = false; + } + } + if (!file.delete()) { + result = false; + } + } + return result; + } + + public boolean delete() { + return deleteFiles(workingDir); + } + +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java new file mode 100644 index 0000000000..939c33c21c --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.transaction.TransactionManager; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC; +import org.apache.ode.bpel.engine.BpelServerImpl; +import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy; +import org.apache.ode.bpel.evt.BpelEvent; +import org.apache.ode.bpel.evt.CorrelationMatchEvent; +import org.apache.ode.bpel.evt.NewProcessInstanceEvent; +import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent; +import org.apache.ode.bpel.iapi.BpelEventListener; +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl; +import org.apache.ode.il.config.OdeConfigProperties; +import org.apache.ode.il.dbutil.Database; +import org.apache.ode.scheduler.simple.JdbcDelegate; +import org.apache.ode.scheduler.simple.SimpleScheduler; +import org.apache.ode.utils.GUID; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.eclipse.core.runtime.FileLocator; + + + +/** + * Embedded ODE process server + * + * @version $Rev$ $Date$ + */ +public class EmbeddedODEServer { + private static final String TUSCANY_IMPL_BPEL_DBLOCATION = "TUSCANY_IMPL_BPEL_DBLOCATION"; + + protected final Log __log = LogFactory.getLog(getClass()); + + private boolean _initialized; + + private OdeConfigProperties _config; + + private TransactionManager _txMgr; + + private Database _db; + + private File _workRoot; + + private BpelDAOConnectionFactoryJDBC _daoCF; + + private BpelServerImpl _bpelServer; + + private Scheduler _scheduler; + + protected ExecutorService _executorService; + + private Map<QName, RuntimeComponent> tuscanyRuntimeComponents = new ConcurrentHashMap<QName, RuntimeComponent>(); + + private Map<String, Long> mexToProcessMap = new ConcurrentHashMap<String, Long>(); + + private Map<Long, Map<String, EndpointReference>> callbackMap = new ConcurrentHashMap<Long, Map<String, EndpointReference>>(); + + private final Lock metadataLock = new ReentrantLock(); + private final Condition mexAdded = metadataLock.newCondition(); + private final Condition callbackAdded = metadataLock.newCondition(); + + public EmbeddedODEServer(TransactionManager txMgr) { + _txMgr = txMgr; + } + + public void init() throws ODEInitializationException { + + Properties p = System.getProperties(); + p.put("derby.system.home", "target"); + + Properties confProps = new Properties(); + confProps.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=false)"); + + _config = new OdeConfigProperties(confProps, "ode-sca"); + + // Setting work root as the directory containing our database + try { + _workRoot = getDatabaseLocationAsFile(); + } catch (URISyntaxException e) { + throw new ODEInitializationException(e); + } + + initTxMgr(); + initPersistence(); + initBpelServer(); + + try { + _bpelServer.start(); + } catch (Exception ex) { + String errmsg = "An error occured during the ODE BPEL server startup."; + __log.error(errmsg, ex); + throw new ODEInitializationException(errmsg, ex); + } + + // Start ODE scheduler + _scheduler.start(); + + __log.info("ODE BPEL server started."); + _initialized = true; + + } // end method init() + + /** + * Gets the location of the database used for the ODE BPEL engine as a File object for + * the directory containing the database + * @return + * @throws ODEInitializationException + * @throws URISyntaxException + */ + private File getDatabaseLocationAsFile() throws ODEInitializationException, URISyntaxException { + File locationFile = null; + URL dbLocation = null; + + // An environment variable to set the path to the DB + String dbFile = System.getenv(TUSCANY_IMPL_BPEL_DBLOCATION); + if( dbFile != null ) { + try { + locationFile = new File(dbFile).getParentFile(); + } catch (Exception e ) { + System.out.println("Environment variable " + TUSCANY_IMPL_BPEL_DBLOCATION + " has the wrong format: " + dbFile); + System.out.println("Exception is: " + e.getClass().toString() + " " + e.getMessage()); + } // end try + } else { + dbLocation = getClass().getClassLoader().getResource("jpadb"); + if (dbLocation == null) { + throw new ODEInitializationException("Couldn't find database in the classpath:" + + " try setting the " + TUSCANY_IMPL_BPEL_DBLOCATION + " environment variable"); + } + // Handle OSGI bundle case + if( dbLocation.getProtocol() == "bundleresource" ) { + try { + dbLocation = FileLocator.toFileURL( dbLocation ); + } catch (Exception ce ) { + throw new ODEInitializationException("Couldn't find database in the OSGi bundle"); + } // end try + } // end if + locationFile = new File(dbLocation.toURI()).getParentFile(); + } // end if + + return locationFile; + } // end method getDatabaseLocationAsFile + + private void initTxMgr() { + if(_txMgr == null) { + try { + GeronimoTxFactory txFactory = new GeronimoTxFactory(); + _txMgr = txFactory.getTransactionManager(); + } catch (Exception e) { + __log.fatal("Couldn't initialize a transaction manager using Geronimo's transaction factory.", e); + throw new ODEInitializationException("Couldn't initialize a transaction manager using " + "Geronimo's transaction factory.", e); + } + } + } + + private void initPersistence() { + _db = new Database(_config); + _db.setTransactionManager(_txMgr); + _db.setWorkRoot(_workRoot); + + try { + _db.start(); + _daoCF = _db.createDaoCF(); + } catch (Exception ex) { + String errmsg = "Error while configuring ODE persistence."; + __log.error(errmsg, ex); + throw new ODEInitializationException(errmsg, ex); + } + } + + private void initBpelServer() { + if (__log.isDebugEnabled()) { + __log.debug("ODE initializing"); + } + ThreadFactory threadFactory = new ThreadFactory() { + int threadNumber = 0; + public Thread newThread(Runnable r) { + threadNumber += 1; + Thread t = new Thread(r, "ODEServer-"+threadNumber); + t.setDaemon(true); + return t; + } + }; + + _executorService = Executors.newCachedThreadPool(threadFactory); + + // executor service for long running bulk transactions + ExecutorService _polledRunnableExecutorService = Executors.newCachedThreadPool(new ThreadFactory() { + int threadNumber = 0; + public Thread newThread(Runnable r) { + threadNumber += 1; + Thread t = new Thread(r, "PolledRunnable-"+threadNumber); + t.setDaemon(true); + return t; + } + }); + + _bpelServer = new BpelServerImpl(); + _scheduler = createScheduler(); + _scheduler.setJobProcessor(_bpelServer); + + BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor(); + polledRunnableProcessor.setPolledRunnableExecutorService(_polledRunnableExecutorService); + polledRunnableProcessor.setContexts(_bpelServer.getContexts()); + //_scheduler.setPolledRunnableProcesser(polledRunnableProcessor); + + _bpelServer.setDaoConnectionFactory(_daoCF); + _bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler)); + + _bpelServer.setEndpointReferenceContext( new ODEEprContext() ); + _bpelServer.setMessageExchangeContext(new ODEMessageExchangeContext(this)); + _bpelServer.setBindingContext(new ODEBindingContext()); + _bpelServer.setScheduler(_scheduler); + if (_config.isDehydrationEnabled()) { + CountLRUDehydrationPolicy dehy = new CountLRUDehydrationPolicy(); + dehy.setProcessMaxAge(_config.getDehydrationMaximumAge()); + dehy.setProcessMaxCount(_config.getDehydrationMaximumCount()); + _bpelServer.setDehydrationPolicy(dehy); + } + + _bpelServer.setConfigProperties(_config.getProperties()); + _bpelServer.init(); + _bpelServer.setInstanceThrottledMaximumCount(_config.getInstanceThrottledMaximumCount()); + _bpelServer.setProcessThrottledMaximumCount(_config.getProcessThrottledMaximumCount()); + _bpelServer.setProcessThrottledMaximumSize(_config.getProcessThrottledMaximumSize()); + _bpelServer.setHydrationLazy(_config.isHydrationLazy()); + _bpelServer.setHydrationLazyMinimumSize(_config.getHydrationLazyMinimumSize()); + + // Register event listener on the BPEL server + _bpelServer.registerBpelEventListener( new ODEEventListener( this, _bpelServer) ); + } // end method initBpelLServer + + public void stop() throws ODEShutdownException { + if(_bpelServer != null) { + try { + __log.debug("Stopping BPEL Embedded server"); + _bpelServer.shutdown(); + _bpelServer = null; + } catch (Exception ex) { + __log.debug("Error stopping BPEL server"); + } + } + + if(_scheduler != null) { + try { + __log.debug("Stopping scheduler"); + _scheduler.shutdown(); + _scheduler = null; + } catch (Exception ex) { + __log.debug("Error stopping scheduler"); + } + } + + if(_daoCF != null) { + try { + __log.debug("Stopping DAO"); + _daoCF.shutdown(); + _daoCF = null; + } catch (Exception ex) { + __log.debug("Error stopping DAO"); + } + } + + if(_db != null) { + try { + __log.debug("Stopping DB"); + _db.shutdown(); + _db = null; + } catch (Exception ex) { + __log.debug("Error stopping DB"); + } + } + + if(_txMgr != null) { + try { + __log.debug("Stopping Transaction Manager"); + _txMgr = null; + } catch (Exception ex) { + __log.debug("Error stopping Transaction Manager"); + } + } + } + + protected Scheduler createScheduler() { + Properties odeProperties = new Properties(); + // TODO Find correct values for these properties - MJE 22/06/2009 + odeProperties.put("ode.scheduler.queueLength", "100" ); + odeProperties.put("ode.scheduler.immediateInterval", "30000" ); + odeProperties.put("ode.scheduler.nearFutureInterval", "600000" ); + odeProperties.put("ode.scheduler.staleInterval", "100000" ); + + SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(), + new JdbcDelegate(_db.getDataSource()), + odeProperties ); + scheduler.setExecutorService(_executorService); + scheduler.setTransactionManager(_txMgr); + + return scheduler; + } + + public boolean isInitialized() { + return _initialized; + } + + public BpelServerImpl getBpelServer() { + return _bpelServer; + } + + public Scheduler getScheduler() { + return _scheduler; + } + + public ExecutorService getExecutor() { + return _executorService; + } + + /** + * Deploy the BPEL process implementation to the ODE Engine + * @param d - ODEDeployment structure + * @param implementation - the BPEL Implementation + * @param component - the SCA component which uses the implementation + */ + public void deploy(ODEDeployment d, BPELImplementation implementation, RuntimeComponent component ) { + try { + TuscanyProcessConfImpl processConf = new TuscanyProcessConfImpl( implementation, component ); + _bpelServer.register(processConf); + d.setProcessConf(processConf); + __log.debug("Completed calling new Process deployment code..."); + } catch (Exception ex) { + String errMsg = ">>> DEPLOY: Unexpected exception during deploy of BPEL. /n Component = " + + component.getName() + + " implementation = " + + implementation.getProcess() + + ex.getMessage(); + __log.debug(errMsg, ex); + throw new ODEDeploymentException(errMsg,ex); + } + } + + /** + * Undeploy the BPEL process implementation from the ODE Engine + * @param d - ODEDeployment structure + */ + public void undeploy(ODEDeployment d) { + TuscanyProcessConfImpl processConf = d.getProcessConf(); + if( processConf != null ) { + processConf.stop(); + } // end if + } // end method undeploy + + public void registerTuscanyRuntimeComponent(QName processName,RuntimeComponent componentContext) { + tuscanyRuntimeComponents.put(processName, componentContext); + } + + public RuntimeComponent getTuscanyRuntimeComponent(QName processName) { + return tuscanyRuntimeComponents.get(processName); + } + + /** + * Records a connection between a MessageExchange ID and a Process Instance ID + * @param mexID + * @param processID + */ + public void addMexToProcessIDLink( String mexID, Long processID ) { + //System.out.println("Add mapping Mex - ProcessID = " + mexID + " " + processID.toString()); + if( mexID == null ) { + //System.out.println("Mex ID is null !"); + return; + } // end if + metadataLock.lock(); + try { + mexToProcessMap.put(mexID, processID); + mexAdded.signalAll(); + return; + } catch (Exception e) { + return; + } finally { + metadataLock.unlock(); + } // end try + } // end method addMexToProcessIDLink( mexID, processID ) + + /** + * Connects from a MessageExchangeID to a Process Instance ID + * @param mexID - the MessageExchange ID + * @return - a Long which is the Process Instance ID + */ + public Long getProcessIDFromMex( String mexID ) { + //System.out.println("Get mapping for Mex: " + mexID); + metadataLock.lock(); + try { + Long processID = mexToProcessMap.get(mexID); + while( processID == null ) { + mexAdded.await(); + processID = mexToProcessMap.get(mexID); + } // end while + return processID; + } catch (Exception e) { + return null; + } finally { + metadataLock.unlock(); + } // end try + + } // end method getProcessIDFromMex + + /** + * Remove the connection between a Message Exchange ID and a Process Instance ID + * @param mexID - the Message Exchange ID + */ + public void removeMexToProcessIDLink( String mexID ) { + mexToProcessMap.remove(mexID); + } // end method removeMexToProcessIDLink + + /** + * Stores the metadata for a Callback + * @param processID - Process ID of the BPEL Process Instance for which this callback applies + * @param serviceName - the name of the service which has the callback + * @param callbackEndpoint - a Tuscany Endpoint which is the target of the callback + */ + public void saveCallbackMetadata( Long processID, String serviceName, EndpointReference callbackEPR ) { + //System.out.println("Save callback metadata: ProcessID " + processID.toString() + " service: " + serviceName); + metadataLock.lock(); + try { + Map<String, EndpointReference> processMap = callbackMap.get(processID); + if( processMap == null ) { + processMap = new ConcurrentHashMap<String, EndpointReference>(); + callbackMap.put(processID, processMap); + } // end if + // Put the mapping of service name to callback endpoint - note that this overwrites any + // previous mapping for the same service name + processMap.put(serviceName, callbackEPR); + callbackAdded.signalAll(); + } finally { + metadataLock.unlock(); + } // end try + } // end saveCallbackMetadata + + /** + * Get the metadata for a Callback, based on a BPEL Process Instance ID and a Service name + * @param processID - the BPEL Process Instance ID + * @param serviceName - the service name + * @return - and Endpoint which is the Callback endpoint for the service for this process instance. + * Returns null if there is no callback metadata for this service. + */ + public EndpointReference getCallbackMetadata( Long processID, String serviceName ) { + EndpointReference theEPR; + //System.out.println("Get callback metadata: ProcessID " + processID.toString() + " service: " + serviceName); + + metadataLock.lock(); + try { + while(true) { + Map<String, EndpointReference> processMap = callbackMap.get(processID); + theEPR = processMap.get(serviceName); + if( theEPR != null ) return theEPR; + callbackAdded.await(); + } // end while + } catch (Exception e) { + return null; + } finally { + metadataLock.unlock(); + } // end try + } // end method getCallbackMetadata + + /** + * Removes the metadata for a Callback + * @param processID - the Process Instance ID of the process instance to which the callback metadata applies + * @param serviceName - the service name for the service which has a callback - can be NULL, in which case ALL + * callback metadata for the process instance is removed + */ + public void removeCallbackMetadata( Long processID, String serviceName ) { + + if( serviceName == null ) { + callbackMap.remove(processID); + } else { + Map<String, EndpointReference> processMap = callbackMap.get(processID); + processMap.remove(serviceName); + } // end if + + } // end method removeCallbackMetadata + + private class ODEEventListener implements BpelEventListener { + + private EmbeddedODEServer ODEServer; + private BpelServerImpl bpelServer; + + ODEEventListener( EmbeddedODEServer ODEServer, BpelServerImpl bpelServer ) { + this.ODEServer = ODEServer; + this.bpelServer = bpelServer; + } // end constructor + + /** + * Method which receives events from the ODE Engine as processing proceeds + */ + public void onEvent(BpelEvent bpelEvent) { + if( bpelEvent instanceof ProcessMessageExchangeEvent || + bpelEvent instanceof NewProcessInstanceEvent || + bpelEvent instanceof CorrelationMatchEvent ) { + handleProcMexEvent( (ProcessMessageExchangeEvent) bpelEvent ); + return; + } // end if + + } // end method onEvent + + /** + * Handle a ProcessMessageExchangeEvent + * - the important aspect of this event is that it establishes a connection between a MessageExchange object + * and the BPEL Process instance to which it relates. + * @param bpelEvent - the ProcessMessageExchangeEvent + */ + private void handleProcMexEvent( ProcessMessageExchangeEvent bpelEvent) { + // Extract the message ID and the process instance ID - it is the connection between these + // that is vital to know + String mexID = bpelEvent.getMessageExchangeId(); + Long processID = bpelEvent.getProcessInstanceId(); + ODEServer.addMexToProcessIDLink( mexID, processID ); + } // end method handleProcMexEvent + + public void shutdown() { + // Intentionally left blank + } + + public void startup(Properties configProperties) { + // Intentionally left blank + } + + } // end Class BPELEventListener +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/GeronimoTxFactory.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/GeronimoTxFactory.java new file mode 100644 index 0000000000..11af0f8b50 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/GeronimoTxFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import javax.transaction.TransactionManager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Geronimo transaction factory + * + * @version $Rev$ $Date$ + */ +public class GeronimoTxFactory { + private static final Log __log = LogFactory.getLog(GeronimoTxFactory.class); + + /* Public no-arg constructor is required */ + public GeronimoTxFactory() { + } + + public TransactionManager getTransactionManager() { + __log.info("Using embedded Geronimo transaction manager"); + try { + Object obj = new org.apache.geronimo.transaction.manager.GeronimoTransactionManager(); + return (TransactionManager) obj; + } catch (Exception except) { + throw new IllegalStateException("Unable to instantiate Geronimo Transaction Manager", except); + } + } +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java new file mode 100644 index 0000000000..8e339812cd --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import javax.wsdl.PortType; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.BindingContext; +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.PartnerRoleChannel; + +/** + * Binding Context information + * + * @version $Rev$ $Date$ + */ +public class ODEBindingContext implements BindingContext { + protected final Log __log = LogFactory.getLog(getClass()); + + public ODEBindingContext() { + + } + + public EndpointReference activateMyRoleEndpoint(QName pid, Endpoint endpoint) { + // This will be needed when we support callBacks + if (__log.isDebugEnabled()) { + __log.debug("Activating MyRole Endpoint : " + pid + " - " + endpoint.serviceName); + } + + QName processName = getProcessName(pid); + + return new TuscanyEPR(processName, endpoint); + } + + public void deactivateMyRoleEndpoint(Endpoint endpoint) { + if (__log.isDebugEnabled()) { + __log.debug("Deactivate MyRole Endpoint : " + endpoint.serviceName); + } + + } + + public PartnerRoleChannel createPartnerRoleChannel(QName pid, PortType portType, Endpoint endpoint) { + if (__log.isDebugEnabled()) { + __log.debug("Create PartnerRole channel : " + pid + " - " + portType.getQName() + " - "+ endpoint.serviceName); + } + + QName processName = getProcessName(pid); + return new TuscanyPRC(processName, pid, portType, endpoint); + } + + /** + * Helper method to retrieve the BPEL process name from a processID (where processID have version concatenated to it) + * @param pid + * @return QName the BPEL process name + */ + private static QName getProcessName(QName pid) { + String processName = pid.getLocalPart().substring(0, pid.getLocalPart().lastIndexOf("-")); + return new QName(pid.getNamespaceURI(), processName); + } + + /** + * Calculate the size of the service that this endpoint references. + * @param epr the endpoint reference for the service + * @returns the size of the service + */ + public long calculateSizeofService(EndpointReference epr) { + // TODO It is not at all clear from the ODE code what "size" means + // eg number of service operations? size of the process in bytes? + // So for the present, return a fixed random value... + return 10000; + } +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeployment.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeployment.java new file mode 100644 index 0000000000..0eedd5d0e5 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeployment.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.io.File; + +/** + * Deployment information + * + * @version $Rev$ $Date$ + */ +public class ODEDeployment { + /** The directory containing the deploy.xml and artifacts. */ + public File deployDir; + + // The Tuscany version of the ODE Process Conf implementation + private TuscanyProcessConfImpl conf; + + /** If non-null the type of exception we expect to get when we deploy. */ + public Class<?> expectedException = null; + + public ODEDeployment(File deployDir) { + this.deployDir = deployDir; + } + + @Override + public String toString() { + return "Deployment#" + deployDir; + } + + public void setProcessConf( TuscanyProcessConfImpl conf ) { + this.conf = conf; + } + + public TuscanyProcessConfImpl getProcessConf() { + return conf; + } +}
\ No newline at end of file diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeploymentException.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeploymentException.java new file mode 100644 index 0000000000..b03f69d9aa --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeploymentException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +/** + * Thrown when ODE failed to shutdown. + * + * @version $Rev$ $Date$ + */ +public class ODEDeploymentException extends RuntimeException { + private static final long serialVersionUID = -2869674556330744215L; + + public ODEDeploymentException(Throwable cause) { + super(cause); + } + + public ODEDeploymentException(String message) { + super(message); + } + + public ODEDeploymentException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEEndpointReference.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEEndpointReference.java new file mode 100644 index 0000000000..700443d9b7 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEEndpointReference.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.utils.DOMUtils; +import org.apache.tuscany.sca.assembly.Base; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Tuscany implementation of the ODE EndpointReference interface + * + */ +public class ODEEndpointReference implements EndpointReference { + + + private Document doc = DOMUtils.newDocument(); + private Element serviceref; + + /** + * Private constructor for the EndpointReference + */ + private ODEEndpointReference() { + super(); + } // end ODEEndpointReference() + + /** + * Add a new <service-ref/> element to the EndpointReference + */ + private void addServiceRef() { + serviceref = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(), + EndpointReference.SERVICE_REF_QNAME.getLocalPart()); + doc.appendChild(serviceref); + } // end method addServiceRef() + + /** + * Create an EndpointReference from an Endpoint object + * @param anEndpoint - the endpoint object + */ + public ODEEndpointReference( Endpoint anEndpoint ) { + this(); + addServiceRef(); + // If there is an endpoint for this reference (ie the reference is wired) + // then add an element to indicate this + String eprCount = anEndpoint.portName; + if( !"0".equals(eprCount) ) { + Element eprElement = doc.createElementNS( Base.SCA11_TUSCANY_NS, "EPR"); + serviceref.appendChild(eprElement); + } // end if + return; + } // end + + /** + * Create a new EndpointReference from an existing endpointElement, which is assumed + * to be of the form: + * <sref:service-ref> + * <EPR/> + * </sref:service-ref> + * + * @param endpointElement the endpointElement + */ + public ODEEndpointReference( Element endpointElement ) { + this(); + if( endpointElement != null ) { + // import the service-ref element into this EndpointReference, if the + // root element is a <sref:service-ref/> + if( endpointElement.getLocalName().equals("service-ref") ) { + doc.appendChild( doc.importNode(endpointElement, true) ); + } // end if + } // end if + return; + } // end + + public Document toXML() { + return doc; + } // end toXML() + +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEEprContext.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEEprContext.java new file mode 100644 index 0000000000..972f77ad65 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEEprContext.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.util.Map; + +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.EndpointReferenceContext; +import org.w3c.dom.Element; + +/** + * Implementation of the ODE EndpointReferenceContext interface, used by the ODE BPEL Engine + * to handle conversions of EndpointReferences at runtime. + * + * An ODE Endpoint reference relates to SCA Reference EndpointReferences (pointers to target + * services) and to BPEL PartnerLink elements and any associated BPEL process variables with + * type set to element="sref:service-ref" + * + */ +public class ODEEprContext implements EndpointReferenceContext { + + /** + * Converts an endpoint reference from its XML representation to another + * type of endpoint reference. + * + * @param targetType + * @param sourceEndpoint + * @return converted EndpointReference, being of targetType + */ + public EndpointReference convertEndpoint( QName targetType, + Element sourceEndpoint) { + // For the present, Tuscany only has one type of EndpointReference, so that the + // targetType parameter is of no significance. + return new ODEEndpointReference( sourceEndpoint ); + } // end method convertEndpoint + + public Map getConfigLookup(EndpointReference epr) { + // TODO Auto-generated method stub + return null; + } + + /** + * Resolve an end-point reference from its XML representation. The + * nature of the representation is determined by the integration + * layer. The BPEL engine uses this method to reconstruct + * {@link EndpointReference} objects that have been persisted in the + * database via {@link EndpointReference#toXML(javax.xml.transform.Result)} + * method. + * + * @param XML representation of the EPR + * @return reconstituted EPR object {@link EndpointReference} + */ + public EndpointReference resolveEndpointReference(Element epr) { + return new ODEEndpointReference( epr ); + } // end method resolveEndpointReference + +} // end class ODEEprContext diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java new file mode 100644 index 0000000000..d2dbd4880d --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.util.List; +import java.util.concurrent.Callable; + +import javax.wsdl.Part; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.Message; +import org.apache.ode.bpel.iapi.MessageExchange; +import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.utils.DOMUtils; +import org.apache.tuscany.sca.assembly.ComponentReference; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Helper Class to handle invocation to Tuscany Component References + * + * @version $Rev$ $Date$ + */ +public class ODEExternalService { + protected final Log __log = LogFactory.getLog(getClass()); + + private EmbeddedODEServer _server; + private Scheduler _sched; + + public ODEExternalService(EmbeddedODEServer server) { + this._server = server; + this._sched = _server.getScheduler(); + } + + public void invoke(final PartnerRoleMessageExchange partnerRoleMessageExchange) { + boolean isTwoWay = + partnerRoleMessageExchange.getMessageExchangePattern() == org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE; + + if (isTwoWay) { + // Defer the invoke until the transaction commits. + _sched.registerSynchronizer(new Scheduler.Synchronizer() { + public void beforeCompletion() { + + } + + public void afterCompletion(boolean success) { + // If the TX is rolled back, then we don't send the request. + if (!success) return; + + // The invocation must happen in a separate thread, holding on the afterCompletion + // blocks other operations that could have been listed there as well. + _server.getExecutor().submit(new Callable<Object>() { + public Object call() throws Exception { + try { + // do execution + if(! (partnerRoleMessageExchange.getChannel() instanceof TuscanyPRC)) { + throw new IllegalArgumentException("Channel should be an instance of TuscanyPRC"); + } + + TuscanyPRC channel = (TuscanyPRC) partnerRoleMessageExchange.getChannel(); + RuntimeComponent tuscanyRuntimeComponent = _server.getTuscanyRuntimeComponent(channel.getProcessName()); + + // Fetching the reference based on the data held in the PRC / Endpoint + String refName = channel.getEndpoint().serviceName.getLocalPart(); + RuntimeComponentReference runtimeComponentReference = getReferenceByName( tuscanyRuntimeComponent, refName ); + RuntimeEndpointReference epr = getEndpointReference( runtimeComponentReference, partnerRoleMessageExchange ); + // convert operations + Operation operation = + findOperation(partnerRoleMessageExchange.getOperation().getName(), epr); + + /* + This is how a request looks like (payload is wrapped with extra info) + <?xml version="1.0" encoding="UTF-8"?> + <message> + <parameters> + <getGreetings xmlns="http://greetings"> + <message xmlns="http://helloworld">Luciano</message> + </getGreetings> + </parameters> + </message> + */ + Element msg = partnerRoleMessageExchange.getRequest().getMessage(); + if (msg != null) { + + if(__log.isDebugEnabled()) { + String xml = DOMUtils.domToString(msg); + String payload = DOMUtils.domToString(getPayload(partnerRoleMessageExchange.getRequest())); + __log.debug("Starting invocation of SCA Reference"); + __log.debug(">>> Original message: " + xml); + __log.debug(">>> Payload: " + payload); + } // end if + + Object[] args = new Object[] {getPayload(partnerRoleMessageExchange.getRequest())}; + + Object result = null; + boolean success = false; + + try { + result = epr.invoke(operation, args); + success = true; + } catch (Exception e) { + e.printStackTrace(); + partnerRoleMessageExchange.replyWithFailure(MessageExchange.FailureType.OTHER, + e.getMessage(), + null); + } // end try + + if(__log.isDebugEnabled()) { + __log.debug("SCA Reference invocation finished"); + __log.debug(">>> Result : " + DOMUtils.domToString((Element)result)); + } // end if + + if (!success) { return null; } + + // two way invocation + // process results based on type of message invocation + replyTwoWayInvocation(partnerRoleMessageExchange.getMessageExchangeId(), + operation, + (Element)result); + } // end if + + } catch (Throwable t) { + // some error + String errmsg = "Error sending message (mex=" + partnerRoleMessageExchange + "): " + t.getMessage(); + __log.error(errmsg, t); + /*replyWithFailure(partnerRoleMessageExchange.getMessageExchangeId(), + MessageExchange.FailureType.COMMUNICATION_ERROR, + errmsg, + null);*/ + } + return null; + } + }); + + } + }); + partnerRoleMessageExchange.replyAsync(); + + } else { + /** one-way case * */ + _server.getExecutor().submit(new Callable<Object>() { + public Object call() throws Exception { + // do reply + // operationClient.execute(false); + return null; + } + }); + partnerRoleMessageExchange.replyOneWayOk(); + } + } + + /** + * Gets a RuntimeComponentReference of a supplied RuntimeComponent by name + * @param tuscanyRuntimeComponent - the runtime component + * @param name - the name of the reference + * @return - the RuntimeComponentReference with the supplied name - null if there is no reference with that name + */ + private RuntimeComponentReference getReferenceByName( RuntimeComponent tuscanyRuntimeComponent, String name ) { + if( name == null ) return null; + for( ComponentReference reference : tuscanyRuntimeComponent.getReferences() ) { + if( name.equals(reference.getName()) ) return (RuntimeComponentReference)reference; + } // end for + return null; + } // end method getReferenceByName + + /** + * Get the Runtime Wire for the supplied reference + * @param componentReference - the reference + * @return - the RuntimeWire - null if it cannot be found + */ + private RuntimeEndpointReference getEndpointReference( RuntimeComponentReference componentReference, + PartnerRoleMessageExchange mex) { + if( componentReference.isForCallback() ) { + // Where there is a callback, it is necessary to create a specialized wire, based on callback information + // present on the forward call + + // Get the callbackEPR for the callback using the BPEL Process ID and the Reference name + // - which is the same name as the service name for a callback + Long processID = _server.getProcessIDFromMex(mex.getMessageExchangeId()); + org.apache.tuscany.sca.assembly.EndpointReference callbackEPR = + _server.getCallbackMetadata(processID, componentReference.getName()); + RuntimeEndpointReference wire = selectCallbackWire( callbackEPR.getTargetEndpoint(), componentReference ); + wire = clone_bind( componentReference, callbackEPR.getCallbackEndpoint() ); + return wire; + } else { + // No callback case... + //TODO - fix the x..n multiplicity case, which needs to select the correct ONE of multiple + // EndpointReferences here + return (RuntimeEndpointReference) componentReference.getEndpointReferences().get(0); + } // end if + } // end method getEndpointReference + + private RuntimeEndpointReference selectCallbackWire( org.apache.tuscany.sca.assembly.Endpoint endpoint, + RuntimeComponentReference componentReference) { + // Look for callback binding with same name as service binding + if (endpoint == null) { + throw new RuntimeException("Destination for forward call is not available"); + } + + for (EndpointReference epr : componentReference.getEndpointReferences()) { + if (epr.getBinding().getName().equals(endpoint.getBinding().getName())) { + return (RuntimeEndpointReference) epr; + } + } + + // if no match, look for callback binding with same type as service binding + for (EndpointReference epr : componentReference.getEndpointReferences()) { + if (epr.getBinding().getType().equals(endpoint.getBinding().getType())) { + return (RuntimeEndpointReference) epr; + } + } + + // no suitable callback wire was found + return null; + } // end method selectCallbackWire + + private RuntimeEndpointReference clone_bind(RuntimeComponentReference reference, + org.apache.tuscany.sca.assembly.Endpoint callbackEndpoint) { + + try { + // clone the callback reference ready to configure it for this callback endpoint + RuntimeComponentReference ref = (RuntimeComponentReference)reference.clone(); + ref.getTargets().clear(); + ref.getBindings().clear(); + ref.getEndpointReferences().clear(); + + // clone epr + EndpointReference callbackEndpointReference = (EndpointReference)reference.getEndpointReferences().get(0).clone(); + callbackEndpointReference.setReference(ref); + callbackEndpointReference.setTargetEndpoint(callbackEndpoint); + callbackEndpointReference.setUnresolved(true); + + // The callback endpoint will be resolved when the wire chains are created + ref.getEndpointReferences().add(callbackEndpointReference); + return (RuntimeEndpointReference) ref.getEndpointReferences().get(0); + } catch ( CloneNotSupportedException e ) { + return null; + } // end try clone_bind + + } // end method + + /** + * Find the SCA Reference operation + * + * @param operationName + * @param runtimeComponentReference + * @return + */ + private Operation findOperation(String operationName, RuntimeEndpointReference epr) { + Operation reseultOperation = null; + + for (Operation operation : epr.getComponentTypeReferenceInterfaceContract().getInterface().getOperations()) { + if (operationName.equalsIgnoreCase(operation.getName())) { + reseultOperation = operation; + break; + } + } + return reseultOperation; + } + + /** + * Get payload from a given ODEMessage + * @param odeMessage - the ODE message + * @return the payload of the Message, as a DOM Element + */ + private Element getPayload(Message odeMessage) { + Element payload = null; + + // Get the message parts - these correspond to the message parts for the invocation + // as defined in the WSDL for the service operation being invoked + List<String> parts = odeMessage.getParts(); + if( parts.size() == 0 ) return null; + + // For the present, just deal with the ** FIRST ** part + // TODO Deal with operations that have messages with multiple parts + // - that will require returning an array of Elements, one for each part + Element part = odeMessage.getPart(parts.get(0)); + + // Get the payload which is the First child + if (part != null && part.hasChildNodes()) { + payload = (Element)part.getFirstChild(); + } + + return payload; + } // end getPayload + + + private void replyTwoWayInvocation(final String odeMexId, final Operation operation, final Element result) { + // ODE MEX needs to be invoked in a TX. + try { + _server.getScheduler().execIsolatedTransaction(new Callable<Void>() { + public Void call() throws Exception { + PartnerRoleMessageExchange odeMex = null; + try { + odeMex = (PartnerRoleMessageExchange)_server.getBpelServer().getEngine().getMessageExchange(odeMexId); + if (odeMex != null) { + Message response = createResponseMessage(odeMex, operation, (Element)result); + odeMex.reply(response); + } + } catch (Exception ex) { + String errmsg = "Unable to process response: " + ex.getMessage(); + if (odeMex != null) { + odeMex.replyWithFailure(MessageExchange.FailureType.OTHER, errmsg, null); + } + } + + return null; + } + }); + } catch (Exception ex) { + ex.printStackTrace(); + } + + } + + private Message createResponseMessage(PartnerRoleMessageExchange partnerRoleMessageExchange, + Operation operation, + Element invocationResult) { + Document dom = DOMUtils.newDocument(); + + String operationName = operation.getName(); + Part bpelOperationOutputPart = + (Part)((WSDLInterface)operation.getInterface()).getPortType().getOperation(operationName, null, null) + .getOutput().getMessage().getParts().values().iterator().next(); + + Element contentMessage = dom.createElement("message"); + Element contentPart = dom.createElement(bpelOperationOutputPart.getName()); + + contentPart.appendChild(dom.importNode(invocationResult, true)); + contentMessage.appendChild(contentPart); + dom.appendChild(contentMessage); + + if(__log.isDebugEnabled()) { + __log.debug("Creating result message:"); + __log.debug(">>>" + DOMUtils.domToString(dom.getDocumentElement())); + } + + QName id = partnerRoleMessageExchange.getOperation().getOutput().getMessage().getQName(); + Message response = partnerRoleMessageExchange.createMessage(id); + response.setMessage(dom.getDocumentElement()); + + return response; + } + +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEInitializationException.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEInitializationException.java new file mode 100644 index 0000000000..2fa91e4e86 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEInitializationException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +/** + * Thrown when ODE failed to initialize one if its needed resources. + * + * @version $Rev$ $Date$ + */ +public class ODEInitializationException extends RuntimeException { + private static final long serialVersionUID = -2869674556330744215L; + + public ODEInitializationException(Throwable cause) { + super(cause); + } + + public ODEInitializationException(String message) { + super(message); + } + + public ODEInitializationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEMessageExchangeContext.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEMessageExchangeContext.java new file mode 100644 index 0000000000..1ec82390cf --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEMessageExchangeContext.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.BpelEngineException; +import org.apache.ode.bpel.iapi.ContextException; +import org.apache.ode.bpel.iapi.MessageExchangeContext; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; + +/** + * Message Exchange Context information + * + * @version $Rev$ $Date$ + */ +public class ODEMessageExchangeContext implements MessageExchangeContext { + private static final Log __log = LogFactory.getLog(ODEMessageExchangeContext.class); + + private EmbeddedODEServer _server; + + public ODEMessageExchangeContext(EmbeddedODEServer _server) { + this._server = _server; + } + + public void invokePartner(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException { + if (__log.isDebugEnabled()) { + __log.debug("Invoking a partner operation: " + partnerRoleMessageExchange.getOperationName()); + } + + ODEExternalService scaService = new ODEExternalService(_server); + scaService.invoke(partnerRoleMessageExchange); + } + + public void onAsyncReply(MyRoleMessageExchange myRoleMessageExchange) throws BpelEngineException { + if (__log.isDebugEnabled()) { + __log.debug("Processing an async reply from service " + myRoleMessageExchange.getServiceName()); + } + } + } diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEShutdownException.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEShutdownException.java new file mode 100644 index 0000000000..a928379ba9 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEShutdownException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +/** + * Thrown when ODE failed to shutdown. + * + * @version $Rev$ $Date$ + */ +public class ODEShutdownException extends RuntimeException { + private static final long serialVersionUID = -2869674556330744215L; + + public ODEShutdownException(Throwable cause) { + super(cause); + } + + public ODEShutdownException(String message) { + super(message); + } + + public ODEShutdownException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyEPR.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyEPR.java new file mode 100644 index 0000000000..deaeaec040 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyEPR.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.utils.DOMUtils; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * This should hold something that makes sense for Tuscany so that the + * process has an address that makes sense from the outside world perspective + * + * @version $Rev$ $Date$ + */ +public class TuscanyEPR implements EndpointReference { + private final Document doc = DOMUtils.newDocument(); + + public TuscanyEPR(QName processName, Endpoint endpoint) { + Element serviceref = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(), + EndpointReference.SERVICE_REF_QNAME.getLocalPart()); + serviceref.setNodeValue(endpoint.serviceName + ":" + endpoint.portName); + doc.appendChild(serviceref); + } + + public Document toXML() { + return doc; + } +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java new file mode 100644 index 0000000000..aaa00069f5 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import javax.wsdl.PortType; +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.PartnerRoleChannel; + +/** + * Tuscany Partner Role Channel for ODE Integration + * + * @version $Rev$ $Date$ + */ +public class TuscanyPRC implements PartnerRoleChannel { + private final QName processName; + private final QName pid; + private final Endpoint endpoint; + + public TuscanyPRC(QName processName, QName pid, PortType portType, Endpoint endpoint){ + this.processName = processName; + this.pid = pid; + this.endpoint = endpoint; + } + + public QName getProcessName() { + return this.processName; + } + + public Endpoint getEndpoint() { + return this.endpoint; + } + + public void close() { + + } + + public EndpointReference getInitialEndpointReference() { + + return new ODEEndpointReference( this.endpoint ); + } // end method getInitialEndpointReference + +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java new file mode 100644 index 0000000000..2c904a37f8 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java @@ -0,0 +1,823 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.wsdl.Definition; +import javax.xml.namespace.QName; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Result; +import javax.xml.transform.Source; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerConfigurationException; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.compiler.BpelC; +import org.apache.ode.bpel.evt.BpelEvent.TYPE; +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.ProcessConf; +import org.apache.ode.bpel.iapi.ProcessState; +import org.apache.tuscany.sca.assembly.Base; +import org.apache.tuscany.sca.assembly.ComponentProperty; +import org.apache.tuscany.sca.assembly.ComponentReference; +import org.apache.tuscany.sca.assembly.ComponentService; +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.databinding.SimpleTypeMapper; +import org.apache.tuscany.sca.databinding.impl.SimpleTypeMapperImpl; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.w3c.dom.Attr; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +/** + * A Tuscany implementation of the ODE Process Conf + * + * @version $Rev$ $Date$ + */ +public class TuscanyProcessConfImpl implements ProcessConf { + private final Log __log = LogFactory.getLog(getClass()); + + private BPELImplementation implementation; + private RuntimeComponent component; + private Map<String, Endpoint> invokeEndpoints = null; + private Map<String, Endpoint> provideEndpoints = null; + private Map<QName, Node> properties = null; + private ProcessState processState; + private Date deployDate; + + private File theBPELFile; + // Marks whether the BPEL file was rewritten (eg for initializer statements) + private boolean rewritten = false; + + private final SimpleTypeMapper mapper = new SimpleTypeMapperImpl(); + private final String TUSCANY_NAMESPACE = Base.SCA11_TUSCANY_NS; + + /** + * Constructor for the ProcessConf implementation + * @param theImplementation the BPEL implementation for which this is the ProcessConf + * @param component - the SCA component which uses the implementation + */ + public TuscanyProcessConfImpl( BPELImplementation theImplementation, RuntimeComponent component ) { + //System.out.println("New TuscanyProcessConfImpl..."); + this.implementation = theImplementation; + this.component = component; + + processState = ProcessState.ACTIVE; + deployDate = new Date(); + + // Compile the process + compile( getBPELFile() ); + } // end TuscanyProcessConfImpl constructor + + public void stop() { + // If the BPEL file was rewritten, destroy the rewritten version of it so that + // it is not used again. Also delete the related compiled cbp file + if( rewritten ) { + try { + String cbpName = theBPELFile.getCanonicalPath(); + // Remove the "bpel_tmp" suffix and add "cbp" + if ( cbpName.endsWith("bpel_tmp") ) { + cbpName = cbpName.substring( 0, cbpName.length() - 8) + "cbp"; + File cbpFile = new File( cbpName ); + if ( cbpFile.exists() ) cbpFile.delete(); + } // end if + } catch (Exception e ) { + // Do nothing with an exception + } // end try + theBPELFile.delete(); + } // end if + + } // end method stop + + /** + * Returns the URI for the directory containing the BPEL process + */ + public URI getBaseURI() { + //System.out.println("getBaseURI called"); + File theDir = getDirectory(); + return theDir.toURI(); + } + + /** + * Returns a String containing the (local) name of the file containing the BPEL process + */ + public String getBpelDocument() { + //System.out.println("getBPELDocument called"); + try { + String location = this.implementation.getProcessDefinition().getLocation(); + URI locationURI = new URI(null, location, null); + File processFile = new File(locationURI); + return getRelativePath( getDirectory(), processFile); + } catch (Exception e) { + if(__log.isWarnEnabled()) { + __log.warn("Unable to resolve relative path of BPEL process" + implementation.getProcessDefinition().getLocation(), e ); + } + return null; + } // end try + } // end getBpelDocument + + /** + * Returns an InputStream containing the Compiled BPEL Process (CBP) + */ + public InputStream getCBPInputStream() { + //System.out.println("getCBPInputStream called"); + + File cbpFile = getCBPFile(); + if( cbpFile == null ) return null; + + if( cbpFile.exists() ) { + // Create an InputStream from the cbp file... + try { + return new FileInputStream( cbpFile ); + } catch ( Exception e ) { + if(__log.isDebugEnabled()) { + __log.debug("Unable to open the cbp file for BPEL process: " + + implementation.getProcessDefinition().getName(), e); + } + } // end try + } else { + // Cannot find the cbp file + if(__log.isWarnEnabled()){ + __log.warn("Cannot find the cbp file for process: " + + implementation.getProcessDefinition().getName()); + } + } // end if + // TODO - need better exception handling if we can't open the cbp file for any reason + return null; + } // end getCBPInputStream + + /** + * Gets the File object for the CBP file for this BPEL Process + * @return - the File object for the CBP file + */ + private File getCBPFile() { + // Find the CBP file - it has the same name as the BPEL process and lives in the same + // directory as the process file + String cbpFileName = null; + try { + String fileName = getRelativePath( getDirectory(), getBPELFile() ); + cbpFileName = fileName.substring(0, fileName.lastIndexOf(".")) + ".cbp"; + } catch (Exception e ) { + // IOException trying to fetch the BPEL file name + if(__log.isDebugEnabled()) { + __log.debug("Unable to calculate the file name for BPEL process: " + + implementation.getProcessDefinition().getName(), e); + return null; + } // end if + } // end try + File cbpFile = new File( getDirectory(), cbpFileName ); + return cbpFile; + } // end getCBPFile + + /** + * Return the WSDL Definition for a given PortType + * @param portTypeName - the QName of the PortType + */ + public Definition getDefinitionForPortType( QName portTypeName ) { + //System.out.println("getDefinitionForPortType called for portType: " + portTypeName ); + // Find the named PortType in the list of WSDL interfaces associated with this BPEL Process + Collection<WSDLInterface> theInterfaces = implementation.getProcessDefinition().getInterfaces(); + for( WSDLInterface wsdlInterface : theInterfaces ) { + if ( wsdlInterface.getPortType().getQName().equals( portTypeName ) ) { + // Extract and return the Definition associated with the WSDLDefinition... + return wsdlInterface.getWsdlDefinition().getDefinition(); + } // end if + } // end for + return null; + } // end getDefinitionforPortType + + /** + * Returns a WSDL Definition for a given Service QName + * + * 22/05/2008 - it is very unclear what this service QName is really meant to be. + * From the handling of the deploy.xml file by the current ODE code, it seems that the key link + * is from the Service QName to the PartnerLink name (done in the deploy.xml file). + * + * The curious part about this is that the QName for the service is ONLY defined in deploy.xml file + * and does not appear to relate to anything else, except for the QName of the PartnerLink + * + * The PartnerLink name is the same as the name of the SCA service (or reference) which in turn points + * at the PartnerLinkType which in turn points at an (WSDL) interface definition. + */ + public Definition getDefinitionForService(QName serviceQName ) { + //System.out.println("getDefinitionForService called for Service: " + serviceQName ); + if(__log.isDebugEnabled()){ + __log.debug("getDefinitionforService called for service: " + serviceQName ); + } + // TODO Auto-generated method stub + return null; + } + + /** + * Returns the date of deployment of the process + * - for SCA returns the date at which this object was created + */ + public Date getDeployDate() { + //System.out.println("getDeployDate called"); + return deployDate; + } + + /** + * Returns userid of deployer + * - always "SCA Tuscany" for Tuscany... + */ + public String getDeployer() { + //System.out.println("getDeployer called"); + return "SCA Tuscany"; + } // end getDeployer + + /** + * Returns a list of the files in the directory containing the BPEL Process + */ + public List<File> getFiles() { + //System.out.println("getFiles called"); + File theDir = getDirectory(); + List<File> theFiles = Arrays.asList( (File[]) theDir.listFiles() ); + // TODO recurse into subdirectories + return theFiles; + } // end getFiles + + /** + * Returns a Map containing all the "invoke endpoints" - for which read "SCA references" + * The map is keyed by partnerLink name and holds Endpoint objects + * 0..1 multiplicity references are not included in the returned Map (it is as if the reference is not there...) + * TODO deal with multiplicity 0..n and 1..n + * TODO deal with service callbacks on bidirectional services + */ + public Map<String, Endpoint> getInvokeEndpoints() { + if( invokeEndpoints == null ) { + invokeEndpoints = new HashMap<String, Endpoint>(); + // Get a collection of the component references - note that this includes "pseudo-references" for any + // services that have a callback interface + List<ComponentReference> theReferences = component.getReferences(); + //List<Reference> theReferences = implementation.getReferences(); + // Create an endpoint for each reference, using the reference name combined with + // http://tuscany.apache.org to make a QName + // Note that the key used for this put operation MUST be the name of one of the partnerLinks of the + // BPEL process. The SCA reference MAY have an alias for the name (can be given using the sca-bpel:reference + // element, if present) and this alias must not be used + for( Reference reference : theReferences ) { + String partnerlinkName = implementation.getReferencePartnerlinkName( reference.getName() ); + // Check that there is at least 1 configured SCA endpointReference for the reference, since it is + // possible for 0..1 multiplicity references to have no SCA endpointReferences configured + List<org.apache.tuscany.sca.assembly.EndpointReference> eprs = reference.getEndpointReferences(); + String eprCount = Integer.toString( eprs.size() ); + invokeEndpoints.put( partnerlinkName, + new Endpoint( new QName( TUSCANY_NAMESPACE, reference.getName() ), eprCount)); + } // end for + } // end if + return invokeEndpoints; + } // end getInvokeEndpoints + + /** + * Returns the name of the directory containing the BPEL files + */ + public String getPackage() { + //System.out.println("getPackage called"); + File theDir = getDirectory(); + return theDir.getName(); + } // end getPackage + + /** + * Return the BPEL Process ID - which is the Process QName appended "-versionnumber" + */ + public QName getProcessId() { + //System.out.println("getProcessId called"); + QName processType = getType(); + QName processID = new QName( processType.getNamespaceURI(), + processType.getLocalPart() + "-" + getVersion() ); + return processID; + } // end getProcessID + + /** + * TODO - What are properties? + */ + public Map<QName, Node> getProperties() { + //System.out.println("getProperties called"); + if ( properties == null ) { + properties = new HashMap<QName, Node>(); + } // end if + return properties; + } // end getProperties + + /** + * Returns a Map containing all the "provide endpoints" - for which read "SCA services" + * The map is keyed by partnerLink name and holds Endpoint objects + * + * TODO deal with reference callbacks on bidirectional references + */ + public Map<String, Endpoint> getProvideEndpoints() { + //System.out.println("getProvideEndpoints called"); + if( provideEndpoints == null ) { + provideEndpoints = new HashMap<String, Endpoint>(); + String componentURI = component.getURI(); + // Get a collection of the services - note that the Component services include additional + // "pseudo-services" for each reference that has a callback... + + List<ComponentService> theServices = component.getServices(); + // Create an endpoint for each service, using the service name combined with + // http://tuscany.apache.org to make a QName + // Note that the key used for this put operation MUST be the name of one of the partnerLinks of the + // BPEL process. The SCA service MAY have an alias for the name (can be given using the sca-bpel:service + // element, if present) and this alias must not be used + for( ComponentService service : theServices ) { + String partnerlinkName = implementation.getServicePartnerlinkName( service.getName() ); + // MJE 14/07/2009 - added componentURI to the service name to get unique service name + provideEndpoints.put( partnerlinkName, + new Endpoint( new QName( TUSCANY_NAMESPACE, componentURI + service.getName() ), + "ServicePort")); + } // end for + } // end if + return provideEndpoints; + } // end getProvideEndpoints + + /** + * Return the process state + */ + public ProcessState getState() { + //System.out.println("getState called"); + return processState; + } + + /** + * Returns the QName of the BPEL process + */ + public QName getType() { + //System.out.println("getType called"); + return implementation.getProcess(); + } + + /** + * Gets the process Version number + * - current code does not have versions for BPEL processes and always returns "1" + */ + public long getVersion() { + //System.out.println("getVersion called"); + return 1; + } + + /** + * Returns true if the supplied event type is enabled for any of the scopes in the provided + * List. These events are "ODE Execution Events" and there is a definition of them on this + * page: http://ode.apache.org/user-guide.html#UserGuide-ProcessDeployment + * + * Tuscany currently uses: + * - instanceLifecycle events in order to establish the relationship of MessageExchange objects + * to the BPEL Process instances + * @param scopeNames - list of BPEL process Scope names + * @param type - the event type + */ + public boolean isEventEnabled(List<String> scopeNames, TYPE type) { + if( type == TYPE.dataHandling ) return false; + if( type == TYPE.activityLifecycle ) return false; + if( type == TYPE.scopeHandling ) return true; + if( type == TYPE.instanceLifecycle ) return true; + if( type == TYPE.correlation ) return true; + return false; + } // end isEventEnabled + + /** + * Returns whether the process is persisted in the store + * + * Returns false for SCA configuration + * - returning true causes problems in communicating with the BPEL process + */ + public boolean isTransient() { + return false; + } // end isTransient + + /** + * Compiles a BPEL process file into a compiled form CBP file in the main directory + * (ie same directory as the BPEL process file) + * @param bpelFile - the BPEL process file + */ + private void compile( File bpelFile ) { + // Set up the compiler + BpelC compiler = BpelC.newBpelCompiler(); + // Provide a null set of initial properties for now + Map<QName, Node> processProps = new HashMap<QName, Node>(); + Map<String, Object> compileProps = new HashMap<String, Object>(); + compileProps.put( BpelC.PROCESS_CUSTOM_PROPERTIES, processProps ); + compiler.setCompileProperties( compileProps ); + compiler.setBaseDirectory( getDirectory() ); + + // Inject any property values + bpelFile = injectPropertyValues( bpelFile ); + + // Run the compiler and generate the CBP file into the given directory + try { + compiler.compile( bpelFile ); + } catch (IOException e) { + if(__log.isDebugEnabled()) { + __log.debug("Compile error in " + bpelFile, e); + } + // TODO - need better exception handling here + } // end try + } // end compile + + /** + * Adds the values for SCA declared properties to the BPEL process. + * The values for the properties are held in the SCA RuntimeComponent supplied to this + * TuscanyProcessConfImpl. + * The properties map to <variable/> declarations in the BPEL process that are specifically + * marked with @sca-bpel:property="yes" + * @param bpelFile the file containing the BPEL process + * @return the (updated) file containing the BPEL process + */ + private File injectPropertyValues( File bpelFile ) { + // Get the properties + List<ComponentProperty> properties = component.getProperties(); + + // If there are no properties, we're done! + if( properties.size() == 0 ) return bpelFile; + + Document bpelDOM = readDOMFromProcess( bpelFile ); + + for( ComponentProperty property : properties ) { + //System.out.println("BPEL: Property - name = " + property.getName() ); + insertSCAPropertyInitializer( bpelDOM, property ); + } // end for + + File bpelFile2 = writeProcessFromDOM( bpelDOM, + getTransformedBPELFile( bpelFile) ); + if( bpelFile2 != null ) { + theBPELFile = bpelFile2; + rewritten = true; + return bpelFile2; + } // end if + + return bpelFile; + } // end injectPropertyValues + + /** + * Insert an initializer which supplies the value of an SCA property as specified by the + * SCA Component using the BPEL process + * @param bpelDOM - a DOM model representation of the BPEL process + * @param property - an SCA ComponentProperty element for the property + * This DOM model is updated, with an initializer being added for the BPEL variable + * corresponding to the SCA property + */ + private void insertSCAPropertyInitializer( Document bpelDOM, ComponentProperty property ) { + // Only insert a Property initializer where there is a value for the Property + if( property.getValue() == null ) return; + + Element insertionElement = findInitializerInsertionPoint( bpelDOM ); + if( insertionElement == null ) return; + + Element initializer = getInitializerSequence( bpelDOM, property ); + if( initializer == null ) return; + + // Insert the initializer sequence as the next sibling element of the insertion point + Element parent = (Element)insertionElement.getParentNode(); + // Get the next sibling element, if there is one + Node sibling = insertionElement.getNextSibling(); + while( sibling != null && sibling.getNodeType() != Node.ELEMENT_NODE ) { + sibling = sibling.getNextSibling(); + } // end while + // Either insert at the end or before the next element + if ( sibling == null ) { + parent.appendChild( initializer ); + } else { + parent.insertBefore( initializer, sibling ); + } // end if + + } // end insertSCAPropertyInitializer + + /** + * Gets the variable initializer DOM sequence for a given property, in the context of a supplied + * DOM model of the BPEL process + * @param bpelDOM - DOM representation of the BPEL process + * @param property - SCA Property which relates to one of the variables in the BPEL process + * @return - a DOM model representation of the XML statements required to initialize the + * BPEL variable with the value of the SCA property. + */ + private Element getInitializerSequence( Document bpelDOM, ComponentProperty property ) { + // For an XML simple type (string, int, etc), the BPEL initializer sequence is: + // <assign><copy><from><literal>value</literal></from><to variable="variableName"/></copy></assign> + QName type = property.getXSDType(); + if( type != null ) { + if( mapper.isSimpleXSDType( type ) ) { + // Simple types + String NS_URI = bpelDOM.getDocumentElement().getNamespaceURI(); + String valueText = getPropertyValueText( property.getValue() ); + Element literalElement = bpelDOM.createElementNS(NS_URI, "literal"); + literalElement.setTextContent(valueText); + Element fromElement = bpelDOM.createElementNS(NS_URI, "from"); + fromElement.appendChild(literalElement); + Element toElement = bpelDOM.createElementNS(NS_URI, "to"); + Attr variableAttribute = bpelDOM.createAttribute("variable"); + variableAttribute.setValue( property.getName() ); + toElement.setAttributeNode( variableAttribute ); + Element copyElement = bpelDOM.createElementNS(NS_URI, "copy"); + copyElement.appendChild(fromElement); + copyElement.appendChild(toElement); + Element assignElement = bpelDOM.createElementNS(NS_URI, "assign"); + assignElement.appendChild(copyElement); + return assignElement; + } // end if + // TODO Deal with Properties which have a non-simple type + } else { + // TODO Deal with Properties which have an element as the type + } // end if + + return null; + } // end method getInitializerSequence + + /** + * Gets the text value of a property that is a simple type + * @param propValue - the SCA Property value + * @return - the text content of the Property value, as a String + */ + private String getPropertyValueText( Object propValue ) { + String text = null; + if( propValue instanceof Document ) { + Element docElement = ((Document)propValue).getDocumentElement(); + if( docElement != null ){ + Element valueElement = (Element)docElement.getFirstChild(); + if( valueElement != null ) { + text = valueElement.getTextContent(); + } // end if + } // end if + } // end if + + return text; + } // end method getPropertyValueText + + private Element findInitializerInsertionPoint( Document bpelDOM ) { + // The concept is to find the first Activity child element of the BPEL process document + Element docElement = bpelDOM.getDocumentElement(); + NodeList elements = docElement.getElementsByTagName("*"); + + Element element; + for ( int i = 0 ; i < elements.getLength() ; i++ ) { + element = (Element)elements.item(i); + if( isInsertableActivityElement( element ) ) { + return element; + } // end if + } // end for + + return null; + } // end method findInitializerInsertionPoint + + /** + * A WS-BPEL activity can be any of the following: + * <receive> + * <reply> + * <invoke> + * <assign> + * <throw> + * <exit> + * <wait> + * <empty> + * <sequence> + * <if> + * <while> + * <repeatUntil> + * <forEach> + * <pick> + * <flow> + * <scope> + * <compensate> + * <compensateScope> + * <rethrow> + * <validate> + * <extensionActivity> + * A WS-BPEL start activity is a <receive> or <pick> with @create_instance="yes" + */ + private static String SEQUENCE_ELEMENT = "sequence"; + private static String REPLY_ELEMENT = "reply"; + private static String INVOKE_ELEMENT = "invoke"; + private static String ASSIGN_ELEMENT = "assign"; + private static String PICK_ELEMENT = "pick"; + private static String RECEIVE_ELEMENT = "receive"; + private static String FLOW_ELEMENT = "flow"; + private static String SCOPE_ELEMENT = "scope"; + /** + * Determine if an Element is a BPEL start activity element which can have an Assign + * inserted following it + * @param element - a DOM Element containing the BPEL activity + * @return - true if the Element is a BPEL Activity element, false otherwise + */ + private boolean isInsertableActivityElement( Element element ) { + String name = element.getTagName(); + // For the present, only <receive/> and <pick/> elements with create_instance="yes" count + // if( SEQUENCE_ELEMENT.equalsIgnoreCase(name) ) return true; + String start = element.getAttribute("createInstance"); + if( start == null ) return false; + if( !"yes".equals(start) ) return false; + if( RECEIVE_ELEMENT.equalsIgnoreCase(name) ) return true; + if( PICK_ELEMENT.equalsIgnoreCase(name) ) return true; + return false; + } // end method isActivityElement + + /** + * Reads a BPEL Process file into a DOM Document structure + * @param bpelFile - a File object referencing the BPEL process document + * @return - a DOM Document structure representing the same BPEL process + */ + private Document readDOMFromProcess( File bpelFile ) { + try { + DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + docFactory.setNamespaceAware(true); + docFactory.setXIncludeAware(true); + DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); + + Document bpelDOM = docBuilder.parse( bpelFile ); + return bpelDOM; + } catch (Exception e) { + return null; + } // end try + } // end method + + /** + * Writes a BPEL Process file from a DOM Document structure representing the Process + * @param bpelDOM - the DOM Document representation of the BPEL process + * @param file - a File object to which the BPEL Process is to be written + * @return + */ + private File writeProcessFromDOM( Document bpelDOM, File file ) { + try { + // Prepare the DOM document for writing + Source source = new DOMSource( bpelDOM ); + + // Prepare the output file + Result result = new StreamResult(file); + + // Write the DOM document to the file + Transformer xformer = TransformerFactory.newInstance().newTransformer(); + xformer.transform(source, result); + } catch (TransformerConfigurationException e) { + } catch (TransformerException e) { + return null; + } + return file; + } // end writeProcessFromDOM + + private File getTransformedBPELFile( File bpelFile ) { + String name = bpelFile.getName(); + File parent = bpelFile.getParentFile(); + File bpelFile2 = null; + try { + bpelFile2 = File.createTempFile(name, ".bpel_tmp", parent); + } catch (Exception e ){ + + } // end try + return bpelFile2; + } // end getTransformedBPELFile + + /** + * Gets the directory containing the BPEL process + * @return + */ + private File getDirectory() { + File theDir = getBPELFile().getParentFile(); + return theDir; + } // end getDirectory + + /** + * Gets the File containing the BPEL process definition + * @return - the File object containing the BPEL process + */ + private File getBPELFile() { + if( theBPELFile != null ) return theBPELFile; + try { + String location = this.implementation.getProcessDefinition().getLocation(); + URI locationURI; + if (location.indexOf('%') != -1) { + locationURI = URI.create(location); + } else { + locationURI = new URI(null, location, null); + } + File theProcess = new File(locationURI); + theBPELFile = theProcess; + return theProcess; + } catch( Exception e ) { + if(__log.isDebugEnabled()) { + __log.debug("Exception converting BPEL file URL to an URI: " + e ); + } + } // end try + return null; + } // end getBPELFile + + /** + * Gets the relative path of a file against a directory in its hierarchy + * @param base - the base directory + * @param path - the file + * @return + * @throws IOException + */ + private String getRelativePath(File base, File path) throws IOException { + String basePath = base.getCanonicalPath(); + String filePath = path.getCanonicalPath(); + if (!filePath.startsWith(basePath)) { + throw new IOException("Invalid relative path: base=" + base + " path=" + path); + } + String relative = filePath.substring(basePath.length()); + if (relative.startsWith(File.separator)) { + relative = relative.substring(1); + } + return relative; + } // end getRelativePath + + //----------------------------------------------------------------------------- + // other public APIs which ProcessConfImpl displays which are not in ProcessConf interface + + public List<String> getMexInterceptors(QName processId) { +// System.out.println("getMexInterceptors for processID: " + processId ); + return null; + } + + public void setTransient(boolean t) { +// System.out.println("setTransient called with boolean: " + t ); + } + + public List<Element> getExtensionElement(QName arg0) { + return Collections.emptyList(); + } + // end of other public APIs + //----------------------------------------------------------------------------- + + /** + * Get the size in bytes of the CBP file + * @return - this size in bytes of the CBP file, 0 if the file cannot be found + */ + public long getCBPFileSize() { + File cbpFile = getCBPFile(); + if( cbpFile == null ) return 0; + + return cbpFile.length(); + } // end getCBPFileSize + + private final Set<CLEANUP_CATEGORY> successCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class); + private final Set<CLEANUP_CATEGORY> failureCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class); + + public Set<CLEANUP_CATEGORY> getCleanupCategories(boolean instanceSucceeded) { + if( instanceSucceeded ) return successCategories; + else return failureCategories; + } + + private final Map<String, String> emptyPropertyMap = new Hashtable<String, String>(); + public Map<String, String> getEndpointProperties(EndpointReference epr) { + return emptyPropertyMap; + } + + private final Map<QName, Node> emptyProcessProperties = new Hashtable<QName, Node>(); + public Map<QName, Node> getProcessProperties() { + return emptyProcessProperties; + } + + public boolean isCleanupCategoryEnabled(boolean instanceSucceeded, + CLEANUP_CATEGORY category) { + // TODO Currently returns false - should this be changed for some categories? + return false; + } + + public boolean isSharedService(QName serviceName) { + // Tuscany does not share the service + return false; + } + +} // end class TuscanyProcessConfImpl diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java new file mode 100644 index 0000000000..b3824bffe1 --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import java.io.File; +import java.net.URI; + +import javax.transaction.TransactionManager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.dao.jpa.ProcessDAOImpl; +import org.apache.openjpa.persistence.PersistenceProviderImpl; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.databinding.xml.DOMDataBinding; +import org.apache.tuscany.sca.extensibility.ClassLoaderContext; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.implementation.bpel.ode.ODEDeployment; +import org.apache.tuscany.sca.implementation.bpel.ode.ODEInitializationException; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; + +/** + * BPEL Implementation provider + * + * @version $Rev$ $Date$ + */ +public class BPELImplementationProvider implements ImplementationProvider { + private final Log __log = LogFactory.getLog(getClass()); + + private RuntimeComponent component; + private BPELImplementation implementation; + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + private ODEDeployment deployment; + + /** + * Constructs a new BPEL Implementation. + */ + public BPELImplementationProvider(RuntimeComponent component, + BPELImplementation implementation, + EmbeddedODEServer odeServer, + TransactionManager txMgr) { + this.component = component; + this.implementation = implementation; + this.odeServer = odeServer; + this.txMgr = txMgr; + + // Configure the service and reference interfaces to use a DOM databinding + // as it's what ODE expects + for(Service service: implementation.getServices() ){ + service.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } // end for + + for(Reference reference: implementation.getReferences() ) { + reference.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } // end for + + for (Service service: component.getServices()) { + //TODO - MJE, 06/06/2009 - we can eventually remove the reset of the service interface + // contract and leave it to the Endpoints only + service.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + for( Endpoint endpoint : service.getEndpoints() ) { + RuntimeEndpoint ep = (RuntimeEndpoint) endpoint; + if (ep.getComponentTypeServiceInterfaceContract() != null) { + ep.getComponentTypeServiceInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } + } // end for + } // end for + + for (Reference reference : component.getReferences()) { + reference.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + for (EndpointReference endpointReference : reference.getEndpointReferences()) { + RuntimeEndpointReference epr = (RuntimeEndpointReference)endpointReference; + if (epr.getComponentTypeReferenceInterfaceContract() != null) { + epr.getComponentTypeReferenceInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } + } // end for */ + } // end for + + } + + public Invoker createInvoker(RuntimeComponentService service, Operation operation) { + BPELInvoker invoker = new BPELInvoker(component, service, operation, odeServer, txMgr); + return invoker; + } + + public boolean supportsOneWayInvocation() { + return false; + } + + public void start() { + if(__log.isInfoEnabled()) { + __log.info("Starting " + component.getName()); + } // end if + + // Switch TCCL - use a classloader that can find classes related to the non-OSGi services + // referenced from the implementation-bpel module which include the Persistence provider (OpenJPA) and + // the JPA DAO implementation contained in the ODE project + ClassLoader tccl = ClassLoaderContext.setContextClassLoader(EmbeddedODEServer.class.getClassLoader(), + PersistenceProviderImpl.class.getClassLoader(), + ProcessDAOImpl.class.getClassLoader() ); + + try { + if (!odeServer.isInitialized()) { + // start ode server + odeServer.init(); + } + + String location = this.implementation.getProcessDefinition().getLocation(); + URI deployURI = new URI(null, location, null); + + File deploymentDir = new File(deployURI).getParentFile(); + + if(__log.isInfoEnabled()) { + __log.info(">>> Deploying : " + deploymentDir.toString()); + } + + // Deploy the BPEL process + if (odeServer.isInitialized()) { + deployment = new ODEDeployment( deploymentDir ); + try { + odeServer.registerTuscanyRuntimeComponent(implementation.getProcess(), component); + + odeServer.deploy(deployment, implementation, component ); + } catch (Exception e) { + e.printStackTrace(); + } + } + + } catch (ODEInitializationException inite) { + throw new RuntimeException("BPEL Component Type Implementation : Error initializing embedded ODE server " + inite.getMessage(), inite); + } catch(Exception e) { + throw new RuntimeException("BPEL Component Type Implementation initialization failure : " + e.getMessage(), e); + } finally { + // Restore the TCCL if we changed it + if( tccl != null ) Thread.currentThread().setContextClassLoader(tccl); + } // end try + } // end method start() + + public void stop() { + if(__log.isInfoEnabled()) { + __log.info("Stopping " + component.getName()); + } + + odeServer.undeploy(deployment); + + if (odeServer.isInitialized()) { + // stop ode server + odeServer.stop(); + } + + txMgr = null; + + if(__log.isInfoEnabled()) { + __log.info("Stopped !!!"); + } + } // end method stop() + +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java new file mode 100644 index 0000000000..ed327e237b --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import javax.transaction.TransactionManager; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.implementation.bpel.ode.GeronimoTxFactory; +import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.provider.ImplementationProviderFactory; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.oasisopen.sca.annotation.Destroy; + +/** + * BPEL Implementation provider factory + * + * We use the provider factory to instantiate a ODE server that is going to be injected in all BPEL components + * + * @version $Rev$ $Date$ + */ +public class BPELImplementationProviderFactory implements ImplementationProviderFactory<BPELImplementation> { + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + /** + * Default constructor receiving an extension point + * @param extensionPoints + */ + public BPELImplementationProviderFactory(ExtensionPointRegistry extensionPoints) { + GeronimoTxFactory txFactory = new GeronimoTxFactory(); + txMgr = txFactory.getTransactionManager(); + this.odeServer = new EmbeddedODEServer(txMgr); + } + + /** + * Creates a new BPEL Implementation and inject the EmbeddedODEServer + */ + public ImplementationProvider createImplementationProvider(RuntimeComponent component, BPELImplementation implementation) { + return new BPELImplementationProvider(component, implementation, odeServer, txMgr); + } + + public Class<BPELImplementation> getModelType() { + return BPELImplementation.class; + } + + @Destroy + public void destroy() { + txMgr = null; + } +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java new file mode 100644 index 0000000000..4a99fe705c --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; +import javax.wsdl.Part; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.MessageExchange.Status; +import org.apache.ode.utils.DOMUtils; +import org.apache.ode.utils.GUID; +import org.apache.tuscany.sca.assembly.Base; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.interfacedef.Interface; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Implements a target invoker for BPEL component implementations. + * + * The target invoker is responsible for dispatching invocations to the particular + * component implementation logic. In this example we are simply delegating the + * CRUD operation invocations to the corresponding methods on our fake + * resource manager. + * + * @version $Rev$ $Date$ + */ +public class BPELInvoker implements Invoker { + private final static long TIME_OUT = 10000L; + + protected final Log __log = LogFactory.getLog(getClass()); + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + private RuntimeComponentService service; + private Operation operation; + private QName bpelServiceName; + private String bpelOperationName; + private Part bpelOperationInputPart; + private Part bpelOperationOutputPart; + private RuntimeComponent component; + // Marks if this service has a callback interface + private Boolean isCallback = false; + private EndpointReference callbackEPR; + + public BPELInvoker(RuntimeComponent component, RuntimeComponentService service, Operation operation, + EmbeddedODEServer odeServer, TransactionManager txMgr) { + this.service = service; + this.component = component; + this.operation = operation; + this.bpelOperationName = operation.getName(); + this.odeServer = odeServer; + this.txMgr = txMgr; + this.isCallback = serviceHasCallback( service ); + + initializeInvocation(); + } // end method BPELInvoker + + private boolean serviceHasCallback( RuntimeComponentService service ) { + if(service.getInterfaceContract().getCallbackInterface() != null) return true; + return false; + } // end method serviceHasCallback + + private void initializeInvocation() { + + __log.debug("Initializing BPELInvoker"); + + Interface interfaze = operation.getInterface(); + if(interfaze instanceof WSDLInterface){ + WSDLInterface wsdlInterface = null; + wsdlInterface = (WSDLInterface) interfaze; + + // Fetch the service name from the service object - including the componentURI guarantees a unique service name + String componentURI = component.getURI(); + bpelServiceName = new QName( Base.SCA11_TUSCANY_NS, componentURI + service.getName() ); + + bpelOperationInputPart = (Part) wsdlInterface.getPortType().getOperation(bpelOperationName,null,null).getInput().getMessage().getParts().values().iterator().next(); + bpelOperationOutputPart = (Part) wsdlInterface.getPortType().getOperation(bpelOperationName,null,null).getOutput().getMessage().getParts().values().iterator().next(); + } + } // end method initializeInvocation + + public Message invoke(Message msg) { + try { + if( isCallback ) { + // Extract the callback endpoint metadata + callbackEPR = msg.getFrom(); + } // end if + Object[] args = msg.getBody(); + Object resp = doTheWork(args); + msg.setBody(resp); + } catch (InvocationTargetException e) { + msg.setFaultBody(e.getCause()); + } + return msg; + } + + public Object doTheWork(Object[] args) throws InvocationTargetException { + Element response = null; + + if(! (operation.getInterface() instanceof WSDLInterface)) { + throw new InvocationTargetException(null,"Unsupported service contract"); + } + + org.apache.ode.bpel.iapi.MyRoleMessageExchange mex = null; + Future<?> onhold = null; + + //Process the BPEL process invocation + Long processID = 0L; + try { + txMgr.begin(); + mex = odeServer.getBpelServer().getEngine().createMessageExchange(new GUID().toString(), + bpelServiceName, + bpelOperationName); + //TODO - this will not be true for OneWay operations - need to handle those + mex.setProperty("isTwoWay", "true"); + onhold = mex.invoke(createInvocationMessage(mex, args)); + + txMgr.commit(); + // Deal with callback cases - store the callback metadata by process instance ID + if( isCallback ) { + processID = odeServer.getProcessIDFromMex(mex.getMessageExchangeId()); + // Store the callback metadata for this invocation + odeServer.saveCallbackMetadata( processID, service.getName(), callbackEPR ); + } // end if + } catch (Exception e) { + try { + txMgr.rollback(); + } catch (SystemException se) { + + } + throw new InvocationTargetException(e, "Error invoking BPEL process : " + e.getMessage()); + } // end try + + // Waiting until the reply is ready in case the engine needs to continue in a different thread + if (onhold != null) { + try { + //add timeout to avoid blocking when there is a exception/failure + onhold.get(TIME_OUT, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new InvocationTargetException(e,"Error invoking BPEL process : " + e.getMessage()); + } // end try + } // end if + + //Process the BPEL invocation response + try { + txMgr.begin(); + // Reloading the mex in the current transaction, otherwise we can't + // be sure we have the "freshest" one. + mex = (MyRoleMessageExchange)odeServer.getBpelServer().getEngine().getMessageExchange(mex.getMessageExchangeId()); + + Status status = mex.getStatus(); + + switch (status) { + case FAULT: + if (__log.isDebugEnabled()) + __log.debug("Fault response message: " + mex.getFault()); + throw new ODEInvocationException("FAULT received from BPEL process : " + mex.getFault() + + " " + + mex.getFaultExplanation()); + case ASYNC: + case RESPONSE: + //process the method invocation result + response = processResponse(mex.getResponse().getMessage()); + if (__log.isDebugEnabled()) + __log.debug("Response message " + response); + break; + case FAILURE: + if (__log.isDebugEnabled()) + __log.debug("Failure response message: " + mex.getFault()); + break; + default: + throw new ODEInvocationException("FAILURE received from BPEL process : " + mex.getStatus() + " - " + mex.getFault()); + } // end switch + + txMgr.commit(); + // end of transaction two + } catch (Exception e) { + try { + txMgr.rollback(); + } catch (SystemException se) { + + } + throw new InvocationTargetException(e, "Error retrieving BPEL process invocation status : " + e.getMessage()); + } // end try + + // Cleanup the ODE MessageExchange object + //mex.release(); + + return response; + } + + /** + * Create BPEL Invocation message + * + * BPEL invocation message like : + * <message> + * <TestPart> + * <hello xmlns="http://tuscany.apache.org/implementation/bpel/example/helloworld.wsdl">Hello</hello> + * </TestPart> + * </message> + * @param args + * @return + */ + private org.apache.ode.bpel.iapi.Message createInvocationMessage(org.apache.ode.bpel.iapi.MyRoleMessageExchange mex, Object[] args) { + Document dom = DOMUtils.newDocument(); + + Element contentMessage = dom.createElement("message"); + Element contentPart = dom.createElement(bpelOperationInputPart.getName()); + Element payload = null; + + // TODO handle WSDL input messages with multiple Parts... + //TUSCANY-2321 - Properly handling Document or Element types + if(args[0] instanceof Document) { + payload = (Element) ((Document) args[0]).getFirstChild(); + } else { + payload = (Element) args[0]; + } + + contentPart.appendChild(dom.importNode(payload, true)); + contentMessage.appendChild(contentPart); + dom.appendChild(contentMessage); + + if (__log.isDebugEnabled()) { + __log.debug("Creating invocation message:"); + __log.debug(">> args.....: " + DOMUtils.domToString(payload)); + __log.debug(">> message..:" + DOMUtils.domToString(dom.getDocumentElement())); + } + + org.apache.ode.bpel.iapi.Message request = mex.createMessage(new QName("", "")); + request.setMessage(dom.getDocumentElement()); + + return request; + } + + /** + * Process BPEL response + * + * <message> + * <TestPart> + * <hello xmlns="http://tuscany.apache.org/implementation/bpel/example/helloworld.wsdl">World</hello> + * </TestPart> + * </message> + * + * @param response + * @return + */ + private Element processResponse(Element response) { + return (Element) DOMUtils.findChildByName(response, new QName("",bpelOperationOutputPart.getName())).getFirstChild(); + + // MJE, 12/06/2009 - changed to return the message without the PART wrapper element, since this element is not + // transmitted in the SOAP messages on the wire + } // end method processResponse +} // end class BPELInvoker diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/ODEInvocationException.java b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/ODEInvocationException.java new file mode 100644 index 0000000000..7b6f9ceafa --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/ODEInvocationException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +public class ODEInvocationException extends Exception { + + /** + * Thrown when the result of the invocation of a BPEL Process operation + * returns a Fault or Failure code + */ + private static final long serialVersionUID = 5096941965798566018L; + + public ODEInvocationException(String message) { + super(message); + } + +} diff --git a/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/resources/META-INF/services/org.apache.tuscany.sca.provider.ImplementationProviderFactory b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/resources/META-INF/services/org.apache.tuscany.sca.provider.ImplementationProviderFactory new file mode 100644 index 0000000000..763f8a601b --- /dev/null +++ b/sandbox/sebastien/java/wrapped/modules/implementation-bpel-runtime/src/main/resources/META-INF/services/org.apache.tuscany.sca.provider.ImplementationProviderFactory @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Implementation class for the implementation extension
+org.apache.tuscany.sca.implementation.bpel.ode.provider.BPELImplementationProviderFactory;model=org.apache.tuscany.sca.implementation.bpel.BPELImplementation
+
|