summaryrefslogtreecommitdiffstats
path: root/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node')
-rw-r--r--sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/DomainNode.java57
-rw-r--r--sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java79
-rw-r--r--sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/TestJob.java82
-rw-r--r--sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java271
-rw-r--r--sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java179
-rw-r--r--sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/workerRules1.drl13
6 files changed, 681 insertions, 0 deletions
diff --git a/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/DomainNode.java b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/DomainNode.java
new file mode 100644
index 0000000000..a278499aae
--- /dev/null
+++ b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/DomainNode.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package node;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.tuscany.sca.domain.SCADomain;
+import org.apache.tuscany.sca.domain.SCADomainFactory;
+
+/**
+ * This server program that loads a composite to provide simple registry
+ * function. This server can be replaced with any registry that is appropriate
+ * but the components in each node that talk to the registry should be replaced
+ * also.
+ */
+public class DomainNode {
+
+ private static String DEFAULT_DOMAIN_URI = "http://u12:8877";
+ private boolean stopped = true;
+
+ public static void main(String[] args) {
+
+ try {
+
+ SCADomainFactory domainFactory = SCADomainFactory.newInstance();
+ SCADomain domain = domainFactory
+ .createSCADomain(DEFAULT_DOMAIN_URI);
+
+ System.out.println("Domain started (press enter to shutdown)");
+ System.in.read();
+ // waitForever();
+ domain.destroy();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ System.out.println("Domain stopped");
+ }
+}
diff --git a/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java
new file mode 100644
index 0000000000..9d05761ad6
--- /dev/null
+++ b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package node;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.daemon.Daemon;
+import org.apache.commons.daemon.DaemonContext;
+import org.apache.tuscany.sca.domain.SCADomain;
+import org.apache.tuscany.sca.domain.SCADomainFactory;
+
+/**
+ * This server program that loads a composite to provide simple registry
+ * function. This server can be replaced with any registry that is appropriate
+ * but the components in each node that talk to the registry should be replaced
+ * also.
+ */
+public class DomainNodeDaemon implements Daemon {
+
+ private SCADomain domain;
+ private static String DEFAULT_DOMAIN_URI = "http://u12:8877";
+ private boolean stopped = true;
+
+ private synchronized void waitForever() {
+ while (!stopped) {
+ try {
+ wait();
+ } catch (InterruptedException ex) {
+ stopped = true;
+ return;
+ }
+ }
+
+ }
+
+ public void destroy() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void init(DaemonContext arg0) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void start() throws Exception {
+
+ SCADomainFactory domainFactory = SCADomainFactory.newInstance();
+ domain = domainFactory.createSCADomain(DEFAULT_DOMAIN_URI);
+
+ System.out.println("Domain started (press enter to shutdown)");
+ waitForever();
+
+ }
+
+ public void stop() throws Exception {
+ // TODO Auto-generated method stub
+ Thread.currentThread().interrupt();
+ domain.destroy();
+ }
+} \ No newline at end of file
diff --git a/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/TestJob.java b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/TestJob.java
new file mode 100644
index 0000000000..f48e647bd6
--- /dev/null
+++ b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/TestJob.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package node;
+
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.databinding.job.JobDataMap;
+import org.apache.tuscany.sca.databinding.job.JobExecutionContext;
+import org.apache.tuscany.sca.databinding.job.RemoteJob;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
+
+public class TestJob extends RemoteJob<Double> implements java.io.Serializable {
+ private boolean EOS = false;
+ private Double value;
+
+ public TestJob(Double x, long iterations, int[] items) {
+ JobDataMap map = new JobDataMap();
+ map.addJobData("value", x);
+ map.addJobData("iterations", iterations);
+ map.addJobData("items", items);
+ context.setJobData(map);
+ }
+
+ public TestJob(Double i, boolean eos) {
+ value = i;
+ this.EOS = eos;
+ }
+
+ public TestJob(String jsonData) {
+ JobExecutionContext ctxt = new JobExecutionContext();
+ ctxt.storeJSONData(jsonData);
+ }
+
+ public int getType() {
+ return Job.REGULAR_JOB;
+ }
+
+ public void setEOS() {
+ EOS = true;
+ }
+
+ public boolean eos() {
+ return EOS;
+ }
+
+ @Override
+ public Double compute(JobExecutionContext context) {
+ JobDataMap contextMap = context.getJobData();
+ Long iterations = (Long) contextMap.getJobDataObject("iterations");
+ Double value = (Double) contextMap.getJobDataObject("value");
+ double x = value.doubleValue();
+ System.out.println("Computing sinx for " + value + " for "
+ + iterations.intValue() + " times");
+ long computing_start = System.currentTimeMillis();
+ for (long i = 0; i < iterations.longValue(); ++i) {
+ x = Math.sin(x);
+ }
+ long computing_end = System.currentTimeMillis();
+ System.out.println("Computing time= "
+ + (computing_end - computing_start));
+ System.out.println("Send result = " + x);
+ return new Double(x);
+ }
+
+}
diff --git a/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java
new file mode 100644
index 0000000000..1f2a4d1f9a
--- /dev/null
+++ b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package node;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.daemon.Daemon;
+import org.apache.commons.daemon.DaemonContext;
+import org.apache.commons.daemon.DaemonController;
+import org.apache.tuscany.sca.assembly.Composite;
+import org.apache.tuscany.sca.assembly.Service;
+import org.apache.tuscany.sca.contribution.Contribution;
+import org.apache.tuscany.sca.contribution.DeployedArtifact;
+import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
+import org.apache.tuscany.sca.domain.SCADomain;
+import org.apache.tuscany.sca.node.NodeException;
+import org.apache.tuscany.sca.node.NodeManagerInitService;
+import org.apache.tuscany.sca.node.SCANode;
+import org.apache.tuscany.sca.node.SCANodeFactory;
+import org.apache.tuscany.sca.node.impl.SCANodeImpl;
+import org.apache.tuscany.sca.node.management.SCANodeManagerInitService;
+
+import java.net.URI;
+
+import workpool.WorkerManager;
+import workpool.WorkerManagerImpl;
+import workpool.WorkpoolManager;
+import workpool.WorkpoolService;
+import workpool.WorkpoolServiceImpl;
+
+/**
+ * This client program shows how to run a distributed SCA node. In this case a
+ * calculator node has been constructed specifically for running the calculator
+ * composite. Internally it creates a representation of a node and associates a
+ * distributed domain with the node. This separation is made different
+ * implementations of the distributed domain can be provided.
+ */
+public class WorkpoolDaemon implements Daemon, Runnable {
+ private String domainName;
+ private String nodeName;
+ private long iterations;
+ private long jobsNo;
+ private long workerNo;
+ private SCANode node;
+ private boolean stopped = false;
+ private DaemonController controller = null;
+ private Thread thread = null;
+ private String ruleFile = "workerRules.drl";
+
+ /*
+ * public static void main(String[] args) throws Exception {
+ * // Check that the correct arguments have been provided if (null == args ||
+ * args.length < 4) { System.err.println("Usage: java WorkpoolNode
+ * domainname nodename iterTest workerNo"); System.exit(1); }
+ *
+ * try { String domainName = args[0]; String nodeName = args[1]; long
+ * iterations = Long.parseLong(args[2]); long jobsNo =
+ * Long.parseLong(args[3]); long workerNo = Long.parseLong(args[4]);
+ * ClassLoader cl = WorkpoolDaemon.class.getClassLoader();
+ *
+ * SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); node =
+ * nodeFactory.createSCANode(null, domainName);
+ * node.addContribution(nodeName, cl.getResource(nodeName + "/"));
+ * node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
+ * node.start(); // nodeA is the head node and runs some tests while all
+ * other nodes // simply listen for incoming messages
+ *
+ * FileReader rules = new FileReader("workerRules.drl"); StringBuffer buffer =
+ * new StringBuffer();
+ *
+ * BufferedReader br = new BufferedReader(rules); String ruleString; do {
+ * ruleString = br.readLine(); if (ruleString!=null) {
+ * buffer.append(ruleString);} } while (ruleString!=null);
+ *
+ * if ( nodeName.equals("nodeA") ) { // do some application stuff
+ * WorkpoolService workpoolService =
+ * node.getDomain().getService(WorkpoolService.class,
+ * "WorkpoolServiceComponent"); workpoolService.start();
+ * NodeManagerInitService nodeInit =
+ * node.getDomain().getService(NodeManagerInitService.class,
+ * "WorkpoolManagerComponent/NodeManagerInitService");
+ * nodeInit.setNode(node); WorkpoolManager workpoolManager =
+ * node.getDomain().getService(WorkpoolManager.class,
+ * "WorkpoolManagerComponent/WorkpoolManager");
+ * workpoolManager.setWorkpoolReference(node.getDomain().getServiceReference(WorkpoolService.class,
+ * "WorkpoolServiceComponent"));
+ * workpoolManager.acceptRules(buffer.toString()); workpoolManager.start();
+ * int items[] = {3,4,5,6,3,6,3,5,9,5,6};
+ *
+ * double x = 398349; for (int i = 0; i < jobsNo; ++i)
+ * workpoolService.submit(new TestJob(x,iterations,items));
+ *
+ * TestJob j = new TestJob(-1.0,true); for (int i = 0; i < workerNo+1; ++i){
+ * j.setEOS(); workpoolService.submit(j); } } try { if
+ * (nodeName.equals("nodeB")) { NodeManagerInitService serviceNodeB =
+ * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeBComponent/NodeManagerInitService");
+ * serviceNodeB.setNode(node); } if (nodeName.equals("nodeC")) {
+ * NodeManagerInitService workerManagerC =
+ * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeCComponent/NodeManagerInitService");
+ * workerManagerC.setNode(node); } if (nodeName.equals("nodeD")) {
+ * NodeManagerInitService workerManagerD =
+ * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeDComponent/NodeManagerInitService");
+ * workerManagerD.setNode(node); } if (nodeName.equals("nodeE")) {
+ * NodeManagerInitService workerManagerE =
+ * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeEComponent/NodeManagerInitService");
+ * workerManagerE.setNode(node); }
+ *
+ * System.out.println("Node started (press enter to shutdown)");
+ * System.in.read(); } catch (IOException e) { e.printStackTrace(); } //
+ * stop the node and all the domains in it node.stop(); node.destroy();
+ * System.exit(0); } catch(Exception ex) { System.err.println("Exception in
+ * node - " + ex.getMessage()); ex.printStackTrace(System.err); } }
+ */
+ public void destroy() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void init(DaemonContext arg0) throws Exception {
+ String[] args = arg0.getArguments();
+ domainName = args[0];
+ nodeName = args[1];
+ iterations = Long.parseLong(args[2]);
+ jobsNo = Long.parseLong(args[3]);
+ workerNo = Long.parseLong(args[4]);
+ if (args.length == 6) {
+ ruleFile = args[5];
+ }
+ this.controller = arg0.getController();
+ // this.thread=new Thread(this);
+ }
+
+ public void start() throws Exception {
+
+ ClassLoader cl = WorkpoolDaemon.class.getClassLoader();
+
+ SCANodeFactory nodeFactory = SCANodeFactory.newInstance();
+ node = nodeFactory.createSCANode(null, domainName);
+ node.addContribution(nodeName, cl.getResource(nodeName + "/"));
+ node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
+ node.start();
+ // nodeA is the head node and runs some tests while all other nodes
+ // simply listen for incoming messages
+
+ FileReader rules = new FileReader(ruleFile);
+ StringBuffer buffer = new StringBuffer();
+
+ BufferedReader br = new BufferedReader(rules);
+ String ruleString;
+ do {
+ ruleString = br.readLine();
+ if (ruleString != null) {
+ buffer.append(ruleString + "\n");
+ }
+ } while (ruleString != null);
+
+ if (nodeName.equals("nodeA")) {
+ // do some application stuff
+ WorkpoolService workpoolService = node.getDomain().getService(
+ WorkpoolService.class, "WorkpoolServiceComponent");
+ workpoolService.start();
+ SCANodeManagerInitService nodeInit = node.getDomain().getService(
+ SCANodeManagerInitService.class,
+ "WorkpoolManagerComponent/NodeManagerInitService");
+ nodeInit.setNode(node);
+ WorkpoolManager workpoolManager = node.getDomain().getService(
+ WorkpoolManager.class,
+ "WorkpoolManagerComponent/WorkpoolManager");
+ workpoolManager.setWorkpoolReference(node.getDomain()
+ .getServiceReference(WorkpoolService.class,
+ "WorkpoolServiceComponent"));
+ workpoolManager.acceptRules(buffer.toString());
+ workpoolManager.start();
+
+ int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 };
+
+ double x = 398349;
+ for (int i = 0; i < jobsNo; ++i) {
+ workpoolService.submit(new TestJob(x, iterations, items));
+ }
+ TestJob j = new TestJob(-1.0, true);
+ for (int i = 0; i < workerNo + 1; ++i) {
+ j.setEOS();
+ workpoolService.submit(j);
+ }
+
+ }
+ if (nodeName.equals("nodeB")) {
+ SCANodeManagerInitService workerManagerNodeB = node
+ .getDomain()
+ .getService(SCANodeManagerInitService.class,
+ "WorkerManagerNodeBComponent/NodeManagerInitService");
+ workerManagerNodeB.setNode(node);
+ }
+
+ if (nodeName.equals("nodeC")) {
+ SCANodeManagerInitService workerManagerNodeC = node
+ .getDomain()
+ .getService(SCANodeManagerInitService.class,
+ "WorkerManagerNodeCComponent/NodeManagerInitService");
+ workerManagerNodeC.setNode(node);
+ }
+
+ if (nodeName.equals("nodeD")) {
+ SCANodeManagerInitService workerManagerNodeD = node
+ .getDomain()
+ .getService(SCANodeManagerInitService.class,
+ "WorkerManagerNodeDComponent/NodeManagerInitService");
+ workerManagerNodeD.setNode(node);
+ }
+
+ if (nodeName.equals("nodeE")) {
+ SCANodeManagerInitService workerManagerNodeE = node
+ .getDomain()
+ .getService(SCANodeManagerInitService.class,
+ "WorkerManagerNodeEComponent/NodeManagerInitService");
+ workerManagerNodeE.setNode(node);
+ }
+
+ this.waitForever();
+ // this.thread.start();
+ }
+
+ public void stop() throws Exception {
+ Thread.currentThread().interrupt();
+ // thread.interrupt();
+ node.stop();
+ node.destroy();
+ }
+
+ private synchronized void waitForever() {
+ while (!stopped) {
+ try {
+ wait();
+ } catch (InterruptedException ex) {
+ stopped = true;
+ return;
+ }
+ }
+ }
+
+ public void run() {
+ waitForever();
+ }
+}
diff --git a/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java
new file mode 100644
index 0000000000..86557548af
--- /dev/null
+++ b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package node;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.tuscany.sca.assembly.Composite;
+import org.apache.tuscany.sca.assembly.Service;
+import org.apache.tuscany.sca.contribution.Contribution;
+import org.apache.tuscany.sca.contribution.DeployedArtifact;
+import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
+import org.apache.tuscany.sca.domain.SCADomain;
+import org.apache.tuscany.sca.node.NodeManagerInitService;
+import org.apache.tuscany.sca.node.SCANode;
+import org.apache.tuscany.sca.node.SCANodeFactory;
+import org.apache.tuscany.sca.node.impl.SCANodeImpl;
+import java.net.URI;
+
+import workpool.WorkerManager;
+import workpool.WorkerManagerImpl;
+import workpool.WorkpoolManager;
+import workpool.WorkpoolService;
+import workpool.WorkpoolServiceImpl;
+
+/**
+ * This client program shows how to run a distributed SCA node. In this case a
+ * calculator node has been constructed specifically for running the calculator
+ * composite. Internally it creates a representation of a node and associates a
+ * distributed domain with the node. This separation is made different
+ * implementations of the distributed domain can be provided.
+ */
+public class WorkpoolNode {
+
+ public static void main(String[] args) throws Exception {
+
+ // Check that the correct arguments have been provided
+ if (null == args || args.length < 4) {
+ System.err
+ .println("Useage: java WorkpoolNode domainname nodename iterTest workerNo");
+ System.exit(1);
+ }
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String domainName = args[0];
+ String nodeName = args[1];
+ long iterations = Long.parseLong(args[2]);
+ long jobsNo = Long.parseLong(args[3]);
+ long workerNo = Long.parseLong(args[4]);
+ ClassLoader cl = WorkpoolNode.class.getClassLoader();
+
+ SCANodeFactory nodeFactory = SCANodeFactory.newInstance();
+ SCANode node = nodeFactory.createSCANode(null, domainName);
+ node.addContribution(nodeName, cl.getResource(nodeName + "/"));
+ node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
+ node.start();
+ // nodeA is the head node and runs some tests while all other nodes
+ // simply listen for incoming messages
+
+ FileReader rules = new FileReader("workerRules.drl");
+ StringBuffer buffer = new StringBuffer();
+
+ BufferedReader br = new BufferedReader(rules);
+ String ruleString;
+ do {
+ ruleString = br.readLine();
+ if (ruleString != null) {
+ buffer.append(ruleString + "\n");
+ }
+ } while (ruleString != null);
+
+ if (nodeName.equals("nodeA")) {
+ // do some application stuff
+ WorkpoolService workpoolService = node.getDomain().getService(
+ WorkpoolService.class, "WorkpoolServiceComponent");
+ workpoolService.start();
+ NodeManagerInitService nodeInit = node.getDomain().getService(
+ NodeManagerInitService.class,
+ "WorkpoolManagerComponent/NodeManagerInitService");
+ nodeInit.setNode(node);
+ WorkpoolManager workpoolManager = node.getDomain().getService(
+ WorkpoolManager.class,
+ "WorkpoolManagerComponent/WorkpoolManager");
+ workpoolManager.setWorkpoolReference(node.getDomain()
+ .getServiceReference(WorkpoolService.class,
+ "WorkpoolServiceComponent"));
+ workpoolManager.setCycleTime(8000);
+ workpoolManager.acceptRules(buffer.toString());
+ workpoolManager.start();
+ int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 };
+
+ double x = 398349;
+
+ for (int i = 0; i < jobsNo; ++i)
+ workpoolService.submit(new TestJob(x, iterations, items));
+
+ TestJob j = new TestJob(-1.0, true);
+ for (int i = 0; i < workerNo + 1; ++i) {
+ j.setEOS();
+ workpoolService.submit(j);
+ }
+
+ }
+ try {
+ if (nodeName.equals("nodeB")) {
+ NodeManagerInitService serviceNodeB = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeBComponent/NodeManagerInitService");
+ serviceNodeB.setNode(node);
+ }
+ if (nodeName.equals("nodeC")) {
+ NodeManagerInitService workerManagerC = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeCComponent/NodeManagerInitService");
+ workerManagerC.setNode(node);
+ }
+ if (nodeName.equals("nodeD")) {
+ NodeManagerInitService workerManagerD = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeDComponent/NodeManagerInitService");
+ workerManagerD.setNode(node);
+ }
+ if (nodeName.equals("nodeE")) {
+ NodeManagerInitService workerManagerE = node
+ .getDomain()
+ .getService(NodeManagerInitService.class,
+ "WorkerManagerNodeEComponent/NodeManagerInitService");
+ workerManagerE.setNode(node);
+ }
+
+ System.out.println("Node started (press enter to shutdown)");
+ String buff;
+ for (;;) {
+ try {
+ buff = in.readLine();
+ if (buff == null)
+ break;
+ System.out.print(in.readLine());
+ } catch (IOException ex) {
+ break; // Exit thread.
+ }
+ }
+ // stop the node and all the domains in it
+ node.stop();
+ node.destroy();
+ System.exit(0);
+ } catch (Exception ex) {
+ System.err.println("Exception in node - " + ex.getMessage());
+ ex.printStackTrace(System.err);
+ }
+ }
+}
diff --git a/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/workerRules1.drl b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/workerRules1.drl
new file mode 100644
index 0000000000..9c5a5d1b7f
--- /dev/null
+++ b/sandbox/sebastien/java/sca-node/demos/workpool-distributed/src/main/java/node/workerRules1.drl
@@ -0,0 +1,13 @@
+package workpool
+import workpool.*;
+rule "WorkerAdder1"
+ when
+ $workerBean: WorkpoolBean(singleAction == false && (jobComputed > 500))
+ then
+ $workerBean.setSingleAction()
+ $workerBean.addWorkerToNode("nodeB")
+ $workerBean.addWorkerToNode("nodeC")
+ $workerBean.addWorkerToNode("nodeD")
+ $workerBean.addWorkerToNode("nodeE")
+end
+ \ No newline at end of file