diff options
Diffstat (limited to 'java/sca-contrib/demos/workpool-distributed')
42 files changed, 3993 insertions, 0 deletions
diff --git a/java/sca-contrib/demos/workpool-distributed/LICENSE b/java/sca-contrib/demos/workpool-distributed/LICENSE new file mode 100644 index 0000000000..8aa906c321 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/LICENSE @@ -0,0 +1,205 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + diff --git a/java/sca-contrib/demos/workpool-distributed/NOTICE b/java/sca-contrib/demos/workpool-distributed/NOTICE new file mode 100644 index 0000000000..25bb89c9b2 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/NOTICE @@ -0,0 +1,6 @@ +${pom.name} +Copyright (c) 2005 - 2009 The Apache Software Foundation + +This product includes software developed by +The Apache Software Foundation (http://www.apache.org/). + diff --git a/java/sca-contrib/demos/workpool-distributed/build.xml b/java/sca-contrib/demos/workpool-distributed/build.xml new file mode 100644 index 0000000000..9e10e8ec91 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/build.xml @@ -0,0 +1,446 @@ +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<project name="workpool" default="runDomainNode"> + <property name="node.class" value="node.WorkpoolNode" /> + <property name="domain.class" value="node.DomainNode" /> + <property name="test.jar" value="sample-workpool-distributed-dynamic.jar" /> + + <target name="init"> + <mkdir dir="target/classes"/> + </target> + + <target name="compile" depends="init"> + <javac srcdir="src/main/java" + destdir="target/classes" + debug="on" + source="1.5" + target="1.5"> + <classpath> + <fileset dir="./rule-engine" id="drools.jar"> + <include name="*.jar"/> + </fileset> + <pathelement location="../../lib/tuscany-sca-manifest.jar"/> + </classpath> + </javac> + <copy todir="target/classes"> + <fileset dir="src/main/resources"/> + </copy> + <jar destfile="target/${test.jar}" basedir="target/classes"> + <manifest> + <attribute name="Main-Class" value="${node.class}" /> + </manifest> + </jar> + </target> + + <target name="runDomainNode"> + <java classname="${domain.class}" + fork="true"> + <classpath> + <fileset dir="./rule-engine" id="drools.jar"> + <include name="*.jar"/> + </fileset> + + + <pathelement path="src/main/resources"/> + <pathelement path="target/classes"/> + <pathelement path="target/${test.jar}"/> + <pathelement location="../../lib/tuscany-sca-manifest.jar"/> + </classpath> + </java> + </target> + + <target name="runNodeA"> + <java classname="${node.class}" + fork="true"> + <classpath> +<fileset dir="./rule-engine" id="drools.jar"> + <include name="*.jar"/> + </fileset> + + + + <pathelement path="src/main/resources"/> + <pathelement path="target/classes"/> + + <pathelement path="target/${test.jar}"/> + <pathelement location="../../lib/tuscany-sca-manifest.jar"/> + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeA"/> + <!-- iterations --> + <arg value="10"/> + <!-- jobs --> + <arg value="1024"/> + <!-- workers --> + <arg value="4" /> + </java> + </target> + + <target name="runNodeB"> + <java classname="${node.class}" + fork="true"> + <classpath> +<fileset dir="./rule-engine" id="drools.jar"> + <include name="*.jar"/> + </fileset> + + + <pathelement path="src/main/resources"/> + <pathelement path="target/classes"/> + <pathelement path="target/${test.jar}"/> + <pathelement location="../../lib/tuscany-sca-manifest.jar"/> + + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeB"/> + <!-- iterations --> + <arg value="1000000"/> + <!-- jobs --> + <arg value="1024"/> + <!-- workers --> + <arg value="4" /> + </java> + </target> + + <target name="runNodeC"> + <java classname="${node.class}" + fork="true"> + <classpath> + <fileset dir="./rule-engine" id="drools.jar"> + <include name="*.jar"/> + </fileset> + + <pathelement location="${drools.jar}"/> + <pathelement path="src/main/resources"/> + <pathelement path="target/classes"/> + <pathelement path="target/${test.jar}"/> + <pathelement location="../../lib/tuscany-sca-manifest.jar"/> + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeC"/> + <!-- iterations --> + <arg value="1000000"/> + <!-- jobs --> + <arg value="1024"/> + <!-- workers --> + <arg value="4" /> + </java> + </target> + <target name="runNodeD"> + <java classname="${node.class}" + fork="true"> + <classpath> + <fileset dir="./rule-engine" id="drools.jar"> + <include name="*.jar"/> + </fileset> + + <pathelement location="${drools.jar}"/> + <pathelement path="src/main/resources"/> + <pathelement path="target/classes"/> + <pathelement path="target/${test.jar}"/> + <pathelement location="../../lib/tuscany-sca-manifest.jar"/> + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeD"/> + <!-- iterations --> + <arg value="1000000"/> + <!-- jobs --> + <arg value="1024"/> + <!-- workers --> + <arg value="4" /> + </java> + </target> + <target name="runNodeE"> + <java classname="${node.class}" + fork="true"> + <classpath> + <fileset dir="./rule-engine" id="drools.jar"> + <include name="*.jar"/> + </fileset> + + <pathelement location="${drools.jar}"/> + <pathelement path="src/main/resources"/> + <pathelement path="target/classes"/> + <pathelement path="target/${test.jar}"/> + <pathelement location="../../lib/tuscany-sca-manifest.jar"/> + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeE"/> + <!-- iterations --> + <arg value="1000000"/> + <!-- jobs --> + <arg value="1024"/> + <!-- workers --> + <arg value="4" /> + </java> + </target> + + <target name="clean"> + <delete quiet="true" includeemptydirs="true"> + <fileset dir="target"/> + </delete> + </target> + + <!-- If you want to test without building the distribution jars set the --> + <!-- localtion value of the m2.repo on your machine and use the targets below --> + <property name="m2.repo" value="/var/cache/maven/repo" /> + <path id="repo.classpath"> + <pathelement location="target/classes"/> + <pathelement location="/var/cache/maven/repo/antlr/antlr/2.7.6/antlr-2.7.6.jar"/> + <pathelement location="/var/cache/maven/repo/antlr/stringtemplate/2.3b6/stringtemplate-2.3b6.jar"/> + <pathelement location="${m2.repo}\xerces\xercesImpl\2.8.1\xercesImpl-2.8.1.jar"/> + <pathelement location="${m2.repo}\org\apache\geronimo\specs\geronimo-jms_1.1_spec\1.1\geronimo-jms_1.1_spec-1.1.jar"/> + <pathelement location="${m2.repo}\wsdl4j\wsdl4j\1.6.2\wsdl4j-1.6.2.jar"/> + <pathelement location="${m2.repo}\commons-codec\commons-codec\1.3\commons-codec-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-sca\1.0-incubating\tuscany-binding-sca-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\xml-apis\xml-apis\1.3.03\xml-apis-1.3.03.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\security\wss4j\1.5.3\wss4j-1.5.3.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-assembly-xml\1.0-incubating\tuscany-assembly-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\rampart\rampart-trust\1.3\rampart-trust-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\commons\axiom\axiom-api\1.2.5\axiom-api-1.2.5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-assembly\1.0-incubating\tuscany-assembly-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface-wsdl\1.0-incubating\tuscany-interface-wsdl-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-kernel\1.3\axis2-kernel-1.3.jar"/> + <pathelement location="${m2.repo}\cglib\cglib-nodep\2.1_3\cglib-nodep-2.1_3.jar"/> + <pathelement location="${m2.repo}\backport-util-concurrent\backport-util-concurrent\2.2\backport-util-concurrent-2.2.jar"/> + <pathelement location="${m2.repo}\org\apache\httpcomponents\httpcore\4.0-alpha5\httpcore-4.0-alpha5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface\1.0-incubating\tuscany-interface-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-policy-xml\1.0-incubating\tuscany-policy-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\commons-logging\commons-logging\1.1\commons-logging-1.1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-domain-api\1.0-incubating\tuscany-domain-api-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-extension-helper\1.0-incubating\tuscany-extension-helper-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\geronimo\specs\geronimo-activation_1.1_spec\1.0-M1\geronimo-activation_1.1_spec-1.0-M1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-jsonrpc\1.0-incubating\tuscany-binding-jsonrpc-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\logkit\logkit\1.0.1\logkit-1.0.1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface-java-xml\1.0-incubating\tuscany-interface-java-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\bouncycastle\bcprov-jdk15\136\bcprov-jdk15-136.jar"/> + <pathelement location="${m2.repo}\commons-fileupload\commons-fileupload\1.1.1\commons-fileupload-1.1.1.jar"/> + <pathelement location="${m2.repo}\annogen\annogen\0.1.0\annogen-0.1.0.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-node\1.0-incubating\tuscany-node-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-adb\1.3\axis2-adb-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-mtompolicy\1.3\axis2-mtompolicy-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-definitions\1.0-incubating\tuscany-definitions-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-host-embedded\1.0-incubating\tuscany-host-embedded-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\commons-httpclient\commons-httpclient\3.0.1\commons-httpclient-3.0.1.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-java2wsdl\1.3\axis2-java2wsdl-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\httpcomponents\httpcore-nio\4.0-alpha5\httpcore-nio-4.0-alpha5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-ws-xml\1.0-incubating\tuscany-binding-ws-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-contribution-java\1.0-incubating\tuscany-contribution-java-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-domain-impl\1.0-incubating\tuscany-domain-impl-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\xalan\xalan\2.7.0\xalan-2.7.0.jar"/> + <pathelement location="${m2.repo}\org\apache\rampart\rampart-core\1.3\rampart-core-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\geronimo\specs\geronimo-commonj_1.1_spec\1.0\geronimo-commonj_1.1_spec-1.0.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-policy-security\1.0-incubating\tuscany-policy-security-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-core-spi\1.0-incubating\tuscany-core-spi-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-sca-axis2\1.0-incubating\tuscany-binding-sca-axis2-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\ant\ant-launcher\1.7.0\ant-launcher-1.7.0.jar"/> + <pathelement location="${m2.repo}\jaxen\jaxen\1.1-beta-9\jaxen-1.1-beta-9.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-sca-api\1.0-incubating\tuscany-sca-api-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\rampart\rampart-policy\1.3\rampart-policy-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\httpcomponents\httpcore-niossl\4.0-alpha5\httpcore-niossl-4.0-alpha5.jar"/> + <pathelement location="${m2.repo}\com\metaparadigm\json-rpc\1.0\json-rpc-1.0.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\commons\axiom\axiom-impl\1.2.5\axiom-impl-1.2.5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-ws\1.0-incubating\tuscany-binding-ws-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-contribution-namespace\1.0-incubating\tuscany-contribution-namespace-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-policy\1.0-incubating\tuscany-policy-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\neethi\neethi\2.0.2\neethi-2.0.2.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface-wsdl-xml\1.0-incubating\tuscany-interface-wsdl-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\javax\activation\activation\1.1\activation-1.1.jar"/> + <pathelement location="${m2.repo}\org\apache\ant\ant\1.7.0\ant-1.7.0.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-core\1.0-incubating\tuscany-core-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-domain\1.0-incubating\tuscany-domain-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\geronimo\specs\geronimo-javamail_1.4_spec\1.0-M1\geronimo-javamail_1.4_spec-1.0-M1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-contribution\1.0-incubating\tuscany-contribution-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\avalon-framework\avalon-framework\4.1.3\avalon-framework-4.1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-node-impl\1.0-incubating\tuscany-node-impl-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\santuario\xmlsec\1.4.0\xmlsec-1.4.0.jar"/> + <pathelement location="${m2.repo}\commons-io\commons-io\1.2\commons-io-1.2.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-databinding\1.0-incubating\tuscany-databinding-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\commons\axiom\axiom-dom\1.2.5\axiom-dom-1.2.5.jar"/> + <pathelement location="${m2.repo}\log4j\log4j\1.2.12\log4j-1.2.12.jar"/> + <pathelement location="${m2.repo}\javax\mail\mail\1.4\mail-1.4.jar"/> + <pathelement location="${m2.repo}\org\codehaus\woodstox\wstx-asl\3.2.1\wstx-asl-3.2.1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-ws-axis2\1.0-incubating\tuscany-binding-ws-axis2-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-host-http\1.0-incubating\tuscany-host-http-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-databinding-axiom\1.0-incubating\tuscany-databinding-axiom-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-contribution-impl\1.0-incubating\tuscany-contribution-impl-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface-java\1.0-incubating\tuscany-interface-java-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\stax\stax-api\1.0.1\stax-api-1.0.1.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\commons\schema\XmlSchema\1.3.2\XmlSchema-1.3.2.jar"/> + <pathelement location="${m2.repo}\org\apache\woden\woden\1.0-incubating-M7b\woden-1.0-incubating-M7b.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-sca-xml\1.0-incubating\tuscany-binding-sca-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-core-databinding\1.0-incubating\tuscany-core-databinding-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\wsdl4j\wsdl4j\1.6.2\wsdl4j-1.6.2.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-sca\1.0-incubating\tuscany-binding-sca-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-implementation-resource\1.0-incubating\tuscany-implementation-resource-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\rampart\rampart-trust\1.3\rampart-trust-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\commons\axiom\axiom-api\1.2.5\axiom-api-1.2.5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-assembly\1.0-incubating\tuscany-assembly-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\cglib\cglib-nodep\2.1_3\cglib-nodep-2.1_3.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-policy-xml\1.0-incubating\tuscany-policy-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-extension-helper\1.0-incubating\tuscany-extension-helper-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-node-api\1.0-incubating\tuscany-node-api-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\geronimo\specs\geronimo-activation_1.1_spec\1.0-M1\geronimo-activation_1.1_spec-1.0-M1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-host-tomcat\1.0-incubating\tuscany-host-tomcat-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\opensaml\opensaml\1.1\opensaml-1.1.jar"/> + <pathelement location="${m2.repo}\logkit\logkit\1.0.1\logkit-1.0.1.jar"/> + <pathelement location="${m2.repo}\bouncycastle\bcprov-jdk15\136\bcprov-jdk15-136.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-assembly-xsd\1.0-incubating\tuscany-assembly-xsd-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\annogen\annogen\0.1.0\annogen-0.1.0.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-xmlbeans\1.3\axis2-xmlbeans-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-adb\1.3\axis2-adb-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-mtompolicy\1.3\axis2-mtompolicy-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-http\1.0-incubating\tuscany-binding-http-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-java2wsdl\1.3\axis2-java2wsdl-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\httpcomponents\httpcore-nio\4.0-alpha5\httpcore-nio-4.0-alpha5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-contribution-java\1.0-incubating\tuscany-contribution-java-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-domain-impl\1.0-incubating\tuscany-domain-impl-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\rampart\rampart-core\1.3\rampart-core-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\geronimo\specs\geronimo-commonj_1.1_spec\1.0\geronimo-commonj_1.1_spec-1.0.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-policy-security\1.0-incubating\tuscany-policy-security-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tomcat\catalina\6.0.10\catalina-6.0.10.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-core-spi\1.0-incubating\tuscany-core-spi-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-sca-axis2\1.0-incubating\tuscany-binding-sca-axis2-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\jaxen\jaxen\1.1-beta-9\jaxen-1.1-beta-9.jar"/> + <pathelement location="${m2.repo}\org\apache\rampart\rampart-policy\1.3\rampart-policy-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\httpcomponents\httpcore-niossl\4.0-alpha5\httpcore-niossl-4.0-alpha5.jar"/> + <pathelement location="${m2.repo}\junit\junit\4.2\junit-4.2.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-contribution-namespace\1.0-incubating\tuscany-contribution-namespace-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-implementation-java-xml\1.0-incubating\tuscany-implementation-java-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface-wsdl-xml\1.0-incubating\tuscany-interface-wsdl-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\javax\activation\activation\1.1\activation-1.1.jar"/> + <pathelement location="${m2.repo}\org\apache\ant\ant\1.7.0\ant-1.7.0.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-core\1.0-incubating\tuscany-core-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-domain\1.0-incubating\tuscany-domain-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\geronimo\specs\geronimo-javamail_1.4_spec\1.0-M1\geronimo-javamail_1.4_spec-1.0-M1.jar"/> + <pathelement location="${m2.repo}\avalon-framework\avalon-framework\4.1.3\avalon-framework-4.1.3.jar"/> + <pathelement location="${m2.repo}\commons-io\commons-io\1.2\commons-io-1.2.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\commons\axiom\axiom-dom\1.2.5\axiom-dom-1.2.5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-host-http\1.0-incubating\tuscany-host-http-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface-java\1.0-incubating\tuscany-interface-java-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-contribution-impl\1.0-incubating\tuscany-contribution-impl-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\commons\schema\XmlSchema\1.3.2\XmlSchema-1.3.2.jar"/> + <pathelement location="${m2.repo}\org\apache\woden\woden\1.0-incubating-M7b\woden-1.0-incubating-M7b.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-sca-xml\1.0-incubating\tuscany-binding-sca-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\xerces\xercesImpl\2.8.1\xercesImpl-2.8.1.jar"/> + <pathelement location="${m2.repo}\org\apache\geronimo\specs\geronimo-jms_1.1_spec\1.1\geronimo-jms_1.1_spec-1.1.jar"/> + <pathelement location="${m2.repo}\commons-codec\commons-codec\1.3\commons-codec-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\security\wss4j\1.5.3\wss4j-1.5.3.jar"/> + <pathelement location="${m2.repo}\xml-apis\xml-apis\1.3.03\xml-apis-1.3.03.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-assembly-xml\1.0-incubating\tuscany-assembly-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface-wsdl\1.0-incubating\tuscany-interface-wsdl-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-kernel\1.3\axis2-kernel-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\httpcomponents\httpcore\4.0-alpha5\httpcore-4.0-alpha5.jar"/> + <pathelement location="${m2.repo}\backport-util-concurrent\backport-util-concurrent\2.2\backport-util-concurrent-2.2.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface\1.0-incubating\tuscany-interface-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\commons-logging\commons-logging\1.1\commons-logging-1.1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-jsonrpc\1.0-incubating\tuscany-binding-jsonrpc-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-interface-java-xml\1.0-incubating\tuscany-interface-java-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\commons-fileupload\commons-fileupload\1.1.1\commons-fileupload-1.1.1.jar"/> + <pathelement location="${m2.repo}\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-node\1.0-incubating\tuscany-node-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-definitions\1.0-incubating\tuscany-definitions-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\commons-httpclient\commons-httpclient\3.0.1\commons-httpclient-3.0.1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-host-embedded\1.0-incubating\tuscany-host-embedded-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\commons-collections\commons-collections\3.1\commons-collections-3.1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-ws-xml\1.0-incubating\tuscany-binding-ws-xml-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-implementation-java-runtime\1.0-incubating\tuscany-implementation-java-runtime-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\xalan\xalan\2.7.0\xalan-2.7.0.jar"/> + <pathelement location="${m2.repo}\org\apache\ant\ant-launcher\1.7.0\ant-launcher-1.7.0.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-adb-codegen\1.3\axis2-adb-codegen-1.3.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-sca-api\1.0-incubating\tuscany-sca-api-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tomcat\coyote\6.0.10\coyote-6.0.10.jar"/> + <pathelement location="${m2.repo}\com\metaparadigm\json-rpc\1.0\json-rpc-1.0.jar"/> + <pathelement location="${m2.repo}\org\apache\ws\commons\axiom\axiom-impl\1.2.5\axiom-impl-1.2.5.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-ws\1.0-incubating\tuscany-binding-ws-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\commons-discovery\commons-discovery\0.2\commons-discovery-0.2.jar"/> + <pathelement location="${m2.repo}\org\apache\neethi\neethi\2.0.2\neethi-2.0.2.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-policy\1.0-incubating\tuscany-policy-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-implementation-java\1.0-incubating\tuscany-implementation-java-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-contribution\1.0-incubating\tuscany-contribution-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tomcat\annotations-api\6.0.10\annotations-api-6.0.10.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-node-impl\1.0-incubating\tuscany-node-impl-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\santuario\xmlsec\1.4.0\xmlsec-1.4.0.jar"/> + <pathelement location="${m2.repo}\org\apache\tomcat\juli\6.0.10\juli-6.0.10.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-databinding\1.0-incubating\tuscany-databinding-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\log4j\log4j\1.2.12\log4j-1.2.12.jar"/> + <pathelement location="${m2.repo}\javax\mail\mail\1.4\mail-1.4.jar"/> + <pathelement location="${m2.repo}\org\apache\axis2\axis2-codegen\1.3\axis2-codegen-1.3.jar"/> + <pathelement location="${m2.repo}\org\codehaus\woodstox\wstx-asl\3.2.1\wstx-asl-3.2.1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-binding-ws-axis2\1.0-incubating\tuscany-binding-ws-axis2-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-databinding-axiom\1.0-incubating\tuscany-databinding-axiom-1.0-incubating.jar"/> + <pathelement location="${m2.repo}\stax\stax-api\1.0.1\stax-api-1.0.1.jar"/> + <pathelement location="${m2.repo}\org\apache\tuscany\sca\tuscany-core-databinding\1.0-incubating\tuscany-core-databinding-1.0-incubating.jar"/> + </path> + + <target name="runDomainNodeRepo"> + <java classname="${domain.class}" + fork="true"> + <classpath> + <path refid="repo.classpath"/> + </classpath> + </java> + </target> + + <target name="runNodeARepo"> + <java classname="${node.class}" + fork="true"> + <classpath> + <path refid="repo.classpath"/> + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeA"/> + </java> + </target> + + <target name="runNodeARepoLoop"> + <java classname="${node.class}" + fork="true"> + <classpath> + <path refid="repo.classpath"/> + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeA"/> + <arg value="loop"/> + </java> + </target> + + <target name="runNodeBRepo"> + <java classname="${node.class}" + fork="true"> + <classpath> + <path refid="repo.classpath"/> + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeB"/> + </java> + </target> + + <target name="runNodeCRepo"> + <java classname="${node.class}" + fork="true"> + <classpath> + <path refid="repo.classpath"/> + </classpath> + <arg value="http://u12:8877/"/> + <arg value="nodeC"/> + </java> + </target> + +</project> diff --git a/java/sca-contrib/demos/workpool-distributed/pom.xml b/java/sca-contrib/demos/workpool-distributed/pom.xml new file mode 100644 index 0000000000..6d441bdfaf --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/pom.xml @@ -0,0 +1,148 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<project> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-sca</artifactId> + <version>2.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <artifactId>demo-workpool-distributed</artifactId> + <name>Apache Tuscany SCA Distributed Job Workpool Demo</name> + + <repositories> + <repository> + <id>apache.incubator</id> + <url> + http://people.apache.org/repo/m2-incubating-repository + </url> + </repository> + <repository> + <id>org.drools</id> + <url>http://repository.jboss.com/maven2/org/drools/</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-host-embedded</artifactId> + <version>2.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.drools</groupId> + <artifactId>drools</artifactId> + <version>4.0.4</version> + </dependency> + + <dependency> + <groupId>antlr</groupId> + <artifactId>antlr</artifactId> + <version>2.7.1</version> + </dependency> + <dependency> + <groupId>commons-daemon</groupId> + <artifactId>commons-daemon</artifactId> + <version>1.0.1</version> + </dependency> + + <dependency> + <groupId>antlr</groupId> + <artifactId>stringtemplate</artifactId> + <version>2.3b6</version> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-node-impl</artifactId> + <version>2.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-domain-impl</artifactId> + <version>2.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-binding-sca-axis2</artifactId> + <version>2.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-binding-jsonrpc-runtime</artifactId> + <version>2.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-binding-http-runtime</artifactId> + <version>2.0-SNAPSHOT</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-databinding-job</artifactId> + <version>2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-contribution-updater</artifactId> + <version>2.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-databinding-xstream</artifactId> + <version>2.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-implementation-resource-runtime</artifactId> + <version>2.0-SNAPSHOT</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.apache.tuscany.sca</groupId> + <artifactId>tuscany-host-tomcat</artifactId> + <version>2.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.2</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <finalName>${artifactId}</finalName> + </build> +</project> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java new file mode 100644 index 0000000000..a278499aae --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package node; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.domain.SCADomainFactory; + +/** + * This server program that loads a composite to provide simple registry + * function. This server can be replaced with any registry that is appropriate + * but the components in each node that talk to the registry should be replaced + * also. + */ +public class DomainNode { + + private static String DEFAULT_DOMAIN_URI = "http://u12:8877"; + private boolean stopped = true; + + public static void main(String[] args) { + + try { + + SCADomainFactory domainFactory = SCADomainFactory.newInstance(); + SCADomain domain = domainFactory + .createSCADomain(DEFAULT_DOMAIN_URI); + + System.out.println("Domain started (press enter to shutdown)"); + System.in.read(); + // waitForever(); + domain.destroy(); + } catch (Exception e) { + e.printStackTrace(); + } + + System.out.println("Domain stopped"); + } +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java new file mode 100644 index 0000000000..9d05761ad6 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package node; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.domain.SCADomainFactory; + +/** + * This server program that loads a composite to provide simple registry + * function. This server can be replaced with any registry that is appropriate + * but the components in each node that talk to the registry should be replaced + * also. + */ +public class DomainNodeDaemon implements Daemon { + + private SCADomain domain; + private static String DEFAULT_DOMAIN_URI = "http://u12:8877"; + private boolean stopped = true; + + private synchronized void waitForever() { + while (!stopped) { + try { + wait(); + } catch (InterruptedException ex) { + stopped = true; + return; + } + } + + } + + public void destroy() { + // TODO Auto-generated method stub + + } + + public void init(DaemonContext arg0) throws Exception { + // TODO Auto-generated method stub + + } + + public void start() throws Exception { + + SCADomainFactory domainFactory = SCADomainFactory.newInstance(); + domain = domainFactory.createSCADomain(DEFAULT_DOMAIN_URI); + + System.out.println("Domain started (press enter to shutdown)"); + waitForever(); + + } + + public void stop() throws Exception { + // TODO Auto-generated method stub + Thread.currentThread().interrupt(); + domain.destroy(); + } +}
\ No newline at end of file diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java new file mode 100644 index 0000000000..f48e647bd6 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package node; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import org.apache.tuscany.sca.databinding.job.JobExecutionContext; +import org.apache.tuscany.sca.databinding.job.RemoteJob; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; + +public class TestJob extends RemoteJob<Double> implements java.io.Serializable { + private boolean EOS = false; + private Double value; + + public TestJob(Double x, long iterations, int[] items) { + JobDataMap map = new JobDataMap(); + map.addJobData("value", x); + map.addJobData("iterations", iterations); + map.addJobData("items", items); + context.setJobData(map); + } + + public TestJob(Double i, boolean eos) { + value = i; + this.EOS = eos; + } + + public TestJob(String jsonData) { + JobExecutionContext ctxt = new JobExecutionContext(); + ctxt.storeJSONData(jsonData); + } + + public int getType() { + return Job.REGULAR_JOB; + } + + public void setEOS() { + EOS = true; + } + + public boolean eos() { + return EOS; + } + + @Override + public Double compute(JobExecutionContext context) { + JobDataMap contextMap = context.getJobData(); + Long iterations = (Long) contextMap.getJobDataObject("iterations"); + Double value = (Double) contextMap.getJobDataObject("value"); + double x = value.doubleValue(); + System.out.println("Computing sinx for " + value + " for " + + iterations.intValue() + " times"); + long computing_start = System.currentTimeMillis(); + for (long i = 0; i < iterations.longValue(); ++i) { + x = Math.sin(x); + } + long computing_end = System.currentTimeMillis(); + System.out.println("Computing time= " + + (computing_end - computing_start)); + System.out.println("Send result = " + x); + return new Double(x); + } + +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java new file mode 100644 index 0000000000..1f2a4d1f9a --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package node; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + +import javax.xml.namespace.QName; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.commons.daemon.DaemonController; +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.node.NodeException; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.SCANodeFactory; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.apache.tuscany.sca.node.management.SCANodeManagerInitService; + +import java.net.URI; + +import workpool.WorkerManager; +import workpool.WorkerManagerImpl; +import workpool.WorkpoolManager; +import workpool.WorkpoolService; +import workpool.WorkpoolServiceImpl; + +/** + * This client program shows how to run a distributed SCA node. In this case a + * calculator node has been constructed specifically for running the calculator + * composite. Internally it creates a representation of a node and associates a + * distributed domain with the node. This separation is made different + * implementations of the distributed domain can be provided. + */ +public class WorkpoolDaemon implements Daemon, Runnable { + private String domainName; + private String nodeName; + private long iterations; + private long jobsNo; + private long workerNo; + private SCANode node; + private boolean stopped = false; + private DaemonController controller = null; + private Thread thread = null; + private String ruleFile = "workerRules.drl"; + + /* + * public static void main(String[] args) throws Exception { + * // Check that the correct arguments have been provided if (null == args || + * args.length < 4) { System.err.println("Usage: java WorkpoolNode + * domainname nodename iterTest workerNo"); System.exit(1); } + * + * try { String domainName = args[0]; String nodeName = args[1]; long + * iterations = Long.parseLong(args[2]); long jobsNo = + * Long.parseLong(args[3]); long workerNo = Long.parseLong(args[4]); + * ClassLoader cl = WorkpoolDaemon.class.getClassLoader(); + * + * SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); node = + * nodeFactory.createSCANode(null, domainName); + * node.addContribution(nodeName, cl.getResource(nodeName + "/")); + * node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + * node.start(); // nodeA is the head node and runs some tests while all + * other nodes // simply listen for incoming messages + * + * FileReader rules = new FileReader("workerRules.drl"); StringBuffer buffer = + * new StringBuffer(); + * + * BufferedReader br = new BufferedReader(rules); String ruleString; do { + * ruleString = br.readLine(); if (ruleString!=null) { + * buffer.append(ruleString);} } while (ruleString!=null); + * + * if ( nodeName.equals("nodeA") ) { // do some application stuff + * WorkpoolService workpoolService = + * node.getDomain().getService(WorkpoolService.class, + * "WorkpoolServiceComponent"); workpoolService.start(); + * NodeManagerInitService nodeInit = + * node.getDomain().getService(NodeManagerInitService.class, + * "WorkpoolManagerComponent/NodeManagerInitService"); + * nodeInit.setNode(node); WorkpoolManager workpoolManager = + * node.getDomain().getService(WorkpoolManager.class, + * "WorkpoolManagerComponent/WorkpoolManager"); + * workpoolManager.setWorkpoolReference(node.getDomain().getServiceReference(WorkpoolService.class, + * "WorkpoolServiceComponent")); + * workpoolManager.acceptRules(buffer.toString()); workpoolManager.start(); + * int items[] = {3,4,5,6,3,6,3,5,9,5,6}; + * + * double x = 398349; for (int i = 0; i < jobsNo; ++i) + * workpoolService.submit(new TestJob(x,iterations,items)); + * + * TestJob j = new TestJob(-1.0,true); for (int i = 0; i < workerNo+1; ++i){ + * j.setEOS(); workpoolService.submit(j); } } try { if + * (nodeName.equals("nodeB")) { NodeManagerInitService serviceNodeB = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeBComponent/NodeManagerInitService"); + * serviceNodeB.setNode(node); } if (nodeName.equals("nodeC")) { + * NodeManagerInitService workerManagerC = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeCComponent/NodeManagerInitService"); + * workerManagerC.setNode(node); } if (nodeName.equals("nodeD")) { + * NodeManagerInitService workerManagerD = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeDComponent/NodeManagerInitService"); + * workerManagerD.setNode(node); } if (nodeName.equals("nodeE")) { + * NodeManagerInitService workerManagerE = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeEComponent/NodeManagerInitService"); + * workerManagerE.setNode(node); } + * + * System.out.println("Node started (press enter to shutdown)"); + * System.in.read(); } catch (IOException e) { e.printStackTrace(); } // + * stop the node and all the domains in it node.stop(); node.destroy(); + * System.exit(0); } catch(Exception ex) { System.err.println("Exception in + * node - " + ex.getMessage()); ex.printStackTrace(System.err); } } + */ + public void destroy() { + // TODO Auto-generated method stub + + } + + public void init(DaemonContext arg0) throws Exception { + String[] args = arg0.getArguments(); + domainName = args[0]; + nodeName = args[1]; + iterations = Long.parseLong(args[2]); + jobsNo = Long.parseLong(args[3]); + workerNo = Long.parseLong(args[4]); + if (args.length == 6) { + ruleFile = args[5]; + } + this.controller = arg0.getController(); + // this.thread=new Thread(this); + } + + public void start() throws Exception { + + ClassLoader cl = WorkpoolDaemon.class.getClassLoader(); + + SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); + node = nodeFactory.createSCANode(null, domainName); + node.addContribution(nodeName, cl.getResource(nodeName + "/")); + node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + node.start(); + // nodeA is the head node and runs some tests while all other nodes + // simply listen for incoming messages + + FileReader rules = new FileReader(ruleFile); + StringBuffer buffer = new StringBuffer(); + + BufferedReader br = new BufferedReader(rules); + String ruleString; + do { + ruleString = br.readLine(); + if (ruleString != null) { + buffer.append(ruleString + "\n"); + } + } while (ruleString != null); + + if (nodeName.equals("nodeA")) { + // do some application stuff + WorkpoolService workpoolService = node.getDomain().getService( + WorkpoolService.class, "WorkpoolServiceComponent"); + workpoolService.start(); + SCANodeManagerInitService nodeInit = node.getDomain().getService( + SCANodeManagerInitService.class, + "WorkpoolManagerComponent/NodeManagerInitService"); + nodeInit.setNode(node); + WorkpoolManager workpoolManager = node.getDomain().getService( + WorkpoolManager.class, + "WorkpoolManagerComponent/WorkpoolManager"); + workpoolManager.setWorkpoolReference(node.getDomain() + .getServiceReference(WorkpoolService.class, + "WorkpoolServiceComponent")); + workpoolManager.acceptRules(buffer.toString()); + workpoolManager.start(); + + int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 }; + + double x = 398349; + for (int i = 0; i < jobsNo; ++i) { + workpoolService.submit(new TestJob(x, iterations, items)); + } + TestJob j = new TestJob(-1.0, true); + for (int i = 0; i < workerNo + 1; ++i) { + j.setEOS(); + workpoolService.submit(j); + } + + } + if (nodeName.equals("nodeB")) { + SCANodeManagerInitService workerManagerNodeB = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeBComponent/NodeManagerInitService"); + workerManagerNodeB.setNode(node); + } + + if (nodeName.equals("nodeC")) { + SCANodeManagerInitService workerManagerNodeC = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeCComponent/NodeManagerInitService"); + workerManagerNodeC.setNode(node); + } + + if (nodeName.equals("nodeD")) { + SCANodeManagerInitService workerManagerNodeD = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeDComponent/NodeManagerInitService"); + workerManagerNodeD.setNode(node); + } + + if (nodeName.equals("nodeE")) { + SCANodeManagerInitService workerManagerNodeE = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeEComponent/NodeManagerInitService"); + workerManagerNodeE.setNode(node); + } + + this.waitForever(); + // this.thread.start(); + } + + public void stop() throws Exception { + Thread.currentThread().interrupt(); + // thread.interrupt(); + node.stop(); + node.destroy(); + } + + private synchronized void waitForever() { + while (!stopped) { + try { + wait(); + } catch (InterruptedException ex) { + stopped = true; + return; + } + } + } + + public void run() { + waitForever(); + } +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java new file mode 100644 index 0000000000..86557548af --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package node; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + +import javax.xml.namespace.QName; + +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.SCANodeFactory; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import java.net.URI; + +import workpool.WorkerManager; +import workpool.WorkerManagerImpl; +import workpool.WorkpoolManager; +import workpool.WorkpoolService; +import workpool.WorkpoolServiceImpl; + +/** + * This client program shows how to run a distributed SCA node. In this case a + * calculator node has been constructed specifically for running the calculator + * composite. Internally it creates a representation of a node and associates a + * distributed domain with the node. This separation is made different + * implementations of the distributed domain can be provided. + */ +public class WorkpoolNode { + + public static void main(String[] args) throws Exception { + + // Check that the correct arguments have been provided + if (null == args || args.length < 4) { + System.err + .println("Useage: java WorkpoolNode domainname nodename iterTest workerNo"); + System.exit(1); + } + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + String domainName = args[0]; + String nodeName = args[1]; + long iterations = Long.parseLong(args[2]); + long jobsNo = Long.parseLong(args[3]); + long workerNo = Long.parseLong(args[4]); + ClassLoader cl = WorkpoolNode.class.getClassLoader(); + + SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); + SCANode node = nodeFactory.createSCANode(null, domainName); + node.addContribution(nodeName, cl.getResource(nodeName + "/")); + node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + node.start(); + // nodeA is the head node and runs some tests while all other nodes + // simply listen for incoming messages + + FileReader rules = new FileReader("workerRules.drl"); + StringBuffer buffer = new StringBuffer(); + + BufferedReader br = new BufferedReader(rules); + String ruleString; + do { + ruleString = br.readLine(); + if (ruleString != null) { + buffer.append(ruleString + "\n"); + } + } while (ruleString != null); + + if (nodeName.equals("nodeA")) { + // do some application stuff + WorkpoolService workpoolService = node.getDomain().getService( + WorkpoolService.class, "WorkpoolServiceComponent"); + workpoolService.start(); + NodeManagerInitService nodeInit = node.getDomain().getService( + NodeManagerInitService.class, + "WorkpoolManagerComponent/NodeManagerInitService"); + nodeInit.setNode(node); + WorkpoolManager workpoolManager = node.getDomain().getService( + WorkpoolManager.class, + "WorkpoolManagerComponent/WorkpoolManager"); + workpoolManager.setWorkpoolReference(node.getDomain() + .getServiceReference(WorkpoolService.class, + "WorkpoolServiceComponent")); + workpoolManager.setCycleTime(8000); + workpoolManager.acceptRules(buffer.toString()); + workpoolManager.start(); + int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 }; + + double x = 398349; + + for (int i = 0; i < jobsNo; ++i) + workpoolService.submit(new TestJob(x, iterations, items)); + + TestJob j = new TestJob(-1.0, true); + for (int i = 0; i < workerNo + 1; ++i) { + j.setEOS(); + workpoolService.submit(j); + } + + } + try { + if (nodeName.equals("nodeB")) { + NodeManagerInitService serviceNodeB = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeBComponent/NodeManagerInitService"); + serviceNodeB.setNode(node); + } + if (nodeName.equals("nodeC")) { + NodeManagerInitService workerManagerC = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeCComponent/NodeManagerInitService"); + workerManagerC.setNode(node); + } + if (nodeName.equals("nodeD")) { + NodeManagerInitService workerManagerD = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeDComponent/NodeManagerInitService"); + workerManagerD.setNode(node); + } + if (nodeName.equals("nodeE")) { + NodeManagerInitService workerManagerE = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeEComponent/NodeManagerInitService"); + workerManagerE.setNode(node); + } + + System.out.println("Node started (press enter to shutdown)"); + String buff; + for (;;) { + try { + buff = in.readLine(); + if (buff == null) + break; + System.out.print(in.readLine()); + } catch (IOException ex) { + break; // Exit thread. + } + } + // stop the node and all the domains in it + node.stop(); + node.destroy(); + System.exit(0); + } catch (Exception ex) { + System.err.println("Exception in node - " + ex.getMessage()); + ex.printStackTrace(System.err); + } + } +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl new file mode 100644 index 0000000000..9c5a5d1b7f --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl @@ -0,0 +1,13 @@ +package workpool +import workpool.*; +rule "WorkerAdder1" + when + $workerBean: WorkpoolBean(singleAction == false && (jobComputed > 500)) + then + $workerBean.setSingleAction() + $workerBean.addWorkerToNode("nodeB") + $workerBean.addWorkerToNode("nodeC") + $workerBean.addWorkerToNode("nodeD") + $workerBean.addWorkerToNode("nodeE") +end +
\ No newline at end of file diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java new file mode 100644 index 0000000000..cdd0f30b34 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.io.StringReader; +import java.net.URI; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.logging.Logger; + +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamReader; + +import org.apache.tuscany.sca.assembly.MetaComponent; +import org.apache.tuscany.sca.assembly.impl.DefaultMetaComponent; + +public class MetaComponentWorker extends DefaultMetaComponent { + + private SecureRandom prng; + private String componentName; + private String scdl; + private String javaClass; + private boolean loadedFromString = false; + private Logger log = Logger.getLogger(MetaComponentWorker.class.getName()); + + public MetaComponentWorker() { + componentName = "WorkerComponent" + + java.util.UUID.randomUUID().toString(); + } + + public void setWorkerName(String componentName) { + this.componentName = componentName; + } + + public void setWorkerClass(String javaClass) { + this.javaClass = javaClass; + } + + private String generateSCDL() { + StringBuffer buffer = new StringBuffer(512); + buffer + .append("<component xmlns=\"http://www.osoa.org/xmlns/sca/1.0\" name=\""); + buffer.append(this.componentName); + buffer.append("\">\n"); + buffer.append("<implementation.java class=\""); + buffer.append(this.javaClass); + buffer.append("\"/>"); + buffer.append("<property name=\"workerName\">"); + buffer.append(this.componentName); + buffer.append("</property>\n</component>"); + return buffer.toString(); + } + + @Override + public XMLStreamReader build() throws Exception { + XMLInputFactory factory = XMLInputFactory.newInstance(); + if (!loadedFromString) + scdl = generateSCDL(); + return factory.createXMLStreamReader(new StringReader(scdl)); + + } + + public String getName() { + + return componentName; + } + +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java new file mode 100644 index 0000000000..3d4df38c95 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import org.apache.tuscany.sca.databinding.job.JobExecutionContext; +import org.apache.tuscany.sca.databinding.job.RemoteJob; +import org.oasisopen.sca.annotation.Scope; + +@Scope("COMPOSITE") +public class MyWorker extends WorkerServiceImpl<Object, Double> { + private static int resultcount = 0; + + @Override + public ResultJob computeTask(Job<Object, Double> job) { + + RemoteJob remoteJob = (RemoteJob) job; + System.out.println("Computing the job"); + JobExecutionContext context = remoteJob.getContext(); + ResultJob resultJob = new ResultJob(); + JobDataMap resultMap = new JobDataMap(); + resultMap.addJobData("result", remoteJob.compute(context)); + resultJob.setJobDataMap(resultMap); + System.out.println("Count result = " + (++resultcount)); + return resultJob; + } + +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java new file mode 100644 index 0000000000..fb930adf2e --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; + +public class NullJob implements Job, java.io.Serializable { + + public Object compute(Object arg0) { + // TODO Auto-generated method stub + return null; + } + + public JobDataMap getDataMap() { + return null; + } + + public boolean eos() { + return false; + } + + public int getType() { + return Job.NULL_JOB; + } + +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java new file mode 100644 index 0000000000..e04411668b --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import org.apache.tuscany.sca.databinding.job.JobExecutionContext; +import org.apache.tuscany.sca.databinding.job.RemoteJob; + +public class ResultJob extends RemoteJob<Object> implements + java.io.Serializable { + private JobDataMap map; + + public JobDataMap getDataMap() { + return map; + } + + public void setJobDataMap(JobDataMap map) { + this.map = map; + } + + public boolean eos() { + // TODO Auto-generated method stub + return true; + } + + public int getType() { + // TODO Auto-generated method stub + return Job.RESULT_JOB; + } + + @Override + public Object compute(JobExecutionContext v) { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java new file mode 100644 index 0000000000..52703d6954 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; +import org.oasisopen.sca.annotation.Remotable; + +@Remotable +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public interface Trigger<T> { + void handleEvent(T c); +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java new file mode 100644 index 0000000000..d2b306cbd5 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; +import org.oasisopen.sca.annotation.Remotable; +import org.oasisopen.sca.CallableReference; +@Remotable +public interface WorkerManager { + CallableReference<WorkerService> addWorker(); + boolean removeWorker(String workerName); + boolean removeWorkers(int k); + boolean removeAllWorkers(); + double getNodeLoad(); + int activeWorkers(); + void start(); +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java new file mode 100644 index 0000000000..9660a6c2b2 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.oasisopen.sca.CallableReference; +import org.apache.tuscany.sca.node.management.SCANodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.oasisopen.sca.ComponentContext; +import org.oasisopen.sca.annotation.Context; +import org.oasisopen.sca.annotation.Property; +import org.oasisopen.sca.annotation.Scope; +import org.oasisopen.sca.annotation.Service; +import java.util.LinkedList; +import java.util.ArrayList; + +@Scope("COMPOSITE") +@Service(interfaces = { SCANodeManagerInitService.class, WorkerManager.class }) +public class WorkerManagerImpl implements WorkerManager, SCANodeManagerInitService { + private Logger log = Logger.getLogger(WorkerManagerImpl.class.getName()); + private LinkedList<CallableReference<WorkerService>> activeWorkers = new LinkedList<CallableReference<WorkerService>>(); + private List<String> workerComponentNames = new ArrayList<String>(); + private SCANodeImpl node; + @Property + protected String nodeName; + @Property + protected String compositeName; + @Property + protected String workerClass; + @Context + protected ComponentContext context; + private double loadAverage; + + /* This method is used to find a composite inside all deployed artifacts */ + private Composite findComposite(List<Composite> artifacts) { + for (Composite fact : artifacts) { + log.info("Searching in a contribution deployed artifacts -" + + compositeName); + Composite augmented = (Composite) fact; + // found + if (augmented.getURI().equals(compositeName)) { + log.info("Found composite..." + compositeName); + return augmented; + } + } + } + return null; + } + + public CallableReference<WorkerService> addWorker() { + log.info("Adding a new worker call.."); + long addWorkerStartTime = System.nanoTime(); + ContributionServiceImpl cServiceImpl = (ContributionServiceImpl) node.getContributionService(); + Contribution contribution = cServiceImpl.getContribution(nodeName); + List<Composite> artifacts = contribution.getDeployables(); + CallableReference<WorkerService> workerReference = null; + CallableReference<WorkerService> ref = null; + log.info("Creating a MetaComponentWorker.."); + MetaComponentWorker mcw = new MetaComponentWorker(); + boolean found = false; + mcw.setWorkerClass(workerClass); + Composite augmented = findComposite(artifacts); + try { + if (augmented != null) { + long startCreation = System.nanoTime(); + node.addComponentToComposite(mcw, contribution.getURI(), + augmented.getURI()); + System.out.println("addComponentToComposite time = " + + (System.nanoTime() - startCreation)); + RuntimeComponent workerComponent = (RuntimeComponent) node + .getComponent(mcw.getName()); + if (workerComponent != null) { + ref = (CallableReference<WorkerService>) workerComponent + .getComponentContext().createSelfReference( + WorkerService.class); + ref.getService().start(); + activeWorkers.addLast(ref); + workerComponentNames.add(mcw.getName()); + CallableReference<WorkerManager> manager = (CallableReference) context + .createSelfReference(WorkerManager.class, + "WorkerManager"); + ref.getService().registerManager(manager); + return ref; + } + } else { + log.info("Workpool composite not found!"); + } + } catch (Exception e) { + log.info("Exception activation"); + e.printStackTrace(); + } + ; + System.out.println("Component Creation Time =" + + (System.nanoTime() - addWorkerStartTime)); + return ref; + } + + public boolean removeAllWorkers() { + for (CallableReference<WorkerService> callable : activeWorkers) { + callable.getService().stop(); + } + return true; + } + + public boolean removeWorker() { + CallableReference<WorkerService> callable = activeWorkers + .removeLast(); + callable.getService().stop(); + return true; + } + + public boolean removeWorkers(int k) { + if (k >= activeWorkers.size()) + return false; + for (int i = 0; i < k; ++i) { + if (!removeWorker()) + return false; + } + return true; + } + + public void setNode(SCANode node) { + this.node = (SCANodeImpl) node; + + } + + public double getNodeLoad() { + /* + * FIXME [jo] this works only on Linux To be replaced with an JNI + * extension + */ + RandomAccessFile statfile; + + this.loadAverage = 1.0; + // load = 0; + int NoProcessors = 0; + String cpuLine = null; + try { + NoProcessors = Runtime.getRuntime().availableProcessors(); + if (NoProcessors > 1) + this.loadAverage = 1 / (1.0 * NoProcessors); + statfile = new RandomAccessFile("/proc/loadavg", "r"); + try { + statfile.seek(0); + cpuLine = statfile.readLine(); + + } catch (IOException e) { + // FIX ME: Better exception handling. + e.printStackTrace(); + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (NumberFormatException e) { + e.printStackTrace(); + } + double min1; + if (cpuLine != null) { + java.util.StringTokenizer st = new java.util.StringTokenizer( + cpuLine, " "); + min1 = Double.parseDouble(st.nextToken()); + } else + min1 = 0; + + return min1 * this.loadAverage; + } + + public int activeWorkers() { + return activeWorkers.size(); + } + + public boolean removeWorker(String workerName) { + RuntimeComponent workerComponent = (RuntimeComponent) node + .getComponent(workerName); + if (workerComponent != null) { + log.info("Removing component " + workerName); + node.removeComponentFromComposite(nodeName, "Workpool.composite", + workerName); + return true; + } + return false; + } + + public void start() { + // do nothing for now. + } +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java new file mode 100644 index 0000000000..3f702db369 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.annotation.Callback; +import org.oasisopen.sca.annotation.Remotable; +import org.oasisopen.sca.annotation.OneWay; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; + +/** + * The interface for the multiply service + */ +@Remotable +@Callback(WorkerServiceCallback.class) +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public interface WorkerService<T, E> { + @OneWay + void compute(Job<T, E> j); + + void start(); + + void stop(); + + // void addJobCompleteHandler(String triggerName, + // CallableReferenceImpl<Trigger> handle); + // void removeJobCompleteHandler(String triggerName); + /* The worker manager */ + void registerManager(CallableReferenceImpl<WorkerManager> wm); + + void registerSender(CallableReferenceImpl<WorkpoolService> sender); + + // void init(Job nullJob); + @OneWay + void computeFirstTime(Job nullJob, + CallableReferenceImpl<WorkpoolService> myReference); + +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java new file mode 100644 index 0000000000..22cfc38ff4 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.oasisopen.sca.annotation.Remotable; + +@Remotable +public interface WorkerServiceCallback { + void receiveResult(Job resultType, boolean reuse, String workerName); +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java new file mode 100644 index 0000000000..c38b27a6cf --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.oasisopen.sca.ComponentContext; +import org.oasisopen.sca.RequestContext; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.annotation.Callback; +import org.oasisopen.sca.annotation.Context; +import org.oasisopen.sca.annotation.Property; +import org.oasisopen.sca.annotation.Scope; +import org.oasisopen.sca.annotation.Service; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.*; + +/** + * An implementation of the worker service. + */ +@Service(WorkerService.class) +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +@Scope("COMPOSITE") +public abstract class WorkerServiceImpl<T, E> implements WorkerService<T, E> { + private Logger log = Logger.getLogger(this.getClass().getName()); + private WorkerServiceCallback workerServiceCallback; + @Context + protected ComponentContext workerContext; + @Context + protected RequestContext requestContext; + @Property + protected String workerName; + private CallableReferenceImpl<WorkerManager> managerReference = null; + + /* TODO add the triggers, but before ask */ + // protected Map<String,Trigger> triggers = new HashMap<String,Trigger>(); + public abstract ResultJob computeTask(Job<T, E> job); + + private boolean stopped = false; + private CallableReferenceImpl<WorkerService> serviceRef; + private CallableReferenceImpl<WorkpoolService> senderService; + private WorkpoolService wp = null; + private WorkerManager manager = null; + + public void start() { + log.info("Starting worker..."); + stopped = false; + serviceRef = (CallableReferenceImpl) workerContext + .createSelfReference(WorkerService.class); + + } + + public void init(CallableReferenceImpl<WorkpoolService> sender, Job nullJob) { + compute(nullJob); + } + + public void stop() { + stopped = true; + } + + @Callback + public void setWorkerServiceCallback( + WorkerServiceCallback workerServiceCallback) { + log.info("Setting worker callback"); + this.workerServiceCallback = workerServiceCallback; + } + + public void computeFirstTime(Job nullJob, + CallableReferenceImpl<WorkpoolService> sender) { + senderService = sender; + wp = sender.getService(); + workWithCallable(nullJob); + } + + public void registerManager(CallableReferenceImpl<WorkerManager> wm) { + managerReference = wm; + manager = managerReference.getService(); + + } + + public void registerSender(CallableReferenceImpl<WorkpoolService> sender) { + log.info("Registering sender.."); + senderService = sender; + wp = sender.getService(); + } + + private void workWithInjection(Job j) { + log.info("Worker has received job"); + if (stopped) { + workerServiceCallback + .receiveResult(j, true, workerContext.getURI()); + if (managerReference != null) + manager.removeWorker(workerContext.getURI()); + } else if (j.eos()) { + if (managerReference != null) + manager.removeWorker(workerContext.getURI()); + } + if (j instanceof NullJob) { + workerServiceCallback.receiveResult(j, false, workerContext + .getURI()); + } else { + workerServiceCallback.receiveResult(computeTask(j), false, + workerContext.getURI()); + } + } + + private void workWithCallable(Job j) { + log.info("Worker " + workerContext.getURI() + + " has received job with eos --> " + j.eos()); + if (stopped) { + wp.handleResult(j, true, workerContext.getURI(), serviceRef, false); + return; + } + if (j.eos()) { + log.info("Got poison token..."); + if (managerReference != null) { + log.info("Removing component " + workerContext.getURI()); + manager.removeWorker(workerContext.getURI()); + + } + return; + } + if (j.getType() != Job.NULL_JOB) { + wp.handleResult(computeTask(j), false, workerContext.getURI(), + serviceRef, false); + } else { + log.info("Got a null job"); + wp.handleResult(j, false, workerContext.getURI(), serviceRef, true); + } + } + + public void compute(Job<T, E> j) { + + if (senderService != null) { + log.info("Computing job using callable reference method"); + workWithCallable(j); + + } else { + log.info("Computing job using reference injection method"); + workWithInjection(j); + + } + } + /* + * public void addJobCompleteHandler(String triggerName, + * CallableReferenceImpl<Trigger> handle) { if + * (!triggers.containsKey(triggerName)) { triggers.put(triggerName, + * handle.getService()); } } public void removeJobCompleteHandler(String + * triggerName) { if (!triggers.containsKey(triggerName)) { + * triggers.remove(triggerName); } } + */ +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java new file mode 100644 index 0000000000..80c093ff1c --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.beans.*; +import java.util.Vector; +import java.util.logging.*; + +public class WorkpoolBean { + private Vector<WorkpoolBeanListener> listeners = new Vector<WorkpoolBeanListener>(); + double loadAverage = 0; + int nodeNumbers = 0; + int workers = 0; + int estimedQueueSize = 0; + double averageServiceTime = 0; + double averageArrivalTime = 0; + double usageFactor = 0; + private final PropertyChangeSupport changes = new PropertyChangeSupport( + this); + long jobComputed = 0; + boolean singleAction = false; + private Logger log = Logger.getLogger(WorkpoolBean.class.getName()); + + public void setNodeNumbers(int n) { + this.nodeNumbers = n; + } + + public void setWorkers(int w) { + this.workers = w; + } + + public void setLoadAverage(double loadAverage) { + this.loadAverage = loadAverage; + } + + public void setAverageServiceTime(double service) { + this.averageServiceTime = service; + } + + public void setAverageArrivalTime(double service) { + this.averageArrivalTime = service; + } + + public double getAverageArrivalTime() { + return this.averageArrivalTime; + } + + public double getUtilizationFactor() { + return usageFactor; + } + + public void setUsageFactor() { + usageFactor = averageServiceTime / averageArrivalTime; + } + + public void setEstimedQueueSize(int size) { + estimedQueueSize = size; + } + + public int getEstimedQueueSize() { + return estimedQueueSize; + } + + public double getLoadAverage() { + return this.loadAverage; + } + + public int getWorkers() { + return this.workers; + } + + public int getNodeNumbers() { + return this.nodeNumbers; + } + + public double getAverageServiceTime() { + return this.averageServiceTime; + } + + public void addPropertyChangeListener(final PropertyChangeListener l) { + this.changes.addPropertyChangeListener(l); + } + + public void removePropertyChangeListener(final PropertyChangeListener l) { + this.changes.removePropertyChangeListener(l); + } + + private synchronized void fireWorkpoolEvent(WorkpoolEvent ev) { + for (WorkpoolBeanListener l : listeners) { + l.handleEvent(new WorkpoolEvent(ev)); + } + } + + public void addWorkersToNode(int k, String nodeName) { + log.info("Adding a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER, k, nodeName); + fireWorkpoolEvent(ev); + } + + public void addWorkerToNode(String nodeName) { + log.info("Adding a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.SINGLE_ADD_WORKER, 1, nodeName); + fireWorkpoolEvent(ev); + } + + public void removeWorkersToNode(int k, String nodeName) { + log.info("Removing a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER, k, nodeName); + fireWorkpoolEvent(ev); + } + + public void removeWorkerToNode(String nodeName) { + log.info("Removing a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.SINGLE_REMOVE_WORKER, 1, nodeName); + fireWorkpoolEvent(ev); + } + + public synchronized void addListener(WorkpoolBeanListener l) { + this.listeners.add(l); + } + + public synchronized void removeListener(WorkpoolBeanListener l) { + this.listeners.remove(l); + } + + public void setJobComputed(long jobComputed) { + this.jobComputed = jobComputed; + + } + + public void setSingleAction() { + singleAction = true; + } + + public boolean getSingleAction() { + return singleAction; + } + + public long getJobComputed() { + return this.jobComputed; + } +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java new file mode 100644 index 0000000000..0ecc223fed --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.EventListener; + +public interface WorkpoolBeanListener extends EventListener { + public void handleEvent(WorkpoolEvent ev); +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java new file mode 100644 index 0000000000..0bdc3671d5 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.EventObject; + +public class WorkpoolEvent extends EventObject { + + private static final long serialVersionUID = -1273928009411948768L; + + public WorkpoolEvent(Object source) { + super(source); + } + + public WorkpoolEvent(WorkpoolEvent ev) { + super(ev.source); + type = ev.type; + noWorker = ev.noWorker; + nodeName = ev.nodeName; + } + + public WorkpoolEvent(Object source, int typeEv, int worker) { + super(source); + type = typeEv; + noWorker = worker; + nodeName = ""; + } + + public WorkpoolEvent(Object source, int typeEv, int worker, String nodeName) { + super(source); + type = typeEv; + noWorker = worker; + this.nodeName = nodeName; + } + + public String getNodeName() { + return nodeName; + } + + public int getType() { + return type; + } + + public int workers() { + return noWorker; + } + + private int type; + private int noWorker; + private String nodeName; + public static final int EVENT_MULTIPLE_ADD_WORKER = 0; + public static final int EVENT_MULTIPLE_REMOVE_WORKER = 1; + public static final int SINGLE_REMOVE_WORKER = 2; + public static final int SINGLE_ADD_WORKER = 3; +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java new file mode 100644 index 0000000000..46fbda7998 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.annotation.OneWay; +import org.oasisopen.sca.annotation.Remotable; + +@Remotable +public interface WorkpoolManager { + /* + * @param String rules This are the autonomic rules. The format is the Java + * Drools .drl file. You have to read it + */ + @OneWay + void acceptRules(String rules); + + @OneWay + void start(); + + @OneWay + void stopAutonomicCycle(); + + @OneWay + void startAutonomicCycle(); + + int activeWorkers(); + + void setCycleTime(long time); + + void setWorkpoolReference(ServiceReference<WorkpoolService> serviceReference); +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java new file mode 100644 index 0000000000..cf353fb3fe --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.oasisopen.sca.ComponentContext; +import org.oasisopen.sca.ServiceReference; +import java.util.Collections; +import java.util.Enumeration; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Iterator; +import java.util.Timer; +import java.util.TimerTask; +import java.util.logging.Logger; + +import javax.xml.stream.XMLStreamException; + +import node.TestJob; +import java.io.File; +import java.util.Vector; +import org.apache.axiom.om.OMElement; +import org.apache.tuscany.sca.contribution.service.ContributionResolveException; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.core.context.ServiceReferenceImpl; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.oasisopen.sca.CallableReference; +import org.drools.FactHandle; +import org.drools.RuleBase; +import org.drools.RuleBaseFactory; +import org.drools.StatefulSession; +import org.drools.StatelessSession; +import org.drools.compiler.DroolsParserException; +import org.drools.compiler.PackageBuilder; +import org.drools.rule.Package; +import org.oasisopen.sca.annotation.Constructor; +import org.oasisopen.sca.annotation.Context; +import org.oasisopen.sca.annotation.Destroy; +import org.oasisopen.sca.annotation.Property; +import org.oasisopen.sca.annotation.Reference; +import org.oasisopen.sca.annotation.Scope; +import org.oasisopen.sca.annotation.Service; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +@Service(interfaces = { NodeManagerInitService.class, WorkpoolManager.class }) +@Scope("COMPOSITE") +/* + * This is the core manager of the workpool application. The Workpool Manager + * holds the reference to each remote node manager. Inside it we've a rule + * engine instance. + */ +public class WorkpoolManagerImpl implements WorkpoolManager, + NodeManagerInitService, WorkpoolBeanListener { + /* + * This inner class trigs the rule engine, at given times: 1. It checks the + * different loads for each nodes and sets the WorkpoolBean 2. It checks the + * Workpool AverageService Time and sets the WorkpoolBean 3. It checks how + * many jobs are already computed and sets the WorkpoolBean Then given the + * configured bean and the rules, run the Rule Engine for executing the + * business logic + */ + class RuleEngineTrigger extends TimerTask { + // private ReentrantLock triggerLock = new ReentrantLock(); + @Override + public void run() { + + System.out.println("Updating WorkpoolBean.."); + // checkActiveWorkers(); + // checkLoadInNodes(); + checkServiceTime(); + // checkEstimedQueueSize(); + // checkArrivalTime(); + getProcessedItem(); + // computeUsageFactor(); + doRun(bean); + } + + } + + private WorkerManager managerNodeB; + private WorkerManager managerNodeC; + private WorkerManager managerNodeD; + private WorkerManager managerNodeE; + + private SCANodeImpl node; + private WorkpoolBean bean = new WorkpoolBean(); + private ReentrantLock handleEventLock = new ReentrantLock(); + private ReentrantLock updateRuleLock = new ReentrantLock(); + + private ServiceReference<WorkpoolService> reference; + private AtomicInteger activeWorkers = new AtomicInteger(0); + private Logger log = Logger.getLogger(WorkpoolManagerImpl.class.getName()); + @Property + protected String workers; + @Property + protected String nodes; + @Property + protected String injection; + @Context + protected ComponentContext workpoolManagerContext; + private CallableReferenceImpl<WorkpoolManager> myReference; + private String rules = null; + private boolean referenceInjection = false; + private ConcurrentHashMap<String, WorkerManager> workerManagerTable = new ConcurrentHashMap<String, WorkerManager>(); + private int workersNo; + private int nodesNo; + private Timer timer = new Timer(); + /* this handle facts */ + private RuleBase ruleBase = null; + private FactHandle handle = null; + private StatefulSession wm = null; + private long cycleTime = 5000; + + @Reference + public void setManagerNodeB(WorkerManager managerNodeB) { + this.managerNodeB = managerNodeB; + workerManagerTable.put("nodeB", managerNodeB); + } + + @Reference + public void setManagerNodeC(WorkerManager managerNodeC) { + this.managerNodeC = managerNodeC; + workerManagerTable.put("nodeC", managerNodeC); + } + + @Reference + public void setManagerNodeD(WorkerManager managerNodeD) { + this.managerNodeD = managerNodeD; + workerManagerTable.put("nodeD", managerNodeD); + } + + @Reference + public void setManagerNodeE(WorkerManager managerNodeE) { + this.managerNodeE = managerNodeE; + workerManagerTable.put("nodeE", managerNodeE); + } + + private void startNewComponents( + Vector<CallableReferenceImpl<WorkerService>> vector) { + log.info("Starting new components"); + WorkpoolService wp = reference.getService(); + // CallableReferenceImpl<WorkpoolService> sink = + // (CallableReferenceImpl<WorkpoolService>) reference; + Job j = new NullJob(); + for (CallableReferenceImpl<WorkerService> item : vector) { + // WorkerService service = item.getService(); + // service.start(); + // service.computeFirstTime(j, sink); + log.info("Send PostWorkerReference..."); + wp.PostWorkerReference(item); + } + if (myReference != null) + wp.registerManager(myReference); + } + + public void setCycleTime(long cycle) { + this.cycleTime = cycle; + } + + @SuppressWarnings("unchecked") + /* + * This gets the number of workers workerNo and instantiates them + */ + public void start() { + this.myReference = (CallableReferenceImpl<WorkpoolManager>) workpoolManagerContext + .createSelfReference(WorkpoolManager.class, "WorkpoolManager"); + this.workersNo = Integer.parseInt(this.workers); + this.nodesNo = Integer.parseInt(this.nodes); + this.referenceInjection = (Integer.parseInt(this.injection) != 0); + log.info("Starting WorkpoolManager Component with #" + workersNo + + " workers and #" + nodes + " nodes"); + nodesNo = workerManagerTable.values().size(); + // Sets info in the bean. + bean.setWorkers(this.workersNo); + bean.setNodeNumbers(nodesNo); + Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>(); + int exactTimes = workersNo / nodesNo; + for (int i = 0; i < exactTimes; ++i) { + for (WorkerManager manager : workerManagerTable.values()) { + manager.start(); + if (manager != null) { + System.err.println("Actual load = " + + manager.getNodeLoad() + " for node "); + addNewComponent(manager, workerRefs); + } + } + } + + int module = (workersNo % nodesNo); + int n = 0; + if (module > 0) { + Vector<String> v = new Vector(workerManagerTable.keySet()); + Collections.sort(v); + // Iterator<WorkerManager> iter = + // workerManagerTable.values().iterator(); + // Display (sorted) hashtable. + for (Enumeration<String> e = v.elements(); (e.hasMoreElements() && n < module); ++n) { + String key = e.nextElement(); + WorkerManager m = workerManagerTable.get(key); + System.err.println("Module Actual load = " + m.getNodeLoad() + + " for node "); + addNewComponent(m, workerRefs); + } + } + startNewComponents(workerRefs); + bean.addListener(this); + TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger(); + timer.scheduleAtFixedRate(task, 3000, cycleTime); + } + + private void checkLoadInNodes() { + System.out.println("CheckLoadInNodes"); + int number = 1; + double loadAverage = 0; + for (WorkerManager manager : workerManagerTable.values()) { + loadAverage += manager.getNodeLoad(); + number++; + } + bean.setLoadAverage(loadAverage / number); + } + + private void computeUsageFactor() { + bean.setUsageFactor(); + } + + private void checkEstimedQueueSize() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + int size = wp.estimatedQueueSize(); + log.info("Estimed Queue Size =" + size); + bean.setEstimedQueueSize(size); + } + } + + private WorkerManager findMinLoad() { + double load = 0; + // workerManagerTable.values().iterator().next().getNodeLoad(); + WorkerManager toFind = null; + for (WorkerManager manager : workerManagerTable.values()) { + if (load == 0) { + load = manager.getNodeLoad(); + toFind = manager; + } else if (manager.getNodeLoad() < load) { + load = manager.getNodeLoad(); + toFind = manager; + } + } + return toFind; + } + + private void checkServiceTime() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + double time = wp.getServiceTime(); + log.info("Average System Service Time =" + time); + bean.setAverageServiceTime(time); + } + } + + private void checkArrivalTime() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + double time = wp.getArrivalTime(); + log.info("Average Arrival Service Time =" + time); + bean.setAverageArrivalTime(time); + } + } + + private void checkActiveWorkers() { + bean.setWorkers(this.activeWorkers()); + } + + private void getProcessedItem() { + WorkpoolService wp = reference.getService(); + if (wp != null) { + long computed = wp.getJobComputed(); + log.info("The system has already computed " + computed + " jobs"); + bean.setJobComputed(computed); + } + } + + private boolean removeComponent(WorkerManager manager, int k) { + manager.removeWorkers(k); + activeWorkers.decrementAndGet(); + return true; + } + + @SuppressWarnings("unchecked") + private boolean addNewComponent(WorkerManager manager, + Vector<CallableReferenceImpl<WorkerService>> workerRefs) { + CallableReferenceImpl<WorkerService> workerReference = (CallableReferenceImpl<WorkerService>) manager + .addWorker(); + + if (workerReference != null) { + /* if i'll decide to use dynamically generated references */ + if (referenceInjection) { + workerReference.getService(); + String uri = workerReference.getEndpointReference().getURI(); + int nameIndex = uri.indexOf("/"); + String componentName = uri.substring(0, nameIndex); + if (componentName.startsWith("/")) + componentName = uri.substring(1, uri.length()); + if (componentName.endsWith("/")) + componentName = uri.substring(0, uri.length() - 1); + // String componentName = uri.substring(0, nameIndex-1); + + log.info("Adding wire from WorkpoolComponentService to " + + componentName); + String referenceName = "ref" + componentName; + + /* + * I'm updating the WorkpoolServiceComponent with a new + * reference to a just created component I assume that the + * WorkpoolManagerService and the WorkpoolServiceComponent stay + * in the same JVM It's like in the scdl there were: <reference + * name=referenceName target="componentName"/> With this then + * I've a wire WorkpoolService---> a new Worker + */ + try { + node.addComponentReferenceWire(referenceName, "nodeA", + "Workpool.composite", "workpool.WorkerServiceImpl", + WorkerService.class, "WorkpoolServiceComponent", + componentName); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + log.info("Sending reference name " + referenceName + + " to WorkpoolService"); + // TODO: this was part of dynamic wiring, but it doesn't work. + // reference.getService().PostWorkerName(referenceName); + + } else { + // log.info("Sending callable reference to WorkpoolService + // placed at -->"+reference); + // reference.getService().PostWorkerReference(workerReference); + workerRefs.add(workerReference); + } + activeWorkers.incrementAndGet(); + return true; + } + return false; + } + + public int activeWorkers() { + + return activeWorkers.get(); + } + + private void doRun(WorkpoolBean bean) { + + long startTime = System.currentTimeMillis(); + updateRuleLock.lock(); + if (wm == null) + wm = ruleBase.newStatefulSession(); + if (this.handle == null) + handle = wm.insert(bean); + else { + wm.update(handle, bean); + } + wm.fireAllRules(); + updateRuleLock.unlock(); + + System.out.println("Engine rule overhead = " + + (System.currentTimeMillis() - startTime)); + } + + private RuleBase readRule(String rule) { + + PackageBuilder packBuilder = new PackageBuilder(); + try { + packBuilder.addPackageFromDrl(new StringReader(rule)); + } catch (DroolsParserException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + Package pkg = packBuilder.getPackage(); + RuleBase ruleBase = RuleBaseFactory.newRuleBase(); + try { + ruleBase.addPackage(pkg); + } catch (Exception e) { + e.printStackTrace(); + } + return ruleBase; + } + + public void acceptRules(String rules) { + this.rules = rules; + if (ruleBase == null) { + RuleBase base = readRule(rules); + if (base != null) { + ruleBase = base; + } + } else { + updateRuleLock.lock(); + // i have already a rule: updating + ruleBase = readRule(rules); + wm = ruleBase.newStatefulSession(); + handle = null; + updateRuleLock.unlock(); + } + + System.out.println("Accepted rules = " + rules); + } + + public String getRules() { + return rules; + } + + private WorkerManager findMaxLoadNode() { + double load = 0.0; + WorkerManager toFind = null; + for (WorkerManager manager : workerManagerTable.values()) { + if (manager.getNodeLoad() > load) { + load = manager.getNodeLoad(); + toFind = manager; + } + } + return toFind; + + } + + public void setWorkpoolReference( + ServiceReference<WorkpoolService> serviceReference) { + reference = serviceReference; + } + + public void setNode(SCANode arg0) { + node = (SCANodeImpl) arg0; + } + + public void handleEvent(WorkpoolEvent ev) { + if (ev == null) + return; + + String nodeName = ev.getNodeName(); + + switch (ev.getType()) { + case WorkpoolEvent.SINGLE_ADD_WORKER: { + if (nodeName != null) { + Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>(); + + // in this case I have a nodeName + if (!nodeName.equals("") + && (workerManagerTable.containsKey(nodeName))) { + WorkerManager manager = workerManagerTable.get(nodeName); + addNewComponent(manager, workerRefs); + startNewComponents(workerRefs); + } else if (nodeName.equals("")) { + WorkerManager manager = findMinLoad(); + addNewComponent(manager, workerRefs); + startNewComponents(workerRefs); + } + } + break; + } + case WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER: { + Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>(); + + if (nodeName.equals("")) { + + WorkerManager manager = findMinLoad(); + int k = ev.workers(); + for (int h = 0; h < k; ++h) { + addNewComponent(manager, workerRefs); + } + } else { + WorkerManager manager = workerManagerTable + .get(ev.getNodeName()); + int k = ev.workers(); + for (int h = 0; h < k; ++h) { + addNewComponent(manager, workerRefs); + } + } + startNewComponents(workerRefs); + break; + } + case WorkpoolEvent.SINGLE_REMOVE_WORKER: { + if (nodeName != null) { + // in this case I have a nodeName + if (!nodeName.equals("") + && (workerManagerTable.containsKey(nodeName))) { + WorkerManager manager = workerManagerTable.get(nodeName); + removeComponent(manager, 1); + } else if (nodeName.equals("")) { + WorkerManager manager = findMaxLoadNode(); + removeComponent(manager, 1); + } + } + break; + } + case WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER: { + if (nodeName.equals("")) { + WorkerManager manager = findMaxLoadNode(); + removeComponent(manager, ev.workers()); + + } else { + WorkerManager manager = workerManagerTable.get(nodeName); + removeComponent(manager, ev.workers()); + } + break; + } + } + + } + + @Destroy + public void onExit() { + // do cleanup + this.timer.cancel(); + this.timer.purge(); + } + + public void stopAutonomicCycle() { + this.timer.cancel(); + this.timer.purge(); + this.timer = null; + } + + public void startAutonomicCycle() { + if (this.timer == null) { + this.timer = new Timer(); + TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger(); + timer.schedule(task, 3000, cycleTime); + } + } +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java new file mode 100644 index 0000000000..239e5d145d --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; +import org.oasisopen.sca.annotation.OneWay; +import org.oasisopen.sca.annotation.Remotable; +import org.oasisopen.sca.ServiceReference; + +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +@Remotable +public interface WorkpoolService { + + /* this the functional part */ + void submit(Job i); + + /* the time between two subsequent worker invocations */ + double getServiceTime(); + + /* the number of ResultJob received */ + long getJobComputed(); + + /* the time elapsed between the stream has initiated and now */ + long getElapsedTime(); + + /* the size of the internal queue : it's not accurate */ + int estimatedQueueSize(); + + /* the average time between two consuecutive submit */ + double getArrivalTime(); + + void start(); + + void stop(); + + /* + * this is the part needed by management. May be in future i'll refactor it + * order to hide this part. + */ + @OneWay + void handleResult(Job j, boolean reuse, String string, + CallableReferenceImpl<WorkerService> worker, boolean newJob); + + void addTrigger(CallableReferenceImpl<Trigger> reference); + + void removeTrigger(); + + void registerManager( + CallableReferenceImpl<WorkpoolManager> createSelfReference); + + /* + * This could placed in another interface definition - think about it These + * methods evict, and evictAll are needed when a worker finish to exist and + * it needs to be evicted by the WorkpoolManager. In the system I have two + * caches: 1) a domain cache, which holds the components URI 2) a + * workerReference cache (implemented by a ConcurrentHashMap), which holds a + * proxy to each worker. Every proxy gets built from the worker callable + * reference. I'm thinking for placing the workerReferenceCache in a local + * interface. Assuming that WorkpoolService and WorkpoolManager are in the + * same JVM. + */ + void evict(String workerURI); + + void evictAll(); + + /* + * these two are no longer needed. I leave it because if i'll have time to + * do dynamic wiring the first one is needed. void PostWorkerName(String + * referenceName); + */ + void PostWorkerReference(CallableReferenceImpl<WorkerService> worker); + +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java new file mode 100644 index 0000000000..268edae530 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.oasisopen.sca.ComponentContext; +import org.oasisopen.sca.annotation.Context; +import org.oasisopen.sca.annotation.Scope; +import org.oasisopen.sca.annotation.Service; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * An implementation of the Workpool service. + */ +@Service(WorkpoolService.class) +@Scope("COMPOSITE") +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public class WorkpoolServiceImpl implements WorkpoolService, + WorkerServiceCallback { + + /* incoming job queue */ + private LinkedBlockingQueue<Job> queue = new LinkedBlockingQueue<Job>(5000); + private CallableReferenceImpl<Trigger> trigger = null; + private Trigger forwardResult = null; + /* counter for job's number fetched from the queue and sent to the Worker */ + private AtomicInteger jobSent = new AtomicInteger(0); + /* time for initHandleResult */ + private AtomicLong initHandleResult = new AtomicLong(0); + /* time for endHandleResult */ + private AtomicLong endHandleResult = new AtomicLong(0); + /* + * number of job computed, this will be exposed in order to be used to + * firing rules + */ + private long jobComputed = 0; + /* same as above */ + private AtomicLong elapsedTime = new AtomicLong(0); + /* this is for comuputing averageServiceTime */ + private long times = 1; + /* this is for computing averageArrivalTime */ + private long timesArrival = 1; + private ReentrantLock arrivalLock = new ReentrantLock(); + private long arrivalPrevious = -1; + // private AtomicBoolean processingStopped = new AtomicBoolean(false); + private boolean processingStopped = false; + // private LinkedBlockingQueue<Trigger> triggers = new + // LinkedBlockingQueue<Trigger>(); + @Context + protected ComponentContext workpoolContext; + private CallableReferenceImpl<WorkpoolManager> manager; + private long previousSubmitTime = -1; + private boolean firstTime = true; + private boolean first = true; + private long start = 0; + private long end = 0; + private double averageServiceTime = 0; + private double averageArrivalTime = 0; + private int workersNo = 0; + private final Job nullJob = new NullJob(); + /* This is useful for counting the start and end */ + private Logger log = Logger.getLogger(WorkpoolServiceImpl.class.getName()); + private ReentrantLock handleResultLock = new ReentrantLock(); + private ReentrantLock postWorkerReferenceLock = new ReentrantLock(); + private ConcurrentHashMap<String, WorkerService> cacheReference = new ConcurrentHashMap<String, WorkerService>(); + private CallableReferenceImpl<WorkpoolService> myReference; + private String previuosURI = ""; + private long time = 0; + + private void computeAverageTime() { + long actualServiceTime = 0; + // if the processing is finished + if (processingStopped) + return; + + if (firstTime == true) { + this.previousSubmitTime = System.currentTimeMillis(); + this.averageServiceTime = 0; + firstTime = false; + } else { + actualServiceTime = System.currentTimeMillis() + - this.previousSubmitTime; + this.previousSubmitTime = System.currentTimeMillis(); + averageServiceTime = ((averageServiceTime * times) + actualServiceTime) + / (times + 1); + ++times; + } + } + + public void submit(Job j) { + try { + // log.info("Submit job in queue -->"+ j.getType()); + // processingStopped.set(false); + try { + arrivalLock.lock(); + if (this.arrivalPrevious == -1) { + arrivalPrevious = System.currentTimeMillis(); + averageArrivalTime = 0; + } + double actualArrivalTime = System.currentTimeMillis() + - arrivalPrevious; + averageArrivalTime = ((averageArrivalTime * timesArrival) + actualArrivalTime) + / (timesArrival + 1); + arrivalPrevious = System.currentTimeMillis(); + ++timesArrival; + } finally { + arrivalLock.unlock(); + } + queue.put(j); + } catch (Exception e) { + log.info("Exception in queue"); + queue.clear(); + e.printStackTrace(); + } + } + + public double getArrivalTime() { + return this.averageArrivalTime; + } + + public double getServiceTime() { + return this.averageServiceTime; + } + + public void receiveResult(Job resultType, boolean reuse, String workerURI) { + + if (reuse) { + queue.add(resultType); + return; + } + + computeAverageTime(); + Job job = null; + try { + job = queue.take(); + } catch (InterruptedException e) { + // TODO Better exception handling --> see Exception antipattern doc + e.printStackTrace(); + return; + } + + if ((job != null) && (job.eos() == false)) { + int nameIndex = workerURI.indexOf("/"); + String workerName = workerURI.substring(0, nameIndex - 1); + log.info("Sending job to worker --> " + workerName); + WorkerService worker = workpoolContext.getService( + WorkerService.class, workerName); + worker.compute(job); + } + + JobDataMap map = ((ResultJob) resultType).getDataMap(); + if (map != null) { + ++jobComputed; + Object obj = map.getJobDataObject("result"); + System.out.println("Result = " + ((Double) obj).doubleValue()); + } + + } + + public void start() { + log.info("WorkpoolServiceComponent started..."); + myReference = (CallableReferenceImpl) workpoolContext + .createSelfReference(WorkpoolService.class, "WorkpoolService"); + myReference.getService(); + } + + /* + * + * This method is called by WorkpoolManagerImpl, when it creates a new + * worker component in order to dispatch worker to the WorkpoolServiceImpl + * @param CallableReferenceImpl reference - a dynamically created reference + * from the Worker + */ + public void PostWorkerReference( + CallableReferenceImpl<WorkerService> reference) { + + try { + long initPostWorkerReference; + long endPostWorkerReference; + this.postWorkerReferenceLock.lock(); + + initPostWorkerReference = System.currentTimeMillis(); + WorkerService worker; + worker = reference.getService(); + worker.start(); + + ++workersNo; + if (myReference != null) { + + // Job poison = new ResultJob(); + this.postWorkerReferenceLock.unlock(); + log.info("Sending null job to worker"); + worker.computeFirstTime(nullJob, myReference); + // queue.put(poison); + endPostWorkerReference = System.currentTimeMillis(); + System.out.println("Time PostWorker =" + + (endPostWorkerReference - initPostWorkerReference)); + } else { + log.info("myReference is null"); + + } + } catch (Exception e) { + postWorkerReferenceLock.unlock(); + } finally { + } + + } + + /* + * FIXME This method currently is not used because i've not yet ready + * dynamic wire injection + */ + + public void PostWorkerName(String referenceName) { + /* TODO Do something similar to PostWorkerReference */ + } + + private void printComputingTime(Job j) { + + if (first == true) { + first = false; + start = System.currentTimeMillis(); + end = System.currentTimeMillis(); + } else { + end = System.currentTimeMillis(); + System.out.println("Elapsed Time = " + (end - start)); + elapsedTime.set(end - start); + } + /* + * i could use reflection or instance of (but it's a penalty kick) , or + * an object as result, but i'd prefer a job so i've defined a + * RESULT_JOB There're in the system three kind of jobs: RESULT_JOB, + * NULL_JOB, DEFAULT_JOB + */ + if ((j != null) && (j.getType() == Job.RESULT_JOB)) { + jobComputed++; + ResultJob result = (ResultJob) j; + JobDataMap map = result.getDataMap(); + if (map != null) { + Double doubleValue = (Double) map.getJobDataObject("result"); + System.out + .println("ResultValue = " + doubleValue.doubleValue()); + } + + } + + } + + public void handleResult(Job resultType, boolean reuse, String workerURI, + CallableReferenceImpl<WorkerService> worker, boolean newWorker) { + initHandleResult.set(System.nanoTime()); + if (reuse) { + log.info("Reusing a job.."); + queue.add(resultType); + return; + } + // init job variable + Job job; + if (newWorker) + System.out.println("newWorkerActivation= " + System.nanoTime()); + printComputingTime(resultType); + + try { + job = queue.take(); + } catch (Exception e) { + log.info("Exception during fetching the queue"); + e.printStackTrace(); + return; + } + + try { + // it needs to be locked because multiple threads could invoke this. + handleResultLock.lock(); + if (previuosURI.equals("")) { + time = System.currentTimeMillis(); + this.previuosURI = workerURI; + } else { + if (previuosURI.equals(workerURI)) + System.out.println("Complete ComputeTime for an item =" + + (time - System.currentTimeMillis())); + } + if (job.eos()) { + long endTime = System.currentTimeMillis(); + /* checking for EOS */ + if (processingStopped == false) { + processingStopped = true; + System.out.println("GOT EOS in time=" + (endTime - start)); + log.info("Stop autonomic cycle.."); + /* + * I'm doing this because i want that in the termination i + * would have more jobs with eos == true than workers. So + * i'm sure that every worker removes itself from its + * manager. I do it only one time. This is necessary because + * i have a variable number of workers. The number of + * workers in the system might change every time the rule + * engine cycle gets executed. + */ + ResultJob poison = new ResultJob(); + for (int i = 0; i < workersNo; ++i) { + try { + + queue.put(poison); + + } catch (Exception e) { + log.info("Cannot duplicate poison tokens"); + break; + } + + } + manager.getService().stopAutonomicCycle(); + } + } + computeAverageTime(); + System.out.println("AverageTime =" + averageServiceTime); + if (job != null) { + + WorkerService workerService; + /* + * the workpool has a high reuse, i always call the same + * component set or un superset or subset, so i cache it. When + * the WorkpoolManager will remove an item, it removes still + * this cache entry + */ + if (!cacheReference.containsKey(workerURI)) { + workerService = worker.getService(); + handleResultLock.unlock(); + cacheReference.put(workerURI, workerService); + } else { + handleResultLock.unlock(); + workerService = cacheReference.get(workerURI); + } + // it's still a penalty kick locking compute because it's going + // to be scheduled whereas it's async. + workerService.compute(job); + log.info("Sent job #" + jobSent.incrementAndGet() + + " Queue size " + queue.size()); + endHandleResult.set(System.nanoTime()); + System.out + .println("begin:handleResult ==> end:handleResult:compute = " + + (endHandleResult.addAndGet(-(initHandleResult + .get())) / 1000000)); + } + } catch (Exception e) { + handleResultLock.unlock(); + } + } + + public void evictAll() { + cacheReference.clear(); + } + + public void evict(String workerURI) { + if (cacheReference.containsKey(workerURI)) { + cacheReference.remove(workerURI); + } + + } + + public int estimatedQueueSize() { + return queue.size(); + } + + public long getElapsedTime() { + return elapsedTime.get(); + } + + public long getJobComputed() { + return jobComputed; + } + + public void registerManager( + CallableReferenceImpl<WorkpoolManager> createSelfReference) { + manager = createSelfReference; + + } + + public void stop() { + // TODO Auto-generated method stub + + } + + public void addTrigger(CallableReferenceImpl<Trigger> reference) { + this.trigger = reference; + this.forwardResult = reference.getService(); + + } + + public void removeTrigger() { + this.trigger = null; + this.forwardResult = null; + } +} diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeA/META-INF/sca-contribution.xml b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeA/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeA/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<contribution xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample"> + <deployable composite="sample:Workpool"/> +</contribution> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeA/Workpool.composite b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeA/Workpool.composite new file mode 100644 index 0000000000..9a00a8e839 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeA/Workpool.composite @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<composite xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample" + name="Workpool"> + + <component name="WorkpoolServiceComponent"> + <implementation.java class="workpool.WorkpoolServiceImpl"/> + </component> + <component name="WorkpoolManagerComponent"> + <implementation.java class="workpool.WorkpoolManagerImpl"/> + <property name="workers">4</property> + <property name="nodes">4</property> + <property name="injection">0</property> + <reference name="managerNodeB" target="WorkerManagerNodeBComponent" /> + + <reference name="managerNodeC" target="WorkerManagerNodeCComponent" /> + + <reference name="managerNodeD" target="WorkerManagerNodeDComponent" /> + + <reference name="managerNodeE" target="WorkerManagerNodeEComponent" /> + + + <service name="WorkpoolManagerInitService"> + <interface.java interface="org.apache.tuscany.sca.node.NodeManagerInitService"/> + <binding.sca/> + </service> + </component> + </composite> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeB/META-INF/sca-contribution.xml b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeB/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeB/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<contribution xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample"> + <deployable composite="sample:Workpool"/> +</contribution> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeB/Workpool.composite b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeB/Workpool.composite new file mode 100644 index 0000000000..a71dc9e08a --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeB/Workpool.composite @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<composite xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample" + name="Workpool"> + <component name="WorkerManagerNodeBComponent"> + <implementation.java class="workpool.WorkerManagerImpl"/> + <property name="nodeName">nodeB</property> + <property name="compositeName">Workpool.composite</property> + <property name="workerClass">workpool.MyWorker</property> + <service name="WorkerManagerInitService"> + <interface.java interface="org.apache.tuscany.sca.node.NodeManagerInitService"/> + <binding.sca/> + </service> + <service name="WorkerManager"> + <binding.sca uri="http://u13:13001/WorkerManagerNodeBComponent"/> + </service> + </component> + +</composite> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeC/META-INF/sca-contribution.xml b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeC/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeC/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<contribution xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample"> + <deployable composite="sample:Workpool"/> +</contribution> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeC/Workpool.composite b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeC/Workpool.composite new file mode 100644 index 0000000000..771db5370b --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeC/Workpool.composite @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<composite xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample" + name="Workpool"> + <component name="WorkerManagerNodeCComponent"> + <implementation.java class="workpool.WorkerManagerImpl"/> + <property name="nodeName">nodeC</property> + <property name="compositeName">Workpool.composite</property> + <property name="workerClass">workpool.MyWorker</property> + <service name="WorkerManagerInitService"> + <interface.java interface="org.apache.tuscany.sca.node.NodeManagerInitService"/> + <binding.sca/> + </service> + <service name="WorkerManager"> + <binding.sca uri="http://u14:13002/WorkerManagerNodeCComponent"/> + </service> + </component> + +</composite> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeD/META-INF/sca-contribution.xml b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeD/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeD/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<contribution xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample"> + <deployable composite="sample:Workpool"/> +</contribution> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeD/Workpool.composite b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeD/Workpool.composite new file mode 100644 index 0000000000..55fd48934f --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeD/Workpool.composite @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<composite xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample" + name="Workpool"> + <component name="WorkerManagerNodeDComponent"> + <implementation.java class="workpool.WorkerManagerImpl"/> + <property name="nodeName">nodeD</property> + <property name="compositeName">Workpool.composite</property> + <property name="workerClass">workpool.MyWorker</property> + <service name="WorkerManagerInitService"> + <interface.java interface="org.apache.tuscany.sca.node.NodeManagerInitService"/> + <binding.sca/> + </service> + <service name="WorkerManager"> + <binding.sca uri="http://u15:13003/WorkerManagerNodeDComponent"/> + </service> + </component> + +</composite> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeE/META-INF/sca-contribution.xml b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeE/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeE/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<contribution xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample"> + <deployable composite="sample:Workpool"/> +</contribution> diff --git a/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeE/Workpool.composite b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeE/Workpool.composite new file mode 100644 index 0000000000..1dc9ccfcea --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/main/resources/nodeE/Workpool.composite @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<composite xmlns="http://www.osoa.org/xmlns/sca/1.0" + targetNamespace="http://sample" + xmlns:sample="http://sample" + name="Workpool"> + <component name="WorkerManagerNodeEComponent"> + <implementation.java class="workpool.WorkerManagerImpl"/> + <property name="nodeName">nodeE</property> + <property name="compositeName">Workpool.composite</property> + <property name="workerClass">workpool.MyWorker</property> + <service name="WorkerManagerInitService"> + <interface.java interface="org.apache.tuscany.sca.node.NodeManagerInitService"/> + <binding.sca/> + </service> + <service name="WorkerManager"> + <binding.sca uri="http://u16:13004/WorkerManagerNodeEComponent"/> + </service> + </component> + +</composite> diff --git a/java/sca-contrib/demos/workpool-distributed/src/test/java/workpool/AComponent.java b/java/sca-contrib/demos/workpool-distributed/src/test/java/workpool/AComponent.java new file mode 100644 index 0000000000..5c20537fab --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/src/test/java/workpool/AComponent.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +public interface AComponent { + public static final int RETURN_VALUE = 1; + + public void greet(); +} diff --git a/java/sca-contrib/demos/workpool-distributed/workerRules.drl b/java/sca-contrib/demos/workpool-distributed/workerRules.drl new file mode 100644 index 0000000000..e7ebcad4c0 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/workerRules.drl @@ -0,0 +1,9 @@ +package workpool +import workpool.*; +rule "WorkerAdder" + when + $workerBean: WorkpoolBean(singleAction == false && (jobComputed > 500)) + then + $workerBean.addWorkerToNode("nodeB"); + $workerBean.setSingleAction(); +end diff --git a/java/sca-contrib/demos/workpool-distributed/workerRules1.drl b/java/sca-contrib/demos/workpool-distributed/workerRules1.drl new file mode 100644 index 0000000000..6fa0b85ba2 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/workerRules1.drl @@ -0,0 +1,9 @@ +package workpool +import workpool.*; +rule "WorkerAdder" + when + $workerBean: WorkpoolBean(averageServiceTime > 250) + then + $workerBean.addWorkerToNode("nodeB"); + $workerBean.setSingleAction(); +end diff --git a/java/sca-contrib/demos/workpool-distributed/workerRules2.drl b/java/sca-contrib/demos/workpool-distributed/workerRules2.drl new file mode 100644 index 0000000000..9d9551c3ea --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/workerRules2.drl @@ -0,0 +1,8 @@ +package workpool +import workpool.*; +rule "WorkerAverageService" + when + $workerBean: WorkpoolBean(jobComputed > 250) + then + $workerBean.addWorkerToNode("nodeB"); +end diff --git a/java/sca-contrib/demos/workpool-distributed/workerRules3.drl b/java/sca-contrib/demos/workpool-distributed/workerRules3.drl new file mode 100644 index 0000000000..a67af910a4 --- /dev/null +++ b/java/sca-contrib/demos/workpool-distributed/workerRules3.drl @@ -0,0 +1,14 @@ +package workpool +import workpool.*; +rule "AdaptUsageFactor" + when + $workerBean: WorkpoolBean(usageFactor > 0.8) + then + $workerBean.addWorkerToNode(""); +end +rule "AdaptQueueFull" + when + $workerBean: WorkpoolBean((estimedQueueSize > 1900) && jobsComputed > 100) + then + $workerBean.addWorkerToNode("nodeB") +end
\ No newline at end of file |