summaryrefslogtreecommitdiffstats
path: root/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'java/sca-1.x-contrib/demos/workpool-distributed/src/main/java')
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java57
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java79
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java82
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java271
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java179
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl13
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java85
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java46
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java43
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java54
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java29
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java31
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java213
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java56
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java27
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java171
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java162
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java25
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java71
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java48
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java555
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java91
-rw-r--r--java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java416
23 files changed, 0 insertions, 2804 deletions
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java
deleted file mode 100644
index a278499aae..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNode.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package node;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.tuscany.sca.domain.SCADomain;
-import org.apache.tuscany.sca.domain.SCADomainFactory;
-
-/**
- * This server program that loads a composite to provide simple registry
- * function. This server can be replaced with any registry that is appropriate
- * but the components in each node that talk to the registry should be replaced
- * also.
- */
-public class DomainNode {
-
- private static String DEFAULT_DOMAIN_URI = "http://u12:8877";
- private boolean stopped = true;
-
- public static void main(String[] args) {
-
- try {
-
- SCADomainFactory domainFactory = SCADomainFactory.newInstance();
- SCADomain domain = domainFactory
- .createSCADomain(DEFAULT_DOMAIN_URI);
-
- System.out.println("Domain started (press enter to shutdown)");
- System.in.read();
- // waitForever();
- domain.destroy();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- System.out.println("Domain stopped");
- }
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java
deleted file mode 100644
index 9d05761ad6..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package node;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.daemon.Daemon;
-import org.apache.commons.daemon.DaemonContext;
-import org.apache.tuscany.sca.domain.SCADomain;
-import org.apache.tuscany.sca.domain.SCADomainFactory;
-
-/**
- * This server program that loads a composite to provide simple registry
- * function. This server can be replaced with any registry that is appropriate
- * but the components in each node that talk to the registry should be replaced
- * also.
- */
-public class DomainNodeDaemon implements Daemon {
-
- private SCADomain domain;
- private static String DEFAULT_DOMAIN_URI = "http://u12:8877";
- private boolean stopped = true;
-
- private synchronized void waitForever() {
- while (!stopped) {
- try {
- wait();
- } catch (InterruptedException ex) {
- stopped = true;
- return;
- }
- }
-
- }
-
- public void destroy() {
- // TODO Auto-generated method stub
-
- }
-
- public void init(DaemonContext arg0) throws Exception {
- // TODO Auto-generated method stub
-
- }
-
- public void start() throws Exception {
-
- SCADomainFactory domainFactory = SCADomainFactory.newInstance();
- domain = domainFactory.createSCADomain(DEFAULT_DOMAIN_URI);
-
- System.out.println("Domain started (press enter to shutdown)");
- waitForever();
-
- }
-
- public void stop() throws Exception {
- // TODO Auto-generated method stub
- Thread.currentThread().interrupt();
- domain.destroy();
- }
-} \ No newline at end of file
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java
deleted file mode 100644
index f48e647bd6..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/TestJob.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package node;
-
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.apache.tuscany.sca.databinding.job.JobDataMap;
-import org.apache.tuscany.sca.databinding.job.JobExecutionContext;
-import org.apache.tuscany.sca.databinding.job.RemoteJob;
-
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
-
-public class TestJob extends RemoteJob<Double> implements java.io.Serializable {
- private boolean EOS = false;
- private Double value;
-
- public TestJob(Double x, long iterations, int[] items) {
- JobDataMap map = new JobDataMap();
- map.addJobData("value", x);
- map.addJobData("iterations", iterations);
- map.addJobData("items", items);
- context.setJobData(map);
- }
-
- public TestJob(Double i, boolean eos) {
- value = i;
- this.EOS = eos;
- }
-
- public TestJob(String jsonData) {
- JobExecutionContext ctxt = new JobExecutionContext();
- ctxt.storeJSONData(jsonData);
- }
-
- public int getType() {
- return Job.REGULAR_JOB;
- }
-
- public void setEOS() {
- EOS = true;
- }
-
- public boolean eos() {
- return EOS;
- }
-
- @Override
- public Double compute(JobExecutionContext context) {
- JobDataMap contextMap = context.getJobData();
- Long iterations = (Long) contextMap.getJobDataObject("iterations");
- Double value = (Double) contextMap.getJobDataObject("value");
- double x = value.doubleValue();
- System.out.println("Computing sinx for " + value + " for "
- + iterations.intValue() + " times");
- long computing_start = System.currentTimeMillis();
- for (long i = 0; i < iterations.longValue(); ++i) {
- x = Math.sin(x);
- }
- long computing_end = System.currentTimeMillis();
- System.out.println("Computing time= "
- + (computing_end - computing_start));
- System.out.println("Send result = " + x);
- return new Double(x);
- }
-
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java
deleted file mode 100644
index 1f2a4d1f9a..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package node;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-
-import javax.xml.namespace.QName;
-
-import org.apache.commons.daemon.Daemon;
-import org.apache.commons.daemon.DaemonContext;
-import org.apache.commons.daemon.DaemonController;
-import org.apache.tuscany.sca.assembly.Composite;
-import org.apache.tuscany.sca.assembly.Service;
-import org.apache.tuscany.sca.contribution.Contribution;
-import org.apache.tuscany.sca.contribution.DeployedArtifact;
-import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
-import org.apache.tuscany.sca.domain.SCADomain;
-import org.apache.tuscany.sca.node.NodeException;
-import org.apache.tuscany.sca.node.NodeManagerInitService;
-import org.apache.tuscany.sca.node.SCANode;
-import org.apache.tuscany.sca.node.SCANodeFactory;
-import org.apache.tuscany.sca.node.impl.SCANodeImpl;
-import org.apache.tuscany.sca.node.management.SCANodeManagerInitService;
-
-import java.net.URI;
-
-import workpool.WorkerManager;
-import workpool.WorkerManagerImpl;
-import workpool.WorkpoolManager;
-import workpool.WorkpoolService;
-import workpool.WorkpoolServiceImpl;
-
-/**
- * This client program shows how to run a distributed SCA node. In this case a
- * calculator node has been constructed specifically for running the calculator
- * composite. Internally it creates a representation of a node and associates a
- * distributed domain with the node. This separation is made different
- * implementations of the distributed domain can be provided.
- */
-public class WorkpoolDaemon implements Daemon, Runnable {
- private String domainName;
- private String nodeName;
- private long iterations;
- private long jobsNo;
- private long workerNo;
- private SCANode node;
- private boolean stopped = false;
- private DaemonController controller = null;
- private Thread thread = null;
- private String ruleFile = "workerRules.drl";
-
- /*
- * public static void main(String[] args) throws Exception {
- * // Check that the correct arguments have been provided if (null == args ||
- * args.length < 4) { System.err.println("Usage: java WorkpoolNode
- * domainname nodename iterTest workerNo"); System.exit(1); }
- *
- * try { String domainName = args[0]; String nodeName = args[1]; long
- * iterations = Long.parseLong(args[2]); long jobsNo =
- * Long.parseLong(args[3]); long workerNo = Long.parseLong(args[4]);
- * ClassLoader cl = WorkpoolDaemon.class.getClassLoader();
- *
- * SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); node =
- * nodeFactory.createSCANode(null, domainName);
- * node.addContribution(nodeName, cl.getResource(nodeName + "/"));
- * node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
- * node.start(); // nodeA is the head node and runs some tests while all
- * other nodes // simply listen for incoming messages
- *
- * FileReader rules = new FileReader("workerRules.drl"); StringBuffer buffer =
- * new StringBuffer();
- *
- * BufferedReader br = new BufferedReader(rules); String ruleString; do {
- * ruleString = br.readLine(); if (ruleString!=null) {
- * buffer.append(ruleString);} } while (ruleString!=null);
- *
- * if ( nodeName.equals("nodeA") ) { // do some application stuff
- * WorkpoolService workpoolService =
- * node.getDomain().getService(WorkpoolService.class,
- * "WorkpoolServiceComponent"); workpoolService.start();
- * NodeManagerInitService nodeInit =
- * node.getDomain().getService(NodeManagerInitService.class,
- * "WorkpoolManagerComponent/NodeManagerInitService");
- * nodeInit.setNode(node); WorkpoolManager workpoolManager =
- * node.getDomain().getService(WorkpoolManager.class,
- * "WorkpoolManagerComponent/WorkpoolManager");
- * workpoolManager.setWorkpoolReference(node.getDomain().getServiceReference(WorkpoolService.class,
- * "WorkpoolServiceComponent"));
- * workpoolManager.acceptRules(buffer.toString()); workpoolManager.start();
- * int items[] = {3,4,5,6,3,6,3,5,9,5,6};
- *
- * double x = 398349; for (int i = 0; i < jobsNo; ++i)
- * workpoolService.submit(new TestJob(x,iterations,items));
- *
- * TestJob j = new TestJob(-1.0,true); for (int i = 0; i < workerNo+1; ++i){
- * j.setEOS(); workpoolService.submit(j); } } try { if
- * (nodeName.equals("nodeB")) { NodeManagerInitService serviceNodeB =
- * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeBComponent/NodeManagerInitService");
- * serviceNodeB.setNode(node); } if (nodeName.equals("nodeC")) {
- * NodeManagerInitService workerManagerC =
- * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeCComponent/NodeManagerInitService");
- * workerManagerC.setNode(node); } if (nodeName.equals("nodeD")) {
- * NodeManagerInitService workerManagerD =
- * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeDComponent/NodeManagerInitService");
- * workerManagerD.setNode(node); } if (nodeName.equals("nodeE")) {
- * NodeManagerInitService workerManagerE =
- * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeEComponent/NodeManagerInitService");
- * workerManagerE.setNode(node); }
- *
- * System.out.println("Node started (press enter to shutdown)");
- * System.in.read(); } catch (IOException e) { e.printStackTrace(); } //
- * stop the node and all the domains in it node.stop(); node.destroy();
- * System.exit(0); } catch(Exception ex) { System.err.println("Exception in
- * node - " + ex.getMessage()); ex.printStackTrace(System.err); } }
- */
- public void destroy() {
- // TODO Auto-generated method stub
-
- }
-
- public void init(DaemonContext arg0) throws Exception {
- String[] args = arg0.getArguments();
- domainName = args[0];
- nodeName = args[1];
- iterations = Long.parseLong(args[2]);
- jobsNo = Long.parseLong(args[3]);
- workerNo = Long.parseLong(args[4]);
- if (args.length == 6) {
- ruleFile = args[5];
- }
- this.controller = arg0.getController();
- // this.thread=new Thread(this);
- }
-
- public void start() throws Exception {
-
- ClassLoader cl = WorkpoolDaemon.class.getClassLoader();
-
- SCANodeFactory nodeFactory = SCANodeFactory.newInstance();
- node = nodeFactory.createSCANode(null, domainName);
- node.addContribution(nodeName, cl.getResource(nodeName + "/"));
- node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
- node.start();
- // nodeA is the head node and runs some tests while all other nodes
- // simply listen for incoming messages
-
- FileReader rules = new FileReader(ruleFile);
- StringBuffer buffer = new StringBuffer();
-
- BufferedReader br = new BufferedReader(rules);
- String ruleString;
- do {
- ruleString = br.readLine();
- if (ruleString != null) {
- buffer.append(ruleString + "\n");
- }
- } while (ruleString != null);
-
- if (nodeName.equals("nodeA")) {
- // do some application stuff
- WorkpoolService workpoolService = node.getDomain().getService(
- WorkpoolService.class, "WorkpoolServiceComponent");
- workpoolService.start();
- SCANodeManagerInitService nodeInit = node.getDomain().getService(
- SCANodeManagerInitService.class,
- "WorkpoolManagerComponent/NodeManagerInitService");
- nodeInit.setNode(node);
- WorkpoolManager workpoolManager = node.getDomain().getService(
- WorkpoolManager.class,
- "WorkpoolManagerComponent/WorkpoolManager");
- workpoolManager.setWorkpoolReference(node.getDomain()
- .getServiceReference(WorkpoolService.class,
- "WorkpoolServiceComponent"));
- workpoolManager.acceptRules(buffer.toString());
- workpoolManager.start();
-
- int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 };
-
- double x = 398349;
- for (int i = 0; i < jobsNo; ++i) {
- workpoolService.submit(new TestJob(x, iterations, items));
- }
- TestJob j = new TestJob(-1.0, true);
- for (int i = 0; i < workerNo + 1; ++i) {
- j.setEOS();
- workpoolService.submit(j);
- }
-
- }
- if (nodeName.equals("nodeB")) {
- SCANodeManagerInitService workerManagerNodeB = node
- .getDomain()
- .getService(SCANodeManagerInitService.class,
- "WorkerManagerNodeBComponent/NodeManagerInitService");
- workerManagerNodeB.setNode(node);
- }
-
- if (nodeName.equals("nodeC")) {
- SCANodeManagerInitService workerManagerNodeC = node
- .getDomain()
- .getService(SCANodeManagerInitService.class,
- "WorkerManagerNodeCComponent/NodeManagerInitService");
- workerManagerNodeC.setNode(node);
- }
-
- if (nodeName.equals("nodeD")) {
- SCANodeManagerInitService workerManagerNodeD = node
- .getDomain()
- .getService(SCANodeManagerInitService.class,
- "WorkerManagerNodeDComponent/NodeManagerInitService");
- workerManagerNodeD.setNode(node);
- }
-
- if (nodeName.equals("nodeE")) {
- SCANodeManagerInitService workerManagerNodeE = node
- .getDomain()
- .getService(SCANodeManagerInitService.class,
- "WorkerManagerNodeEComponent/NodeManagerInitService");
- workerManagerNodeE.setNode(node);
- }
-
- this.waitForever();
- // this.thread.start();
- }
-
- public void stop() throws Exception {
- Thread.currentThread().interrupt();
- // thread.interrupt();
- node.stop();
- node.destroy();
- }
-
- private synchronized void waitForever() {
- while (!stopped) {
- try {
- wait();
- } catch (InterruptedException ex) {
- stopped = true;
- return;
- }
- }
- }
-
- public void run() {
- waitForever();
- }
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java
deleted file mode 100644
index 86557548af..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package node;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-
-import javax.xml.namespace.QName;
-
-import org.apache.tuscany.sca.assembly.Composite;
-import org.apache.tuscany.sca.assembly.Service;
-import org.apache.tuscany.sca.contribution.Contribution;
-import org.apache.tuscany.sca.contribution.DeployedArtifact;
-import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
-import org.apache.tuscany.sca.domain.SCADomain;
-import org.apache.tuscany.sca.node.NodeManagerInitService;
-import org.apache.tuscany.sca.node.SCANode;
-import org.apache.tuscany.sca.node.SCANodeFactory;
-import org.apache.tuscany.sca.node.impl.SCANodeImpl;
-import java.net.URI;
-
-import workpool.WorkerManager;
-import workpool.WorkerManagerImpl;
-import workpool.WorkpoolManager;
-import workpool.WorkpoolService;
-import workpool.WorkpoolServiceImpl;
-
-/**
- * This client program shows how to run a distributed SCA node. In this case a
- * calculator node has been constructed specifically for running the calculator
- * composite. Internally it creates a representation of a node and associates a
- * distributed domain with the node. This separation is made different
- * implementations of the distributed domain can be provided.
- */
-public class WorkpoolNode {
-
- public static void main(String[] args) throws Exception {
-
- // Check that the correct arguments have been provided
- if (null == args || args.length < 4) {
- System.err
- .println("Useage: java WorkpoolNode domainname nodename iterTest workerNo");
- System.exit(1);
- }
- BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
- String domainName = args[0];
- String nodeName = args[1];
- long iterations = Long.parseLong(args[2]);
- long jobsNo = Long.parseLong(args[3]);
- long workerNo = Long.parseLong(args[4]);
- ClassLoader cl = WorkpoolNode.class.getClassLoader();
-
- SCANodeFactory nodeFactory = SCANodeFactory.newInstance();
- SCANode node = nodeFactory.createSCANode(null, domainName);
- node.addContribution(nodeName, cl.getResource(nodeName + "/"));
- node.addToDomainLevelComposite(new QName("http://sample", "Workpool"));
- node.start();
- // nodeA is the head node and runs some tests while all other nodes
- // simply listen for incoming messages
-
- FileReader rules = new FileReader("workerRules.drl");
- StringBuffer buffer = new StringBuffer();
-
- BufferedReader br = new BufferedReader(rules);
- String ruleString;
- do {
- ruleString = br.readLine();
- if (ruleString != null) {
- buffer.append(ruleString + "\n");
- }
- } while (ruleString != null);
-
- if (nodeName.equals("nodeA")) {
- // do some application stuff
- WorkpoolService workpoolService = node.getDomain().getService(
- WorkpoolService.class, "WorkpoolServiceComponent");
- workpoolService.start();
- NodeManagerInitService nodeInit = node.getDomain().getService(
- NodeManagerInitService.class,
- "WorkpoolManagerComponent/NodeManagerInitService");
- nodeInit.setNode(node);
- WorkpoolManager workpoolManager = node.getDomain().getService(
- WorkpoolManager.class,
- "WorkpoolManagerComponent/WorkpoolManager");
- workpoolManager.setWorkpoolReference(node.getDomain()
- .getServiceReference(WorkpoolService.class,
- "WorkpoolServiceComponent"));
- workpoolManager.setCycleTime(8000);
- workpoolManager.acceptRules(buffer.toString());
- workpoolManager.start();
- int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 };
-
- double x = 398349;
-
- for (int i = 0; i < jobsNo; ++i)
- workpoolService.submit(new TestJob(x, iterations, items));
-
- TestJob j = new TestJob(-1.0, true);
- for (int i = 0; i < workerNo + 1; ++i) {
- j.setEOS();
- workpoolService.submit(j);
- }
-
- }
- try {
- if (nodeName.equals("nodeB")) {
- NodeManagerInitService serviceNodeB = node
- .getDomain()
- .getService(NodeManagerInitService.class,
- "WorkerManagerNodeBComponent/NodeManagerInitService");
- serviceNodeB.setNode(node);
- }
- if (nodeName.equals("nodeC")) {
- NodeManagerInitService workerManagerC = node
- .getDomain()
- .getService(NodeManagerInitService.class,
- "WorkerManagerNodeCComponent/NodeManagerInitService");
- workerManagerC.setNode(node);
- }
- if (nodeName.equals("nodeD")) {
- NodeManagerInitService workerManagerD = node
- .getDomain()
- .getService(NodeManagerInitService.class,
- "WorkerManagerNodeDComponent/NodeManagerInitService");
- workerManagerD.setNode(node);
- }
- if (nodeName.equals("nodeE")) {
- NodeManagerInitService workerManagerE = node
- .getDomain()
- .getService(NodeManagerInitService.class,
- "WorkerManagerNodeEComponent/NodeManagerInitService");
- workerManagerE.setNode(node);
- }
-
- System.out.println("Node started (press enter to shutdown)");
- String buff;
- for (;;) {
- try {
- buff = in.readLine();
- if (buff == null)
- break;
- System.out.print(in.readLine());
- } catch (IOException ex) {
- break; // Exit thread.
- }
- }
- // stop the node and all the domains in it
- node.stop();
- node.destroy();
- System.exit(0);
- } catch (Exception ex) {
- System.err.println("Exception in node - " + ex.getMessage());
- ex.printStackTrace(System.err);
- }
- }
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl
deleted file mode 100644
index 9c5a5d1b7f..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/node/workerRules1.drl
+++ /dev/null
@@ -1,13 +0,0 @@
-package workpool
-import workpool.*;
-rule "WorkerAdder1"
- when
- $workerBean: WorkpoolBean(singleAction == false && (jobComputed > 500))
- then
- $workerBean.setSingleAction()
- $workerBean.addWorkerToNode("nodeB")
- $workerBean.addWorkerToNode("nodeC")
- $workerBean.addWorkerToNode("nodeD")
- $workerBean.addWorkerToNode("nodeE")
-end
- \ No newline at end of file
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java
deleted file mode 100644
index cdd0f30b34..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import java.io.StringReader;
-import java.net.URI;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.util.logging.Logger;
-
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamReader;
-
-import org.apache.tuscany.sca.assembly.MetaComponent;
-import org.apache.tuscany.sca.assembly.impl.DefaultMetaComponent;
-
-public class MetaComponentWorker extends DefaultMetaComponent {
-
- private SecureRandom prng;
- private String componentName;
- private String scdl;
- private String javaClass;
- private boolean loadedFromString = false;
- private Logger log = Logger.getLogger(MetaComponentWorker.class.getName());
-
- public MetaComponentWorker() {
- componentName = "WorkerComponent"
- + java.util.UUID.randomUUID().toString();
- }
-
- public void setWorkerName(String componentName) {
- this.componentName = componentName;
- }
-
- public void setWorkerClass(String javaClass) {
- this.javaClass = javaClass;
- }
-
- private String generateSCDL() {
- StringBuffer buffer = new StringBuffer(512);
- buffer
- .append("<component xmlns=\"http://www.osoa.org/xmlns/sca/1.0\" name=\"");
- buffer.append(this.componentName);
- buffer.append("\">\n");
- buffer.append("<implementation.java class=\"");
- buffer.append(this.javaClass);
- buffer.append("\"/>");
- buffer.append("<property name=\"workerName\">");
- buffer.append(this.componentName);
- buffer.append("</property>\n</component>");
- return buffer.toString();
- }
-
- @Override
- public XMLStreamReader build() throws Exception {
- XMLInputFactory factory = XMLInputFactory.newInstance();
- if (!loadedFromString)
- scdl = generateSCDL();
- return factory.createXMLStreamReader(new StringReader(scdl));
-
- }
-
- public String getName() {
-
- return componentName;
- }
-
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java
deleted file mode 100644
index c45696e3cf..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.apache.tuscany.sca.databinding.job.JobDataMap;
-import org.apache.tuscany.sca.databinding.job.JobExecutionContext;
-import org.apache.tuscany.sca.databinding.job.RemoteJob;
-import org.osoa.sca.annotations.Scope;
-
-@Scope("COMPOSITE")
-public class MyWorker extends WorkerServiceImpl<Object, Double> {
- private static int resultcount = 0;
-
- @Override
- public ResultJob computeTask(Job<Object, Double> job) {
-
- RemoteJob remoteJob = (RemoteJob) job;
- System.out.println("Computing the job");
- JobExecutionContext context = remoteJob.getContext();
- ResultJob resultJob = new ResultJob();
- JobDataMap resultMap = new JobDataMap();
- resultMap.addJobData("result", remoteJob.compute(context));
- resultJob.setJobDataMap(resultMap);
- System.out.println("Count result = " + (++resultcount));
- return resultJob;
- }
-
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java
deleted file mode 100644
index fb930adf2e..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.apache.tuscany.sca.databinding.job.JobDataMap;
-
-public class NullJob implements Job, java.io.Serializable {
-
- public Object compute(Object arg0) {
- // TODO Auto-generated method stub
- return null;
- }
-
- public JobDataMap getDataMap() {
- return null;
- }
-
- public boolean eos() {
- return false;
- }
-
- public int getType() {
- return Job.NULL_JOB;
- }
-
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java
deleted file mode 100644
index e04411668b..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.apache.tuscany.sca.databinding.job.JobDataMap;
-import org.apache.tuscany.sca.databinding.job.JobExecutionContext;
-import org.apache.tuscany.sca.databinding.job.RemoteJob;
-
-public class ResultJob extends RemoteJob<Object> implements
- java.io.Serializable {
- private JobDataMap map;
-
- public JobDataMap getDataMap() {
- return map;
- }
-
- public void setJobDataMap(JobDataMap map) {
- this.map = map;
- }
-
- public boolean eos() {
- // TODO Auto-generated method stub
- return true;
- }
-
- public int getType() {
- // TODO Auto-generated method stub
- return Job.RESULT_JOB;
- }
-
- @Override
- public Object compute(JobExecutionContext v) {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java
deleted file mode 100644
index 469675b19b..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.apache.tuscany.sca.databinding.annotation.DataBinding;
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.osoa.sca.annotations.Remotable;
-
-@Remotable
-@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
-public interface Trigger<T> {
- void handleEvent(T c);
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java
deleted file mode 100644
index 520203e190..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-import org.osoa.sca.annotations.Remotable;
-import org.osoa.sca.CallableReference;
-@Remotable
-public interface WorkerManager {
- CallableReference<WorkerService> addWorker();
- boolean removeWorker(String workerName);
- boolean removeWorkers(int k);
- boolean removeAllWorkers();
- double getNodeLoad();
- int activeWorkers();
- void start();
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java
deleted file mode 100644
index d4337cad2f..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.List;
-import java.util.logging.Logger;
-
-import org.apache.tuscany.sca.assembly.Composite;
-import org.apache.tuscany.sca.contribution.Contribution;
-import org.apache.tuscany.sca.contribution.DeployedArtifact;
-import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
-import org.osoa.sca.CallableReference;
-import org.apache.tuscany.sca.node.management.SCANodeManagerInitService;
-import org.apache.tuscany.sca.node.SCANode;
-import org.apache.tuscany.sca.node.impl.SCANodeImpl;
-import org.apache.tuscany.sca.runtime.RuntimeComponent;
-import org.osoa.sca.ComponentContext;
-import org.osoa.sca.annotations.Context;
-import org.osoa.sca.annotations.Property;
-import org.osoa.sca.annotations.Scope;
-import org.osoa.sca.annotations.Service;
-import java.util.LinkedList;
-import java.util.ArrayList;
-
-@Scope("COMPOSITE")
-@Service(interfaces = { SCANodeManagerInitService.class, WorkerManager.class })
-public class WorkerManagerImpl implements WorkerManager, SCANodeManagerInitService {
- private Logger log = Logger.getLogger(WorkerManagerImpl.class.getName());
- private LinkedList<CallableReference<WorkerService>> activeWorkers = new LinkedList<CallableReference<WorkerService>>();
- private List<String> workerComponentNames = new ArrayList<String>();
- private SCANodeImpl node;
- @Property
- protected String nodeName;
- @Property
- protected String compositeName;
- @Property
- protected String workerClass;
- @Context
- protected ComponentContext context;
- private double loadAverage;
-
- /* This method is used to find a composite inside all deployed artifacts */
- private Composite findComposite(List<Composite> artifacts) {
- for (Composite fact : artifacts) {
- log.info("Searching in a contribution deployed artifacts -"
- + compositeName);
- Composite augmented = (Composite) fact;
- // found
- if (augmented.getURI().equals(compositeName)) {
- log.info("Found composite..." + compositeName);
- return augmented;
- }
- }
- }
- return null;
- }
-
- public CallableReference<WorkerService> addWorker() {
- log.info("Adding a new worker call..");
- long addWorkerStartTime = System.nanoTime();
- ContributionServiceImpl cServiceImpl = (ContributionServiceImpl) node.getContributionService();
- Contribution contribution = cServiceImpl.getContribution(nodeName);
- List<Composite> artifacts = contribution.getDeployables();
- CallableReference<WorkerService> workerReference = null;
- CallableReference<WorkerService> ref = null;
- log.info("Creating a MetaComponentWorker..");
- MetaComponentWorker mcw = new MetaComponentWorker();
- boolean found = false;
- mcw.setWorkerClass(workerClass);
- Composite augmented = findComposite(artifacts);
- try {
- if (augmented != null) {
- long startCreation = System.nanoTime();
- node.addComponentToComposite(mcw, contribution.getURI(),
- augmented.getURI());
- System.out.println("addComponentToComposite time = "
- + (System.nanoTime() - startCreation));
- RuntimeComponent workerComponent = (RuntimeComponent) node
- .getComponent(mcw.getName());
- if (workerComponent != null) {
- ref = (CallableReference<WorkerService>) workerComponent
- .getComponentContext().createSelfReference(
- WorkerService.class);
- ref.getService().start();
- activeWorkers.addLast(ref);
- workerComponentNames.add(mcw.getName());
- CallableReference<WorkerManager> manager = (CallableReference) context
- .createSelfReference(WorkerManager.class,
- "WorkerManager");
- ref.getService().registerManager(manager);
- return ref;
- }
- } else {
- log.info("Workpool composite not found!");
- }
- } catch (Exception e) {
- log.info("Exception activation");
- e.printStackTrace();
- }
- ;
- System.out.println("Component Creation Time ="
- + (System.nanoTime() - addWorkerStartTime));
- return ref;
- }
-
- public boolean removeAllWorkers() {
- for (CallableReference<WorkerService> callable : activeWorkers) {
- callable.getService().stop();
- }
- return true;
- }
-
- public boolean removeWorker() {
- CallableReference<WorkerService> callable = activeWorkers
- .removeLast();
- callable.getService().stop();
- return true;
- }
-
- public boolean removeWorkers(int k) {
- if (k >= activeWorkers.size())
- return false;
- for (int i = 0; i < k; ++i) {
- if (!removeWorker())
- return false;
- }
- return true;
- }
-
- public void setNode(SCANode node) {
- this.node = (SCANodeImpl) node;
-
- }
-
- public double getNodeLoad() {
- /*
- * FIXME [jo] this works only on Linux To be replaced with an JNI
- * extension
- */
- RandomAccessFile statfile;
-
- this.loadAverage = 1.0;
- // load = 0;
- int NoProcessors = 0;
- String cpuLine = null;
- try {
- NoProcessors = Runtime.getRuntime().availableProcessors();
- if (NoProcessors > 1)
- this.loadAverage = 1 / (1.0 * NoProcessors);
- statfile = new RandomAccessFile("/proc/loadavg", "r");
- try {
- statfile.seek(0);
- cpuLine = statfile.readLine();
-
- } catch (IOException e) {
- // FIX ME: Better exception handling.
- e.printStackTrace();
- }
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (NumberFormatException e) {
- e.printStackTrace();
- }
- double min1;
- if (cpuLine != null) {
- java.util.StringTokenizer st = new java.util.StringTokenizer(
- cpuLine, " ");
- min1 = Double.parseDouble(st.nextToken());
- } else
- min1 = 0;
-
- return min1 * this.loadAverage;
- }
-
- public int activeWorkers() {
- return activeWorkers.size();
- }
-
- public boolean removeWorker(String workerName) {
- RuntimeComponent workerComponent = (RuntimeComponent) node
- .getComponent(workerName);
- if (workerComponent != null) {
- log.info("Removing component " + workerName);
- node.removeComponentFromComposite(nodeName, "Workpool.composite",
- workerName);
- return true;
- }
- return false;
- }
-
- public void start() {
- // do nothing for now.
- }
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java
deleted file mode 100644
index 37b7ea227a..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.osoa.sca.ServiceReference;
-import org.osoa.sca.annotations.Callback;
-import org.osoa.sca.annotations.Remotable;
-import org.osoa.sca.annotations.OneWay;
-import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
-import org.apache.tuscany.sca.databinding.annotation.DataBinding;
-import org.apache.tuscany.sca.databinding.job.Job;
-
-/**
- * The interface for the multiply service
- */
-@Remotable
-@Callback(WorkerServiceCallback.class)
-@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
-public interface WorkerService<T, E> {
- @OneWay
- void compute(Job<T, E> j);
-
- void start();
-
- void stop();
-
- // void addJobCompleteHandler(String triggerName,
- // CallableReferenceImpl<Trigger> handle);
- // void removeJobCompleteHandler(String triggerName);
- /* The worker manager */
- void registerManager(CallableReferenceImpl<WorkerManager> wm);
-
- void registerSender(CallableReferenceImpl<WorkpoolService> sender);
-
- // void init(Job nullJob);
- @OneWay
- void computeFirstTime(Job nullJob,
- CallableReferenceImpl<WorkpoolService> myReference);
-
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java
deleted file mode 100644
index 6fb1278e5e..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.osoa.sca.annotations.Remotable;
-
-@Remotable
-public interface WorkerServiceCallback {
- void receiveResult(Job resultType, boolean reuse, String workerName);
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java
deleted file mode 100644
index 2c9bf5ea48..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.osoa.sca.ComponentContext;
-import org.osoa.sca.RequestContext;
-import org.osoa.sca.ServiceReference;
-import org.osoa.sca.annotations.Callback;
-import org.osoa.sca.annotations.Context;
-import org.osoa.sca.annotations.Property;
-import org.osoa.sca.annotations.Scope;
-import org.osoa.sca.annotations.Service;
-import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
-import org.apache.tuscany.sca.databinding.annotation.DataBinding;
-import org.apache.tuscany.sca.databinding.job.Job;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.*;
-
-/**
- * An implementation of the worker service.
- */
-@Service(WorkerService.class)
-@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
-@Scope("COMPOSITE")
-public abstract class WorkerServiceImpl<T, E> implements WorkerService<T, E> {
- private Logger log = Logger.getLogger(this.getClass().getName());
- private WorkerServiceCallback workerServiceCallback;
- @Context
- protected ComponentContext workerContext;
- @Context
- protected RequestContext requestContext;
- @Property
- protected String workerName;
- private CallableReferenceImpl<WorkerManager> managerReference = null;
-
- /* TODO add the triggers, but before ask */
- // protected Map<String,Trigger> triggers = new HashMap<String,Trigger>();
- public abstract ResultJob computeTask(Job<T, E> job);
-
- private boolean stopped = false;
- private CallableReferenceImpl<WorkerService> serviceRef;
- private CallableReferenceImpl<WorkpoolService> senderService;
- private WorkpoolService wp = null;
- private WorkerManager manager = null;
-
- public void start() {
- log.info("Starting worker...");
- stopped = false;
- serviceRef = (CallableReferenceImpl) workerContext
- .createSelfReference(WorkerService.class);
-
- }
-
- public void init(CallableReferenceImpl<WorkpoolService> sender, Job nullJob) {
- compute(nullJob);
- }
-
- public void stop() {
- stopped = true;
- }
-
- @Callback
- public void setWorkerServiceCallback(
- WorkerServiceCallback workerServiceCallback) {
- log.info("Setting worker callback");
- this.workerServiceCallback = workerServiceCallback;
- }
-
- public void computeFirstTime(Job nullJob,
- CallableReferenceImpl<WorkpoolService> sender) {
- senderService = sender;
- wp = sender.getService();
- workWithCallable(nullJob);
- }
-
- public void registerManager(CallableReferenceImpl<WorkerManager> wm) {
- managerReference = wm;
- manager = managerReference.getService();
-
- }
-
- public void registerSender(CallableReferenceImpl<WorkpoolService> sender) {
- log.info("Registering sender..");
- senderService = sender;
- wp = sender.getService();
- }
-
- private void workWithInjection(Job j) {
- log.info("Worker has received job");
- if (stopped) {
- workerServiceCallback
- .receiveResult(j, true, workerContext.getURI());
- if (managerReference != null)
- manager.removeWorker(workerContext.getURI());
- } else if (j.eos()) {
- if (managerReference != null)
- manager.removeWorker(workerContext.getURI());
- }
- if (j instanceof NullJob) {
- workerServiceCallback.receiveResult(j, false, workerContext
- .getURI());
- } else {
- workerServiceCallback.receiveResult(computeTask(j), false,
- workerContext.getURI());
- }
- }
-
- private void workWithCallable(Job j) {
- log.info("Worker " + workerContext.getURI()
- + " has received job with eos --> " + j.eos());
- if (stopped) {
- wp.handleResult(j, true, workerContext.getURI(), serviceRef, false);
- return;
- }
- if (j.eos()) {
- log.info("Got poison token...");
- if (managerReference != null) {
- log.info("Removing component " + workerContext.getURI());
- manager.removeWorker(workerContext.getURI());
-
- }
- return;
- }
- if (j.getType() != Job.NULL_JOB) {
- wp.handleResult(computeTask(j), false, workerContext.getURI(),
- serviceRef, false);
- } else {
- log.info("Got a null job");
- wp.handleResult(j, false, workerContext.getURI(), serviceRef, true);
- }
- }
-
- public void compute(Job<T, E> j) {
-
- if (senderService != null) {
- log.info("Computing job using callable reference method");
- workWithCallable(j);
-
- } else {
- log.info("Computing job using reference injection method");
- workWithInjection(j);
-
- }
- }
- /*
- * public void addJobCompleteHandler(String triggerName,
- * CallableReferenceImpl<Trigger> handle) { if
- * (!triggers.containsKey(triggerName)) { triggers.put(triggerName,
- * handle.getService()); } } public void removeJobCompleteHandler(String
- * triggerName) { if (!triggers.containsKey(triggerName)) {
- * triggers.remove(triggerName); } }
- */
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java
deleted file mode 100644
index 80c093ff1c..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import java.beans.*;
-import java.util.Vector;
-import java.util.logging.*;
-
-public class WorkpoolBean {
- private Vector<WorkpoolBeanListener> listeners = new Vector<WorkpoolBeanListener>();
- double loadAverage = 0;
- int nodeNumbers = 0;
- int workers = 0;
- int estimedQueueSize = 0;
- double averageServiceTime = 0;
- double averageArrivalTime = 0;
- double usageFactor = 0;
- private final PropertyChangeSupport changes = new PropertyChangeSupport(
- this);
- long jobComputed = 0;
- boolean singleAction = false;
- private Logger log = Logger.getLogger(WorkpoolBean.class.getName());
-
- public void setNodeNumbers(int n) {
- this.nodeNumbers = n;
- }
-
- public void setWorkers(int w) {
- this.workers = w;
- }
-
- public void setLoadAverage(double loadAverage) {
- this.loadAverage = loadAverage;
- }
-
- public void setAverageServiceTime(double service) {
- this.averageServiceTime = service;
- }
-
- public void setAverageArrivalTime(double service) {
- this.averageArrivalTime = service;
- }
-
- public double getAverageArrivalTime() {
- return this.averageArrivalTime;
- }
-
- public double getUtilizationFactor() {
- return usageFactor;
- }
-
- public void setUsageFactor() {
- usageFactor = averageServiceTime / averageArrivalTime;
- }
-
- public void setEstimedQueueSize(int size) {
- estimedQueueSize = size;
- }
-
- public int getEstimedQueueSize() {
- return estimedQueueSize;
- }
-
- public double getLoadAverage() {
- return this.loadAverage;
- }
-
- public int getWorkers() {
- return this.workers;
- }
-
- public int getNodeNumbers() {
- return this.nodeNumbers;
- }
-
- public double getAverageServiceTime() {
- return this.averageServiceTime;
- }
-
- public void addPropertyChangeListener(final PropertyChangeListener l) {
- this.changes.addPropertyChangeListener(l);
- }
-
- public void removePropertyChangeListener(final PropertyChangeListener l) {
- this.changes.removePropertyChangeListener(l);
- }
-
- private synchronized void fireWorkpoolEvent(WorkpoolEvent ev) {
- for (WorkpoolBeanListener l : listeners) {
- l.handleEvent(new WorkpoolEvent(ev));
- }
- }
-
- public void addWorkersToNode(int k, String nodeName) {
- log.info("Adding a worker to node " + nodeName);
- WorkpoolEvent ev = new WorkpoolEvent(this,
- WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER, k, nodeName);
- fireWorkpoolEvent(ev);
- }
-
- public void addWorkerToNode(String nodeName) {
- log.info("Adding a worker to node " + nodeName);
- WorkpoolEvent ev = new WorkpoolEvent(this,
- WorkpoolEvent.SINGLE_ADD_WORKER, 1, nodeName);
- fireWorkpoolEvent(ev);
- }
-
- public void removeWorkersToNode(int k, String nodeName) {
- log.info("Removing a worker to node " + nodeName);
- WorkpoolEvent ev = new WorkpoolEvent(this,
- WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER, k, nodeName);
- fireWorkpoolEvent(ev);
- }
-
- public void removeWorkerToNode(String nodeName) {
- log.info("Removing a worker to node " + nodeName);
- WorkpoolEvent ev = new WorkpoolEvent(this,
- WorkpoolEvent.SINGLE_REMOVE_WORKER, 1, nodeName);
- fireWorkpoolEvent(ev);
- }
-
- public synchronized void addListener(WorkpoolBeanListener l) {
- this.listeners.add(l);
- }
-
- public synchronized void removeListener(WorkpoolBeanListener l) {
- this.listeners.remove(l);
- }
-
- public void setJobComputed(long jobComputed) {
- this.jobComputed = jobComputed;
-
- }
-
- public void setSingleAction() {
- singleAction = true;
- }
-
- public boolean getSingleAction() {
- return singleAction;
- }
-
- public long getJobComputed() {
- return this.jobComputed;
- }
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java
deleted file mode 100644
index 0ecc223fed..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import java.util.EventListener;
-
-public interface WorkpoolBeanListener extends EventListener {
- public void handleEvent(WorkpoolEvent ev);
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java
deleted file mode 100644
index 0bdc3671d5..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import java.util.EventObject;
-
-public class WorkpoolEvent extends EventObject {
-
- private static final long serialVersionUID = -1273928009411948768L;
-
- public WorkpoolEvent(Object source) {
- super(source);
- }
-
- public WorkpoolEvent(WorkpoolEvent ev) {
- super(ev.source);
- type = ev.type;
- noWorker = ev.noWorker;
- nodeName = ev.nodeName;
- }
-
- public WorkpoolEvent(Object source, int typeEv, int worker) {
- super(source);
- type = typeEv;
- noWorker = worker;
- nodeName = "";
- }
-
- public WorkpoolEvent(Object source, int typeEv, int worker, String nodeName) {
- super(source);
- type = typeEv;
- noWorker = worker;
- this.nodeName = nodeName;
- }
-
- public String getNodeName() {
- return nodeName;
- }
-
- public int getType() {
- return type;
- }
-
- public int workers() {
- return noWorker;
- }
-
- private int type;
- private int noWorker;
- private String nodeName;
- public static final int EVENT_MULTIPLE_ADD_WORKER = 0;
- public static final int EVENT_MULTIPLE_REMOVE_WORKER = 1;
- public static final int SINGLE_REMOVE_WORKER = 2;
- public static final int SINGLE_ADD_WORKER = 3;
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java
deleted file mode 100644
index 6520954bdd..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.osoa.sca.ServiceReference;
-import org.osoa.sca.annotations.OneWay;
-import org.osoa.sca.annotations.Remotable;
-
-@Remotable
-public interface WorkpoolManager {
- /*
- * @param String rules This are the autonomic rules. The format is the Java
- * Drools .drl file. You have to read it
- */
- @OneWay
- void acceptRules(String rules);
-
- @OneWay
- void start();
-
- @OneWay
- void stopAutonomicCycle();
-
- @OneWay
- void startAutonomicCycle();
-
- int activeWorkers();
-
- void setCycleTime(long time);
-
- void setWorkpoolReference(ServiceReference<WorkpoolService> serviceReference);
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java
deleted file mode 100644
index f7c727ad04..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.osoa.sca.ComponentContext;
-import org.osoa.sca.ServiceReference;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Logger;
-
-import javax.xml.stream.XMLStreamException;
-
-import node.TestJob;
-import java.io.File;
-import java.util.Vector;
-import org.apache.axiom.om.OMElement;
-import org.apache.tuscany.sca.contribution.service.ContributionResolveException;
-import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
-import org.apache.tuscany.sca.core.context.ServiceReferenceImpl;
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.apache.tuscany.sca.node.NodeManagerInitService;
-import org.apache.tuscany.sca.node.SCANode;
-import org.apache.tuscany.sca.node.impl.SCANodeImpl;
-import org.osoa.sca.CallableReference;
-import org.drools.FactHandle;
-import org.drools.RuleBase;
-import org.drools.RuleBaseFactory;
-import org.drools.StatefulSession;
-import org.drools.StatelessSession;
-import org.drools.compiler.DroolsParserException;
-import org.drools.compiler.PackageBuilder;
-import org.drools.rule.Package;
-import org.osoa.sca.annotations.Constructor;
-import org.osoa.sca.annotations.Context;
-import org.osoa.sca.annotations.Destroy;
-import org.osoa.sca.annotations.Property;
-import org.osoa.sca.annotations.Reference;
-import org.osoa.sca.annotations.Scope;
-import org.osoa.sca.annotations.Service;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
-@Service(interfaces = { NodeManagerInitService.class, WorkpoolManager.class })
-@Scope("COMPOSITE")
-/*
- * This is the core manager of the workpool application. The Workpool Manager
- * holds the reference to each remote node manager. Inside it we've a rule
- * engine instance.
- */
-public class WorkpoolManagerImpl implements WorkpoolManager,
- NodeManagerInitService, WorkpoolBeanListener {
- /*
- * This inner class trigs the rule engine, at given times: 1. It checks the
- * different loads for each nodes and sets the WorkpoolBean 2. It checks the
- * Workpool AverageService Time and sets the WorkpoolBean 3. It checks how
- * many jobs are already computed and sets the WorkpoolBean Then given the
- * configured bean and the rules, run the Rule Engine for executing the
- * business logic
- */
- class RuleEngineTrigger extends TimerTask {
- // private ReentrantLock triggerLock = new ReentrantLock();
- @Override
- public void run() {
-
- System.out.println("Updating WorkpoolBean..");
- // checkActiveWorkers();
- // checkLoadInNodes();
- checkServiceTime();
- // checkEstimedQueueSize();
- // checkArrivalTime();
- getProcessedItem();
- // computeUsageFactor();
- doRun(bean);
- }
-
- }
-
- private WorkerManager managerNodeB;
- private WorkerManager managerNodeC;
- private WorkerManager managerNodeD;
- private WorkerManager managerNodeE;
-
- private SCANodeImpl node;
- private WorkpoolBean bean = new WorkpoolBean();
- private ReentrantLock handleEventLock = new ReentrantLock();
- private ReentrantLock updateRuleLock = new ReentrantLock();
-
- private ServiceReference<WorkpoolService> reference;
- private AtomicInteger activeWorkers = new AtomicInteger(0);
- private Logger log = Logger.getLogger(WorkpoolManagerImpl.class.getName());
- @Property
- protected String workers;
- @Property
- protected String nodes;
- @Property
- protected String injection;
- @Context
- protected ComponentContext workpoolManagerContext;
- private CallableReferenceImpl<WorkpoolManager> myReference;
- private String rules = null;
- private boolean referenceInjection = false;
- private ConcurrentHashMap<String, WorkerManager> workerManagerTable = new ConcurrentHashMap<String, WorkerManager>();
- private int workersNo;
- private int nodesNo;
- private Timer timer = new Timer();
- /* this handle facts */
- private RuleBase ruleBase = null;
- private FactHandle handle = null;
- private StatefulSession wm = null;
- private long cycleTime = 5000;
-
- @Reference
- public void setManagerNodeB(WorkerManager managerNodeB) {
- this.managerNodeB = managerNodeB;
- workerManagerTable.put("nodeB", managerNodeB);
- }
-
- @Reference
- public void setManagerNodeC(WorkerManager managerNodeC) {
- this.managerNodeC = managerNodeC;
- workerManagerTable.put("nodeC", managerNodeC);
- }
-
- @Reference
- public void setManagerNodeD(WorkerManager managerNodeD) {
- this.managerNodeD = managerNodeD;
- workerManagerTable.put("nodeD", managerNodeD);
- }
-
- @Reference
- public void setManagerNodeE(WorkerManager managerNodeE) {
- this.managerNodeE = managerNodeE;
- workerManagerTable.put("nodeE", managerNodeE);
- }
-
- private void startNewComponents(
- Vector<CallableReferenceImpl<WorkerService>> vector) {
- log.info("Starting new components");
- WorkpoolService wp = reference.getService();
- // CallableReferenceImpl<WorkpoolService> sink =
- // (CallableReferenceImpl<WorkpoolService>) reference;
- Job j = new NullJob();
- for (CallableReferenceImpl<WorkerService> item : vector) {
- // WorkerService service = item.getService();
- // service.start();
- // service.computeFirstTime(j, sink);
- log.info("Send PostWorkerReference...");
- wp.PostWorkerReference(item);
- }
- if (myReference != null)
- wp.registerManager(myReference);
- }
-
- public void setCycleTime(long cycle) {
- this.cycleTime = cycle;
- }
-
- @SuppressWarnings("unchecked")
- /*
- * This gets the number of workers workerNo and instantiates them
- */
- public void start() {
- this.myReference = (CallableReferenceImpl<WorkpoolManager>) workpoolManagerContext
- .createSelfReference(WorkpoolManager.class, "WorkpoolManager");
- this.workersNo = Integer.parseInt(this.workers);
- this.nodesNo = Integer.parseInt(this.nodes);
- this.referenceInjection = (Integer.parseInt(this.injection) != 0);
- log.info("Starting WorkpoolManager Component with #" + workersNo
- + " workers and #" + nodes + " nodes");
- nodesNo = workerManagerTable.values().size();
- // Sets info in the bean.
- bean.setWorkers(this.workersNo);
- bean.setNodeNumbers(nodesNo);
- Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>();
- int exactTimes = workersNo / nodesNo;
- for (int i = 0; i < exactTimes; ++i) {
- for (WorkerManager manager : workerManagerTable.values()) {
- manager.start();
- if (manager != null) {
- System.err.println("Actual load = "
- + manager.getNodeLoad() + " for node ");
- addNewComponent(manager, workerRefs);
- }
- }
- }
-
- int module = (workersNo % nodesNo);
- int n = 0;
- if (module > 0) {
- Vector<String> v = new Vector(workerManagerTable.keySet());
- Collections.sort(v);
- // Iterator<WorkerManager> iter =
- // workerManagerTable.values().iterator();
- // Display (sorted) hashtable.
- for (Enumeration<String> e = v.elements(); (e.hasMoreElements() && n < module); ++n) {
- String key = e.nextElement();
- WorkerManager m = workerManagerTable.get(key);
- System.err.println("Module Actual load = " + m.getNodeLoad()
- + " for node ");
- addNewComponent(m, workerRefs);
- }
- }
- startNewComponents(workerRefs);
- bean.addListener(this);
- TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger();
- timer.scheduleAtFixedRate(task, 3000, cycleTime);
- }
-
- private void checkLoadInNodes() {
- System.out.println("CheckLoadInNodes");
- int number = 1;
- double loadAverage = 0;
- for (WorkerManager manager : workerManagerTable.values()) {
- loadAverage += manager.getNodeLoad();
- number++;
- }
- bean.setLoadAverage(loadAverage / number);
- }
-
- private void computeUsageFactor() {
- bean.setUsageFactor();
- }
-
- private void checkEstimedQueueSize() {
- WorkpoolService wp = reference.getService();
-
- if (wp != null) {
- int size = wp.estimatedQueueSize();
- log.info("Estimed Queue Size =" + size);
- bean.setEstimedQueueSize(size);
- }
- }
-
- private WorkerManager findMinLoad() {
- double load = 0;
- // workerManagerTable.values().iterator().next().getNodeLoad();
- WorkerManager toFind = null;
- for (WorkerManager manager : workerManagerTable.values()) {
- if (load == 0) {
- load = manager.getNodeLoad();
- toFind = manager;
- } else if (manager.getNodeLoad() < load) {
- load = manager.getNodeLoad();
- toFind = manager;
- }
- }
- return toFind;
- }
-
- private void checkServiceTime() {
- WorkpoolService wp = reference.getService();
-
- if (wp != null) {
- double time = wp.getServiceTime();
- log.info("Average System Service Time =" + time);
- bean.setAverageServiceTime(time);
- }
- }
-
- private void checkArrivalTime() {
- WorkpoolService wp = reference.getService();
-
- if (wp != null) {
- double time = wp.getArrivalTime();
- log.info("Average Arrival Service Time =" + time);
- bean.setAverageArrivalTime(time);
- }
- }
-
- private void checkActiveWorkers() {
- bean.setWorkers(this.activeWorkers());
- }
-
- private void getProcessedItem() {
- WorkpoolService wp = reference.getService();
- if (wp != null) {
- long computed = wp.getJobComputed();
- log.info("The system has already computed " + computed + " jobs");
- bean.setJobComputed(computed);
- }
- }
-
- private boolean removeComponent(WorkerManager manager, int k) {
- manager.removeWorkers(k);
- activeWorkers.decrementAndGet();
- return true;
- }
-
- @SuppressWarnings("unchecked")
- private boolean addNewComponent(WorkerManager manager,
- Vector<CallableReferenceImpl<WorkerService>> workerRefs) {
- CallableReferenceImpl<WorkerService> workerReference = (CallableReferenceImpl<WorkerService>) manager
- .addWorker();
-
- if (workerReference != null) {
- /* if i'll decide to use dynamically generated references */
- if (referenceInjection) {
- workerReference.getService();
- String uri = workerReference.getEndpointReference().getURI();
- int nameIndex = uri.indexOf("/");
- String componentName = uri.substring(0, nameIndex);
- if (componentName.startsWith("/"))
- componentName = uri.substring(1, uri.length());
- if (componentName.endsWith("/"))
- componentName = uri.substring(0, uri.length() - 1);
- // String componentName = uri.substring(0, nameIndex-1);
-
- log.info("Adding wire from WorkpoolComponentService to "
- + componentName);
- String referenceName = "ref" + componentName;
-
- /*
- * I'm updating the WorkpoolServiceComponent with a new
- * reference to a just created component I assume that the
- * WorkpoolManagerService and the WorkpoolServiceComponent stay
- * in the same JVM It's like in the scdl there were: <reference
- * name=referenceName target="componentName"/> With this then
- * I've a wire WorkpoolService---> a new Worker
- */
- try {
- node.addComponentReferenceWire(referenceName, "nodeA",
- "Workpool.composite", "workpool.WorkerServiceImpl",
- WorkerService.class, "WorkpoolServiceComponent",
- componentName);
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- log.info("Sending reference name " + referenceName
- + " to WorkpoolService");
- // TODO: this was part of dynamic wiring, but it doesn't work.
- // reference.getService().PostWorkerName(referenceName);
-
- } else {
- // log.info("Sending callable reference to WorkpoolService
- // placed at -->"+reference);
- // reference.getService().PostWorkerReference(workerReference);
- workerRefs.add(workerReference);
- }
- activeWorkers.incrementAndGet();
- return true;
- }
- return false;
- }
-
- public int activeWorkers() {
-
- return activeWorkers.get();
- }
-
- private void doRun(WorkpoolBean bean) {
-
- long startTime = System.currentTimeMillis();
- updateRuleLock.lock();
- if (wm == null)
- wm = ruleBase.newStatefulSession();
- if (this.handle == null)
- handle = wm.insert(bean);
- else {
- wm.update(handle, bean);
- }
- wm.fireAllRules();
- updateRuleLock.unlock();
-
- System.out.println("Engine rule overhead = "
- + (System.currentTimeMillis() - startTime));
- }
-
- private RuleBase readRule(String rule) {
-
- PackageBuilder packBuilder = new PackageBuilder();
- try {
- packBuilder.addPackageFromDrl(new StringReader(rule));
- } catch (DroolsParserException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- Package pkg = packBuilder.getPackage();
- RuleBase ruleBase = RuleBaseFactory.newRuleBase();
- try {
- ruleBase.addPackage(pkg);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return ruleBase;
- }
-
- public void acceptRules(String rules) {
- this.rules = rules;
- if (ruleBase == null) {
- RuleBase base = readRule(rules);
- if (base != null) {
- ruleBase = base;
- }
- } else {
- updateRuleLock.lock();
- // i have already a rule: updating
- ruleBase = readRule(rules);
- wm = ruleBase.newStatefulSession();
- handle = null;
- updateRuleLock.unlock();
- }
-
- System.out.println("Accepted rules = " + rules);
- }
-
- public String getRules() {
- return rules;
- }
-
- private WorkerManager findMaxLoadNode() {
- double load = 0.0;
- WorkerManager toFind = null;
- for (WorkerManager manager : workerManagerTable.values()) {
- if (manager.getNodeLoad() > load) {
- load = manager.getNodeLoad();
- toFind = manager;
- }
- }
- return toFind;
-
- }
-
- public void setWorkpoolReference(
- ServiceReference<WorkpoolService> serviceReference) {
- reference = serviceReference;
- }
-
- public void setNode(SCANode arg0) {
- node = (SCANodeImpl) arg0;
- }
-
- public void handleEvent(WorkpoolEvent ev) {
- if (ev == null)
- return;
-
- String nodeName = ev.getNodeName();
-
- switch (ev.getType()) {
- case WorkpoolEvent.SINGLE_ADD_WORKER: {
- if (nodeName != null) {
- Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>();
-
- // in this case I have a nodeName
- if (!nodeName.equals("")
- && (workerManagerTable.containsKey(nodeName))) {
- WorkerManager manager = workerManagerTable.get(nodeName);
- addNewComponent(manager, workerRefs);
- startNewComponents(workerRefs);
- } else if (nodeName.equals("")) {
- WorkerManager manager = findMinLoad();
- addNewComponent(manager, workerRefs);
- startNewComponents(workerRefs);
- }
- }
- break;
- }
- case WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER: {
- Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>();
-
- if (nodeName.equals("")) {
-
- WorkerManager manager = findMinLoad();
- int k = ev.workers();
- for (int h = 0; h < k; ++h) {
- addNewComponent(manager, workerRefs);
- }
- } else {
- WorkerManager manager = workerManagerTable
- .get(ev.getNodeName());
- int k = ev.workers();
- for (int h = 0; h < k; ++h) {
- addNewComponent(manager, workerRefs);
- }
- }
- startNewComponents(workerRefs);
- break;
- }
- case WorkpoolEvent.SINGLE_REMOVE_WORKER: {
- if (nodeName != null) {
- // in this case I have a nodeName
- if (!nodeName.equals("")
- && (workerManagerTable.containsKey(nodeName))) {
- WorkerManager manager = workerManagerTable.get(nodeName);
- removeComponent(manager, 1);
- } else if (nodeName.equals("")) {
- WorkerManager manager = findMaxLoadNode();
- removeComponent(manager, 1);
- }
- }
- break;
- }
- case WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER: {
- if (nodeName.equals("")) {
- WorkerManager manager = findMaxLoadNode();
- removeComponent(manager, ev.workers());
-
- } else {
- WorkerManager manager = workerManagerTable.get(nodeName);
- removeComponent(manager, ev.workers());
- }
- break;
- }
- }
-
- }
-
- @Destroy
- public void onExit() {
- // do cleanup
- this.timer.cancel();
- this.timer.purge();
- }
-
- public void stopAutonomicCycle() {
- this.timer.cancel();
- this.timer.purge();
- this.timer = null;
- }
-
- public void startAutonomicCycle() {
- if (this.timer == null) {
- this.timer = new Timer();
- TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger();
- timer.schedule(task, 3000, cycleTime);
- }
- }
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java
deleted file mode 100644
index d84ae549d8..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
-import org.apache.tuscany.sca.databinding.annotation.DataBinding;
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.osoa.sca.annotations.OneWay;
-import org.osoa.sca.annotations.Remotable;
-import org.osoa.sca.ServiceReference;
-
-@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
-@Remotable
-public interface WorkpoolService {
-
- /* this the functional part */
- void submit(Job i);
-
- /* the time between two subsequent worker invocations */
- double getServiceTime();
-
- /* the number of ResultJob received */
- long getJobComputed();
-
- /* the time elapsed between the stream has initiated and now */
- long getElapsedTime();
-
- /* the size of the internal queue : it's not accurate */
- int estimatedQueueSize();
-
- /* the average time between two consuecutive submit */
- double getArrivalTime();
-
- void start();
-
- void stop();
-
- /*
- * this is the part needed by management. May be in future i'll refactor it
- * order to hide this part.
- */
- @OneWay
- void handleResult(Job j, boolean reuse, String string,
- CallableReferenceImpl<WorkerService> worker, boolean newJob);
-
- void addTrigger(CallableReferenceImpl<Trigger> reference);
-
- void removeTrigger();
-
- void registerManager(
- CallableReferenceImpl<WorkpoolManager> createSelfReference);
-
- /*
- * This could placed in another interface definition - think about it These
- * methods evict, and evictAll are needed when a worker finish to exist and
- * it needs to be evicted by the WorkpoolManager. In the system I have two
- * caches: 1) a domain cache, which holds the components URI 2) a
- * workerReference cache (implemented by a ConcurrentHashMap), which holds a
- * proxy to each worker. Every proxy gets built from the worker callable
- * reference. I'm thinking for placing the workerReferenceCache in a local
- * interface. Assuming that WorkpoolService and WorkpoolManager are in the
- * same JVM.
- */
- void evict(String workerURI);
-
- void evictAll();
-
- /*
- * these two are no longer needed. I leave it because if i'll have time to
- * do dynamic wiring the first one is needed. void PostWorkerName(String
- * referenceName);
- */
- void PostWorkerReference(CallableReferenceImpl<WorkerService> worker);
-
-}
diff --git a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java b/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java
deleted file mode 100644
index 52b88af42c..0000000000
--- a/java/sca-1.x-contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package workpool;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Logger;
-import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
-import org.apache.tuscany.sca.databinding.annotation.DataBinding;
-import org.osoa.sca.ComponentContext;
-import org.osoa.sca.annotations.Context;
-import org.osoa.sca.annotations.Scope;
-import org.osoa.sca.annotations.Service;
-import org.apache.tuscany.sca.databinding.job.Job;
-import org.apache.tuscany.sca.databinding.job.JobDataMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * An implementation of the Workpool service.
- */
-@Service(WorkpoolService.class)
-@Scope("COMPOSITE")
-@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
-public class WorkpoolServiceImpl implements WorkpoolService,
- WorkerServiceCallback {
-
- /* incoming job queue */
- private LinkedBlockingQueue<Job> queue = new LinkedBlockingQueue<Job>(5000);
- private CallableReferenceImpl<Trigger> trigger = null;
- private Trigger forwardResult = null;
- /* counter for job's number fetched from the queue and sent to the Worker */
- private AtomicInteger jobSent = new AtomicInteger(0);
- /* time for initHandleResult */
- private AtomicLong initHandleResult = new AtomicLong(0);
- /* time for endHandleResult */
- private AtomicLong endHandleResult = new AtomicLong(0);
- /*
- * number of job computed, this will be exposed in order to be used to
- * firing rules
- */
- private long jobComputed = 0;
- /* same as above */
- private AtomicLong elapsedTime = new AtomicLong(0);
- /* this is for comuputing averageServiceTime */
- private long times = 1;
- /* this is for computing averageArrivalTime */
- private long timesArrival = 1;
- private ReentrantLock arrivalLock = new ReentrantLock();
- private long arrivalPrevious = -1;
- // private AtomicBoolean processingStopped = new AtomicBoolean(false);
- private boolean processingStopped = false;
- // private LinkedBlockingQueue<Trigger> triggers = new
- // LinkedBlockingQueue<Trigger>();
- @Context
- protected ComponentContext workpoolContext;
- private CallableReferenceImpl<WorkpoolManager> manager;
- private long previousSubmitTime = -1;
- private boolean firstTime = true;
- private boolean first = true;
- private long start = 0;
- private long end = 0;
- private double averageServiceTime = 0;
- private double averageArrivalTime = 0;
- private int workersNo = 0;
- private final Job nullJob = new NullJob();
- /* This is useful for counting the start and end */
- private Logger log = Logger.getLogger(WorkpoolServiceImpl.class.getName());
- private ReentrantLock handleResultLock = new ReentrantLock();
- private ReentrantLock postWorkerReferenceLock = new ReentrantLock();
- private ConcurrentHashMap<String, WorkerService> cacheReference = new ConcurrentHashMap<String, WorkerService>();
- private CallableReferenceImpl<WorkpoolService> myReference;
- private String previuosURI = "";
- private long time = 0;
-
- private void computeAverageTime() {
- long actualServiceTime = 0;
- // if the processing is finished
- if (processingStopped)
- return;
-
- if (firstTime == true) {
- this.previousSubmitTime = System.currentTimeMillis();
- this.averageServiceTime = 0;
- firstTime = false;
- } else {
- actualServiceTime = System.currentTimeMillis()
- - this.previousSubmitTime;
- this.previousSubmitTime = System.currentTimeMillis();
- averageServiceTime = ((averageServiceTime * times) + actualServiceTime)
- / (times + 1);
- ++times;
- }
- }
-
- public void submit(Job j) {
- try {
- // log.info("Submit job in queue -->"+ j.getType());
- // processingStopped.set(false);
- try {
- arrivalLock.lock();
- if (this.arrivalPrevious == -1) {
- arrivalPrevious = System.currentTimeMillis();
- averageArrivalTime = 0;
- }
- double actualArrivalTime = System.currentTimeMillis()
- - arrivalPrevious;
- averageArrivalTime = ((averageArrivalTime * timesArrival) + actualArrivalTime)
- / (timesArrival + 1);
- arrivalPrevious = System.currentTimeMillis();
- ++timesArrival;
- } finally {
- arrivalLock.unlock();
- }
- queue.put(j);
- } catch (Exception e) {
- log.info("Exception in queue");
- queue.clear();
- e.printStackTrace();
- }
- }
-
- public double getArrivalTime() {
- return this.averageArrivalTime;
- }
-
- public double getServiceTime() {
- return this.averageServiceTime;
- }
-
- public void receiveResult(Job resultType, boolean reuse, String workerURI) {
-
- if (reuse) {
- queue.add(resultType);
- return;
- }
-
- computeAverageTime();
- Job job = null;
- try {
- job = queue.take();
- } catch (InterruptedException e) {
- // TODO Better exception handling --> see Exception antipattern doc
- e.printStackTrace();
- return;
- }
-
- if ((job != null) && (job.eos() == false)) {
- int nameIndex = workerURI.indexOf("/");
- String workerName = workerURI.substring(0, nameIndex - 1);
- log.info("Sending job to worker --> " + workerName);
- WorkerService worker = workpoolContext.getService(
- WorkerService.class, workerName);
- worker.compute(job);
- }
-
- JobDataMap map = ((ResultJob) resultType).getDataMap();
- if (map != null) {
- ++jobComputed;
- Object obj = map.getJobDataObject("result");
- System.out.println("Result = " + ((Double) obj).doubleValue());
- }
-
- }
-
- public void start() {
- log.info("WorkpoolServiceComponent started...");
- myReference = (CallableReferenceImpl) workpoolContext
- .createSelfReference(WorkpoolService.class, "WorkpoolService");
- myReference.getService();
- }
-
- /*
- *
- * This method is called by WorkpoolManagerImpl, when it creates a new
- * worker component in order to dispatch worker to the WorkpoolServiceImpl
- * @param CallableReferenceImpl reference - a dynamically created reference
- * from the Worker
- */
- public void PostWorkerReference(
- CallableReferenceImpl<WorkerService> reference) {
-
- try {
- long initPostWorkerReference;
- long endPostWorkerReference;
- this.postWorkerReferenceLock.lock();
-
- initPostWorkerReference = System.currentTimeMillis();
- WorkerService worker;
- worker = reference.getService();
- worker.start();
-
- ++workersNo;
- if (myReference != null) {
-
- // Job poison = new ResultJob();
- this.postWorkerReferenceLock.unlock();
- log.info("Sending null job to worker");
- worker.computeFirstTime(nullJob, myReference);
- // queue.put(poison);
- endPostWorkerReference = System.currentTimeMillis();
- System.out.println("Time PostWorker ="
- + (endPostWorkerReference - initPostWorkerReference));
- } else {
- log.info("myReference is null");
-
- }
- } catch (Exception e) {
- postWorkerReferenceLock.unlock();
- } finally {
- }
-
- }
-
- /*
- * FIXME This method currently is not used because i've not yet ready
- * dynamic wire injection
- */
-
- public void PostWorkerName(String referenceName) {
- /* TODO Do something similar to PostWorkerReference */
- }
-
- private void printComputingTime(Job j) {
-
- if (first == true) {
- first = false;
- start = System.currentTimeMillis();
- end = System.currentTimeMillis();
- } else {
- end = System.currentTimeMillis();
- System.out.println("Elapsed Time = " + (end - start));
- elapsedTime.set(end - start);
- }
- /*
- * i could use reflection or instance of (but it's a penalty kick) , or
- * an object as result, but i'd prefer a job so i've defined a
- * RESULT_JOB There're in the system three kind of jobs: RESULT_JOB,
- * NULL_JOB, DEFAULT_JOB
- */
- if ((j != null) && (j.getType() == Job.RESULT_JOB)) {
- jobComputed++;
- ResultJob result = (ResultJob) j;
- JobDataMap map = result.getDataMap();
- if (map != null) {
- Double doubleValue = (Double) map.getJobDataObject("result");
- System.out
- .println("ResultValue = " + doubleValue.doubleValue());
- }
-
- }
-
- }
-
- public void handleResult(Job resultType, boolean reuse, String workerURI,
- CallableReferenceImpl<WorkerService> worker, boolean newWorker) {
- initHandleResult.set(System.nanoTime());
- if (reuse) {
- log.info("Reusing a job..");
- queue.add(resultType);
- return;
- }
- // init job variable
- Job job;
- if (newWorker)
- System.out.println("newWorkerActivation= " + System.nanoTime());
- printComputingTime(resultType);
-
- try {
- job = queue.take();
- } catch (Exception e) {
- log.info("Exception during fetching the queue");
- e.printStackTrace();
- return;
- }
-
- try {
- // it needs to be locked because multiple threads could invoke this.
- handleResultLock.lock();
- if (previuosURI.equals("")) {
- time = System.currentTimeMillis();
- this.previuosURI = workerURI;
- } else {
- if (previuosURI.equals(workerURI))
- System.out.println("Complete ComputeTime for an item ="
- + (time - System.currentTimeMillis()));
- }
- if (job.eos()) {
- long endTime = System.currentTimeMillis();
- /* checking for EOS */
- if (processingStopped == false) {
- processingStopped = true;
- System.out.println("GOT EOS in time=" + (endTime - start));
- log.info("Stop autonomic cycle..");
- /*
- * I'm doing this because i want that in the termination i
- * would have more jobs with eos == true than workers. So
- * i'm sure that every worker removes itself from its
- * manager. I do it only one time. This is necessary because
- * i have a variable number of workers. The number of
- * workers in the system might change every time the rule
- * engine cycle gets executed.
- */
- ResultJob poison = new ResultJob();
- for (int i = 0; i < workersNo; ++i) {
- try {
-
- queue.put(poison);
-
- } catch (Exception e) {
- log.info("Cannot duplicate poison tokens");
- break;
- }
-
- }
- manager.getService().stopAutonomicCycle();
- }
- }
- computeAverageTime();
- System.out.println("AverageTime =" + averageServiceTime);
- if (job != null) {
-
- WorkerService workerService;
- /*
- * the workpool has a high reuse, i always call the same
- * component set or un superset or subset, so i cache it. When
- * the WorkpoolManager will remove an item, it removes still
- * this cache entry
- */
- if (!cacheReference.containsKey(workerURI)) {
- workerService = worker.getService();
- handleResultLock.unlock();
- cacheReference.put(workerURI, workerService);
- } else {
- handleResultLock.unlock();
- workerService = cacheReference.get(workerURI);
- }
- // it's still a penalty kick locking compute because it's going
- // to be scheduled whereas it's async.
- workerService.compute(job);
- log.info("Sent job #" + jobSent.incrementAndGet()
- + " Queue size " + queue.size());
- endHandleResult.set(System.nanoTime());
- System.out
- .println("begin:handleResult ==> end:handleResult:compute = "
- + (endHandleResult.addAndGet(-(initHandleResult
- .get())) / 1000000));
- }
- } catch (Exception e) {
- handleResultLock.unlock();
- }
- }
-
- public void evictAll() {
- cacheReference.clear();
- }
-
- public void evict(String workerURI) {
- if (cacheReference.containsKey(workerURI)) {
- cacheReference.remove(workerURI);
- }
-
- }
-
- public int estimatedQueueSize() {
- return queue.size();
- }
-
- public long getElapsedTime() {
- return elapsedTime.get();
- }
-
- public long getJobComputed() {
- return jobComputed;
- }
-
- public void registerManager(
- CallableReferenceImpl<WorkpoolManager> createSelfReference) {
- manager = createSelfReference;
-
- }
-
- public void stop() {
- // TODO Auto-generated method stub
-
- }
-
- public void addTrigger(CallableReferenceImpl<Trigger> reference) {
- this.trigger = reference;
- this.forwardResult = reference.getService();
-
- }
-
- public void removeTrigger() {
- this.trigger = null;
- this.forwardResult = null;
- }
-}