diff options
Diffstat (limited to '')
16 files changed, 2148 insertions, 0 deletions
diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/BPELODEDeployFile.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/BPELODEDeployFile.java new file mode 100644 index 0000000000..dc2242a189 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/BPELODEDeployFile.java @@ -0,0 +1,233 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.implementation.bpel.ode;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tuscany.sca.assembly.ComponentType;
+import org.apache.tuscany.sca.assembly.Reference;
+import org.apache.tuscany.sca.assembly.Service;
+import org.apache.tuscany.sca.implementation.bpel.BPELImplementation;
+
+/**
+ * A class that handles the deploy.xml file required for each BPEL process by the ODE runtime
+ * @author Mike Edwards
+ *
+ * An explanation of the structure of the ODE deploy file:
+ *
+ * <deploy xmlns="http://www.apache.org/ode/schemas/dd/2007/03"
+ * xmlns:tns="http://helloworld"
+ * xmlns:tus="http://tuscany.apache.org">
+ *
+ * <process name="tns:HelloWorld">
+ * <active>true</active>
+ * <provide partnerLink="helloPartnerLink">
+ * <service name="tus:helloPartnerLink" port="HelloWorld"/>
+ * </provide>
+ * <invoke partnerLink="greetingsPartnerLink">
+ * <service name="tus:greetingsPartnerLink" port="Greetings"/>
+ * </invoke>
+ * </process>
+ * </deploy>
+ *
+ * For SCA purposes:
+ *
+ * a) Each partner link in the BPEL process is declared using either a <provide.../>
+ * (for a service) or using a <invoke.../> (for a reference).
+ *
+ * b) Each <provide/> and <invoke/> must use the partnerLink name, as declared in the
+ * BPEL process.
+ *
+ * c) The <provide/> and <invoke/> elements each have a single child <service/> element.
+ * The <service/> elements have name and port attributes. The NAME attribute MUST be set
+ * to the same name as the partnerLink and MUST be prefixed by a prefix which references
+ * the namespace "http://tuscany.apache.org" ("tus" in the example above).
+ * The port attribute can be set to any name (it must be present but it is not actually
+ * used for anything significant).
+ *
+ * When SCA loads a BPEL process to the ODE server, this file is read by the ODE server to
+ * characterize the process. When SCA interacts with ODE at later points - either when a
+ * service is being invoked or the process invokes a reference - it is the service @name
+ * attribute that identifies the service or reference involved.
+ *
+ * @version
+ */
+public class BPELODEDeployFile {
+ private final Log __log = LogFactory.getLog(getClass());
+
+ static final String DEPLOY_ELEMENT_START = "<deploy xmlns=\"http://www.apache.org/ode/schemas/dd/2007/03\"";
+ static final String DEPLOY_ENDELEMENT = "</deploy>";
+ static final String PROCESS_NAMESPACE_DECL = "xmlns:tns=";
+ static final String SERVICE_NAMESPACE = "xmlns:tus=\"http://tuscany.apache.org\"";
+ static final String PROCESS_ELEMENT_START = "<process name=\"tns:";
+ static final String PROCESS_ELEMENT_END = "\">";
+ static final String PROCESS_ENDELEMENT = "</process>";
+ static final String ACTIVE_ELEMENT = "<active>true</active>";
+ static final String PROVIDE_ELEMENT_START = "<provide partnerLink=\"";
+ static final String PROVIDE_ELEMENT_END = "\">";
+ static final String PROVIDE_ENDELEMENT = "</provide>";
+ static final String SERVICE_ELEMENT_START = "<service name=\"tus:";
+ static final String SERVICE_ELEMENT_PORT = "\" port=\"";
+ static final String SERVICE_ELEMENT_END = "Port\"/>";
+ static final String INVOKE_ELEMENT_START = "<invoke partnerLink=\"";
+ static final String INVOKE_ELEMENT_END = "\">";
+ static final String INVOKE_ENDELEMENT = "</invoke>";
+
+ static final String DEPLOY_FILENAME = "deploy.xml";
+
+ private BPELImplementation implementation;
+
+ /**
+ * Constructor - requires a BPELImplementation as a parameter
+ * The ODE deploy.xml file is for this supplied BPELImplementation
+ * @param theImplementation
+ */
+ public BPELODEDeployFile( BPELImplementation theImplementation ) {
+
+ implementation = theImplementation;
+
+ } // end BPELODEDeployFile constructor
+
+ /**
+ * Writes the deploy file into the same directory as the BPEL process file, with the name
+ * "deploy.xml"
+ */
+ public void writeDeployfile() throws IOException {
+
+ File theDirectory = getDirectory();
+
+ File deployFile = new File( theDirectory, DEPLOY_FILENAME );
+ new FileOutputStream( deployFile );
+ //if( !deployFile.canWrite() ) throw new IOException( "Unable to write to deploy file" +
+ // deployFile.getPath() );
+
+ // Create a stream for the data and write the data to the file
+ PrintStream theStream = new PrintStream( new FileOutputStream( deployFile ) );
+ try {
+ constructDeployXML( theStream );
+ if( theStream.checkError() ) throw new IOException();
+ } catch (Exception e) {
+ throw new IOException( "Unable to write data to deploy file" +
+ deployFile.getPath() );
+ } finally {
+ theStream.close();
+ } // end try
+
+ } // end writeDeployFile
+
+ /**
+ * Creates the deploy.xml data and writes it to a supplied PrintStream
+ * @param stream
+ */
+ public void constructDeployXML( PrintStream stream ) {
+
+ // <deploy + namespace...
+ stream.println( DEPLOY_ELEMENT_START );
+ // namespace of the BPEL process
+ QName process = implementation.getProcess();
+ String processNamespace = process.getNamespaceURI();
+ stream.println( PROCESS_NAMESPACE_DECL + "\"" + processNamespace + "\"" );
+ // namespace for the service name elements
+ stream.println( SERVICE_NAMESPACE + ">" );
+
+ // <process> element
+ stream.println( PROCESS_ELEMENT_START + process.getLocalPart() +
+ PROCESS_ELEMENT_END );
+
+ // <active/> element
+ stream.println( ACTIVE_ELEMENT );
+
+ ComponentType componentType = implementation.getComponentType();
+ List<Service> theServices = componentType.getServices();
+ // Loop over the <provide/> elements - one per service
+ for ( Service service : theServices ) {
+ String serviceName = service.getName();
+ // Provide element...
+ stream.println( PROVIDE_ELEMENT_START + serviceName + PROVIDE_ELEMENT_END );
+ // Child service element...
+ stream.println( SERVICE_ELEMENT_START + serviceName +
+ SERVICE_ELEMENT_PORT + serviceName + SERVICE_ELEMENT_END );
+ stream.println( PROVIDE_ENDELEMENT );
+ } // end for
+
+ // Loop over the <invoke/> elements - one per reference
+ List<Reference> theReferences = componentType.getReferences();
+ for ( Reference reference : theReferences ) {
+ String referenceName = reference.getName();
+ stream.println( INVOKE_ELEMENT_START + referenceName + INVOKE_ELEMENT_END );
+ // Child service element...
+ stream.println( SERVICE_ELEMENT_START + referenceName +
+ SERVICE_ELEMENT_PORT + referenceName + SERVICE_ELEMENT_END );
+ stream.println( INVOKE_ENDELEMENT );
+
+ } // end for
+
+ // </process> element
+ stream.println( PROCESS_ENDELEMENT );
+
+ // </deploy>
+ stream.println( DEPLOY_ENDELEMENT );
+
+ } // end constructDeployXML
+
+ /**
+ * Gets the directory containing the BPEL process
+ * @return
+ */
+ private File getDirectory() {
+ File theDir = getBPELFile().getParentFile();
+ return theDir;
+ } // end getDirectory
+
+ /**
+ * Gets the File containing the BPEL process definition
+ * @return - the File object containing the BPEL process
+ */
+ private File getBPELFile() {
+ try {
+ String location = this.implementation.getProcessDefinition().getLocation();
+ URI locationURI;
+ if (location.indexOf('%') != -1) {
+ locationURI = URI.create(location);
+ } else {
+ locationURI = new URI(null, location, null);
+ }
+
+ File theProcess = new File(locationURI);
+ return theProcess;
+ } catch( Exception e ) {
+ if(__log.isDebugEnabled()) {
+ __log.debug("Exception converting BPEL file URL to an URI: " + e );
+ }
+ } // end try
+ return null;
+ } // end getBPELFile
+
+
+
+} // end class BPELODEDeployFile
diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java new file mode 100644 index 0000000000..619643eaa3 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/EmbeddedODEServer.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.transaction.TransactionManager; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC; +import org.apache.ode.bpel.engine.BpelServerImpl; +import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy; +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl; +import org.apache.ode.il.config.OdeConfigProperties; +import org.apache.ode.il.dbutil.Database; +import org.apache.ode.scheduler.simple.JdbcDelegate; +import org.apache.ode.scheduler.simple.SimpleScheduler; +import org.apache.ode.utils.GUID; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.runtime.RuntimeComponent; + +/** + * Embedded ODE process server + * + * @version $Rev$ $Date$ + */ +public class EmbeddedODEServer { + protected final Log __log = LogFactory.getLog(getClass()); + + private boolean _initialized; + + private OdeConfigProperties _config; + + private TransactionManager _txMgr; + + private Database _db; + + private File _workRoot; + + private BpelDAOConnectionFactoryJDBC _daoCF; + + private BpelServerImpl _bpelServer; + + private Scheduler _scheduler; + + protected ExecutorService _executorService; + + private Map<QName, RuntimeComponent> tuscanyRuntimeComponents = new ConcurrentHashMap<QName, RuntimeComponent>(); + + public EmbeddedODEServer(TransactionManager txMgr) { + _txMgr = txMgr; + } + + public void init() throws ODEInitializationException { + Properties p = System.getProperties(); + p.put("derby.system.home", "target"); + + Properties confProps = new Properties(); + confProps.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=false)"); + _config = new OdeConfigProperties(confProps, "ode-sca"); + + // Setting work root as the directory containing our database (wherever in the classpath) + URL dbLocation = getClass().getClassLoader().getResource("jpadb"); + if (dbLocation == null) + throw new ODEInitializationException("Couldn't find database in the classpath"); + try { + _workRoot = new File(dbLocation.toURI()).getParentFile(); + } catch (URISyntaxException e) { + throw new ODEInitializationException(e); + } + + initTxMgr(); + initPersistence(); + initBpelServer(); + + try { + _bpelServer.start(); + } catch (Exception ex) { + String errmsg = "An error occured during the ODE BPEL server startup."; + __log.error(errmsg, ex); + throw new ODEInitializationException(errmsg, ex); + } + + __log.info("ODE BPEL server started."); + _initialized = true; + } + + private void initTxMgr() { + if(_txMgr == null) { + try { + GeronimoTxFactory txFactory = new GeronimoTxFactory(); + _txMgr = txFactory.getTransactionManager(); + } catch (Exception e) { + __log.fatal("Couldn't initialize a transaction manager using Geronimo's transaction factory.", e); + throw new ODEInitializationException("Couldn't initialize a transaction manager using " + "Geronimo's transaction factory.", e); + } + } + } + + private void initPersistence() { + _db = new Database(_config); + _db.setTransactionManager(_txMgr); + _db.setWorkRoot(_workRoot); + + try { + _db.start(); + _daoCF = _db.createDaoCF(); + } catch (Exception ex) { + String errmsg = "Error while configuring ODE persistence."; + __log.error(errmsg, ex); + throw new ODEInitializationException(errmsg, ex); + } + } + + private void initBpelServer() { + if (__log.isDebugEnabled()) { + __log.debug("ODE initializing"); + } + + //FIXME: externalize the configuration for ThreadPoolMaxSize + _executorService = Executors.newCachedThreadPool(); + + _bpelServer = new BpelServerImpl(); + _scheduler = createScheduler(); + _scheduler.setJobProcessor(_bpelServer); + + _bpelServer.setDaoConnectionFactory(_daoCF); + _bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler)); + // _bpelServer.setEndpointReferenceContext(new EndpointReferenceContextImpl(this)); + _bpelServer.setMessageExchangeContext(new ODEMessageExchangeContext(this)); + _bpelServer.setBindingContext(new ODEBindingContext()); + _bpelServer.setScheduler(_scheduler); + if (_config.isDehydrationEnabled()) { + CountLRUDehydrationPolicy dehy = new CountLRUDehydrationPolicy(); + _bpelServer.setDehydrationPolicy(dehy); + } + + _bpelServer.init(); + } // end InitBpelServer + + public void stop() throws ODEShutdownException { + if(_bpelServer != null) { + try { + __log.debug("Stopping BPEL Embedded server"); + _bpelServer.shutdown(); + _bpelServer = null; + } catch (Exception ex) { + __log.debug("Error stopping BPEL server"); + } + } + + if(_scheduler != null) { + try { + __log.debug("Stopping scheduler"); + _scheduler.shutdown(); + _scheduler = null; + } catch (Exception ex) { + __log.debug("Error stopping scheduler"); + } + } + + if(_daoCF != null) { + try { + __log.debug("Stopping DAO"); + _daoCF.shutdown(); + _daoCF = null; + } catch (Exception ex) { + __log.debug("Error stopping DAO"); + } + } + + if(_db != null) { + try { + __log.debug("Stopping DB"); + _db.shutdown(); + _db = null; + } catch (Exception ex) { + __log.debug("Error stopping DB"); + } + } + + if(_txMgr != null) { + try { + __log.debug("Stopping Transaction Manager"); + _txMgr = null; + } catch (Exception ex) { + __log.debug("Error stopping Transaction Manager"); + } + } + } + + protected Scheduler createScheduler() { + SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_db.getDataSource())); + scheduler.setTransactionManager(_txMgr); + + return scheduler; + } + + public boolean isInitialized() { + return _initialized; + } + + public BpelServerImpl getBpelServer() { + return _bpelServer; + } + + public Scheduler getScheduler() { + return _scheduler; + } + + public ExecutorService getExecutor() { + return _executorService; + } + + // Updated by Mike Edwards, 23/05/2008 + public void deploy(ODEDeployment d, BPELImplementation implementation) { + try { + TuscanyProcessConfImpl processConf = new TuscanyProcessConfImpl( implementation ); + _bpelServer.register(processConf); + __log.debug("Completed calling new Process deployment code..."); + } catch (Exception ex) { + String errMsg = ">>> DEPLOY: Unexpected exception: " + ex.getMessage(); + __log.debug(errMsg, ex); + throw new ODEDeploymentException(errMsg,ex); + } + } + + public void undeploy(ODEDeployment d) { + //TODO + } + + public void registerTuscanyRuntimeComponent(QName processName,RuntimeComponent componentContext) { + tuscanyRuntimeComponents.put(processName, componentContext); + } + + public RuntimeComponent getTuscanyRuntimeComponent(QName processName) { + return tuscanyRuntimeComponents.get(processName); + } +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/GeronimoTxFactory.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/GeronimoTxFactory.java new file mode 100644 index 0000000000..11af0f8b50 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/GeronimoTxFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import javax.transaction.TransactionManager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Geronimo transaction factory + * + * @version $Rev$ $Date$ + */ +public class GeronimoTxFactory { + private static final Log __log = LogFactory.getLog(GeronimoTxFactory.class); + + /* Public no-arg constructor is required */ + public GeronimoTxFactory() { + } + + public TransactionManager getTransactionManager() { + __log.info("Using embedded Geronimo transaction manager"); + try { + Object obj = new org.apache.geronimo.transaction.manager.GeronimoTransactionManager(); + return (TransactionManager) obj; + } catch (Exception except) { + throw new IllegalStateException("Unable to instantiate Geronimo Transaction Manager", except); + } + } +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java new file mode 100644 index 0000000000..3f2db7b244 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEBindingContext.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import javax.wsdl.PortType; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.BindingContext; +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.PartnerRoleChannel; + +/** + * Binding Context information + * + * @version $Rev$ $Date$ + */ +public class ODEBindingContext implements BindingContext { + protected final Log __log = LogFactory.getLog(getClass()); + + public ODEBindingContext() { + + } + + public EndpointReference activateMyRoleEndpoint(QName pid, Endpoint endpoint) { + // This will be needed when we support callBacks + if (__log.isDebugEnabled()) { + __log.debug("Activating MyRole Endpoint : " + pid + " - " + endpoint.serviceName); + } + + QName processName = getProcessName(pid); + + return new TuscanyEPR(processName, endpoint); + } + + public void deactivateMyRoleEndpoint(Endpoint endpoint) { + if (__log.isDebugEnabled()) { + __log.debug("Deactivate MyRole Endpoint : " + endpoint.serviceName); + } + + } + + public PartnerRoleChannel createPartnerRoleChannel(QName pid, PortType portType, Endpoint endpoint) { + if (__log.isDebugEnabled()) { + __log.debug("Create PartnerRole channel : " + pid + " - " + portType.getQName() + " - "+ endpoint.serviceName); + } + + QName processName = getProcessName(pid); + return new TuscanyPRC(processName, pid, portType, endpoint); + } + + /** + * Helper method to retrieve the BPEL process name from a processID (where processID have version concatenated to it) + * @param pid + * @return QName the BPEL process name + */ + private static QName getProcessName(QName pid) { + String processName = pid.getLocalPart().substring(0, pid.getLocalPart().lastIndexOf("-")); + return new QName(pid.getNamespaceURI(), processName); + } +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeployment.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeployment.java new file mode 100644 index 0000000000..d57471e215 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeployment.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.io.File; + +/** + * Deployment information + * + * @version $Rev$ $Date$ + */ +public class ODEDeployment { + /** The directory containing the deploy.xml and artifacts. */ + public File deployDir; + + /** If non-null the type of exception we expect to get when we deploy. */ + public Class<?> expectedException = null; + + public ODEDeployment(File deployDir) { + this.deployDir = deployDir; + } + + @Override + public String toString() { + return "Deployment#" + deployDir; + } +}
\ No newline at end of file diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeploymentException.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeploymentException.java new file mode 100644 index 0000000000..b03f69d9aa --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEDeploymentException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +/** + * Thrown when ODE failed to shutdown. + * + * @version $Rev$ $Date$ + */ +public class ODEDeploymentException extends RuntimeException { + private static final long serialVersionUID = -2869674556330744215L; + + public ODEDeploymentException(Throwable cause) { + super(cause); + } + + public ODEDeploymentException(String message) { + super(message); + } + + public ODEDeploymentException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java new file mode 100644 index 0000000000..15e9229601 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEExternalService.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.util.concurrent.Callable; + +import javax.wsdl.Part; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.Message; +import org.apache.ode.bpel.iapi.MessageExchange; +import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.utils.DOMUtils; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeWire; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Helper Class to handle invocation to Tuscany Component References + * + * @version $Rev$ $Date$ + */ +public class ODEExternalService { + protected final Log __log = LogFactory.getLog(getClass()); + + private EmbeddedODEServer _server; + private Scheduler _sched; + + public ODEExternalService(EmbeddedODEServer server) { + this._server = server; + this._sched = _server.getScheduler(); + } + + + public void invoke(final PartnerRoleMessageExchange partnerRoleMessageExchange) { + boolean isTwoWay = + partnerRoleMessageExchange.getMessageExchangePattern() == org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE; + + if (isTwoWay) { + // Defer the invoke until the transaction commits. + _sched.registerSynchronizer(new Scheduler.Synchronizer() { + public void beforeCompletion() { + + } + + public void afterCompletion(boolean success) { + // If the TX is rolled back, then we don't send the request. + if (!success) + return; + + // The invocation must happen in a separate thread, holding + // on the afterCompletion + // blocks other operations that could have been listed there + // as well. + _server.getExecutor().submit(new Callable<Object>() { + public Object call() throws Exception { + try { + // do execution + if(! (partnerRoleMessageExchange.getChannel() instanceof TuscanyPRC)) { + throw new IllegalArgumentException("Channel should be an instance of TuscanyPRC"); + } + + TuscanyPRC channel = (TuscanyPRC) partnerRoleMessageExchange.getChannel(); + RuntimeComponent tuscanyRuntimeComponent = _server.getTuscanyRuntimeComponent(channel.getProcessName()); + + RuntimeComponentReference runtimeComponentReference = + (RuntimeComponentReference)tuscanyRuntimeComponent.getReferences().get(0); + RuntimeWire runtimeWire = + runtimeComponentReference.getRuntimeWire(runtimeComponentReference.getBindings().get(0)); + + // convert operations + Operation operation = + findOperation(partnerRoleMessageExchange.getOperation().getName(), runtimeComponentReference); + + + /* + This is how a request looks like (payload is wrapped with extra info) + <?xml version="1.0" encoding="UTF-8"?> + <message> + <parameters> + <getGreetings xmlns="http://greetings"> + <message xmlns="http://helloworld">Luciano</message> + </getGreetings> + </parameters> + </message> + */ + Element msg = partnerRoleMessageExchange.getRequest().getMessage(); + if (msg != null) { + String xml = DOMUtils.domToString(msg); + + String payload = + DOMUtils.domToString(getPayload(partnerRoleMessageExchange.getRequest())); + + if(__log.isDebugEnabled()) { + __log.debug("Starting invocation of SCA Reference"); + __log.debug(">>> Original message: " + xml); + __log.debug(">>> Payload: " + payload); + } + + Object[] args = new Object[] {getPayload(partnerRoleMessageExchange.getRequest())}; + + Object result = null; + boolean success = false; + + try { + result = runtimeWire.invoke(operation, args); + success = true; + } catch (Exception e) { + partnerRoleMessageExchange.replyWithFailure(MessageExchange.FailureType.OTHER, + e.getMessage(), + null); + } + + + if(__log.isDebugEnabled()) { + __log.debug("SCA Reference invocation finished"); + __log.debug(">>> Result : " + DOMUtils.domToString((Element)result)); + } + + if (!success) { + return null; + } + + // two way invocation + // process results based on type of message + // invocation + + // Message response = + // createResponseMessage(partnerRoleMessageExchange, + // (Element) result); + // partnerRoleMessageExchange.reply(response); + replyTwoWayInvocation(partnerRoleMessageExchange.getMessageExchangeId(), + operation, + (Element)result); + } + + } catch (Throwable t) { + // some error + String errmsg = + "Error sending message (mex=" + partnerRoleMessageExchange + "): " + t.getMessage(); + __log.error(errmsg, t); + /*replyWithFailure(partnerRoleMessageExchange.getMessageExchangeId(), + MessageExchange.FailureType.COMMUNICATION_ERROR, + errmsg, + null);*/ + } + return null; + } + }); + + } + }); + partnerRoleMessageExchange.replyAsync(); + + } else { + /** one-way case * */ + _server.getExecutor().submit(new Callable<Object>() { + public Object call() throws Exception { + // do reply + // operationClient.execute(false); + return null; + } + }); + partnerRoleMessageExchange.replyOneWayOk(); + } + } + + + /** + * Find the SCA Reference operation + * + * @param operationName + * @param runtimeComponentReference + * @return + */ + private Operation findOperation(String operationName, RuntimeComponentReference runtimeComponentReference) { + Operation reseultOperation = null; + + for(Operation operation : runtimeComponentReference.getInterfaceContract().getInterface().getOperations()) { + if (operationName.equalsIgnoreCase(operation.getName())) { + reseultOperation = operation; + break; + } + } + return reseultOperation; + } + + /** + * Get paylod from a given ODEMessage + * @param odeMessage + * @return + */ + private Element getPayload(Message odeMessage) { + Element payload = null; + Element parameters = odeMessage.getPart("parameters"); + + if (parameters != null && parameters.hasChildNodes()) { + payload = (Element)parameters.getFirstChild(); + } + + return payload; + } + + + private void replyTwoWayInvocation(final String odeMexId, final Operation operation, final Element result) { + // ODE MEX needs to be invoked in a TX. + try { + _server.getScheduler().execIsolatedTransaction(new Callable<Void>() { + public Void call() throws Exception { + PartnerRoleMessageExchange odeMex = null; + try { + odeMex = (PartnerRoleMessageExchange)_server.getBpelServer().getEngine().getMessageExchange(odeMexId); + if (odeMex != null) { + Message response = createResponseMessage(odeMex, operation, (Element)result); + odeMex.reply(response); + } + } catch (Exception ex) { + String errmsg = "Unable to process response: " + ex.getMessage(); + if (odeMex != null) { + odeMex.replyWithFailure(MessageExchange.FailureType.OTHER, errmsg, null); + } + } + + return null; + } + }); + } catch (Exception ex) { + ex.printStackTrace(); + } + + } + + private Message createResponseMessage(PartnerRoleMessageExchange partnerRoleMessageExchange, + Operation operation, + Element invocationResult) { + Document dom = DOMUtils.newDocument(); + + String operationName = operation.getName(); + Part bpelOperationOutputPart = + (Part)((WSDLInterface)operation.getInterface()).getPortType().getOperation(operationName, null, null) + .getOutput().getMessage().getParts().values().iterator().next(); + + Element contentMessage = dom.createElement("message"); + Element contentPart = dom.createElement(bpelOperationOutputPart.getName()); + + contentPart.appendChild(dom.importNode(invocationResult, true)); + contentMessage.appendChild(contentPart); + dom.appendChild(contentMessage); + + if(__log.isDebugEnabled()) { + __log.debug("Creating result message:"); + __log.debug(">>>" + DOMUtils.domToString(dom.getDocumentElement())); + } + + QName id = partnerRoleMessageExchange.getOperation().getOutput().getMessage().getQName(); + Message response = partnerRoleMessageExchange.createMessage(id); + response.setMessage(dom.getDocumentElement()); + + return response; + } + +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEInitializationException.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEInitializationException.java new file mode 100644 index 0000000000..2fa91e4e86 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEInitializationException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +/** + * Thrown when ODE failed to initialize one if its needed resources. + * + * @version $Rev$ $Date$ + */ +public class ODEInitializationException extends RuntimeException { + private static final long serialVersionUID = -2869674556330744215L; + + public ODEInitializationException(Throwable cause) { + super(cause); + } + + public ODEInitializationException(String message) { + super(message); + } + + public ODEInitializationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEMessageExchangeContext.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEMessageExchangeContext.java new file mode 100644 index 0000000000..1ec82390cf --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEMessageExchangeContext.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.BpelEngineException; +import org.apache.ode.bpel.iapi.ContextException; +import org.apache.ode.bpel.iapi.MessageExchangeContext; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; + +/** + * Message Exchange Context information + * + * @version $Rev$ $Date$ + */ +public class ODEMessageExchangeContext implements MessageExchangeContext { + private static final Log __log = LogFactory.getLog(ODEMessageExchangeContext.class); + + private EmbeddedODEServer _server; + + public ODEMessageExchangeContext(EmbeddedODEServer _server) { + this._server = _server; + } + + public void invokePartner(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException { + if (__log.isDebugEnabled()) { + __log.debug("Invoking a partner operation: " + partnerRoleMessageExchange.getOperationName()); + } + + ODEExternalService scaService = new ODEExternalService(_server); + scaService.invoke(partnerRoleMessageExchange); + } + + public void onAsyncReply(MyRoleMessageExchange myRoleMessageExchange) throws BpelEngineException { + if (__log.isDebugEnabled()) { + __log.debug("Processing an async reply from service " + myRoleMessageExchange.getServiceName()); + } + } + } diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEShutdownException.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEShutdownException.java new file mode 100644 index 0000000000..a928379ba9 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/ODEShutdownException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +/** + * Thrown when ODE failed to shutdown. + * + * @version $Rev$ $Date$ + */ +public class ODEShutdownException extends RuntimeException { + private static final long serialVersionUID = -2869674556330744215L; + + public ODEShutdownException(Throwable cause) { + super(cause); + } + + public ODEShutdownException(String message) { + super(message); + } + + public ODEShutdownException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyEPR.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyEPR.java new file mode 100644 index 0000000000..deaeaec040 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyEPR.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.utils.DOMUtils; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * This should hold something that makes sense for Tuscany so that the + * process has an address that makes sense from the outside world perspective + * + * @version $Rev$ $Date$ + */ +public class TuscanyEPR implements EndpointReference { + private final Document doc = DOMUtils.newDocument(); + + public TuscanyEPR(QName processName, Endpoint endpoint) { + Element serviceref = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(), + EndpointReference.SERVICE_REF_QNAME.getLocalPart()); + serviceref.setNodeValue(endpoint.serviceName + ":" + endpoint.portName); + doc.appendChild(serviceref); + } + + public Document toXML() { + return doc; + } +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java new file mode 100644 index 0000000000..b0539970ec --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyPRC.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode; + +import javax.wsdl.PortType; +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.PartnerRoleChannel; +import org.apache.ode.utils.DOMUtils; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Tuscany Partner Role Channel for ODE Integration + * + * @version $Rev$ $Date$ + */ +public class TuscanyPRC implements PartnerRoleChannel { + private final QName processName; + + public TuscanyPRC(QName processName, QName pid, PortType portType, Endpoint endpoint){ + this.processName = processName; + } + + public QName getProcessName() { + return this.processName; + } + + public void close() { + + } + + public EndpointReference getInitialEndpointReference() { + final Document doc = DOMUtils.newDocument(); + Element serviceref = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(), + EndpointReference.SERVICE_REF_QNAME.getLocalPart()); + doc.appendChild(serviceref); + + return new EndpointReference() { + public Document toXML() { + return doc; + } + }; + } + +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java new file mode 100644 index 0000000000..a2d4645e8b --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/TuscanyProcessConfImpl.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.wsdl.Definition; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.compiler.BpelC; +import org.apache.ode.bpel.evt.BpelEvent.TYPE; +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.ProcessConf; +import org.apache.ode.bpel.iapi.ProcessState; +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +/** + * A Tuscany implementation of the ODE Process Conf + * + * @version $Rev$ $Date$ + */ +public class TuscanyProcessConfImpl implements ProcessConf { + private final Log __log = LogFactory.getLog(getClass()); + + private BPELImplementation implementation; + private Map<String, Endpoint> invokeEndpoints = null; + private Map<String, Endpoint> provideEndpoints = null; + private Map<QName, Node> properties = null; + private ProcessState processState; + private Date deployDate; + + private final String TUSCANY_NAMESPACE = "http://tuscany.apache.org"; + + /** + * Constructor for the ProcessConf implementation + * @param theImplementation the BPEL implementation for which this is the ProcessConf + */ + public TuscanyProcessConfImpl( BPELImplementation theImplementation ) { + //System.out.println("New TuscanyProcessConfImpl..."); + this.implementation = theImplementation; + + processState = ProcessState.ACTIVE; + deployDate = new Date(); + + // Compile the process + compile( getBPELFile() ); + } // end TuscanyProcessConfImpl constructor + + /** + * Returns the URI for the directory containing the BPEL process + */ + public URI getBaseURI() { + //System.out.println("getBaseURI called"); + File theDir = getDirectory(); + return theDir.toURI(); + } + + /** + * Returns a String containing the (local) name of the file containing the BPEL process + */ + public String getBpelDocument() { + //System.out.println("getBPELDocument called"); + try { + String location = this.implementation.getProcessDefinition().getLocation(); + URI locationURI = new URI(null, location, null); + File processFile = new File(locationURI); + return getRelativePath( getDirectory(), processFile); + } catch (Exception e) { + if(__log.isWarnEnabled()) { + __log.warn("Unable to resolve relative path of BPEL process" + implementation.getProcessDefinition().getLocation(), e ); + } + return null; + } // end try + } // end getBpelDocument + + /** + * Returns an InputStream containing the Compiled BPEL Process (CBP) + */ + public InputStream getCBPInputStream() { + //System.out.println("getCBPInputStream called"); + // Find the CBP file - it has the same name as the BPEL process and lives in the same + // directory as the process file + String cbpFileName = null; + try { + String fileName = getRelativePath( getDirectory(), getBPELFile() ); + cbpFileName = fileName.substring(0, fileName.lastIndexOf(".")) + ".cbp"; + } catch (Exception e ) { + // IOException trying to fetch the BPEL file name + if(__log.isDebugEnabled()) { + __log.debug("Unable to calculate the file name for BPEL process: " + + implementation.getProcessDefinition().getName(), e); + return null; + } // end if + } // end try + File cbpFile = new File( getDirectory(), cbpFileName ); + if( cbpFile.exists() ) { + // Create an InputStream from the cbp file... + try { + return new FileInputStream( cbpFile ); + } catch ( Exception e ) { + if(__log.isDebugEnabled()) { + __log.debug("Unable to open the cbp file for BPEL process: " + + implementation.getProcessDefinition().getName(), e); + } + } // end try + } else { + // Cannot find the cbp file + if(__log.isWarnEnabled()){ + __log.warn("Cannot find the cbp file for process: " + + implementation.getProcessDefinition().getName()); + } + } // end if + // TODO - need better exception handling if we can't open the cbp file for any reason + return null; + } // end getCBPInputStream + + /** + * Return the WSDL Definition for a given PortType + * @param portTypeName - the QName of the PortType + */ + public Definition getDefinitionForPortType( QName portTypeName ) { + //System.out.println("getDefinitionForPortType called for portType: " + portTypeName ); + // Find the named PortType in the list of WSDL interfaces associated with this BPEL Process + Collection<WSDLInterface> theInterfaces = implementation.getProcessDefinition().getInterfaces(); + for( WSDLInterface wsdlInterface : theInterfaces ) { + if ( wsdlInterface.getPortType().getQName().equals( portTypeName ) ) { + // Extract and return the Definition associated with the WSDLDefinition... + return wsdlInterface.getWsdlDefinition().getDefinition(); + } // end if + } // end for + return null; + } // end getDefinitionforPortType + + /** + * Returns a WSDL Definition for a given Service QName + * + * 22/05/2008 - it is very unclear what this service QName is really meant to be. + * From the handling of the deploy.xml file by the current ODE code, it seems that the key link + * is from the Service QName to the PartnerLink name (done in the deploy.xml file). + * + * The curious part about this is that the QName for the service is ONLY defined in deploy.xml file + * and does not appear to relate to anything else, except for the QName of the PartnerLink + * + * The PartnerLink name is the same as the name of the SCA service (or reference) which in turn points + * at the PartnerLinkType which in turn points at an (WSDL) interface definition. + */ + public Definition getDefinitionForService(QName serviceQName ) { + //System.out.println("getDefinitionForService called for Service: " + serviceQName ); + if(__log.isDebugEnabled()){ + __log.debug("getDefinitionforService called for service: " + serviceQName ); + } + // TODO Auto-generated method stub + return null; + } + + /** + * Returns the date of deployment of the process + * - for SCA returns the date at which this object was created + */ + public Date getDeployDate() { + //System.out.println("getDeployDate called"); + return deployDate; + } + + /** + * Returns userid of deployer + * - always "SCA Tuscany" for Tuscany... + */ + public String getDeployer() { + //System.out.println("getDeployer called"); + return "SCA Tuscany"; + } // end getDeployer + + /** + * Returns a list of the files in the directory containing the BPEL Process + */ + public List<File> getFiles() { + //System.out.println("getFiles called"); + File theDir = getDirectory(); + List<File> theFiles = Arrays.asList( (File[]) theDir.listFiles() ); + // TODO recurse into subdirectories + return theFiles; + } // end getFiles + + /** + * Returns a Map containing all the "invoke endpoints" - for which read "SCA references" + * The map is keyed by partnerLink name and holds Endpoint objects + * + * TODO deal with service callbacks on bidirectional services + */ + public Map<String, Endpoint> getInvokeEndpoints() { + //System.out.println("getInvokeEndpoints called"); + if( invokeEndpoints == null ) { + invokeEndpoints = new HashMap<String, Endpoint>(); + // Get a collection of the references + List<Reference> theReferences = implementation.getReferences(); + // Create an endpoint for each reference, using the reference name as the "service" + // name, combined with http://tuscany.apache.org to make a QName + for( Reference reference : theReferences ) { + invokeEndpoints.put( reference.getName(), + new Endpoint( new QName( TUSCANY_NAMESPACE, reference.getName() ), "ReferencePort")); + } // end for + } // end if + return invokeEndpoints; + } // end getInvokeEndpoints + + /** + * Returns the name of the directory containing the BPEL files + */ + public String getPackage() { + //System.out.println("getPackage called"); + File theDir = getDirectory(); + return theDir.getName(); + } // end getPackage + + /** + * Return the BPEL Process ID - which is the Process QName appended "-versionnumber" + */ + public QName getProcessId() { + //System.out.println("getProcessId called"); + QName processType = getType(); + QName processID = new QName( processType.getNamespaceURI(), + processType.getLocalPart() + "-" + getVersion() ); + return processID; + } // end getProcessID + + /** + * TODO - What are properties? + */ + public Map<QName, Node> getProperties() { + //System.out.println("getProperties called"); + if ( properties == null ) { + properties = new HashMap<QName, Node>(); + } // end if + return properties; + } // end getProperties + + /** + * Returns a Map containing all the "provide endpoints" - for which read "SCA services" + * The map is keyed by partnerLink name and holds Endpoint objects + * + * TODO deal with reference callbacks on bidirectional references + */ + public Map<String, Endpoint> getProvideEndpoints() { + //System.out.println("getProvideEndpoints called"); + if( provideEndpoints == null ) { + provideEndpoints = new HashMap<String, Endpoint>(); + // Get a collection of the references + List<Service> theServices = implementation.getServices(); + // Create an endpoint for each reference, using the reference name as the "service" + // name, combined with http://tuscany.apache.org to make a QName + for( Service service : theServices ) { + provideEndpoints.put( service.getName(), + new Endpoint( new QName( TUSCANY_NAMESPACE, service.getName() ), "ServicePort")); + } // end for + } // end if + return provideEndpoints; + } // end getProvideEndpoints + + /** + * Return the process state + */ + public ProcessState getState() { + //System.out.println("getState called"); + return processState; + } + + /** + * Returns the QName of the BPEL process + */ + public QName getType() { + //System.out.println("getType called"); + return implementation.getProcess(); + } + + /** + * Gets the process Version number + * - current code does not have versions for BPEL processes and always returns "1" + */ + public long getVersion() { + //System.out.println("getVersion called"); + return 1; + } + + /** + * Returns true if the supplied event type is enabled for any of the scopes in the provided + * List. These events are "ODE Execution Events" and there is a definition of them on this + * page: http://ode.apache.org/user-guide.html#UserGuide-ProcessDeployment + * + * For the present Tuscany does not support manipulating the event enablement and always + * returns that the event is not enabled + * @param scopeNames - list of BPEL process Scope names + * @param type - the event type + */ + public boolean isEventEnabled(List<String> scopeNames, TYPE type) { + //System.out.println("isEventEnabled called with scopeNames: " + + // scopeNames + " and type: " + type ); + return false; + } // end isEventEnabled + + /** + * Returns whether the process is persisted in the store + * + * Returns false for SCA configuration + * - returning true causes problems in communicating with the BPEL process + */ + public boolean isTransient() { + //System.out.println("isTransient called"); + return false; + } // end isTransient + + /** + * Compiles a BPEL process file into a compiled form CBP file in the main directory + * (ie same directory as the BPEL process file) + * @param bpelFile - the BPEL process file + */ + private void compile( File bpelFile ) { + // Set up the compiler + BpelC compiler = BpelC.newBpelCompiler(); + // Provide a null set of initial properties for now + Map<QName, Node> processProps = new HashMap<QName, Node>(); + Map<String, Object> compileProps = new HashMap<String, Object>(); + compileProps.put( BpelC.PROCESS_CUSTOM_PROPERTIES, processProps ); + compiler.setCompileProperties( compileProps ); + compiler.setBaseDirectory( getDirectory() ); + + // Run the compiler and generate the CBP file into the given directory + try { + compiler.compile( bpelFile ); + } catch (IOException e) { + if(__log.isDebugEnabled()) { + __log.debug("Compile error in " + bpelFile, e); + } + // TODO - need better exception handling here + } // end try + } // end compile + + /** + * Gets the directory containing the BPEL process + * @return + */ + private File getDirectory() { + File theDir = getBPELFile().getParentFile(); + return theDir; + } // end getDirectory + + /** + * Gets the File containing the BPEL process definition + * @return - the File object containing the BPEL process + */ + private File getBPELFile() { + try { + String location = this.implementation.getProcessDefinition().getLocation(); + URI locationURI; + if (location.indexOf('%') != -1) { + locationURI = URI.create(location); + } else { + locationURI = new URI(null, location, null); + } + File theProcess = new File(locationURI); + return theProcess; + } catch( Exception e ) { + if(__log.isDebugEnabled()) { + __log.debug("Exception converting BPEL file URL to an URI: " + e ); + } + } // end try + return null; + } // end getBPELFile + + /** + * Gets the relative path of a file against a directory in its hierarchy + * @param base - the base directory + * @param path - the file + * @return + * @throws IOException + */ + private String getRelativePath(File base, File path) throws IOException { + String basePath = base.getCanonicalPath(); + String filePath = path.getCanonicalPath(); + if (!filePath.startsWith(basePath)) { + throw new IOException("Invalid relative path: base=" + base + " path=" + path); + } + String relative = filePath.substring(basePath.length()); + if (relative.startsWith(File.separator)) { + relative = relative.substring(1); + } + return relative; + } // end getRelativePath + + //----------------------------------------------------------------------------- + // other public APIs which ProcessConfImpl displays which are not in ProcessConf interface + + public List<String> getMexInterceptors(QName processId) { +// System.out.println("getMexInterceptors for processID: " + processId ); + return null; + } + + public void setTransient(boolean t) { +// System.out.println("setTransient called with boolean: " + t ); + } + + public List<Element> getExtensionElement(QName arg0) { + return Collections.emptyList(); + } + // end of other public APIs + //----------------------------------------------------------------------------- + +} // end class TuscanyProcessConfImpl diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java new file mode 100644 index 0000000000..c500f001dc --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import java.io.File; +import java.net.URI; + +import javax.transaction.TransactionManager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.databinding.xml.DOMDataBinding; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.implementation.bpel.ode.ODEDeployment; +import org.apache.tuscany.sca.implementation.bpel.ode.ODEInitializationException; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +/** + * BPEL Implementation provider + * + * @version $Rev$ $Date$ + */ +public class BPELImplementationProvider implements ImplementationProvider { + private final Log __log = LogFactory.getLog(getClass()); + + private RuntimeComponent component; + private BPELImplementation implementation; + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + /** + * Constructs a new BPEL Implementation. + */ + public BPELImplementationProvider(RuntimeComponent component, + BPELImplementation implementation, + EmbeddedODEServer odeServer, + TransactionManager txMgr) { + this.component = component; + this.implementation = implementation; + this.odeServer = odeServer; + this.txMgr = txMgr; + + // Configure the service and reference interfaces to use a DOM databinding + // as it's what ODE expects + for (Service service: implementation.getServices()) { + service.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } + for (Reference reference: implementation.getReferences()) { + reference.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } + + } + + public Invoker createInvoker(RuntimeComponentService service, Operation operation) { + BPELInvoker invoker = new BPELInvoker(component, service, operation, odeServer, txMgr); + return invoker; + } + + public boolean supportsOneWayInvocation() { + return false; + } + + public void start() { + if(__log.isInfoEnabled()) { + __log.info("Starting " + component.getName()); + } + + try { + if (!odeServer.isInitialized()) { + // start ode server + odeServer.init(); + } + + String location = this.implementation.getProcessDefinition().getLocation(); + URI deployURI = new URI(null, location, null); + + File deploymentDir = new File(deployURI).getParentFile(); + + if(__log.isInfoEnabled()) { + __log.info(">>> Deploying : " + deploymentDir.toString()); + } + + // deploy the process + if (odeServer.isInitialized()) { + try { + //txMgr.begin(); + odeServer.registerTuscanyRuntimeComponent(implementation.getProcess(), component); + // Replaced by Mike Edwards 23/05/2008 + //odeServer.deploy(new ODEDeployment(deploymentDir)); + odeServer.deploy(new ODEDeployment(deploymentDir), implementation ); + //txMgr.commit(); + } catch (Exception e) { + e.printStackTrace(); + //txMgr.rollback(); + } + } + + } catch (ODEInitializationException inite) { + throw new RuntimeException("BPEL Component Type Implementation : Error initializing embedded ODE server " + inite.getMessage(), inite); + } catch(Exception e) { + throw new RuntimeException("BPEl Component Type Implementation initialization failure : " + e.getMessage(), e); + } + } + + public void stop() { + if(__log.isInfoEnabled()) { + __log.info("Stopping " + component.getName()); + } + + if (odeServer.isInitialized()) { + // start ode server + odeServer.stop(); + } + + txMgr = null; + + if(__log.isInfoEnabled()) { + __log.info("Stopped !!!"); + } + } + +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java new file mode 100644 index 0000000000..ed327e237b --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import javax.transaction.TransactionManager; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.implementation.bpel.ode.GeronimoTxFactory; +import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.provider.ImplementationProviderFactory; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.oasisopen.sca.annotation.Destroy; + +/** + * BPEL Implementation provider factory + * + * We use the provider factory to instantiate a ODE server that is going to be injected in all BPEL components + * + * @version $Rev$ $Date$ + */ +public class BPELImplementationProviderFactory implements ImplementationProviderFactory<BPELImplementation> { + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + /** + * Default constructor receiving an extension point + * @param extensionPoints + */ + public BPELImplementationProviderFactory(ExtensionPointRegistry extensionPoints) { + GeronimoTxFactory txFactory = new GeronimoTxFactory(); + txMgr = txFactory.getTransactionManager(); + this.odeServer = new EmbeddedODEServer(txMgr); + } + + /** + * Creates a new BPEL Implementation and inject the EmbeddedODEServer + */ + public ImplementationProvider createImplementationProvider(RuntimeComponent component, BPELImplementation implementation) { + return new BPELImplementationProvider(component, implementation, odeServer, txMgr); + } + + public Class<BPELImplementation> getModelType() { + return BPELImplementation.class; + } + + @Destroy + public void destroy() { + txMgr = null; + } +} diff --git a/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java new file mode 100644 index 0000000000..e07fdbc302 --- /dev/null +++ b/branches/sca-java-2.0-M3/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.Future; + +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; +import javax.wsdl.Part; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.MessageExchange.Status; +import org.apache.ode.utils.DOMUtils; +import org.apache.ode.utils.GUID; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.interfacedef.Interface; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Implements a target invoker for BPEL component implementations. + * + * The target invoker is responsible for dispatching invocations to the particular + * component implementation logic. In this example we are simply delegating the + * CRUD operation invocations to the corresponding methods on our fake + * resource manager. + * + * @version $Rev$ $Date$ + */ +public class BPELInvoker implements Invoker { + protected final Log __log = LogFactory.getLog(getClass()); + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + private RuntimeComponentService service; + private Operation operation; + private QName bpelServiceName; + private String bpelOperationName; + private Part bpelOperationInputPart; + private Part bpelOperationOutputPart; + + public BPELInvoker(RuntimeComponent component, RuntimeComponentService service, Operation operation, EmbeddedODEServer odeServer, TransactionManager txMgr) { + this.service = service; + this.operation = operation; + this.bpelOperationName = operation.getName(); + this.odeServer = odeServer; + this.txMgr = txMgr; + + initializeInvocation(); + } + + + private void initializeInvocation() { + + __log.debug("Initializing BPELInvoker"); + + Interface interfaze = operation.getInterface(); + if(interfaze instanceof WSDLInterface){ + WSDLInterface wsdlInterface = null; + wsdlInterface = (WSDLInterface) interfaze; + + // The following commented out code is bogus and is replaced by what follows - Mike Edwards + // Service serviceDefinition = (Service) wsdlInterface.getWsdlDefinition().getDefinition().getAllServices().values().iterator().next(); + // bpelServiceName = serviceDefinition.getQName(); + // + // Fetch the service name from the service object + bpelServiceName = new QName( "http://tuscany.apache.org", service.getName() ); + + bpelOperationInputPart = (Part) wsdlInterface.getPortType().getOperation(bpelOperationName,null,null).getInput().getMessage().getParts().values().iterator().next(); + bpelOperationOutputPart = (Part) wsdlInterface.getPortType().getOperation(bpelOperationName,null,null).getOutput().getMessage().getParts().values().iterator().next(); + } + } + + public Message invoke(Message msg) { + try { + Object[] args = msg.getBody(); + Object resp = doTheWork(args); + msg.setBody(resp); + } catch (InvocationTargetException e) { + msg.setFaultBody(e.getCause()); + } + return msg; + } + + public Object doTheWork(Object[] args) throws InvocationTargetException { + Element response = null; + + if(! (operation.getInterface() instanceof WSDLInterface)) { + throw new InvocationTargetException(null,"Unsupported service contract"); + } + + org.apache.ode.bpel.iapi.MyRoleMessageExchange mex = null; + Future<?> onhold = null; + + //Process the BPEL process invocation + try { + txMgr.begin(); + mex = odeServer.getBpelServer().getEngine().createMessageExchange(new GUID().toString(), + bpelServiceName, + bpelOperationName); + + onhold = mex.invoke(createInvocationMessage(mex, args)); + + txMgr.commit(); + } catch (Exception e) { + try { + txMgr.rollback(); + } catch (SystemException se) { + + } + throw new InvocationTargetException(e, "Error invoking BPEL process : " + e.getMessage()); + } + + + // Waiting until the reply is ready in case the engine needs to continue in a different thread + if (onhold != null) { + try { + onhold.get(); + } catch (Exception e) { + throw new InvocationTargetException(e,"Error invoking BPEL process : " + e.getMessage()); + } + } + + //Process the BPEL invocation response + try { + txMgr.begin(); + // Reloading the mex in the current transaction, otherwise we can't + // be sure we have the "freshest" one. + mex = (MyRoleMessageExchange)odeServer.getBpelServer().getEngine().getMessageExchange(mex.getMessageExchangeId()); + + if (__log.isDebugEnabled()) { + Status status = mex.getStatus(); + Element invocationResponse = mex.getResponse().getMessage(); + __log.debug(">>>Invocation status:" + status.name()); + __log.debug(">>>Response:\n" + DOMUtils.domToString(invocationResponse)); + __log.debug(">>>Response:\n" + DOMUtils.domToString(invocationResponse)); + } + + //process the method invocation result + response = processResponse(mex.getResponse().getMessage()); + + txMgr.commit(); + // end of transaction two + } catch (Exception e) { + try { + txMgr.rollback(); + } catch (SystemException se) { + + } + throw new InvocationTargetException(e, "Error retrieving BPEL process invocation status : " + e.getMessage()); + } + + + return response; + } + + /** + * Create BPEL Invocation message + * + * BPEL invocation message like : + * <message> + * <TestPart> + * <hello xmlns="http://tuscany.apache.org/implementation/bpel/example/helloworld.wsdl">Hello</hello> + * </TestPart> + * </message> + * @param args + * @return + */ + private org.apache.ode.bpel.iapi.Message createInvocationMessage(org.apache.ode.bpel.iapi.MyRoleMessageExchange mex, Object[] args) { + Document dom = DOMUtils.newDocument(); + + Element contentMessage = dom.createElement("message"); + Element contentPart = dom.createElement(bpelOperationInputPart.getName()); + Element payload = null; + + //TUSCANY-2321 - Properly handling Document or Element types + if(args[0] instanceof Document) { + payload = (Element) ((Document) args[0]).getFirstChild(); + } else { + payload = (Element) args[0]; + } + + contentPart.appendChild(dom.importNode(payload, true)); + contentMessage.appendChild(contentPart); + dom.appendChild(contentMessage); + + if (__log.isDebugEnabled()) { + __log.debug("Creating invocation message:"); + __log.debug(">> args.....: " + DOMUtils.domToString(payload)); + __log.debug(">> message..:" + DOMUtils.domToString(dom.getDocumentElement())); + } + + org.apache.ode.bpel.iapi.Message request = mex.createMessage(new QName("", "")); + request.setMessage(dom.getDocumentElement()); + + return request; + } + + /** + * Process BPEL response + * + * <message> + * <TestPart> + * <hello xmlns="http://tuscany.apache.org/implementation/bpel/example/helloworld.wsdl">World</hello> + * </TestPart> + * </message> + * + * @param response + * @return + */ + private Element processResponse(Element response) { + return (Element) DOMUtils.findChildByName(response, new QName("",bpelOperationOutputPart.getName())); + } +} |