From e5b7380c874745c989d1816b8f552504f038e1bc Mon Sep 17 00:00:00 2001 From: lresende Date: Thu, 26 Sep 2013 20:33:20 +0000 Subject: 2.0 branch for possible maintenance release git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1526672 13f79535-47bb-0310-9956-ffa450edef68 --- .../ode/provider/BPELImplementationProvider.java | 187 ++++++++++++++ .../BPELImplementationProviderFactory.java | 69 +++++ .../bpel/ode/provider/BPELInvoker.java | 286 +++++++++++++++++++++ .../bpel/ode/provider/ODEInvocationException.java | 33 +++ 4 files changed, 575 insertions(+) create mode 100644 sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java create mode 100644 sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java create mode 100644 sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java create mode 100644 sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/ODEInvocationException.java (limited to 'sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider') diff --git a/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java b/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java new file mode 100644 index 0000000000..b3824bffe1 --- /dev/null +++ b/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProvider.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import java.io.File; +import java.net.URI; + +import javax.transaction.TransactionManager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.dao.jpa.ProcessDAOImpl; +import org.apache.openjpa.persistence.PersistenceProviderImpl; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.assembly.Reference; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.databinding.xml.DOMDataBinding; +import org.apache.tuscany.sca.extensibility.ClassLoaderContext; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.implementation.bpel.ode.ODEDeployment; +import org.apache.tuscany.sca.implementation.bpel.ode.ODEInitializationException; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeEndpointReference; + +/** + * BPEL Implementation provider + * + * @version $Rev$ $Date$ + */ +public class BPELImplementationProvider implements ImplementationProvider { + private final Log __log = LogFactory.getLog(getClass()); + + private RuntimeComponent component; + private BPELImplementation implementation; + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + private ODEDeployment deployment; + + /** + * Constructs a new BPEL Implementation. + */ + public BPELImplementationProvider(RuntimeComponent component, + BPELImplementation implementation, + EmbeddedODEServer odeServer, + TransactionManager txMgr) { + this.component = component; + this.implementation = implementation; + this.odeServer = odeServer; + this.txMgr = txMgr; + + // Configure the service and reference interfaces to use a DOM databinding + // as it's what ODE expects + for(Service service: implementation.getServices() ){ + service.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } // end for + + for(Reference reference: implementation.getReferences() ) { + reference.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } // end for + + for (Service service: component.getServices()) { + //TODO - MJE, 06/06/2009 - we can eventually remove the reset of the service interface + // contract and leave it to the Endpoints only + service.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + for( Endpoint endpoint : service.getEndpoints() ) { + RuntimeEndpoint ep = (RuntimeEndpoint) endpoint; + if (ep.getComponentTypeServiceInterfaceContract() != null) { + ep.getComponentTypeServiceInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } + } // end for + } // end for + + for (Reference reference : component.getReferences()) { + reference.getInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + for (EndpointReference endpointReference : reference.getEndpointReferences()) { + RuntimeEndpointReference epr = (RuntimeEndpointReference)endpointReference; + if (epr.getComponentTypeReferenceInterfaceContract() != null) { + epr.getComponentTypeReferenceInterfaceContract().getInterface().resetDataBinding(DOMDataBinding.NAME); + } + } // end for */ + } // end for + + } + + public Invoker createInvoker(RuntimeComponentService service, Operation operation) { + BPELInvoker invoker = new BPELInvoker(component, service, operation, odeServer, txMgr); + return invoker; + } + + public boolean supportsOneWayInvocation() { + return false; + } + + public void start() { + if(__log.isInfoEnabled()) { + __log.info("Starting " + component.getName()); + } // end if + + // Switch TCCL - use a classloader that can find classes related to the non-OSGi services + // referenced from the implementation-bpel module which include the Persistence provider (OpenJPA) and + // the JPA DAO implementation contained in the ODE project + ClassLoader tccl = ClassLoaderContext.setContextClassLoader(EmbeddedODEServer.class.getClassLoader(), + PersistenceProviderImpl.class.getClassLoader(), + ProcessDAOImpl.class.getClassLoader() ); + + try { + if (!odeServer.isInitialized()) { + // start ode server + odeServer.init(); + } + + String location = this.implementation.getProcessDefinition().getLocation(); + URI deployURI = new URI(null, location, null); + + File deploymentDir = new File(deployURI).getParentFile(); + + if(__log.isInfoEnabled()) { + __log.info(">>> Deploying : " + deploymentDir.toString()); + } + + // Deploy the BPEL process + if (odeServer.isInitialized()) { + deployment = new ODEDeployment( deploymentDir ); + try { + odeServer.registerTuscanyRuntimeComponent(implementation.getProcess(), component); + + odeServer.deploy(deployment, implementation, component ); + } catch (Exception e) { + e.printStackTrace(); + } + } + + } catch (ODEInitializationException inite) { + throw new RuntimeException("BPEL Component Type Implementation : Error initializing embedded ODE server " + inite.getMessage(), inite); + } catch(Exception e) { + throw new RuntimeException("BPEL Component Type Implementation initialization failure : " + e.getMessage(), e); + } finally { + // Restore the TCCL if we changed it + if( tccl != null ) Thread.currentThread().setContextClassLoader(tccl); + } // end try + } // end method start() + + public void stop() { + if(__log.isInfoEnabled()) { + __log.info("Stopping " + component.getName()); + } + + odeServer.undeploy(deployment); + + if (odeServer.isInitialized()) { + // stop ode server + odeServer.stop(); + } + + txMgr = null; + + if(__log.isInfoEnabled()) { + __log.info("Stopped !!!"); + } + } // end method stop() + +} diff --git a/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java b/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java new file mode 100644 index 0000000000..ed327e237b --- /dev/null +++ b/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELImplementationProviderFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import javax.transaction.TransactionManager; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.implementation.bpel.BPELImplementation; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.implementation.bpel.ode.GeronimoTxFactory; +import org.apache.tuscany.sca.provider.ImplementationProvider; +import org.apache.tuscany.sca.provider.ImplementationProviderFactory; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.oasisopen.sca.annotation.Destroy; + +/** + * BPEL Implementation provider factory + * + * We use the provider factory to instantiate a ODE server that is going to be injected in all BPEL components + * + * @version $Rev$ $Date$ + */ +public class BPELImplementationProviderFactory implements ImplementationProviderFactory { + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + /** + * Default constructor receiving an extension point + * @param extensionPoints + */ + public BPELImplementationProviderFactory(ExtensionPointRegistry extensionPoints) { + GeronimoTxFactory txFactory = new GeronimoTxFactory(); + txMgr = txFactory.getTransactionManager(); + this.odeServer = new EmbeddedODEServer(txMgr); + } + + /** + * Creates a new BPEL Implementation and inject the EmbeddedODEServer + */ + public ImplementationProvider createImplementationProvider(RuntimeComponent component, BPELImplementation implementation) { + return new BPELImplementationProvider(component, implementation, odeServer, txMgr); + } + + public Class getModelType() { + return BPELImplementation.class; + } + + @Destroy + public void destroy() { + txMgr = null; + } +} diff --git a/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java b/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java new file mode 100644 index 0000000000..4a99fe705c --- /dev/null +++ b/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/BPELInvoker.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; +import javax.wsdl.Part; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.MessageExchange.Status; +import org.apache.ode.utils.DOMUtils; +import org.apache.ode.utils.GUID; +import org.apache.tuscany.sca.assembly.Base; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.implementation.bpel.ode.EmbeddedODEServer; +import org.apache.tuscany.sca.interfacedef.Interface; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Implements a target invoker for BPEL component implementations. + * + * The target invoker is responsible for dispatching invocations to the particular + * component implementation logic. In this example we are simply delegating the + * CRUD operation invocations to the corresponding methods on our fake + * resource manager. + * + * @version $Rev$ $Date$ + */ +public class BPELInvoker implements Invoker { + private final static long TIME_OUT = 10000L; + + protected final Log __log = LogFactory.getLog(getClass()); + + private EmbeddedODEServer odeServer; + private TransactionManager txMgr; + + private RuntimeComponentService service; + private Operation operation; + private QName bpelServiceName; + private String bpelOperationName; + private Part bpelOperationInputPart; + private Part bpelOperationOutputPart; + private RuntimeComponent component; + // Marks if this service has a callback interface + private Boolean isCallback = false; + private EndpointReference callbackEPR; + + public BPELInvoker(RuntimeComponent component, RuntimeComponentService service, Operation operation, + EmbeddedODEServer odeServer, TransactionManager txMgr) { + this.service = service; + this.component = component; + this.operation = operation; + this.bpelOperationName = operation.getName(); + this.odeServer = odeServer; + this.txMgr = txMgr; + this.isCallback = serviceHasCallback( service ); + + initializeInvocation(); + } // end method BPELInvoker + + private boolean serviceHasCallback( RuntimeComponentService service ) { + if(service.getInterfaceContract().getCallbackInterface() != null) return true; + return false; + } // end method serviceHasCallback + + private void initializeInvocation() { + + __log.debug("Initializing BPELInvoker"); + + Interface interfaze = operation.getInterface(); + if(interfaze instanceof WSDLInterface){ + WSDLInterface wsdlInterface = null; + wsdlInterface = (WSDLInterface) interfaze; + + // Fetch the service name from the service object - including the componentURI guarantees a unique service name + String componentURI = component.getURI(); + bpelServiceName = new QName( Base.SCA11_TUSCANY_NS, componentURI + service.getName() ); + + bpelOperationInputPart = (Part) wsdlInterface.getPortType().getOperation(bpelOperationName,null,null).getInput().getMessage().getParts().values().iterator().next(); + bpelOperationOutputPart = (Part) wsdlInterface.getPortType().getOperation(bpelOperationName,null,null).getOutput().getMessage().getParts().values().iterator().next(); + } + } // end method initializeInvocation + + public Message invoke(Message msg) { + try { + if( isCallback ) { + // Extract the callback endpoint metadata + callbackEPR = msg.getFrom(); + } // end if + Object[] args = msg.getBody(); + Object resp = doTheWork(args); + msg.setBody(resp); + } catch (InvocationTargetException e) { + msg.setFaultBody(e.getCause()); + } + return msg; + } + + public Object doTheWork(Object[] args) throws InvocationTargetException { + Element response = null; + + if(! (operation.getInterface() instanceof WSDLInterface)) { + throw new InvocationTargetException(null,"Unsupported service contract"); + } + + org.apache.ode.bpel.iapi.MyRoleMessageExchange mex = null; + Future onhold = null; + + //Process the BPEL process invocation + Long processID = 0L; + try { + txMgr.begin(); + mex = odeServer.getBpelServer().getEngine().createMessageExchange(new GUID().toString(), + bpelServiceName, + bpelOperationName); + //TODO - this will not be true for OneWay operations - need to handle those + mex.setProperty("isTwoWay", "true"); + onhold = mex.invoke(createInvocationMessage(mex, args)); + + txMgr.commit(); + // Deal with callback cases - store the callback metadata by process instance ID + if( isCallback ) { + processID = odeServer.getProcessIDFromMex(mex.getMessageExchangeId()); + // Store the callback metadata for this invocation + odeServer.saveCallbackMetadata( processID, service.getName(), callbackEPR ); + } // end if + } catch (Exception e) { + try { + txMgr.rollback(); + } catch (SystemException se) { + + } + throw new InvocationTargetException(e, "Error invoking BPEL process : " + e.getMessage()); + } // end try + + // Waiting until the reply is ready in case the engine needs to continue in a different thread + if (onhold != null) { + try { + //add timeout to avoid blocking when there is a exception/failure + onhold.get(TIME_OUT, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new InvocationTargetException(e,"Error invoking BPEL process : " + e.getMessage()); + } // end try + } // end if + + //Process the BPEL invocation response + try { + txMgr.begin(); + // Reloading the mex in the current transaction, otherwise we can't + // be sure we have the "freshest" one. + mex = (MyRoleMessageExchange)odeServer.getBpelServer().getEngine().getMessageExchange(mex.getMessageExchangeId()); + + Status status = mex.getStatus(); + + switch (status) { + case FAULT: + if (__log.isDebugEnabled()) + __log.debug("Fault response message: " + mex.getFault()); + throw new ODEInvocationException("FAULT received from BPEL process : " + mex.getFault() + + " " + + mex.getFaultExplanation()); + case ASYNC: + case RESPONSE: + //process the method invocation result + response = processResponse(mex.getResponse().getMessage()); + if (__log.isDebugEnabled()) + __log.debug("Response message " + response); + break; + case FAILURE: + if (__log.isDebugEnabled()) + __log.debug("Failure response message: " + mex.getFault()); + break; + default: + throw new ODEInvocationException("FAILURE received from BPEL process : " + mex.getStatus() + " - " + mex.getFault()); + } // end switch + + txMgr.commit(); + // end of transaction two + } catch (Exception e) { + try { + txMgr.rollback(); + } catch (SystemException se) { + + } + throw new InvocationTargetException(e, "Error retrieving BPEL process invocation status : " + e.getMessage()); + } // end try + + // Cleanup the ODE MessageExchange object + //mex.release(); + + return response; + } + + /** + * Create BPEL Invocation message + * + * BPEL invocation message like : + * + * + * Hello + * + * + * @param args + * @return + */ + private org.apache.ode.bpel.iapi.Message createInvocationMessage(org.apache.ode.bpel.iapi.MyRoleMessageExchange mex, Object[] args) { + Document dom = DOMUtils.newDocument(); + + Element contentMessage = dom.createElement("message"); + Element contentPart = dom.createElement(bpelOperationInputPart.getName()); + Element payload = null; + + // TODO handle WSDL input messages with multiple Parts... + //TUSCANY-2321 - Properly handling Document or Element types + if(args[0] instanceof Document) { + payload = (Element) ((Document) args[0]).getFirstChild(); + } else { + payload = (Element) args[0]; + } + + contentPart.appendChild(dom.importNode(payload, true)); + contentMessage.appendChild(contentPart); + dom.appendChild(contentMessage); + + if (__log.isDebugEnabled()) { + __log.debug("Creating invocation message:"); + __log.debug(">> args.....: " + DOMUtils.domToString(payload)); + __log.debug(">> message..:" + DOMUtils.domToString(dom.getDocumentElement())); + } + + org.apache.ode.bpel.iapi.Message request = mex.createMessage(new QName("", "")); + request.setMessage(dom.getDocumentElement()); + + return request; + } + + /** + * Process BPEL response + * + * + * + * World + * + * + * + * @param response + * @return + */ + private Element processResponse(Element response) { + return (Element) DOMUtils.findChildByName(response, new QName("",bpelOperationOutputPart.getName())).getFirstChild(); + + // MJE, 12/06/2009 - changed to return the message without the PART wrapper element, since this element is not + // transmitted in the SOAP messages on the wire + } // end method processResponse +} // end class BPELInvoker diff --git a/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/ODEInvocationException.java b/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/ODEInvocationException.java new file mode 100644 index 0000000000..7b6f9ceafa --- /dev/null +++ b/sca-java-2.x/branches/2.0/modules/implementation-bpel-runtime/src/main/java/org/apache/tuscany/sca/implementation/bpel/ode/provider/ODEInvocationException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.implementation.bpel.ode.provider; + +public class ODEInvocationException extends Exception { + + /** + * Thrown when the result of the invocation of a BPEL Process operation + * returns a Fault or Failure code + */ + private static final long serialVersionUID = 5096941965798566018L; + + public ODEInvocationException(String message) { + super(message); + } + +} -- cgit v1.2.3