From cefeaaba712d4a50d7dee69508708773203379fb Mon Sep 17 00:00:00 2001 From: antelder Date: Tue, 21 Apr 2009 06:31:30 +0000 Subject: Move to 1.x-contrib git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@766998 13f79535-47bb-0310-9956-ffa450edef68 --- .../demos/workpool-distributed/LICENSE | 205 ++++++++ .../demos/workpool-distributed/NOTICE | 6 + .../demos/workpool-distributed/build.xml | 446 +++++++++++++++++ .../demos/workpool-distributed/pom.xml | 148 ++++++ .../src/main/java/node/DomainNode.java | 57 +++ .../src/main/java/node/DomainNodeDaemon.java | 79 +++ .../src/main/java/node/TestJob.java | 82 +++ .../src/main/java/node/WorkpoolDaemon.java | 271 ++++++++++ .../src/main/java/node/WorkpoolNode.java | 179 +++++++ .../src/main/java/node/workerRules1.drl | 13 + .../main/java/workpool/MetaComponentWorker.java | 85 ++++ .../src/main/java/workpool/MyWorker.java | 46 ++ .../src/main/java/workpool/NullJob.java | 43 ++ .../src/main/java/workpool/ResultJob.java | 54 ++ .../src/main/java/workpool/Trigger.java | 29 ++ .../src/main/java/workpool/WorkerManager.java | 31 ++ .../src/main/java/workpool/WorkerManagerImpl.java | 213 ++++++++ .../src/main/java/workpool/WorkerService.java | 56 +++ .../main/java/workpool/WorkerServiceCallback.java | 27 + .../src/main/java/workpool/WorkerServiceImpl.java | 171 +++++++ .../src/main/java/workpool/WorkpoolBean.java | 162 ++++++ .../main/java/workpool/WorkpoolBeanListener.java | 25 + .../src/main/java/workpool/WorkpoolEvent.java | 71 +++ .../src/main/java/workpool/WorkpoolManager.java | 48 ++ .../main/java/workpool/WorkpoolManagerImpl.java | 555 +++++++++++++++++++++ .../src/main/java/workpool/WorkpoolService.java | 91 ++++ .../main/java/workpool/WorkpoolServiceImpl.java | 416 +++++++++++++++ .../resources/nodeA/META-INF/sca-contribution.xml | 24 + .../src/main/resources/nodeA/Workpool.composite | 47 ++ .../resources/nodeB/META-INF/sca-contribution.xml | 24 + .../src/main/resources/nodeB/Workpool.composite | 38 ++ .../resources/nodeC/META-INF/sca-contribution.xml | 24 + .../src/main/resources/nodeC/Workpool.composite | 38 ++ .../resources/nodeD/META-INF/sca-contribution.xml | 24 + .../src/main/resources/nodeD/Workpool.composite | 38 ++ .../resources/nodeE/META-INF/sca-contribution.xml | 24 + .../src/main/resources/nodeE/Workpool.composite | 38 ++ .../src/test/java/workpool/AComponent.java | 25 + .../demos/workpool-distributed/workerRules.drl | 9 + .../demos/workpool-distributed/workerRules1.drl | 9 + .../demos/workpool-distributed/workerRules2.drl | 8 + .../demos/workpool-distributed/workerRules3.drl | 14 + 42 files changed, 3993 insertions(+) create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/LICENSE create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/NOTICE create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/build.xml create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/pom.xml create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeA/META-INF/sca-contribution.xml create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeA/Workpool.composite create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeB/META-INF/sca-contribution.xml create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeB/Workpool.composite create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeC/META-INF/sca-contribution.xml create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeC/Workpool.composite create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeD/META-INF/sca-contribution.xml create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeD/Workpool.composite create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeE/META-INF/sca-contribution.xml create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeE/Workpool.composite create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/src/test/java/workpool/AComponent.java create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/workerRules.drl create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/workerRules1.drl create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/workerRules2.drl create mode 100644 java/sca-1.x-contrib/demos/workpool-distributed/workerRules3.drl (limited to 'java/sca-1.x-contrib/demos') diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/LICENSE b/java/sca-1.x-contrib/demos/workpool-distributed/LICENSE new file mode 100644 index 0000000000..8aa906c321 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/LICENSE @@ -0,0 +1,205 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/NOTICE b/java/sca-1.x-contrib/demos/workpool-distributed/NOTICE new file mode 100644 index 0000000000..94481d6cfa --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/NOTICE @@ -0,0 +1,6 @@ +${pom.name} +Copyright (c) 2005 - 2007 The Apache Software Foundation + +This product includes software developed by +The Apache Software Foundation (http://www.apache.org/). + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/build.xml b/java/sca-1.x-contrib/demos/workpool-distributed/build.xml new file mode 100644 index 0000000000..9e10e8ec91 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/build.xml @@ -0,0 +1,446 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/pom.xml b/java/sca-1.x-contrib/demos/workpool-distributed/pom.xml new file mode 100644 index 0000000000..2f821d76a5 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/pom.xml @@ -0,0 +1,148 @@ + + + + 4.0.0 + + org.apache.tuscany.sca + tuscany-sca + 1.6-SNAPSHOT + ../../pom.xml + + demo-workpool-distributed + Apache Tuscany SCA Distributed Job Workpool Demo + + + + apache.incubator + + http://people.apache.org/repo/m2-incubating-repository + + + + org.drools + http://repository.jboss.com/maven2/org/drools/ + + + + + + org.apache.tuscany.sca + tuscany-host-embedded + 1.6-SNAPSHOT + + + + org.drools + drools + 4.0.4 + + + + antlr + antlr + 2.7.1 + + + commons-daemon + commons-daemon + 1.0.1 + + + + antlr + stringtemplate + 2.3b6 + + + + org.apache.tuscany.sca + tuscany-node-impl + 1.6-SNAPSHOT + + + + org.apache.tuscany.sca + tuscany-domain-impl + 1.6-SNAPSHOT + + + + org.apache.tuscany.sca + tuscany-binding-sca-axis2 + 1.6-SNAPSHOT + + + + org.apache.tuscany.sca + tuscany-binding-jsonrpc-runtime + 1.6-SNAPSHOT + + + + org.apache.tuscany.sca + tuscany-binding-http-runtime + 1.6-SNAPSHOT + runtime + + + + org.apache.tuscany.sca + tuscany-databinding-job + 1.6-SNAPSHOT + + + org.apache.tuscany.sca + tuscany-contribution-updater + 1.6-SNAPSHOT + + + + org.apache.tuscany.sca + tuscany-databinding-xstream + 1.6-SNAPSHOT + + + + org.apache.tuscany.sca + tuscany-implementation-resource-runtime + 1.6-SNAPSHOT + runtime + + + + org.apache.tuscany.sca + tuscany-host-tomcat + 1.6-SNAPSHOT + test + + + + junit + junit + 4.2 + test + + + + + + ${artifactId} + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java new file mode 100644 index 0000000000..a278499aae --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package node; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.domain.SCADomainFactory; + +/** + * This server program that loads a composite to provide simple registry + * function. This server can be replaced with any registry that is appropriate + * but the components in each node that talk to the registry should be replaced + * also. + */ +public class DomainNode { + + private static String DEFAULT_DOMAIN_URI = "http://u12:8877"; + private boolean stopped = true; + + public static void main(String[] args) { + + try { + + SCADomainFactory domainFactory = SCADomainFactory.newInstance(); + SCADomain domain = domainFactory + .createSCADomain(DEFAULT_DOMAIN_URI); + + System.out.println("Domain started (press enter to shutdown)"); + System.in.read(); + // waitForever(); + domain.destroy(); + } catch (Exception e) { + e.printStackTrace(); + } + + System.out.println("Domain stopped"); + } +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java new file mode 100644 index 0000000000..9d05761ad6 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package node; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.domain.SCADomainFactory; + +/** + * This server program that loads a composite to provide simple registry + * function. This server can be replaced with any registry that is appropriate + * but the components in each node that talk to the registry should be replaced + * also. + */ +public class DomainNodeDaemon implements Daemon { + + private SCADomain domain; + private static String DEFAULT_DOMAIN_URI = "http://u12:8877"; + private boolean stopped = true; + + private synchronized void waitForever() { + while (!stopped) { + try { + wait(); + } catch (InterruptedException ex) { + stopped = true; + return; + } + } + + } + + public void destroy() { + // TODO Auto-generated method stub + + } + + public void init(DaemonContext arg0) throws Exception { + // TODO Auto-generated method stub + + } + + public void start() throws Exception { + + SCADomainFactory domainFactory = SCADomainFactory.newInstance(); + domain = domainFactory.createSCADomain(DEFAULT_DOMAIN_URI); + + System.out.println("Domain started (press enter to shutdown)"); + waitForever(); + + } + + public void stop() throws Exception { + // TODO Auto-generated method stub + Thread.currentThread().interrupt(); + domain.destroy(); + } +} \ No newline at end of file diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java new file mode 100644 index 0000000000..f48e647bd6 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package node; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import org.apache.tuscany.sca.databinding.job.JobExecutionContext; +import org.apache.tuscany.sca.databinding.job.RemoteJob; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; + +public class TestJob extends RemoteJob implements java.io.Serializable { + private boolean EOS = false; + private Double value; + + public TestJob(Double x, long iterations, int[] items) { + JobDataMap map = new JobDataMap(); + map.addJobData("value", x); + map.addJobData("iterations", iterations); + map.addJobData("items", items); + context.setJobData(map); + } + + public TestJob(Double i, boolean eos) { + value = i; + this.EOS = eos; + } + + public TestJob(String jsonData) { + JobExecutionContext ctxt = new JobExecutionContext(); + ctxt.storeJSONData(jsonData); + } + + public int getType() { + return Job.REGULAR_JOB; + } + + public void setEOS() { + EOS = true; + } + + public boolean eos() { + return EOS; + } + + @Override + public Double compute(JobExecutionContext context) { + JobDataMap contextMap = context.getJobData(); + Long iterations = (Long) contextMap.getJobDataObject("iterations"); + Double value = (Double) contextMap.getJobDataObject("value"); + double x = value.doubleValue(); + System.out.println("Computing sinx for " + value + " for " + + iterations.intValue() + " times"); + long computing_start = System.currentTimeMillis(); + for (long i = 0; i < iterations.longValue(); ++i) { + x = Math.sin(x); + } + long computing_end = System.currentTimeMillis(); + System.out.println("Computing time= " + + (computing_end - computing_start)); + System.out.println("Send result = " + x); + return new Double(x); + } + +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java new file mode 100644 index 0000000000..1f2a4d1f9a --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package node; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + +import javax.xml.namespace.QName; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.commons.daemon.DaemonController; +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.node.NodeException; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.SCANodeFactory; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.apache.tuscany.sca.node.management.SCANodeManagerInitService; + +import java.net.URI; + +import workpool.WorkerManager; +import workpool.WorkerManagerImpl; +import workpool.WorkpoolManager; +import workpool.WorkpoolService; +import workpool.WorkpoolServiceImpl; + +/** + * This client program shows how to run a distributed SCA node. In this case a + * calculator node has been constructed specifically for running the calculator + * composite. Internally it creates a representation of a node and associates a + * distributed domain with the node. This separation is made different + * implementations of the distributed domain can be provided. + */ +public class WorkpoolDaemon implements Daemon, Runnable { + private String domainName; + private String nodeName; + private long iterations; + private long jobsNo; + private long workerNo; + private SCANode node; + private boolean stopped = false; + private DaemonController controller = null; + private Thread thread = null; + private String ruleFile = "workerRules.drl"; + + /* + * public static void main(String[] args) throws Exception { + * // Check that the correct arguments have been provided if (null == args || + * args.length < 4) { System.err.println("Usage: java WorkpoolNode + * domainname nodename iterTest workerNo"); System.exit(1); } + * + * try { String domainName = args[0]; String nodeName = args[1]; long + * iterations = Long.parseLong(args[2]); long jobsNo = + * Long.parseLong(args[3]); long workerNo = Long.parseLong(args[4]); + * ClassLoader cl = WorkpoolDaemon.class.getClassLoader(); + * + * SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); node = + * nodeFactory.createSCANode(null, domainName); + * node.addContribution(nodeName, cl.getResource(nodeName + "/")); + * node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + * node.start(); // nodeA is the head node and runs some tests while all + * other nodes // simply listen for incoming messages + * + * FileReader rules = new FileReader("workerRules.drl"); StringBuffer buffer = + * new StringBuffer(); + * + * BufferedReader br = new BufferedReader(rules); String ruleString; do { + * ruleString = br.readLine(); if (ruleString!=null) { + * buffer.append(ruleString);} } while (ruleString!=null); + * + * if ( nodeName.equals("nodeA") ) { // do some application stuff + * WorkpoolService workpoolService = + * node.getDomain().getService(WorkpoolService.class, + * "WorkpoolServiceComponent"); workpoolService.start(); + * NodeManagerInitService nodeInit = + * node.getDomain().getService(NodeManagerInitService.class, + * "WorkpoolManagerComponent/NodeManagerInitService"); + * nodeInit.setNode(node); WorkpoolManager workpoolManager = + * node.getDomain().getService(WorkpoolManager.class, + * "WorkpoolManagerComponent/WorkpoolManager"); + * workpoolManager.setWorkpoolReference(node.getDomain().getServiceReference(WorkpoolService.class, + * "WorkpoolServiceComponent")); + * workpoolManager.acceptRules(buffer.toString()); workpoolManager.start(); + * int items[] = {3,4,5,6,3,6,3,5,9,5,6}; + * + * double x = 398349; for (int i = 0; i < jobsNo; ++i) + * workpoolService.submit(new TestJob(x,iterations,items)); + * + * TestJob j = new TestJob(-1.0,true); for (int i = 0; i < workerNo+1; ++i){ + * j.setEOS(); workpoolService.submit(j); } } try { if + * (nodeName.equals("nodeB")) { NodeManagerInitService serviceNodeB = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeBComponent/NodeManagerInitService"); + * serviceNodeB.setNode(node); } if (nodeName.equals("nodeC")) { + * NodeManagerInitService workerManagerC = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeCComponent/NodeManagerInitService"); + * workerManagerC.setNode(node); } if (nodeName.equals("nodeD")) { + * NodeManagerInitService workerManagerD = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeDComponent/NodeManagerInitService"); + * workerManagerD.setNode(node); } if (nodeName.equals("nodeE")) { + * NodeManagerInitService workerManagerE = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeEComponent/NodeManagerInitService"); + * workerManagerE.setNode(node); } + * + * System.out.println("Node started (press enter to shutdown)"); + * System.in.read(); } catch (IOException e) { e.printStackTrace(); } // + * stop the node and all the domains in it node.stop(); node.destroy(); + * System.exit(0); } catch(Exception ex) { System.err.println("Exception in + * node - " + ex.getMessage()); ex.printStackTrace(System.err); } } + */ + public void destroy() { + // TODO Auto-generated method stub + + } + + public void init(DaemonContext arg0) throws Exception { + String[] args = arg0.getArguments(); + domainName = args[0]; + nodeName = args[1]; + iterations = Long.parseLong(args[2]); + jobsNo = Long.parseLong(args[3]); + workerNo = Long.parseLong(args[4]); + if (args.length == 6) { + ruleFile = args[5]; + } + this.controller = arg0.getController(); + // this.thread=new Thread(this); + } + + public void start() throws Exception { + + ClassLoader cl = WorkpoolDaemon.class.getClassLoader(); + + SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); + node = nodeFactory.createSCANode(null, domainName); + node.addContribution(nodeName, cl.getResource(nodeName + "/")); + node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + node.start(); + // nodeA is the head node and runs some tests while all other nodes + // simply listen for incoming messages + + FileReader rules = new FileReader(ruleFile); + StringBuffer buffer = new StringBuffer(); + + BufferedReader br = new BufferedReader(rules); + String ruleString; + do { + ruleString = br.readLine(); + if (ruleString != null) { + buffer.append(ruleString + "\n"); + } + } while (ruleString != null); + + if (nodeName.equals("nodeA")) { + // do some application stuff + WorkpoolService workpoolService = node.getDomain().getService( + WorkpoolService.class, "WorkpoolServiceComponent"); + workpoolService.start(); + SCANodeManagerInitService nodeInit = node.getDomain().getService( + SCANodeManagerInitService.class, + "WorkpoolManagerComponent/NodeManagerInitService"); + nodeInit.setNode(node); + WorkpoolManager workpoolManager = node.getDomain().getService( + WorkpoolManager.class, + "WorkpoolManagerComponent/WorkpoolManager"); + workpoolManager.setWorkpoolReference(node.getDomain() + .getServiceReference(WorkpoolService.class, + "WorkpoolServiceComponent")); + workpoolManager.acceptRules(buffer.toString()); + workpoolManager.start(); + + int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 }; + + double x = 398349; + for (int i = 0; i < jobsNo; ++i) { + workpoolService.submit(new TestJob(x, iterations, items)); + } + TestJob j = new TestJob(-1.0, true); + for (int i = 0; i < workerNo + 1; ++i) { + j.setEOS(); + workpoolService.submit(j); + } + + } + if (nodeName.equals("nodeB")) { + SCANodeManagerInitService workerManagerNodeB = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeBComponent/NodeManagerInitService"); + workerManagerNodeB.setNode(node); + } + + if (nodeName.equals("nodeC")) { + SCANodeManagerInitService workerManagerNodeC = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeCComponent/NodeManagerInitService"); + workerManagerNodeC.setNode(node); + } + + if (nodeName.equals("nodeD")) { + SCANodeManagerInitService workerManagerNodeD = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeDComponent/NodeManagerInitService"); + workerManagerNodeD.setNode(node); + } + + if (nodeName.equals("nodeE")) { + SCANodeManagerInitService workerManagerNodeE = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeEComponent/NodeManagerInitService"); + workerManagerNodeE.setNode(node); + } + + this.waitForever(); + // this.thread.start(); + } + + public void stop() throws Exception { + Thread.currentThread().interrupt(); + // thread.interrupt(); + node.stop(); + node.destroy(); + } + + private synchronized void waitForever() { + while (!stopped) { + try { + wait(); + } catch (InterruptedException ex) { + stopped = true; + return; + } + } + } + + public void run() { + waitForever(); + } +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java new file mode 100644 index 0000000000..86557548af --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package node; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + +import javax.xml.namespace.QName; + +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.SCANodeFactory; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import java.net.URI; + +import workpool.WorkerManager; +import workpool.WorkerManagerImpl; +import workpool.WorkpoolManager; +import workpool.WorkpoolService; +import workpool.WorkpoolServiceImpl; + +/** + * This client program shows how to run a distributed SCA node. In this case a + * calculator node has been constructed specifically for running the calculator + * composite. Internally it creates a representation of a node and associates a + * distributed domain with the node. This separation is made different + * implementations of the distributed domain can be provided. + */ +public class WorkpoolNode { + + public static void main(String[] args) throws Exception { + + // Check that the correct arguments have been provided + if (null == args || args.length < 4) { + System.err + .println("Useage: java WorkpoolNode domainname nodename iterTest workerNo"); + System.exit(1); + } + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + String domainName = args[0]; + String nodeName = args[1]; + long iterations = Long.parseLong(args[2]); + long jobsNo = Long.parseLong(args[3]); + long workerNo = Long.parseLong(args[4]); + ClassLoader cl = WorkpoolNode.class.getClassLoader(); + + SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); + SCANode node = nodeFactory.createSCANode(null, domainName); + node.addContribution(nodeName, cl.getResource(nodeName + "/")); + node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + node.start(); + // nodeA is the head node and runs some tests while all other nodes + // simply listen for incoming messages + + FileReader rules = new FileReader("workerRules.drl"); + StringBuffer buffer = new StringBuffer(); + + BufferedReader br = new BufferedReader(rules); + String ruleString; + do { + ruleString = br.readLine(); + if (ruleString != null) { + buffer.append(ruleString + "\n"); + } + } while (ruleString != null); + + if (nodeName.equals("nodeA")) { + // do some application stuff + WorkpoolService workpoolService = node.getDomain().getService( + WorkpoolService.class, "WorkpoolServiceComponent"); + workpoolService.start(); + NodeManagerInitService nodeInit = node.getDomain().getService( + NodeManagerInitService.class, + "WorkpoolManagerComponent/NodeManagerInitService"); + nodeInit.setNode(node); + WorkpoolManager workpoolManager = node.getDomain().getService( + WorkpoolManager.class, + "WorkpoolManagerComponent/WorkpoolManager"); + workpoolManager.setWorkpoolReference(node.getDomain() + .getServiceReference(WorkpoolService.class, + "WorkpoolServiceComponent")); + workpoolManager.setCycleTime(8000); + workpoolManager.acceptRules(buffer.toString()); + workpoolManager.start(); + int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 }; + + double x = 398349; + + for (int i = 0; i < jobsNo; ++i) + workpoolService.submit(new TestJob(x, iterations, items)); + + TestJob j = new TestJob(-1.0, true); + for (int i = 0; i < workerNo + 1; ++i) { + j.setEOS(); + workpoolService.submit(j); + } + + } + try { + if (nodeName.equals("nodeB")) { + NodeManagerInitService serviceNodeB = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeBComponent/NodeManagerInitService"); + serviceNodeB.setNode(node); + } + if (nodeName.equals("nodeC")) { + NodeManagerInitService workerManagerC = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeCComponent/NodeManagerInitService"); + workerManagerC.setNode(node); + } + if (nodeName.equals("nodeD")) { + NodeManagerInitService workerManagerD = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeDComponent/NodeManagerInitService"); + workerManagerD.setNode(node); + } + if (nodeName.equals("nodeE")) { + NodeManagerInitService workerManagerE = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeEComponent/NodeManagerInitService"); + workerManagerE.setNode(node); + } + + System.out.println("Node started (press enter to shutdown)"); + String buff; + for (;;) { + try { + buff = in.readLine(); + if (buff == null) + break; + System.out.print(in.readLine()); + } catch (IOException ex) { + break; // Exit thread. + } + } + // stop the node and all the domains in it + node.stop(); + node.destroy(); + System.exit(0); + } catch (Exception ex) { + System.err.println("Exception in node - " + ex.getMessage()); + ex.printStackTrace(System.err); + } + } +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl new file mode 100644 index 0000000000..9c5a5d1b7f --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl @@ -0,0 +1,13 @@ +package workpool +import workpool.*; +rule "WorkerAdder1" + when + $workerBean: WorkpoolBean(singleAction == false && (jobComputed > 500)) + then + $workerBean.setSingleAction() + $workerBean.addWorkerToNode("nodeB") + $workerBean.addWorkerToNode("nodeC") + $workerBean.addWorkerToNode("nodeD") + $workerBean.addWorkerToNode("nodeE") +end + \ No newline at end of file diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java new file mode 100644 index 0000000000..cdd0f30b34 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.io.StringReader; +import java.net.URI; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.logging.Logger; + +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamReader; + +import org.apache.tuscany.sca.assembly.MetaComponent; +import org.apache.tuscany.sca.assembly.impl.DefaultMetaComponent; + +public class MetaComponentWorker extends DefaultMetaComponent { + + private SecureRandom prng; + private String componentName; + private String scdl; + private String javaClass; + private boolean loadedFromString = false; + private Logger log = Logger.getLogger(MetaComponentWorker.class.getName()); + + public MetaComponentWorker() { + componentName = "WorkerComponent" + + java.util.UUID.randomUUID().toString(); + } + + public void setWorkerName(String componentName) { + this.componentName = componentName; + } + + public void setWorkerClass(String javaClass) { + this.javaClass = javaClass; + } + + private String generateSCDL() { + StringBuffer buffer = new StringBuffer(512); + buffer + .append("\n"); + buffer.append(""); + buffer.append(""); + buffer.append(this.componentName); + buffer.append("\n"); + return buffer.toString(); + } + + @Override + public XMLStreamReader build() throws Exception { + XMLInputFactory factory = XMLInputFactory.newInstance(); + if (!loadedFromString) + scdl = generateSCDL(); + return factory.createXMLStreamReader(new StringReader(scdl)); + + } + + public String getName() { + + return componentName; + } + +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java new file mode 100644 index 0000000000..c45696e3cf --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import org.apache.tuscany.sca.databinding.job.JobExecutionContext; +import org.apache.tuscany.sca.databinding.job.RemoteJob; +import org.osoa.sca.annotations.Scope; + +@Scope("COMPOSITE") +public class MyWorker extends WorkerServiceImpl { + private static int resultcount = 0; + + @Override + public ResultJob computeTask(Job job) { + + RemoteJob remoteJob = (RemoteJob) job; + System.out.println("Computing the job"); + JobExecutionContext context = remoteJob.getContext(); + ResultJob resultJob = new ResultJob(); + JobDataMap resultMap = new JobDataMap(); + resultMap.addJobData("result", remoteJob.compute(context)); + resultJob.setJobDataMap(resultMap); + System.out.println("Count result = " + (++resultcount)); + return resultJob; + } + +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java new file mode 100644 index 0000000000..fb930adf2e --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; + +public class NullJob implements Job, java.io.Serializable { + + public Object compute(Object arg0) { + // TODO Auto-generated method stub + return null; + } + + public JobDataMap getDataMap() { + return null; + } + + public boolean eos() { + return false; + } + + public int getType() { + return Job.NULL_JOB; + } + +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java new file mode 100644 index 0000000000..e04411668b --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import org.apache.tuscany.sca.databinding.job.JobExecutionContext; +import org.apache.tuscany.sca.databinding.job.RemoteJob; + +public class ResultJob extends RemoteJob implements + java.io.Serializable { + private JobDataMap map; + + public JobDataMap getDataMap() { + return map; + } + + public void setJobDataMap(JobDataMap map) { + this.map = map; + } + + public boolean eos() { + // TODO Auto-generated method stub + return true; + } + + public int getType() { + // TODO Auto-generated method stub + return Job.RESULT_JOB; + } + + @Override + public Object compute(JobExecutionContext v) { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java new file mode 100644 index 0000000000..469675b19b --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; +import org.osoa.sca.annotations.Remotable; + +@Remotable +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public interface Trigger { + void handleEvent(T c); +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java new file mode 100644 index 0000000000..520203e190 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; +import org.osoa.sca.annotations.Remotable; +import org.osoa.sca.CallableReference; +@Remotable +public interface WorkerManager { + CallableReference addWorker(); + boolean removeWorker(String workerName); + boolean removeWorkers(int k); + boolean removeAllWorkers(); + double getNodeLoad(); + int activeWorkers(); + void start(); +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java new file mode 100644 index 0000000000..d4337cad2f --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.osoa.sca.CallableReference; +import org.apache.tuscany.sca.node.management.SCANodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.osoa.sca.ComponentContext; +import org.osoa.sca.annotations.Context; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Scope; +import org.osoa.sca.annotations.Service; +import java.util.LinkedList; +import java.util.ArrayList; + +@Scope("COMPOSITE") +@Service(interfaces = { SCANodeManagerInitService.class, WorkerManager.class }) +public class WorkerManagerImpl implements WorkerManager, SCANodeManagerInitService { + private Logger log = Logger.getLogger(WorkerManagerImpl.class.getName()); + private LinkedList> activeWorkers = new LinkedList>(); + private List workerComponentNames = new ArrayList(); + private SCANodeImpl node; + @Property + protected String nodeName; + @Property + protected String compositeName; + @Property + protected String workerClass; + @Context + protected ComponentContext context; + private double loadAverage; + + /* This method is used to find a composite inside all deployed artifacts */ + private Composite findComposite(List artifacts) { + for (Composite fact : artifacts) { + log.info("Searching in a contribution deployed artifacts -" + + compositeName); + Composite augmented = (Composite) fact; + // found + if (augmented.getURI().equals(compositeName)) { + log.info("Found composite..." + compositeName); + return augmented; + } + } + } + return null; + } + + public CallableReference addWorker() { + log.info("Adding a new worker call.."); + long addWorkerStartTime = System.nanoTime(); + ContributionServiceImpl cServiceImpl = (ContributionServiceImpl) node.getContributionService(); + Contribution contribution = cServiceImpl.getContribution(nodeName); + List artifacts = contribution.getDeployables(); + CallableReference workerReference = null; + CallableReference ref = null; + log.info("Creating a MetaComponentWorker.."); + MetaComponentWorker mcw = new MetaComponentWorker(); + boolean found = false; + mcw.setWorkerClass(workerClass); + Composite augmented = findComposite(artifacts); + try { + if (augmented != null) { + long startCreation = System.nanoTime(); + node.addComponentToComposite(mcw, contribution.getURI(), + augmented.getURI()); + System.out.println("addComponentToComposite time = " + + (System.nanoTime() - startCreation)); + RuntimeComponent workerComponent = (RuntimeComponent) node + .getComponent(mcw.getName()); + if (workerComponent != null) { + ref = (CallableReference) workerComponent + .getComponentContext().createSelfReference( + WorkerService.class); + ref.getService().start(); + activeWorkers.addLast(ref); + workerComponentNames.add(mcw.getName()); + CallableReference manager = (CallableReference) context + .createSelfReference(WorkerManager.class, + "WorkerManager"); + ref.getService().registerManager(manager); + return ref; + } + } else { + log.info("Workpool composite not found!"); + } + } catch (Exception e) { + log.info("Exception activation"); + e.printStackTrace(); + } + ; + System.out.println("Component Creation Time =" + + (System.nanoTime() - addWorkerStartTime)); + return ref; + } + + public boolean removeAllWorkers() { + for (CallableReference callable : activeWorkers) { + callable.getService().stop(); + } + return true; + } + + public boolean removeWorker() { + CallableReference callable = activeWorkers + .removeLast(); + callable.getService().stop(); + return true; + } + + public boolean removeWorkers(int k) { + if (k >= activeWorkers.size()) + return false; + for (int i = 0; i < k; ++i) { + if (!removeWorker()) + return false; + } + return true; + } + + public void setNode(SCANode node) { + this.node = (SCANodeImpl) node; + + } + + public double getNodeLoad() { + /* + * FIXME [jo] this works only on Linux To be replaced with an JNI + * extension + */ + RandomAccessFile statfile; + + this.loadAverage = 1.0; + // load = 0; + int NoProcessors = 0; + String cpuLine = null; + try { + NoProcessors = Runtime.getRuntime().availableProcessors(); + if (NoProcessors > 1) + this.loadAverage = 1 / (1.0 * NoProcessors); + statfile = new RandomAccessFile("/proc/loadavg", "r"); + try { + statfile.seek(0); + cpuLine = statfile.readLine(); + + } catch (IOException e) { + // FIX ME: Better exception handling. + e.printStackTrace(); + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (NumberFormatException e) { + e.printStackTrace(); + } + double min1; + if (cpuLine != null) { + java.util.StringTokenizer st = new java.util.StringTokenizer( + cpuLine, " "); + min1 = Double.parseDouble(st.nextToken()); + } else + min1 = 0; + + return min1 * this.loadAverage; + } + + public int activeWorkers() { + return activeWorkers.size(); + } + + public boolean removeWorker(String workerName) { + RuntimeComponent workerComponent = (RuntimeComponent) node + .getComponent(workerName); + if (workerComponent != null) { + log.info("Removing component " + workerName); + node.removeComponentFromComposite(nodeName, "Workpool.composite", + workerName); + return true; + } + return false; + } + + public void start() { + // do nothing for now. + } +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java new file mode 100644 index 0000000000..37b7ea227a --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.osoa.sca.ServiceReference; +import org.osoa.sca.annotations.Callback; +import org.osoa.sca.annotations.Remotable; +import org.osoa.sca.annotations.OneWay; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; + +/** + * The interface for the multiply service + */ +@Remotable +@Callback(WorkerServiceCallback.class) +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public interface WorkerService { + @OneWay + void compute(Job j); + + void start(); + + void stop(); + + // void addJobCompleteHandler(String triggerName, + // CallableReferenceImpl handle); + // void removeJobCompleteHandler(String triggerName); + /* The worker manager */ + void registerManager(CallableReferenceImpl wm); + + void registerSender(CallableReferenceImpl sender); + + // void init(Job nullJob); + @OneWay + void computeFirstTime(Job nullJob, + CallableReferenceImpl myReference); + +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java new file mode 100644 index 0000000000..6fb1278e5e --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.osoa.sca.annotations.Remotable; + +@Remotable +public interface WorkerServiceCallback { + void receiveResult(Job resultType, boolean reuse, String workerName); +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java new file mode 100644 index 0000000000..2c9bf5ea48 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.osoa.sca.ComponentContext; +import org.osoa.sca.RequestContext; +import org.osoa.sca.ServiceReference; +import org.osoa.sca.annotations.Callback; +import org.osoa.sca.annotations.Context; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Scope; +import org.osoa.sca.annotations.Service; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.*; + +/** + * An implementation of the worker service. + */ +@Service(WorkerService.class) +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +@Scope("COMPOSITE") +public abstract class WorkerServiceImpl implements WorkerService { + private Logger log = Logger.getLogger(this.getClass().getName()); + private WorkerServiceCallback workerServiceCallback; + @Context + protected ComponentContext workerContext; + @Context + protected RequestContext requestContext; + @Property + protected String workerName; + private CallableReferenceImpl managerReference = null; + + /* TODO add the triggers, but before ask */ + // protected Map triggers = new HashMap(); + public abstract ResultJob computeTask(Job job); + + private boolean stopped = false; + private CallableReferenceImpl serviceRef; + private CallableReferenceImpl senderService; + private WorkpoolService wp = null; + private WorkerManager manager = null; + + public void start() { + log.info("Starting worker..."); + stopped = false; + serviceRef = (CallableReferenceImpl) workerContext + .createSelfReference(WorkerService.class); + + } + + public void init(CallableReferenceImpl sender, Job nullJob) { + compute(nullJob); + } + + public void stop() { + stopped = true; + } + + @Callback + public void setWorkerServiceCallback( + WorkerServiceCallback workerServiceCallback) { + log.info("Setting worker callback"); + this.workerServiceCallback = workerServiceCallback; + } + + public void computeFirstTime(Job nullJob, + CallableReferenceImpl sender) { + senderService = sender; + wp = sender.getService(); + workWithCallable(nullJob); + } + + public void registerManager(CallableReferenceImpl wm) { + managerReference = wm; + manager = managerReference.getService(); + + } + + public void registerSender(CallableReferenceImpl sender) { + log.info("Registering sender.."); + senderService = sender; + wp = sender.getService(); + } + + private void workWithInjection(Job j) { + log.info("Worker has received job"); + if (stopped) { + workerServiceCallback + .receiveResult(j, true, workerContext.getURI()); + if (managerReference != null) + manager.removeWorker(workerContext.getURI()); + } else if (j.eos()) { + if (managerReference != null) + manager.removeWorker(workerContext.getURI()); + } + if (j instanceof NullJob) { + workerServiceCallback.receiveResult(j, false, workerContext + .getURI()); + } else { + workerServiceCallback.receiveResult(computeTask(j), false, + workerContext.getURI()); + } + } + + private void workWithCallable(Job j) { + log.info("Worker " + workerContext.getURI() + + " has received job with eos --> " + j.eos()); + if (stopped) { + wp.handleResult(j, true, workerContext.getURI(), serviceRef, false); + return; + } + if (j.eos()) { + log.info("Got poison token..."); + if (managerReference != null) { + log.info("Removing component " + workerContext.getURI()); + manager.removeWorker(workerContext.getURI()); + + } + return; + } + if (j.getType() != Job.NULL_JOB) { + wp.handleResult(computeTask(j), false, workerContext.getURI(), + serviceRef, false); + } else { + log.info("Got a null job"); + wp.handleResult(j, false, workerContext.getURI(), serviceRef, true); + } + } + + public void compute(Job j) { + + if (senderService != null) { + log.info("Computing job using callable reference method"); + workWithCallable(j); + + } else { + log.info("Computing job using reference injection method"); + workWithInjection(j); + + } + } + /* + * public void addJobCompleteHandler(String triggerName, + * CallableReferenceImpl handle) { if + * (!triggers.containsKey(triggerName)) { triggers.put(triggerName, + * handle.getService()); } } public void removeJobCompleteHandler(String + * triggerName) { if (!triggers.containsKey(triggerName)) { + * triggers.remove(triggerName); } } + */ +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java new file mode 100644 index 0000000000..80c093ff1c --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.beans.*; +import java.util.Vector; +import java.util.logging.*; + +public class WorkpoolBean { + private Vector listeners = new Vector(); + double loadAverage = 0; + int nodeNumbers = 0; + int workers = 0; + int estimedQueueSize = 0; + double averageServiceTime = 0; + double averageArrivalTime = 0; + double usageFactor = 0; + private final PropertyChangeSupport changes = new PropertyChangeSupport( + this); + long jobComputed = 0; + boolean singleAction = false; + private Logger log = Logger.getLogger(WorkpoolBean.class.getName()); + + public void setNodeNumbers(int n) { + this.nodeNumbers = n; + } + + public void setWorkers(int w) { + this.workers = w; + } + + public void setLoadAverage(double loadAverage) { + this.loadAverage = loadAverage; + } + + public void setAverageServiceTime(double service) { + this.averageServiceTime = service; + } + + public void setAverageArrivalTime(double service) { + this.averageArrivalTime = service; + } + + public double getAverageArrivalTime() { + return this.averageArrivalTime; + } + + public double getUtilizationFactor() { + return usageFactor; + } + + public void setUsageFactor() { + usageFactor = averageServiceTime / averageArrivalTime; + } + + public void setEstimedQueueSize(int size) { + estimedQueueSize = size; + } + + public int getEstimedQueueSize() { + return estimedQueueSize; + } + + public double getLoadAverage() { + return this.loadAverage; + } + + public int getWorkers() { + return this.workers; + } + + public int getNodeNumbers() { + return this.nodeNumbers; + } + + public double getAverageServiceTime() { + return this.averageServiceTime; + } + + public void addPropertyChangeListener(final PropertyChangeListener l) { + this.changes.addPropertyChangeListener(l); + } + + public void removePropertyChangeListener(final PropertyChangeListener l) { + this.changes.removePropertyChangeListener(l); + } + + private synchronized void fireWorkpoolEvent(WorkpoolEvent ev) { + for (WorkpoolBeanListener l : listeners) { + l.handleEvent(new WorkpoolEvent(ev)); + } + } + + public void addWorkersToNode(int k, String nodeName) { + log.info("Adding a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER, k, nodeName); + fireWorkpoolEvent(ev); + } + + public void addWorkerToNode(String nodeName) { + log.info("Adding a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.SINGLE_ADD_WORKER, 1, nodeName); + fireWorkpoolEvent(ev); + } + + public void removeWorkersToNode(int k, String nodeName) { + log.info("Removing a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER, k, nodeName); + fireWorkpoolEvent(ev); + } + + public void removeWorkerToNode(String nodeName) { + log.info("Removing a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.SINGLE_REMOVE_WORKER, 1, nodeName); + fireWorkpoolEvent(ev); + } + + public synchronized void addListener(WorkpoolBeanListener l) { + this.listeners.add(l); + } + + public synchronized void removeListener(WorkpoolBeanListener l) { + this.listeners.remove(l); + } + + public void setJobComputed(long jobComputed) { + this.jobComputed = jobComputed; + + } + + public void setSingleAction() { + singleAction = true; + } + + public boolean getSingleAction() { + return singleAction; + } + + public long getJobComputed() { + return this.jobComputed; + } +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java new file mode 100644 index 0000000000..0ecc223fed --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.EventListener; + +public interface WorkpoolBeanListener extends EventListener { + public void handleEvent(WorkpoolEvent ev); +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java new file mode 100644 index 0000000000..0bdc3671d5 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.EventObject; + +public class WorkpoolEvent extends EventObject { + + private static final long serialVersionUID = -1273928009411948768L; + + public WorkpoolEvent(Object source) { + super(source); + } + + public WorkpoolEvent(WorkpoolEvent ev) { + super(ev.source); + type = ev.type; + noWorker = ev.noWorker; + nodeName = ev.nodeName; + } + + public WorkpoolEvent(Object source, int typeEv, int worker) { + super(source); + type = typeEv; + noWorker = worker; + nodeName = ""; + } + + public WorkpoolEvent(Object source, int typeEv, int worker, String nodeName) { + super(source); + type = typeEv; + noWorker = worker; + this.nodeName = nodeName; + } + + public String getNodeName() { + return nodeName; + } + + public int getType() { + return type; + } + + public int workers() { + return noWorker; + } + + private int type; + private int noWorker; + private String nodeName; + public static final int EVENT_MULTIPLE_ADD_WORKER = 0; + public static final int EVENT_MULTIPLE_REMOVE_WORKER = 1; + public static final int SINGLE_REMOVE_WORKER = 2; + public static final int SINGLE_ADD_WORKER = 3; +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java new file mode 100644 index 0000000000..6520954bdd --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.osoa.sca.ServiceReference; +import org.osoa.sca.annotations.OneWay; +import org.osoa.sca.annotations.Remotable; + +@Remotable +public interface WorkpoolManager { + /* + * @param String rules This are the autonomic rules. The format is the Java + * Drools .drl file. You have to read it + */ + @OneWay + void acceptRules(String rules); + + @OneWay + void start(); + + @OneWay + void stopAutonomicCycle(); + + @OneWay + void startAutonomicCycle(); + + int activeWorkers(); + + void setCycleTime(long time); + + void setWorkpoolReference(ServiceReference serviceReference); +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java new file mode 100644 index 0000000000..f7c727ad04 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.osoa.sca.ComponentContext; +import org.osoa.sca.ServiceReference; +import java.util.Collections; +import java.util.Enumeration; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Iterator; +import java.util.Timer; +import java.util.TimerTask; +import java.util.logging.Logger; + +import javax.xml.stream.XMLStreamException; + +import node.TestJob; +import java.io.File; +import java.util.Vector; +import org.apache.axiom.om.OMElement; +import org.apache.tuscany.sca.contribution.service.ContributionResolveException; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.core.context.ServiceReferenceImpl; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.osoa.sca.CallableReference; +import org.drools.FactHandle; +import org.drools.RuleBase; +import org.drools.RuleBaseFactory; +import org.drools.StatefulSession; +import org.drools.StatelessSession; +import org.drools.compiler.DroolsParserException; +import org.drools.compiler.PackageBuilder; +import org.drools.rule.Package; +import org.osoa.sca.annotations.Constructor; +import org.osoa.sca.annotations.Context; +import org.osoa.sca.annotations.Destroy; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Reference; +import org.osoa.sca.annotations.Scope; +import org.osoa.sca.annotations.Service; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +@Service(interfaces = { NodeManagerInitService.class, WorkpoolManager.class }) +@Scope("COMPOSITE") +/* + * This is the core manager of the workpool application. The Workpool Manager + * holds the reference to each remote node manager. Inside it we've a rule + * engine instance. + */ +public class WorkpoolManagerImpl implements WorkpoolManager, + NodeManagerInitService, WorkpoolBeanListener { + /* + * This inner class trigs the rule engine, at given times: 1. It checks the + * different loads for each nodes and sets the WorkpoolBean 2. It checks the + * Workpool AverageService Time and sets the WorkpoolBean 3. It checks how + * many jobs are already computed and sets the WorkpoolBean Then given the + * configured bean and the rules, run the Rule Engine for executing the + * business logic + */ + class RuleEngineTrigger extends TimerTask { + // private ReentrantLock triggerLock = new ReentrantLock(); + @Override + public void run() { + + System.out.println("Updating WorkpoolBean.."); + // checkActiveWorkers(); + // checkLoadInNodes(); + checkServiceTime(); + // checkEstimedQueueSize(); + // checkArrivalTime(); + getProcessedItem(); + // computeUsageFactor(); + doRun(bean); + } + + } + + private WorkerManager managerNodeB; + private WorkerManager managerNodeC; + private WorkerManager managerNodeD; + private WorkerManager managerNodeE; + + private SCANodeImpl node; + private WorkpoolBean bean = new WorkpoolBean(); + private ReentrantLock handleEventLock = new ReentrantLock(); + private ReentrantLock updateRuleLock = new ReentrantLock(); + + private ServiceReference reference; + private AtomicInteger activeWorkers = new AtomicInteger(0); + private Logger log = Logger.getLogger(WorkpoolManagerImpl.class.getName()); + @Property + protected String workers; + @Property + protected String nodes; + @Property + protected String injection; + @Context + protected ComponentContext workpoolManagerContext; + private CallableReferenceImpl myReference; + private String rules = null; + private boolean referenceInjection = false; + private ConcurrentHashMap workerManagerTable = new ConcurrentHashMap(); + private int workersNo; + private int nodesNo; + private Timer timer = new Timer(); + /* this handle facts */ + private RuleBase ruleBase = null; + private FactHandle handle = null; + private StatefulSession wm = null; + private long cycleTime = 5000; + + @Reference + public void setManagerNodeB(WorkerManager managerNodeB) { + this.managerNodeB = managerNodeB; + workerManagerTable.put("nodeB", managerNodeB); + } + + @Reference + public void setManagerNodeC(WorkerManager managerNodeC) { + this.managerNodeC = managerNodeC; + workerManagerTable.put("nodeC", managerNodeC); + } + + @Reference + public void setManagerNodeD(WorkerManager managerNodeD) { + this.managerNodeD = managerNodeD; + workerManagerTable.put("nodeD", managerNodeD); + } + + @Reference + public void setManagerNodeE(WorkerManager managerNodeE) { + this.managerNodeE = managerNodeE; + workerManagerTable.put("nodeE", managerNodeE); + } + + private void startNewComponents( + Vector> vector) { + log.info("Starting new components"); + WorkpoolService wp = reference.getService(); + // CallableReferenceImpl sink = + // (CallableReferenceImpl) reference; + Job j = new NullJob(); + for (CallableReferenceImpl item : vector) { + // WorkerService service = item.getService(); + // service.start(); + // service.computeFirstTime(j, sink); + log.info("Send PostWorkerReference..."); + wp.PostWorkerReference(item); + } + if (myReference != null) + wp.registerManager(myReference); + } + + public void setCycleTime(long cycle) { + this.cycleTime = cycle; + } + + @SuppressWarnings("unchecked") + /* + * This gets the number of workers workerNo and instantiates them + */ + public void start() { + this.myReference = (CallableReferenceImpl) workpoolManagerContext + .createSelfReference(WorkpoolManager.class, "WorkpoolManager"); + this.workersNo = Integer.parseInt(this.workers); + this.nodesNo = Integer.parseInt(this.nodes); + this.referenceInjection = (Integer.parseInt(this.injection) != 0); + log.info("Starting WorkpoolManager Component with #" + workersNo + + " workers and #" + nodes + " nodes"); + nodesNo = workerManagerTable.values().size(); + // Sets info in the bean. + bean.setWorkers(this.workersNo); + bean.setNodeNumbers(nodesNo); + Vector> workerRefs = new Vector>(); + int exactTimes = workersNo / nodesNo; + for (int i = 0; i < exactTimes; ++i) { + for (WorkerManager manager : workerManagerTable.values()) { + manager.start(); + if (manager != null) { + System.err.println("Actual load = " + + manager.getNodeLoad() + " for node "); + addNewComponent(manager, workerRefs); + } + } + } + + int module = (workersNo % nodesNo); + int n = 0; + if (module > 0) { + Vector v = new Vector(workerManagerTable.keySet()); + Collections.sort(v); + // Iterator iter = + // workerManagerTable.values().iterator(); + // Display (sorted) hashtable. + for (Enumeration e = v.elements(); (e.hasMoreElements() && n < module); ++n) { + String key = e.nextElement(); + WorkerManager m = workerManagerTable.get(key); + System.err.println("Module Actual load = " + m.getNodeLoad() + + " for node "); + addNewComponent(m, workerRefs); + } + } + startNewComponents(workerRefs); + bean.addListener(this); + TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger(); + timer.scheduleAtFixedRate(task, 3000, cycleTime); + } + + private void checkLoadInNodes() { + System.out.println("CheckLoadInNodes"); + int number = 1; + double loadAverage = 0; + for (WorkerManager manager : workerManagerTable.values()) { + loadAverage += manager.getNodeLoad(); + number++; + } + bean.setLoadAverage(loadAverage / number); + } + + private void computeUsageFactor() { + bean.setUsageFactor(); + } + + private void checkEstimedQueueSize() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + int size = wp.estimatedQueueSize(); + log.info("Estimed Queue Size =" + size); + bean.setEstimedQueueSize(size); + } + } + + private WorkerManager findMinLoad() { + double load = 0; + // workerManagerTable.values().iterator().next().getNodeLoad(); + WorkerManager toFind = null; + for (WorkerManager manager : workerManagerTable.values()) { + if (load == 0) { + load = manager.getNodeLoad(); + toFind = manager; + } else if (manager.getNodeLoad() < load) { + load = manager.getNodeLoad(); + toFind = manager; + } + } + return toFind; + } + + private void checkServiceTime() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + double time = wp.getServiceTime(); + log.info("Average System Service Time =" + time); + bean.setAverageServiceTime(time); + } + } + + private void checkArrivalTime() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + double time = wp.getArrivalTime(); + log.info("Average Arrival Service Time =" + time); + bean.setAverageArrivalTime(time); + } + } + + private void checkActiveWorkers() { + bean.setWorkers(this.activeWorkers()); + } + + private void getProcessedItem() { + WorkpoolService wp = reference.getService(); + if (wp != null) { + long computed = wp.getJobComputed(); + log.info("The system has already computed " + computed + " jobs"); + bean.setJobComputed(computed); + } + } + + private boolean removeComponent(WorkerManager manager, int k) { + manager.removeWorkers(k); + activeWorkers.decrementAndGet(); + return true; + } + + @SuppressWarnings("unchecked") + private boolean addNewComponent(WorkerManager manager, + Vector> workerRefs) { + CallableReferenceImpl workerReference = (CallableReferenceImpl) manager + .addWorker(); + + if (workerReference != null) { + /* if i'll decide to use dynamically generated references */ + if (referenceInjection) { + workerReference.getService(); + String uri = workerReference.getEndpointReference().getURI(); + int nameIndex = uri.indexOf("/"); + String componentName = uri.substring(0, nameIndex); + if (componentName.startsWith("/")) + componentName = uri.substring(1, uri.length()); + if (componentName.endsWith("/")) + componentName = uri.substring(0, uri.length() - 1); + // String componentName = uri.substring(0, nameIndex-1); + + log.info("Adding wire from WorkpoolComponentService to " + + componentName); + String referenceName = "ref" + componentName; + + /* + * I'm updating the WorkpoolServiceComponent with a new + * reference to a just created component I assume that the + * WorkpoolManagerService and the WorkpoolServiceComponent stay + * in the same JVM It's like in the scdl there were: With this then + * I've a wire WorkpoolService---> a new Worker + */ + try { + node.addComponentReferenceWire(referenceName, "nodeA", + "Workpool.composite", "workpool.WorkerServiceImpl", + WorkerService.class, "WorkpoolServiceComponent", + componentName); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + log.info("Sending reference name " + referenceName + + " to WorkpoolService"); + // TODO: this was part of dynamic wiring, but it doesn't work. + // reference.getService().PostWorkerName(referenceName); + + } else { + // log.info("Sending callable reference to WorkpoolService + // placed at -->"+reference); + // reference.getService().PostWorkerReference(workerReference); + workerRefs.add(workerReference); + } + activeWorkers.incrementAndGet(); + return true; + } + return false; + } + + public int activeWorkers() { + + return activeWorkers.get(); + } + + private void doRun(WorkpoolBean bean) { + + long startTime = System.currentTimeMillis(); + updateRuleLock.lock(); + if (wm == null) + wm = ruleBase.newStatefulSession(); + if (this.handle == null) + handle = wm.insert(bean); + else { + wm.update(handle, bean); + } + wm.fireAllRules(); + updateRuleLock.unlock(); + + System.out.println("Engine rule overhead = " + + (System.currentTimeMillis() - startTime)); + } + + private RuleBase readRule(String rule) { + + PackageBuilder packBuilder = new PackageBuilder(); + try { + packBuilder.addPackageFromDrl(new StringReader(rule)); + } catch (DroolsParserException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + Package pkg = packBuilder.getPackage(); + RuleBase ruleBase = RuleBaseFactory.newRuleBase(); + try { + ruleBase.addPackage(pkg); + } catch (Exception e) { + e.printStackTrace(); + } + return ruleBase; + } + + public void acceptRules(String rules) { + this.rules = rules; + if (ruleBase == null) { + RuleBase base = readRule(rules); + if (base != null) { + ruleBase = base; + } + } else { + updateRuleLock.lock(); + // i have already a rule: updating + ruleBase = readRule(rules); + wm = ruleBase.newStatefulSession(); + handle = null; + updateRuleLock.unlock(); + } + + System.out.println("Accepted rules = " + rules); + } + + public String getRules() { + return rules; + } + + private WorkerManager findMaxLoadNode() { + double load = 0.0; + WorkerManager toFind = null; + for (WorkerManager manager : workerManagerTable.values()) { + if (manager.getNodeLoad() > load) { + load = manager.getNodeLoad(); + toFind = manager; + } + } + return toFind; + + } + + public void setWorkpoolReference( + ServiceReference serviceReference) { + reference = serviceReference; + } + + public void setNode(SCANode arg0) { + node = (SCANodeImpl) arg0; + } + + public void handleEvent(WorkpoolEvent ev) { + if (ev == null) + return; + + String nodeName = ev.getNodeName(); + + switch (ev.getType()) { + case WorkpoolEvent.SINGLE_ADD_WORKER: { + if (nodeName != null) { + Vector> workerRefs = new Vector>(); + + // in this case I have a nodeName + if (!nodeName.equals("") + && (workerManagerTable.containsKey(nodeName))) { + WorkerManager manager = workerManagerTable.get(nodeName); + addNewComponent(manager, workerRefs); + startNewComponents(workerRefs); + } else if (nodeName.equals("")) { + WorkerManager manager = findMinLoad(); + addNewComponent(manager, workerRefs); + startNewComponents(workerRefs); + } + } + break; + } + case WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER: { + Vector> workerRefs = new Vector>(); + + if (nodeName.equals("")) { + + WorkerManager manager = findMinLoad(); + int k = ev.workers(); + for (int h = 0; h < k; ++h) { + addNewComponent(manager, workerRefs); + } + } else { + WorkerManager manager = workerManagerTable + .get(ev.getNodeName()); + int k = ev.workers(); + for (int h = 0; h < k; ++h) { + addNewComponent(manager, workerRefs); + } + } + startNewComponents(workerRefs); + break; + } + case WorkpoolEvent.SINGLE_REMOVE_WORKER: { + if (nodeName != null) { + // in this case I have a nodeName + if (!nodeName.equals("") + && (workerManagerTable.containsKey(nodeName))) { + WorkerManager manager = workerManagerTable.get(nodeName); + removeComponent(manager, 1); + } else if (nodeName.equals("")) { + WorkerManager manager = findMaxLoadNode(); + removeComponent(manager, 1); + } + } + break; + } + case WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER: { + if (nodeName.equals("")) { + WorkerManager manager = findMaxLoadNode(); + removeComponent(manager, ev.workers()); + + } else { + WorkerManager manager = workerManagerTable.get(nodeName); + removeComponent(manager, ev.workers()); + } + break; + } + } + + } + + @Destroy + public void onExit() { + // do cleanup + this.timer.cancel(); + this.timer.purge(); + } + + public void stopAutonomicCycle() { + this.timer.cancel(); + this.timer.purge(); + this.timer = null; + } + + public void startAutonomicCycle() { + if (this.timer == null) { + this.timer = new Timer(); + TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger(); + timer.schedule(task, 3000, cycleTime); + } + } +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java new file mode 100644 index 0000000000..d84ae549d8 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; +import org.osoa.sca.annotations.OneWay; +import org.osoa.sca.annotations.Remotable; +import org.osoa.sca.ServiceReference; + +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +@Remotable +public interface WorkpoolService { + + /* this the functional part */ + void submit(Job i); + + /* the time between two subsequent worker invocations */ + double getServiceTime(); + + /* the number of ResultJob received */ + long getJobComputed(); + + /* the time elapsed between the stream has initiated and now */ + long getElapsedTime(); + + /* the size of the internal queue : it's not accurate */ + int estimatedQueueSize(); + + /* the average time between two consuecutive submit */ + double getArrivalTime(); + + void start(); + + void stop(); + + /* + * this is the part needed by management. May be in future i'll refactor it + * order to hide this part. + */ + @OneWay + void handleResult(Job j, boolean reuse, String string, + CallableReferenceImpl worker, boolean newJob); + + void addTrigger(CallableReferenceImpl reference); + + void removeTrigger(); + + void registerManager( + CallableReferenceImpl createSelfReference); + + /* + * This could placed in another interface definition - think about it These + * methods evict, and evictAll are needed when a worker finish to exist and + * it needs to be evicted by the WorkpoolManager. In the system I have two + * caches: 1) a domain cache, which holds the components URI 2) a + * workerReference cache (implemented by a ConcurrentHashMap), which holds a + * proxy to each worker. Every proxy gets built from the worker callable + * reference. I'm thinking for placing the workerReferenceCache in a local + * interface. Assuming that WorkpoolService and WorkpoolManager are in the + * same JVM. + */ + void evict(String workerURI); + + void evictAll(); + + /* + * these two are no longer needed. I leave it because if i'll have time to + * do dynamic wiring the first one is needed. void PostWorkerName(String + * referenceName); + */ + void PostWorkerReference(CallableReferenceImpl worker); + +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java new file mode 100644 index 0000000000..52b88af42c --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.osoa.sca.ComponentContext; +import org.osoa.sca.annotations.Context; +import org.osoa.sca.annotations.Scope; +import org.osoa.sca.annotations.Service; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * An implementation of the Workpool service. + */ +@Service(WorkpoolService.class) +@Scope("COMPOSITE") +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public class WorkpoolServiceImpl implements WorkpoolService, + WorkerServiceCallback { + + /* incoming job queue */ + private LinkedBlockingQueue queue = new LinkedBlockingQueue(5000); + private CallableReferenceImpl trigger = null; + private Trigger forwardResult = null; + /* counter for job's number fetched from the queue and sent to the Worker */ + private AtomicInteger jobSent = new AtomicInteger(0); + /* time for initHandleResult */ + private AtomicLong initHandleResult = new AtomicLong(0); + /* time for endHandleResult */ + private AtomicLong endHandleResult = new AtomicLong(0); + /* + * number of job computed, this will be exposed in order to be used to + * firing rules + */ + private long jobComputed = 0; + /* same as above */ + private AtomicLong elapsedTime = new AtomicLong(0); + /* this is for comuputing averageServiceTime */ + private long times = 1; + /* this is for computing averageArrivalTime */ + private long timesArrival = 1; + private ReentrantLock arrivalLock = new ReentrantLock(); + private long arrivalPrevious = -1; + // private AtomicBoolean processingStopped = new AtomicBoolean(false); + private boolean processingStopped = false; + // private LinkedBlockingQueue triggers = new + // LinkedBlockingQueue(); + @Context + protected ComponentContext workpoolContext; + private CallableReferenceImpl manager; + private long previousSubmitTime = -1; + private boolean firstTime = true; + private boolean first = true; + private long start = 0; + private long end = 0; + private double averageServiceTime = 0; + private double averageArrivalTime = 0; + private int workersNo = 0; + private final Job nullJob = new NullJob(); + /* This is useful for counting the start and end */ + private Logger log = Logger.getLogger(WorkpoolServiceImpl.class.getName()); + private ReentrantLock handleResultLock = new ReentrantLock(); + private ReentrantLock postWorkerReferenceLock = new ReentrantLock(); + private ConcurrentHashMap cacheReference = new ConcurrentHashMap(); + private CallableReferenceImpl myReference; + private String previuosURI = ""; + private long time = 0; + + private void computeAverageTime() { + long actualServiceTime = 0; + // if the processing is finished + if (processingStopped) + return; + + if (firstTime == true) { + this.previousSubmitTime = System.currentTimeMillis(); + this.averageServiceTime = 0; + firstTime = false; + } else { + actualServiceTime = System.currentTimeMillis() + - this.previousSubmitTime; + this.previousSubmitTime = System.currentTimeMillis(); + averageServiceTime = ((averageServiceTime * times) + actualServiceTime) + / (times + 1); + ++times; + } + } + + public void submit(Job j) { + try { + // log.info("Submit job in queue -->"+ j.getType()); + // processingStopped.set(false); + try { + arrivalLock.lock(); + if (this.arrivalPrevious == -1) { + arrivalPrevious = System.currentTimeMillis(); + averageArrivalTime = 0; + } + double actualArrivalTime = System.currentTimeMillis() + - arrivalPrevious; + averageArrivalTime = ((averageArrivalTime * timesArrival) + actualArrivalTime) + / (timesArrival + 1); + arrivalPrevious = System.currentTimeMillis(); + ++timesArrival; + } finally { + arrivalLock.unlock(); + } + queue.put(j); + } catch (Exception e) { + log.info("Exception in queue"); + queue.clear(); + e.printStackTrace(); + } + } + + public double getArrivalTime() { + return this.averageArrivalTime; + } + + public double getServiceTime() { + return this.averageServiceTime; + } + + public void receiveResult(Job resultType, boolean reuse, String workerURI) { + + if (reuse) { + queue.add(resultType); + return; + } + + computeAverageTime(); + Job job = null; + try { + job = queue.take(); + } catch (InterruptedException e) { + // TODO Better exception handling --> see Exception antipattern doc + e.printStackTrace(); + return; + } + + if ((job != null) && (job.eos() == false)) { + int nameIndex = workerURI.indexOf("/"); + String workerName = workerURI.substring(0, nameIndex - 1); + log.info("Sending job to worker --> " + workerName); + WorkerService worker = workpoolContext.getService( + WorkerService.class, workerName); + worker.compute(job); + } + + JobDataMap map = ((ResultJob) resultType).getDataMap(); + if (map != null) { + ++jobComputed; + Object obj = map.getJobDataObject("result"); + System.out.println("Result = " + ((Double) obj).doubleValue()); + } + + } + + public void start() { + log.info("WorkpoolServiceComponent started..."); + myReference = (CallableReferenceImpl) workpoolContext + .createSelfReference(WorkpoolService.class, "WorkpoolService"); + myReference.getService(); + } + + /* + * + * This method is called by WorkpoolManagerImpl, when it creates a new + * worker component in order to dispatch worker to the WorkpoolServiceImpl + * @param CallableReferenceImpl reference - a dynamically created reference + * from the Worker + */ + public void PostWorkerReference( + CallableReferenceImpl reference) { + + try { + long initPostWorkerReference; + long endPostWorkerReference; + this.postWorkerReferenceLock.lock(); + + initPostWorkerReference = System.currentTimeMillis(); + WorkerService worker; + worker = reference.getService(); + worker.start(); + + ++workersNo; + if (myReference != null) { + + // Job poison = new ResultJob(); + this.postWorkerReferenceLock.unlock(); + log.info("Sending null job to worker"); + worker.computeFirstTime(nullJob, myReference); + // queue.put(poison); + endPostWorkerReference = System.currentTimeMillis(); + System.out.println("Time PostWorker =" + + (endPostWorkerReference - initPostWorkerReference)); + } else { + log.info("myReference is null"); + + } + } catch (Exception e) { + postWorkerReferenceLock.unlock(); + } finally { + } + + } + + /* + * FIXME This method currently is not used because i've not yet ready + * dynamic wire injection + */ + + public void PostWorkerName(String referenceName) { + /* TODO Do something similar to PostWorkerReference */ + } + + private void printComputingTime(Job j) { + + if (first == true) { + first = false; + start = System.currentTimeMillis(); + end = System.currentTimeMillis(); + } else { + end = System.currentTimeMillis(); + System.out.println("Elapsed Time = " + (end - start)); + elapsedTime.set(end - start); + } + /* + * i could use reflection or instance of (but it's a penalty kick) , or + * an object as result, but i'd prefer a job so i've defined a + * RESULT_JOB There're in the system three kind of jobs: RESULT_JOB, + * NULL_JOB, DEFAULT_JOB + */ + if ((j != null) && (j.getType() == Job.RESULT_JOB)) { + jobComputed++; + ResultJob result = (ResultJob) j; + JobDataMap map = result.getDataMap(); + if (map != null) { + Double doubleValue = (Double) map.getJobDataObject("result"); + System.out + .println("ResultValue = " + doubleValue.doubleValue()); + } + + } + + } + + public void handleResult(Job resultType, boolean reuse, String workerURI, + CallableReferenceImpl worker, boolean newWorker) { + initHandleResult.set(System.nanoTime()); + if (reuse) { + log.info("Reusing a job.."); + queue.add(resultType); + return; + } + // init job variable + Job job; + if (newWorker) + System.out.println("newWorkerActivation= " + System.nanoTime()); + printComputingTime(resultType); + + try { + job = queue.take(); + } catch (Exception e) { + log.info("Exception during fetching the queue"); + e.printStackTrace(); + return; + } + + try { + // it needs to be locked because multiple threads could invoke this. + handleResultLock.lock(); + if (previuosURI.equals("")) { + time = System.currentTimeMillis(); + this.previuosURI = workerURI; + } else { + if (previuosURI.equals(workerURI)) + System.out.println("Complete ComputeTime for an item =" + + (time - System.currentTimeMillis())); + } + if (job.eos()) { + long endTime = System.currentTimeMillis(); + /* checking for EOS */ + if (processingStopped == false) { + processingStopped = true; + System.out.println("GOT EOS in time=" + (endTime - start)); + log.info("Stop autonomic cycle.."); + /* + * I'm doing this because i want that in the termination i + * would have more jobs with eos == true than workers. So + * i'm sure that every worker removes itself from its + * manager. I do it only one time. This is necessary because + * i have a variable number of workers. The number of + * workers in the system might change every time the rule + * engine cycle gets executed. + */ + ResultJob poison = new ResultJob(); + for (int i = 0; i < workersNo; ++i) { + try { + + queue.put(poison); + + } catch (Exception e) { + log.info("Cannot duplicate poison tokens"); + break; + } + + } + manager.getService().stopAutonomicCycle(); + } + } + computeAverageTime(); + System.out.println("AverageTime =" + averageServiceTime); + if (job != null) { + + WorkerService workerService; + /* + * the workpool has a high reuse, i always call the same + * component set or un superset or subset, so i cache it. When + * the WorkpoolManager will remove an item, it removes still + * this cache entry + */ + if (!cacheReference.containsKey(workerURI)) { + workerService = worker.getService(); + handleResultLock.unlock(); + cacheReference.put(workerURI, workerService); + } else { + handleResultLock.unlock(); + workerService = cacheReference.get(workerURI); + } + // it's still a penalty kick locking compute because it's going + // to be scheduled whereas it's async. + workerService.compute(job); + log.info("Sent job #" + jobSent.incrementAndGet() + + " Queue size " + queue.size()); + endHandleResult.set(System.nanoTime()); + System.out + .println("begin:handleResult ==> end:handleResult:compute = " + + (endHandleResult.addAndGet(-(initHandleResult + .get())) / 1000000)); + } + } catch (Exception e) { + handleResultLock.unlock(); + } + } + + public void evictAll() { + cacheReference.clear(); + } + + public void evict(String workerURI) { + if (cacheReference.containsKey(workerURI)) { + cacheReference.remove(workerURI); + } + + } + + public int estimatedQueueSize() { + return queue.size(); + } + + public long getElapsedTime() { + return elapsedTime.get(); + } + + public long getJobComputed() { + return jobComputed; + } + + public void registerManager( + CallableReferenceImpl createSelfReference) { + manager = createSelfReference; + + } + + public void stop() { + // TODO Auto-generated method stub + + } + + public void addTrigger(CallableReferenceImpl reference) { + this.trigger = reference; + this.forwardResult = reference.getService(); + + } + + public void removeTrigger() { + this.trigger = null; + this.forwardResult = null; + } +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeA/META-INF/sca-contribution.xml b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeA/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeA/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeA/Workpool.composite b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeA/Workpool.composite new file mode 100644 index 0000000000..9a00a8e839 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeA/Workpool.composite @@ -0,0 +1,47 @@ + + + + + + + + + + 4 + 4 + 0 + + + + + + + + + + + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeB/META-INF/sca-contribution.xml b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeB/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeB/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeB/Workpool.composite b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeB/Workpool.composite new file mode 100644 index 0000000000..a71dc9e08a --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeB/Workpool.composite @@ -0,0 +1,38 @@ + + + + + + nodeB + Workpool.composite + workpool.MyWorker + + + + + + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeC/META-INF/sca-contribution.xml b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeC/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeC/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeC/Workpool.composite b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeC/Workpool.composite new file mode 100644 index 0000000000..771db5370b --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeC/Workpool.composite @@ -0,0 +1,38 @@ + + + + + + nodeC + Workpool.composite + workpool.MyWorker + + + + + + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeD/META-INF/sca-contribution.xml b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeD/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeD/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeD/Workpool.composite b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeD/Workpool.composite new file mode 100644 index 0000000000..55fd48934f --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeD/Workpool.composite @@ -0,0 +1,38 @@ + + + + + + nodeD + Workpool.composite + workpool.MyWorker + + + + + + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeE/META-INF/sca-contribution.xml b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeE/META-INF/sca-contribution.xml new file mode 100644 index 0000000000..b3e2d16c05 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeE/META-INF/sca-contribution.xml @@ -0,0 +1,24 @@ + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeE/Workpool.composite b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeE/Workpool.composite new file mode 100644 index 0000000000..1dc9ccfcea --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/resources/nodeE/Workpool.composite @@ -0,0 +1,38 @@ + + + + + + nodeE + Workpool.composite + workpool.MyWorker + + + + + + + + + + diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/test/java/workpool/AComponent.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/test/java/workpool/AComponent.java new file mode 100644 index 0000000000..5c20537fab --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/src/test/java/workpool/AComponent.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +public interface AComponent { + public static final int RETURN_VALUE = 1; + + public void greet(); +} diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/workerRules.drl b/java/sca-1.x-contrib/demos/workpool-distributed/workerRules.drl new file mode 100644 index 0000000000..e7ebcad4c0 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/workerRules.drl @@ -0,0 +1,9 @@ +package workpool +import workpool.*; +rule "WorkerAdder" + when + $workerBean: WorkpoolBean(singleAction == false && (jobComputed > 500)) + then + $workerBean.addWorkerToNode("nodeB"); + $workerBean.setSingleAction(); +end diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/workerRules1.drl b/java/sca-1.x-contrib/demos/workpool-distributed/workerRules1.drl new file mode 100644 index 0000000000..6fa0b85ba2 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/workerRules1.drl @@ -0,0 +1,9 @@ +package workpool +import workpool.*; +rule "WorkerAdder" + when + $workerBean: WorkpoolBean(averageServiceTime > 250) + then + $workerBean.addWorkerToNode("nodeB"); + $workerBean.setSingleAction(); +end diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/workerRules2.drl b/java/sca-1.x-contrib/demos/workpool-distributed/workerRules2.drl new file mode 100644 index 0000000000..9d9551c3ea --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/workerRules2.drl @@ -0,0 +1,8 @@ +package workpool +import workpool.*; +rule "WorkerAverageService" + when + $workerBean: WorkpoolBean(jobComputed > 250) + then + $workerBean.addWorkerToNode("nodeB"); +end diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/workerRules3.drl b/java/sca-1.x-contrib/demos/workpool-distributed/workerRules3.drl new file mode 100644 index 0000000000..a67af910a4 --- /dev/null +++ b/java/sca-1.x-contrib/demos/workpool-distributed/workerRules3.drl @@ -0,0 +1,14 @@ +package workpool +import workpool.*; +rule "AdaptUsageFactor" + when + $workerBean: WorkpoolBean(usageFactor > 0.8) + then + $workerBean.addWorkerToNode(""); +end +rule "AdaptQueueFull" + when + $workerBean: WorkpoolBean((estimedQueueSize > 1900) && jobsComputed > 100) + then + $workerBean.addWorkerToNode("nodeB") +end \ No newline at end of file -- cgit v1.2.3