From cefeaaba712d4a50d7dee69508708773203379fb Mon Sep 17 00:00:00 2001 From: antelder Date: Tue, 21 Apr 2009 06:31:30 +0000 Subject: Move to 1.x-contrib git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@766998 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/node/DomainNode.java | 57 --- .../src/main/java/node/DomainNodeDaemon.java | 79 --- .../src/main/java/node/TestJob.java | 82 --- .../src/main/java/node/WorkpoolDaemon.java | 271 ---------- .../src/main/java/node/WorkpoolNode.java | 179 ------- .../src/main/java/node/workerRules1.drl | 13 - .../main/java/workpool/MetaComponentWorker.java | 85 ---- .../src/main/java/workpool/MyWorker.java | 46 -- .../src/main/java/workpool/NullJob.java | 43 -- .../src/main/java/workpool/ResultJob.java | 54 -- .../src/main/java/workpool/Trigger.java | 29 -- .../src/main/java/workpool/WorkerManager.java | 31 -- .../src/main/java/workpool/WorkerManagerImpl.java | 213 -------- .../src/main/java/workpool/WorkerService.java | 56 --- .../main/java/workpool/WorkerServiceCallback.java | 27 - .../src/main/java/workpool/WorkerServiceImpl.java | 171 ------- .../src/main/java/workpool/WorkpoolBean.java | 162 ------ .../main/java/workpool/WorkpoolBeanListener.java | 25 - .../src/main/java/workpool/WorkpoolEvent.java | 71 --- .../src/main/java/workpool/WorkpoolManager.java | 48 -- .../main/java/workpool/WorkpoolManagerImpl.java | 555 --------------------- .../src/main/java/workpool/WorkpoolService.java | 91 ---- .../main/java/workpool/WorkpoolServiceImpl.java | 416 --------------- 23 files changed, 2804 deletions(-) delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/DomainNode.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/TestJob.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/workerRules1.drl delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/MyWorker.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/NullJob.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/ResultJob.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/Trigger.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerService.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java delete mode 100644 branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java (limited to 'branches/sca-java-1.x/demos/workpool-distributed/src/main/java') diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/DomainNode.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/DomainNode.java deleted file mode 100644 index a278499aae..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/DomainNode.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package node; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -import org.apache.tuscany.sca.domain.SCADomain; -import org.apache.tuscany.sca.domain.SCADomainFactory; - -/** - * This server program that loads a composite to provide simple registry - * function. This server can be replaced with any registry that is appropriate - * but the components in each node that talk to the registry should be replaced - * also. - */ -public class DomainNode { - - private static String DEFAULT_DOMAIN_URI = "http://u12:8877"; - private boolean stopped = true; - - public static void main(String[] args) { - - try { - - SCADomainFactory domainFactory = SCADomainFactory.newInstance(); - SCADomain domain = domainFactory - .createSCADomain(DEFAULT_DOMAIN_URI); - - System.out.println("Domain started (press enter to shutdown)"); - System.in.read(); - // waitForever(); - domain.destroy(); - } catch (Exception e) { - e.printStackTrace(); - } - - System.out.println("Domain stopped"); - } -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java deleted file mode 100644 index 9d05761ad6..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/DomainNodeDaemon.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package node; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -import org.apache.commons.daemon.Daemon; -import org.apache.commons.daemon.DaemonContext; -import org.apache.tuscany.sca.domain.SCADomain; -import org.apache.tuscany.sca.domain.SCADomainFactory; - -/** - * This server program that loads a composite to provide simple registry - * function. This server can be replaced with any registry that is appropriate - * but the components in each node that talk to the registry should be replaced - * also. - */ -public class DomainNodeDaemon implements Daemon { - - private SCADomain domain; - private static String DEFAULT_DOMAIN_URI = "http://u12:8877"; - private boolean stopped = true; - - private synchronized void waitForever() { - while (!stopped) { - try { - wait(); - } catch (InterruptedException ex) { - stopped = true; - return; - } - } - - } - - public void destroy() { - // TODO Auto-generated method stub - - } - - public void init(DaemonContext arg0) throws Exception { - // TODO Auto-generated method stub - - } - - public void start() throws Exception { - - SCADomainFactory domainFactory = SCADomainFactory.newInstance(); - domain = domainFactory.createSCADomain(DEFAULT_DOMAIN_URI); - - System.out.println("Domain started (press enter to shutdown)"); - waitForever(); - - } - - public void stop() throws Exception { - // TODO Auto-generated method stub - Thread.currentThread().interrupt(); - domain.destroy(); - } -} \ No newline at end of file diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/TestJob.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/TestJob.java deleted file mode 100644 index f48e647bd6..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/TestJob.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package node; - -import org.apache.tuscany.sca.databinding.job.Job; -import org.apache.tuscany.sca.databinding.job.JobDataMap; -import org.apache.tuscany.sca.databinding.job.JobExecutionContext; -import org.apache.tuscany.sca.databinding.job.RemoteJob; - -import com.thoughtworks.xstream.XStream; -import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; - -public class TestJob extends RemoteJob implements java.io.Serializable { - private boolean EOS = false; - private Double value; - - public TestJob(Double x, long iterations, int[] items) { - JobDataMap map = new JobDataMap(); - map.addJobData("value", x); - map.addJobData("iterations", iterations); - map.addJobData("items", items); - context.setJobData(map); - } - - public TestJob(Double i, boolean eos) { - value = i; - this.EOS = eos; - } - - public TestJob(String jsonData) { - JobExecutionContext ctxt = new JobExecutionContext(); - ctxt.storeJSONData(jsonData); - } - - public int getType() { - return Job.REGULAR_JOB; - } - - public void setEOS() { - EOS = true; - } - - public boolean eos() { - return EOS; - } - - @Override - public Double compute(JobExecutionContext context) { - JobDataMap contextMap = context.getJobData(); - Long iterations = (Long) contextMap.getJobDataObject("iterations"); - Double value = (Double) contextMap.getJobDataObject("value"); - double x = value.doubleValue(); - System.out.println("Computing sinx for " + value + " for " - + iterations.intValue() + " times"); - long computing_start = System.currentTimeMillis(); - for (long i = 0; i < iterations.longValue(); ++i) { - x = Math.sin(x); - } - long computing_end = System.currentTimeMillis(); - System.out.println("Computing time= " - + (computing_end - computing_start)); - System.out.println("Send result = " + x); - return new Double(x); - } - -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java deleted file mode 100644 index 1f2a4d1f9a..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package node; - -import java.io.BufferedInputStream; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.List; - -import javax.xml.namespace.QName; - -import org.apache.commons.daemon.Daemon; -import org.apache.commons.daemon.DaemonContext; -import org.apache.commons.daemon.DaemonController; -import org.apache.tuscany.sca.assembly.Composite; -import org.apache.tuscany.sca.assembly.Service; -import org.apache.tuscany.sca.contribution.Contribution; -import org.apache.tuscany.sca.contribution.DeployedArtifact; -import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; -import org.apache.tuscany.sca.domain.SCADomain; -import org.apache.tuscany.sca.node.NodeException; -import org.apache.tuscany.sca.node.NodeManagerInitService; -import org.apache.tuscany.sca.node.SCANode; -import org.apache.tuscany.sca.node.SCANodeFactory; -import org.apache.tuscany.sca.node.impl.SCANodeImpl; -import org.apache.tuscany.sca.node.management.SCANodeManagerInitService; - -import java.net.URI; - -import workpool.WorkerManager; -import workpool.WorkerManagerImpl; -import workpool.WorkpoolManager; -import workpool.WorkpoolService; -import workpool.WorkpoolServiceImpl; - -/** - * This client program shows how to run a distributed SCA node. In this case a - * calculator node has been constructed specifically for running the calculator - * composite. Internally it creates a representation of a node and associates a - * distributed domain with the node. This separation is made different - * implementations of the distributed domain can be provided. - */ -public class WorkpoolDaemon implements Daemon, Runnable { - private String domainName; - private String nodeName; - private long iterations; - private long jobsNo; - private long workerNo; - private SCANode node; - private boolean stopped = false; - private DaemonController controller = null; - private Thread thread = null; - private String ruleFile = "workerRules.drl"; - - /* - * public static void main(String[] args) throws Exception { - * // Check that the correct arguments have been provided if (null == args || - * args.length < 4) { System.err.println("Usage: java WorkpoolNode - * domainname nodename iterTest workerNo"); System.exit(1); } - * - * try { String domainName = args[0]; String nodeName = args[1]; long - * iterations = Long.parseLong(args[2]); long jobsNo = - * Long.parseLong(args[3]); long workerNo = Long.parseLong(args[4]); - * ClassLoader cl = WorkpoolDaemon.class.getClassLoader(); - * - * SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); node = - * nodeFactory.createSCANode(null, domainName); - * node.addContribution(nodeName, cl.getResource(nodeName + "/")); - * node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); - * node.start(); // nodeA is the head node and runs some tests while all - * other nodes // simply listen for incoming messages - * - * FileReader rules = new FileReader("workerRules.drl"); StringBuffer buffer = - * new StringBuffer(); - * - * BufferedReader br = new BufferedReader(rules); String ruleString; do { - * ruleString = br.readLine(); if (ruleString!=null) { - * buffer.append(ruleString);} } while (ruleString!=null); - * - * if ( nodeName.equals("nodeA") ) { // do some application stuff - * WorkpoolService workpoolService = - * node.getDomain().getService(WorkpoolService.class, - * "WorkpoolServiceComponent"); workpoolService.start(); - * NodeManagerInitService nodeInit = - * node.getDomain().getService(NodeManagerInitService.class, - * "WorkpoolManagerComponent/NodeManagerInitService"); - * nodeInit.setNode(node); WorkpoolManager workpoolManager = - * node.getDomain().getService(WorkpoolManager.class, - * "WorkpoolManagerComponent/WorkpoolManager"); - * workpoolManager.setWorkpoolReference(node.getDomain().getServiceReference(WorkpoolService.class, - * "WorkpoolServiceComponent")); - * workpoolManager.acceptRules(buffer.toString()); workpoolManager.start(); - * int items[] = {3,4,5,6,3,6,3,5,9,5,6}; - * - * double x = 398349; for (int i = 0; i < jobsNo; ++i) - * workpoolService.submit(new TestJob(x,iterations,items)); - * - * TestJob j = new TestJob(-1.0,true); for (int i = 0; i < workerNo+1; ++i){ - * j.setEOS(); workpoolService.submit(j); } } try { if - * (nodeName.equals("nodeB")) { NodeManagerInitService serviceNodeB = - * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeBComponent/NodeManagerInitService"); - * serviceNodeB.setNode(node); } if (nodeName.equals("nodeC")) { - * NodeManagerInitService workerManagerC = - * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeCComponent/NodeManagerInitService"); - * workerManagerC.setNode(node); } if (nodeName.equals("nodeD")) { - * NodeManagerInitService workerManagerD = - * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeDComponent/NodeManagerInitService"); - * workerManagerD.setNode(node); } if (nodeName.equals("nodeE")) { - * NodeManagerInitService workerManagerE = - * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeEComponent/NodeManagerInitService"); - * workerManagerE.setNode(node); } - * - * System.out.println("Node started (press enter to shutdown)"); - * System.in.read(); } catch (IOException e) { e.printStackTrace(); } // - * stop the node and all the domains in it node.stop(); node.destroy(); - * System.exit(0); } catch(Exception ex) { System.err.println("Exception in - * node - " + ex.getMessage()); ex.printStackTrace(System.err); } } - */ - public void destroy() { - // TODO Auto-generated method stub - - } - - public void init(DaemonContext arg0) throws Exception { - String[] args = arg0.getArguments(); - domainName = args[0]; - nodeName = args[1]; - iterations = Long.parseLong(args[2]); - jobsNo = Long.parseLong(args[3]); - workerNo = Long.parseLong(args[4]); - if (args.length == 6) { - ruleFile = args[5]; - } - this.controller = arg0.getController(); - // this.thread=new Thread(this); - } - - public void start() throws Exception { - - ClassLoader cl = WorkpoolDaemon.class.getClassLoader(); - - SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); - node = nodeFactory.createSCANode(null, domainName); - node.addContribution(nodeName, cl.getResource(nodeName + "/")); - node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); - node.start(); - // nodeA is the head node and runs some tests while all other nodes - // simply listen for incoming messages - - FileReader rules = new FileReader(ruleFile); - StringBuffer buffer = new StringBuffer(); - - BufferedReader br = new BufferedReader(rules); - String ruleString; - do { - ruleString = br.readLine(); - if (ruleString != null) { - buffer.append(ruleString + "\n"); - } - } while (ruleString != null); - - if (nodeName.equals("nodeA")) { - // do some application stuff - WorkpoolService workpoolService = node.getDomain().getService( - WorkpoolService.class, "WorkpoolServiceComponent"); - workpoolService.start(); - SCANodeManagerInitService nodeInit = node.getDomain().getService( - SCANodeManagerInitService.class, - "WorkpoolManagerComponent/NodeManagerInitService"); - nodeInit.setNode(node); - WorkpoolManager workpoolManager = node.getDomain().getService( - WorkpoolManager.class, - "WorkpoolManagerComponent/WorkpoolManager"); - workpoolManager.setWorkpoolReference(node.getDomain() - .getServiceReference(WorkpoolService.class, - "WorkpoolServiceComponent")); - workpoolManager.acceptRules(buffer.toString()); - workpoolManager.start(); - - int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 }; - - double x = 398349; - for (int i = 0; i < jobsNo; ++i) { - workpoolService.submit(new TestJob(x, iterations, items)); - } - TestJob j = new TestJob(-1.0, true); - for (int i = 0; i < workerNo + 1; ++i) { - j.setEOS(); - workpoolService.submit(j); - } - - } - if (nodeName.equals("nodeB")) { - SCANodeManagerInitService workerManagerNodeB = node - .getDomain() - .getService(SCANodeManagerInitService.class, - "WorkerManagerNodeBComponent/NodeManagerInitService"); - workerManagerNodeB.setNode(node); - } - - if (nodeName.equals("nodeC")) { - SCANodeManagerInitService workerManagerNodeC = node - .getDomain() - .getService(SCANodeManagerInitService.class, - "WorkerManagerNodeCComponent/NodeManagerInitService"); - workerManagerNodeC.setNode(node); - } - - if (nodeName.equals("nodeD")) { - SCANodeManagerInitService workerManagerNodeD = node - .getDomain() - .getService(SCANodeManagerInitService.class, - "WorkerManagerNodeDComponent/NodeManagerInitService"); - workerManagerNodeD.setNode(node); - } - - if (nodeName.equals("nodeE")) { - SCANodeManagerInitService workerManagerNodeE = node - .getDomain() - .getService(SCANodeManagerInitService.class, - "WorkerManagerNodeEComponent/NodeManagerInitService"); - workerManagerNodeE.setNode(node); - } - - this.waitForever(); - // this.thread.start(); - } - - public void stop() throws Exception { - Thread.currentThread().interrupt(); - // thread.interrupt(); - node.stop(); - node.destroy(); - } - - private synchronized void waitForever() { - while (!stopped) { - try { - wait(); - } catch (InterruptedException ex) { - stopped = true; - return; - } - } - } - - public void run() { - waitForever(); - } -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java deleted file mode 100644 index 86557548af..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package node; - -import java.io.BufferedInputStream; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.List; - -import javax.xml.namespace.QName; - -import org.apache.tuscany.sca.assembly.Composite; -import org.apache.tuscany.sca.assembly.Service; -import org.apache.tuscany.sca.contribution.Contribution; -import org.apache.tuscany.sca.contribution.DeployedArtifact; -import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; -import org.apache.tuscany.sca.domain.SCADomain; -import org.apache.tuscany.sca.node.NodeManagerInitService; -import org.apache.tuscany.sca.node.SCANode; -import org.apache.tuscany.sca.node.SCANodeFactory; -import org.apache.tuscany.sca.node.impl.SCANodeImpl; -import java.net.URI; - -import workpool.WorkerManager; -import workpool.WorkerManagerImpl; -import workpool.WorkpoolManager; -import workpool.WorkpoolService; -import workpool.WorkpoolServiceImpl; - -/** - * This client program shows how to run a distributed SCA node. In this case a - * calculator node has been constructed specifically for running the calculator - * composite. Internally it creates a representation of a node and associates a - * distributed domain with the node. This separation is made different - * implementations of the distributed domain can be provided. - */ -public class WorkpoolNode { - - public static void main(String[] args) throws Exception { - - // Check that the correct arguments have been provided - if (null == args || args.length < 4) { - System.err - .println("Useage: java WorkpoolNode domainname nodename iterTest workerNo"); - System.exit(1); - } - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); - String domainName = args[0]; - String nodeName = args[1]; - long iterations = Long.parseLong(args[2]); - long jobsNo = Long.parseLong(args[3]); - long workerNo = Long.parseLong(args[4]); - ClassLoader cl = WorkpoolNode.class.getClassLoader(); - - SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); - SCANode node = nodeFactory.createSCANode(null, domainName); - node.addContribution(nodeName, cl.getResource(nodeName + "/")); - node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); - node.start(); - // nodeA is the head node and runs some tests while all other nodes - // simply listen for incoming messages - - FileReader rules = new FileReader("workerRules.drl"); - StringBuffer buffer = new StringBuffer(); - - BufferedReader br = new BufferedReader(rules); - String ruleString; - do { - ruleString = br.readLine(); - if (ruleString != null) { - buffer.append(ruleString + "\n"); - } - } while (ruleString != null); - - if (nodeName.equals("nodeA")) { - // do some application stuff - WorkpoolService workpoolService = node.getDomain().getService( - WorkpoolService.class, "WorkpoolServiceComponent"); - workpoolService.start(); - NodeManagerInitService nodeInit = node.getDomain().getService( - NodeManagerInitService.class, - "WorkpoolManagerComponent/NodeManagerInitService"); - nodeInit.setNode(node); - WorkpoolManager workpoolManager = node.getDomain().getService( - WorkpoolManager.class, - "WorkpoolManagerComponent/WorkpoolManager"); - workpoolManager.setWorkpoolReference(node.getDomain() - .getServiceReference(WorkpoolService.class, - "WorkpoolServiceComponent")); - workpoolManager.setCycleTime(8000); - workpoolManager.acceptRules(buffer.toString()); - workpoolManager.start(); - int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 }; - - double x = 398349; - - for (int i = 0; i < jobsNo; ++i) - workpoolService.submit(new TestJob(x, iterations, items)); - - TestJob j = new TestJob(-1.0, true); - for (int i = 0; i < workerNo + 1; ++i) { - j.setEOS(); - workpoolService.submit(j); - } - - } - try { - if (nodeName.equals("nodeB")) { - NodeManagerInitService serviceNodeB = node - .getDomain() - .getService(NodeManagerInitService.class, - "WorkerManagerNodeBComponent/NodeManagerInitService"); - serviceNodeB.setNode(node); - } - if (nodeName.equals("nodeC")) { - NodeManagerInitService workerManagerC = node - .getDomain() - .getService(NodeManagerInitService.class, - "WorkerManagerNodeCComponent/NodeManagerInitService"); - workerManagerC.setNode(node); - } - if (nodeName.equals("nodeD")) { - NodeManagerInitService workerManagerD = node - .getDomain() - .getService(NodeManagerInitService.class, - "WorkerManagerNodeDComponent/NodeManagerInitService"); - workerManagerD.setNode(node); - } - if (nodeName.equals("nodeE")) { - NodeManagerInitService workerManagerE = node - .getDomain() - .getService(NodeManagerInitService.class, - "WorkerManagerNodeEComponent/NodeManagerInitService"); - workerManagerE.setNode(node); - } - - System.out.println("Node started (press enter to shutdown)"); - String buff; - for (;;) { - try { - buff = in.readLine(); - if (buff == null) - break; - System.out.print(in.readLine()); - } catch (IOException ex) { - break; // Exit thread. - } - } - // stop the node and all the domains in it - node.stop(); - node.destroy(); - System.exit(0); - } catch (Exception ex) { - System.err.println("Exception in node - " + ex.getMessage()); - ex.printStackTrace(System.err); - } - } -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/workerRules1.drl b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/workerRules1.drl deleted file mode 100644 index 9c5a5d1b7f..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/node/workerRules1.drl +++ /dev/null @@ -1,13 +0,0 @@ -package workpool -import workpool.*; -rule "WorkerAdder1" - when - $workerBean: WorkpoolBean(singleAction == false && (jobComputed > 500)) - then - $workerBean.setSingleAction() - $workerBean.addWorkerToNode("nodeB") - $workerBean.addWorkerToNode("nodeC") - $workerBean.addWorkerToNode("nodeD") - $workerBean.addWorkerToNode("nodeE") -end - \ No newline at end of file diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java deleted file mode 100644 index cdd0f30b34..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import java.io.StringReader; -import java.net.URI; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.util.logging.Logger; - -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamReader; - -import org.apache.tuscany.sca.assembly.MetaComponent; -import org.apache.tuscany.sca.assembly.impl.DefaultMetaComponent; - -public class MetaComponentWorker extends DefaultMetaComponent { - - private SecureRandom prng; - private String componentName; - private String scdl; - private String javaClass; - private boolean loadedFromString = false; - private Logger log = Logger.getLogger(MetaComponentWorker.class.getName()); - - public MetaComponentWorker() { - componentName = "WorkerComponent" - + java.util.UUID.randomUUID().toString(); - } - - public void setWorkerName(String componentName) { - this.componentName = componentName; - } - - public void setWorkerClass(String javaClass) { - this.javaClass = javaClass; - } - - private String generateSCDL() { - StringBuffer buffer = new StringBuffer(512); - buffer - .append("\n"); - buffer.append(""); - buffer.append(""); - buffer.append(this.componentName); - buffer.append("\n"); - return buffer.toString(); - } - - @Override - public XMLStreamReader build() throws Exception { - XMLInputFactory factory = XMLInputFactory.newInstance(); - if (!loadedFromString) - scdl = generateSCDL(); - return factory.createXMLStreamReader(new StringReader(scdl)); - - } - - public String getName() { - - return componentName; - } - -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/MyWorker.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/MyWorker.java deleted file mode 100644 index c45696e3cf..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/MyWorker.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.apache.tuscany.sca.core.context.CallableReferenceImpl; -import org.apache.tuscany.sca.databinding.job.Job; -import org.apache.tuscany.sca.databinding.job.JobDataMap; -import org.apache.tuscany.sca.databinding.job.JobExecutionContext; -import org.apache.tuscany.sca.databinding.job.RemoteJob; -import org.osoa.sca.annotations.Scope; - -@Scope("COMPOSITE") -public class MyWorker extends WorkerServiceImpl { - private static int resultcount = 0; - - @Override - public ResultJob computeTask(Job job) { - - RemoteJob remoteJob = (RemoteJob) job; - System.out.println("Computing the job"); - JobExecutionContext context = remoteJob.getContext(); - ResultJob resultJob = new ResultJob(); - JobDataMap resultMap = new JobDataMap(); - resultMap.addJobData("result", remoteJob.compute(context)); - resultJob.setJobDataMap(resultMap); - System.out.println("Count result = " + (++resultcount)); - return resultJob; - } - -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/NullJob.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/NullJob.java deleted file mode 100644 index fb930adf2e..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/NullJob.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.apache.tuscany.sca.databinding.job.Job; -import org.apache.tuscany.sca.databinding.job.JobDataMap; - -public class NullJob implements Job, java.io.Serializable { - - public Object compute(Object arg0) { - // TODO Auto-generated method stub - return null; - } - - public JobDataMap getDataMap() { - return null; - } - - public boolean eos() { - return false; - } - - public int getType() { - return Job.NULL_JOB; - } - -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/ResultJob.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/ResultJob.java deleted file mode 100644 index e04411668b..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/ResultJob.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.apache.tuscany.sca.databinding.job.Job; -import org.apache.tuscany.sca.databinding.job.JobDataMap; -import org.apache.tuscany.sca.databinding.job.JobExecutionContext; -import org.apache.tuscany.sca.databinding.job.RemoteJob; - -public class ResultJob extends RemoteJob implements - java.io.Serializable { - private JobDataMap map; - - public JobDataMap getDataMap() { - return map; - } - - public void setJobDataMap(JobDataMap map) { - this.map = map; - } - - public boolean eos() { - // TODO Auto-generated method stub - return true; - } - - public int getType() { - // TODO Auto-generated method stub - return Job.RESULT_JOB; - } - - @Override - public Object compute(JobExecutionContext v) { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/Trigger.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/Trigger.java deleted file mode 100644 index 469675b19b..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/Trigger.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.apache.tuscany.sca.databinding.annotation.DataBinding; -import org.apache.tuscany.sca.databinding.job.Job; -import org.osoa.sca.annotations.Remotable; - -@Remotable -@DataBinding("org.apache.tuscany.sca.databinding.job.Job") -public interface Trigger { - void handleEvent(T c); -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java deleted file mode 100644 index 520203e190..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; -import org.osoa.sca.annotations.Remotable; -import org.osoa.sca.CallableReference; -@Remotable -public interface WorkerManager { - CallableReference addWorker(); - boolean removeWorker(String workerName); - boolean removeWorkers(int k); - boolean removeAllWorkers(); - double getNodeLoad(); - int activeWorkers(); - void start(); -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java deleted file mode 100644 index d4337cad2f..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.List; -import java.util.logging.Logger; - -import org.apache.tuscany.sca.assembly.Composite; -import org.apache.tuscany.sca.contribution.Contribution; -import org.apache.tuscany.sca.contribution.DeployedArtifact; -import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; -import org.osoa.sca.CallableReference; -import org.apache.tuscany.sca.node.management.SCANodeManagerInitService; -import org.apache.tuscany.sca.node.SCANode; -import org.apache.tuscany.sca.node.impl.SCANodeImpl; -import org.apache.tuscany.sca.runtime.RuntimeComponent; -import org.osoa.sca.ComponentContext; -import org.osoa.sca.annotations.Context; -import org.osoa.sca.annotations.Property; -import org.osoa.sca.annotations.Scope; -import org.osoa.sca.annotations.Service; -import java.util.LinkedList; -import java.util.ArrayList; - -@Scope("COMPOSITE") -@Service(interfaces = { SCANodeManagerInitService.class, WorkerManager.class }) -public class WorkerManagerImpl implements WorkerManager, SCANodeManagerInitService { - private Logger log = Logger.getLogger(WorkerManagerImpl.class.getName()); - private LinkedList> activeWorkers = new LinkedList>(); - private List workerComponentNames = new ArrayList(); - private SCANodeImpl node; - @Property - protected String nodeName; - @Property - protected String compositeName; - @Property - protected String workerClass; - @Context - protected ComponentContext context; - private double loadAverage; - - /* This method is used to find a composite inside all deployed artifacts */ - private Composite findComposite(List artifacts) { - for (Composite fact : artifacts) { - log.info("Searching in a contribution deployed artifacts -" - + compositeName); - Composite augmented = (Composite) fact; - // found - if (augmented.getURI().equals(compositeName)) { - log.info("Found composite..." + compositeName); - return augmented; - } - } - } - return null; - } - - public CallableReference addWorker() { - log.info("Adding a new worker call.."); - long addWorkerStartTime = System.nanoTime(); - ContributionServiceImpl cServiceImpl = (ContributionServiceImpl) node.getContributionService(); - Contribution contribution = cServiceImpl.getContribution(nodeName); - List artifacts = contribution.getDeployables(); - CallableReference workerReference = null; - CallableReference ref = null; - log.info("Creating a MetaComponentWorker.."); - MetaComponentWorker mcw = new MetaComponentWorker(); - boolean found = false; - mcw.setWorkerClass(workerClass); - Composite augmented = findComposite(artifacts); - try { - if (augmented != null) { - long startCreation = System.nanoTime(); - node.addComponentToComposite(mcw, contribution.getURI(), - augmented.getURI()); - System.out.println("addComponentToComposite time = " - + (System.nanoTime() - startCreation)); - RuntimeComponent workerComponent = (RuntimeComponent) node - .getComponent(mcw.getName()); - if (workerComponent != null) { - ref = (CallableReference) workerComponent - .getComponentContext().createSelfReference( - WorkerService.class); - ref.getService().start(); - activeWorkers.addLast(ref); - workerComponentNames.add(mcw.getName()); - CallableReference manager = (CallableReference) context - .createSelfReference(WorkerManager.class, - "WorkerManager"); - ref.getService().registerManager(manager); - return ref; - } - } else { - log.info("Workpool composite not found!"); - } - } catch (Exception e) { - log.info("Exception activation"); - e.printStackTrace(); - } - ; - System.out.println("Component Creation Time =" - + (System.nanoTime() - addWorkerStartTime)); - return ref; - } - - public boolean removeAllWorkers() { - for (CallableReference callable : activeWorkers) { - callable.getService().stop(); - } - return true; - } - - public boolean removeWorker() { - CallableReference callable = activeWorkers - .removeLast(); - callable.getService().stop(); - return true; - } - - public boolean removeWorkers(int k) { - if (k >= activeWorkers.size()) - return false; - for (int i = 0; i < k; ++i) { - if (!removeWorker()) - return false; - } - return true; - } - - public void setNode(SCANode node) { - this.node = (SCANodeImpl) node; - - } - - public double getNodeLoad() { - /* - * FIXME [jo] this works only on Linux To be replaced with an JNI - * extension - */ - RandomAccessFile statfile; - - this.loadAverage = 1.0; - // load = 0; - int NoProcessors = 0; - String cpuLine = null; - try { - NoProcessors = Runtime.getRuntime().availableProcessors(); - if (NoProcessors > 1) - this.loadAverage = 1 / (1.0 * NoProcessors); - statfile = new RandomAccessFile("/proc/loadavg", "r"); - try { - statfile.seek(0); - cpuLine = statfile.readLine(); - - } catch (IOException e) { - // FIX ME: Better exception handling. - e.printStackTrace(); - } - } catch (FileNotFoundException e) { - e.printStackTrace(); - } catch (NumberFormatException e) { - e.printStackTrace(); - } - double min1; - if (cpuLine != null) { - java.util.StringTokenizer st = new java.util.StringTokenizer( - cpuLine, " "); - min1 = Double.parseDouble(st.nextToken()); - } else - min1 = 0; - - return min1 * this.loadAverage; - } - - public int activeWorkers() { - return activeWorkers.size(); - } - - public boolean removeWorker(String workerName) { - RuntimeComponent workerComponent = (RuntimeComponent) node - .getComponent(workerName); - if (workerComponent != null) { - log.info("Removing component " + workerName); - node.removeComponentFromComposite(nodeName, "Workpool.composite", - workerName); - return true; - } - return false; - } - - public void start() { - // do nothing for now. - } -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerService.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerService.java deleted file mode 100644 index 37b7ea227a..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerService.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.osoa.sca.ServiceReference; -import org.osoa.sca.annotations.Callback; -import org.osoa.sca.annotations.Remotable; -import org.osoa.sca.annotations.OneWay; -import org.apache.tuscany.sca.core.context.CallableReferenceImpl; -import org.apache.tuscany.sca.databinding.annotation.DataBinding; -import org.apache.tuscany.sca.databinding.job.Job; - -/** - * The interface for the multiply service - */ -@Remotable -@Callback(WorkerServiceCallback.class) -@DataBinding("org.apache.tuscany.sca.databinding.job.Job") -public interface WorkerService { - @OneWay - void compute(Job j); - - void start(); - - void stop(); - - // void addJobCompleteHandler(String triggerName, - // CallableReferenceImpl handle); - // void removeJobCompleteHandler(String triggerName); - /* The worker manager */ - void registerManager(CallableReferenceImpl wm); - - void registerSender(CallableReferenceImpl sender); - - // void init(Job nullJob); - @OneWay - void computeFirstTime(Job nullJob, - CallableReferenceImpl myReference); - -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java deleted file mode 100644 index 6fb1278e5e..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.apache.tuscany.sca.databinding.job.Job; -import org.osoa.sca.annotations.Remotable; - -@Remotable -public interface WorkerServiceCallback { - void receiveResult(Job resultType, boolean reuse, String workerName); -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java deleted file mode 100644 index 2c9bf5ea48..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.osoa.sca.ComponentContext; -import org.osoa.sca.RequestContext; -import org.osoa.sca.ServiceReference; -import org.osoa.sca.annotations.Callback; -import org.osoa.sca.annotations.Context; -import org.osoa.sca.annotations.Property; -import org.osoa.sca.annotations.Scope; -import org.osoa.sca.annotations.Service; -import org.apache.tuscany.sca.core.context.CallableReferenceImpl; -import org.apache.tuscany.sca.databinding.annotation.DataBinding; -import org.apache.tuscany.sca.databinding.job.Job; - -import java.util.HashMap; -import java.util.Map; -import java.util.logging.*; - -/** - * An implementation of the worker service. - */ -@Service(WorkerService.class) -@DataBinding("org.apache.tuscany.sca.databinding.job.Job") -@Scope("COMPOSITE") -public abstract class WorkerServiceImpl implements WorkerService { - private Logger log = Logger.getLogger(this.getClass().getName()); - private WorkerServiceCallback workerServiceCallback; - @Context - protected ComponentContext workerContext; - @Context - protected RequestContext requestContext; - @Property - protected String workerName; - private CallableReferenceImpl managerReference = null; - - /* TODO add the triggers, but before ask */ - // protected Map triggers = new HashMap(); - public abstract ResultJob computeTask(Job job); - - private boolean stopped = false; - private CallableReferenceImpl serviceRef; - private CallableReferenceImpl senderService; - private WorkpoolService wp = null; - private WorkerManager manager = null; - - public void start() { - log.info("Starting worker..."); - stopped = false; - serviceRef = (CallableReferenceImpl) workerContext - .createSelfReference(WorkerService.class); - - } - - public void init(CallableReferenceImpl sender, Job nullJob) { - compute(nullJob); - } - - public void stop() { - stopped = true; - } - - @Callback - public void setWorkerServiceCallback( - WorkerServiceCallback workerServiceCallback) { - log.info("Setting worker callback"); - this.workerServiceCallback = workerServiceCallback; - } - - public void computeFirstTime(Job nullJob, - CallableReferenceImpl sender) { - senderService = sender; - wp = sender.getService(); - workWithCallable(nullJob); - } - - public void registerManager(CallableReferenceImpl wm) { - managerReference = wm; - manager = managerReference.getService(); - - } - - public void registerSender(CallableReferenceImpl sender) { - log.info("Registering sender.."); - senderService = sender; - wp = sender.getService(); - } - - private void workWithInjection(Job j) { - log.info("Worker has received job"); - if (stopped) { - workerServiceCallback - .receiveResult(j, true, workerContext.getURI()); - if (managerReference != null) - manager.removeWorker(workerContext.getURI()); - } else if (j.eos()) { - if (managerReference != null) - manager.removeWorker(workerContext.getURI()); - } - if (j instanceof NullJob) { - workerServiceCallback.receiveResult(j, false, workerContext - .getURI()); - } else { - workerServiceCallback.receiveResult(computeTask(j), false, - workerContext.getURI()); - } - } - - private void workWithCallable(Job j) { - log.info("Worker " + workerContext.getURI() - + " has received job with eos --> " + j.eos()); - if (stopped) { - wp.handleResult(j, true, workerContext.getURI(), serviceRef, false); - return; - } - if (j.eos()) { - log.info("Got poison token..."); - if (managerReference != null) { - log.info("Removing component " + workerContext.getURI()); - manager.removeWorker(workerContext.getURI()); - - } - return; - } - if (j.getType() != Job.NULL_JOB) { - wp.handleResult(computeTask(j), false, workerContext.getURI(), - serviceRef, false); - } else { - log.info("Got a null job"); - wp.handleResult(j, false, workerContext.getURI(), serviceRef, true); - } - } - - public void compute(Job j) { - - if (senderService != null) { - log.info("Computing job using callable reference method"); - workWithCallable(j); - - } else { - log.info("Computing job using reference injection method"); - workWithInjection(j); - - } - } - /* - * public void addJobCompleteHandler(String triggerName, - * CallableReferenceImpl handle) { if - * (!triggers.containsKey(triggerName)) { triggers.put(triggerName, - * handle.getService()); } } public void removeJobCompleteHandler(String - * triggerName) { if (!triggers.containsKey(triggerName)) { - * triggers.remove(triggerName); } } - */ -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java deleted file mode 100644 index 80c093ff1c..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import java.beans.*; -import java.util.Vector; -import java.util.logging.*; - -public class WorkpoolBean { - private Vector listeners = new Vector(); - double loadAverage = 0; - int nodeNumbers = 0; - int workers = 0; - int estimedQueueSize = 0; - double averageServiceTime = 0; - double averageArrivalTime = 0; - double usageFactor = 0; - private final PropertyChangeSupport changes = new PropertyChangeSupport( - this); - long jobComputed = 0; - boolean singleAction = false; - private Logger log = Logger.getLogger(WorkpoolBean.class.getName()); - - public void setNodeNumbers(int n) { - this.nodeNumbers = n; - } - - public void setWorkers(int w) { - this.workers = w; - } - - public void setLoadAverage(double loadAverage) { - this.loadAverage = loadAverage; - } - - public void setAverageServiceTime(double service) { - this.averageServiceTime = service; - } - - public void setAverageArrivalTime(double service) { - this.averageArrivalTime = service; - } - - public double getAverageArrivalTime() { - return this.averageArrivalTime; - } - - public double getUtilizationFactor() { - return usageFactor; - } - - public void setUsageFactor() { - usageFactor = averageServiceTime / averageArrivalTime; - } - - public void setEstimedQueueSize(int size) { - estimedQueueSize = size; - } - - public int getEstimedQueueSize() { - return estimedQueueSize; - } - - public double getLoadAverage() { - return this.loadAverage; - } - - public int getWorkers() { - return this.workers; - } - - public int getNodeNumbers() { - return this.nodeNumbers; - } - - public double getAverageServiceTime() { - return this.averageServiceTime; - } - - public void addPropertyChangeListener(final PropertyChangeListener l) { - this.changes.addPropertyChangeListener(l); - } - - public void removePropertyChangeListener(final PropertyChangeListener l) { - this.changes.removePropertyChangeListener(l); - } - - private synchronized void fireWorkpoolEvent(WorkpoolEvent ev) { - for (WorkpoolBeanListener l : listeners) { - l.handleEvent(new WorkpoolEvent(ev)); - } - } - - public void addWorkersToNode(int k, String nodeName) { - log.info("Adding a worker to node " + nodeName); - WorkpoolEvent ev = new WorkpoolEvent(this, - WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER, k, nodeName); - fireWorkpoolEvent(ev); - } - - public void addWorkerToNode(String nodeName) { - log.info("Adding a worker to node " + nodeName); - WorkpoolEvent ev = new WorkpoolEvent(this, - WorkpoolEvent.SINGLE_ADD_WORKER, 1, nodeName); - fireWorkpoolEvent(ev); - } - - public void removeWorkersToNode(int k, String nodeName) { - log.info("Removing a worker to node " + nodeName); - WorkpoolEvent ev = new WorkpoolEvent(this, - WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER, k, nodeName); - fireWorkpoolEvent(ev); - } - - public void removeWorkerToNode(String nodeName) { - log.info("Removing a worker to node " + nodeName); - WorkpoolEvent ev = new WorkpoolEvent(this, - WorkpoolEvent.SINGLE_REMOVE_WORKER, 1, nodeName); - fireWorkpoolEvent(ev); - } - - public synchronized void addListener(WorkpoolBeanListener l) { - this.listeners.add(l); - } - - public synchronized void removeListener(WorkpoolBeanListener l) { - this.listeners.remove(l); - } - - public void setJobComputed(long jobComputed) { - this.jobComputed = jobComputed; - - } - - public void setSingleAction() { - singleAction = true; - } - - public boolean getSingleAction() { - return singleAction; - } - - public long getJobComputed() { - return this.jobComputed; - } -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java deleted file mode 100644 index 0ecc223fed..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import java.util.EventListener; - -public interface WorkpoolBeanListener extends EventListener { - public void handleEvent(WorkpoolEvent ev); -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java deleted file mode 100644 index 0bdc3671d5..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import java.util.EventObject; - -public class WorkpoolEvent extends EventObject { - - private static final long serialVersionUID = -1273928009411948768L; - - public WorkpoolEvent(Object source) { - super(source); - } - - public WorkpoolEvent(WorkpoolEvent ev) { - super(ev.source); - type = ev.type; - noWorker = ev.noWorker; - nodeName = ev.nodeName; - } - - public WorkpoolEvent(Object source, int typeEv, int worker) { - super(source); - type = typeEv; - noWorker = worker; - nodeName = ""; - } - - public WorkpoolEvent(Object source, int typeEv, int worker, String nodeName) { - super(source); - type = typeEv; - noWorker = worker; - this.nodeName = nodeName; - } - - public String getNodeName() { - return nodeName; - } - - public int getType() { - return type; - } - - public int workers() { - return noWorker; - } - - private int type; - private int noWorker; - private String nodeName; - public static final int EVENT_MULTIPLE_ADD_WORKER = 0; - public static final int EVENT_MULTIPLE_REMOVE_WORKER = 1; - public static final int SINGLE_REMOVE_WORKER = 2; - public static final int SINGLE_ADD_WORKER = 3; -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java deleted file mode 100644 index 6520954bdd..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.osoa.sca.ServiceReference; -import org.osoa.sca.annotations.OneWay; -import org.osoa.sca.annotations.Remotable; - -@Remotable -public interface WorkpoolManager { - /* - * @param String rules This are the autonomic rules. The format is the Java - * Drools .drl file. You have to read it - */ - @OneWay - void acceptRules(String rules); - - @OneWay - void start(); - - @OneWay - void stopAutonomicCycle(); - - @OneWay - void startAutonomicCycle(); - - int activeWorkers(); - - void setCycleTime(long time); - - void setWorkpoolReference(ServiceReference serviceReference); -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java deleted file mode 100644 index f7c727ad04..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java +++ /dev/null @@ -1,555 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.osoa.sca.ComponentContext; -import org.osoa.sca.ServiceReference; -import java.util.Collections; -import java.util.Enumeration; -import java.io.IOException; -import java.io.StringReader; -import java.io.StringWriter; -import java.util.Collection; -import java.util.Iterator; -import java.util.Timer; -import java.util.TimerTask; -import java.util.logging.Logger; - -import javax.xml.stream.XMLStreamException; - -import node.TestJob; -import java.io.File; -import java.util.Vector; -import org.apache.axiom.om.OMElement; -import org.apache.tuscany.sca.contribution.service.ContributionResolveException; -import org.apache.tuscany.sca.core.context.CallableReferenceImpl; -import org.apache.tuscany.sca.core.context.ServiceReferenceImpl; -import org.apache.tuscany.sca.databinding.job.Job; -import org.apache.tuscany.sca.node.NodeManagerInitService; -import org.apache.tuscany.sca.node.SCANode; -import org.apache.tuscany.sca.node.impl.SCANodeImpl; -import org.osoa.sca.CallableReference; -import org.drools.FactHandle; -import org.drools.RuleBase; -import org.drools.RuleBaseFactory; -import org.drools.StatefulSession; -import org.drools.StatelessSession; -import org.drools.compiler.DroolsParserException; -import org.drools.compiler.PackageBuilder; -import org.drools.rule.Package; -import org.osoa.sca.annotations.Constructor; -import org.osoa.sca.annotations.Context; -import org.osoa.sca.annotations.Destroy; -import org.osoa.sca.annotations.Property; -import org.osoa.sca.annotations.Reference; -import org.osoa.sca.annotations.Scope; -import org.osoa.sca.annotations.Service; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; - -@Service(interfaces = { NodeManagerInitService.class, WorkpoolManager.class }) -@Scope("COMPOSITE") -/* - * This is the core manager of the workpool application. The Workpool Manager - * holds the reference to each remote node manager. Inside it we've a rule - * engine instance. - */ -public class WorkpoolManagerImpl implements WorkpoolManager, - NodeManagerInitService, WorkpoolBeanListener { - /* - * This inner class trigs the rule engine, at given times: 1. It checks the - * different loads for each nodes and sets the WorkpoolBean 2. It checks the - * Workpool AverageService Time and sets the WorkpoolBean 3. It checks how - * many jobs are already computed and sets the WorkpoolBean Then given the - * configured bean and the rules, run the Rule Engine for executing the - * business logic - */ - class RuleEngineTrigger extends TimerTask { - // private ReentrantLock triggerLock = new ReentrantLock(); - @Override - public void run() { - - System.out.println("Updating WorkpoolBean.."); - // checkActiveWorkers(); - // checkLoadInNodes(); - checkServiceTime(); - // checkEstimedQueueSize(); - // checkArrivalTime(); - getProcessedItem(); - // computeUsageFactor(); - doRun(bean); - } - - } - - private WorkerManager managerNodeB; - private WorkerManager managerNodeC; - private WorkerManager managerNodeD; - private WorkerManager managerNodeE; - - private SCANodeImpl node; - private WorkpoolBean bean = new WorkpoolBean(); - private ReentrantLock handleEventLock = new ReentrantLock(); - private ReentrantLock updateRuleLock = new ReentrantLock(); - - private ServiceReference reference; - private AtomicInteger activeWorkers = new AtomicInteger(0); - private Logger log = Logger.getLogger(WorkpoolManagerImpl.class.getName()); - @Property - protected String workers; - @Property - protected String nodes; - @Property - protected String injection; - @Context - protected ComponentContext workpoolManagerContext; - private CallableReferenceImpl myReference; - private String rules = null; - private boolean referenceInjection = false; - private ConcurrentHashMap workerManagerTable = new ConcurrentHashMap(); - private int workersNo; - private int nodesNo; - private Timer timer = new Timer(); - /* this handle facts */ - private RuleBase ruleBase = null; - private FactHandle handle = null; - private StatefulSession wm = null; - private long cycleTime = 5000; - - @Reference - public void setManagerNodeB(WorkerManager managerNodeB) { - this.managerNodeB = managerNodeB; - workerManagerTable.put("nodeB", managerNodeB); - } - - @Reference - public void setManagerNodeC(WorkerManager managerNodeC) { - this.managerNodeC = managerNodeC; - workerManagerTable.put("nodeC", managerNodeC); - } - - @Reference - public void setManagerNodeD(WorkerManager managerNodeD) { - this.managerNodeD = managerNodeD; - workerManagerTable.put("nodeD", managerNodeD); - } - - @Reference - public void setManagerNodeE(WorkerManager managerNodeE) { - this.managerNodeE = managerNodeE; - workerManagerTable.put("nodeE", managerNodeE); - } - - private void startNewComponents( - Vector> vector) { - log.info("Starting new components"); - WorkpoolService wp = reference.getService(); - // CallableReferenceImpl sink = - // (CallableReferenceImpl) reference; - Job j = new NullJob(); - for (CallableReferenceImpl item : vector) { - // WorkerService service = item.getService(); - // service.start(); - // service.computeFirstTime(j, sink); - log.info("Send PostWorkerReference..."); - wp.PostWorkerReference(item); - } - if (myReference != null) - wp.registerManager(myReference); - } - - public void setCycleTime(long cycle) { - this.cycleTime = cycle; - } - - @SuppressWarnings("unchecked") - /* - * This gets the number of workers workerNo and instantiates them - */ - public void start() { - this.myReference = (CallableReferenceImpl) workpoolManagerContext - .createSelfReference(WorkpoolManager.class, "WorkpoolManager"); - this.workersNo = Integer.parseInt(this.workers); - this.nodesNo = Integer.parseInt(this.nodes); - this.referenceInjection = (Integer.parseInt(this.injection) != 0); - log.info("Starting WorkpoolManager Component with #" + workersNo - + " workers and #" + nodes + " nodes"); - nodesNo = workerManagerTable.values().size(); - // Sets info in the bean. - bean.setWorkers(this.workersNo); - bean.setNodeNumbers(nodesNo); - Vector> workerRefs = new Vector>(); - int exactTimes = workersNo / nodesNo; - for (int i = 0; i < exactTimes; ++i) { - for (WorkerManager manager : workerManagerTable.values()) { - manager.start(); - if (manager != null) { - System.err.println("Actual load = " - + manager.getNodeLoad() + " for node "); - addNewComponent(manager, workerRefs); - } - } - } - - int module = (workersNo % nodesNo); - int n = 0; - if (module > 0) { - Vector v = new Vector(workerManagerTable.keySet()); - Collections.sort(v); - // Iterator iter = - // workerManagerTable.values().iterator(); - // Display (sorted) hashtable. - for (Enumeration e = v.elements(); (e.hasMoreElements() && n < module); ++n) { - String key = e.nextElement(); - WorkerManager m = workerManagerTable.get(key); - System.err.println("Module Actual load = " + m.getNodeLoad() - + " for node "); - addNewComponent(m, workerRefs); - } - } - startNewComponents(workerRefs); - bean.addListener(this); - TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger(); - timer.scheduleAtFixedRate(task, 3000, cycleTime); - } - - private void checkLoadInNodes() { - System.out.println("CheckLoadInNodes"); - int number = 1; - double loadAverage = 0; - for (WorkerManager manager : workerManagerTable.values()) { - loadAverage += manager.getNodeLoad(); - number++; - } - bean.setLoadAverage(loadAverage / number); - } - - private void computeUsageFactor() { - bean.setUsageFactor(); - } - - private void checkEstimedQueueSize() { - WorkpoolService wp = reference.getService(); - - if (wp != null) { - int size = wp.estimatedQueueSize(); - log.info("Estimed Queue Size =" + size); - bean.setEstimedQueueSize(size); - } - } - - private WorkerManager findMinLoad() { - double load = 0; - // workerManagerTable.values().iterator().next().getNodeLoad(); - WorkerManager toFind = null; - for (WorkerManager manager : workerManagerTable.values()) { - if (load == 0) { - load = manager.getNodeLoad(); - toFind = manager; - } else if (manager.getNodeLoad() < load) { - load = manager.getNodeLoad(); - toFind = manager; - } - } - return toFind; - } - - private void checkServiceTime() { - WorkpoolService wp = reference.getService(); - - if (wp != null) { - double time = wp.getServiceTime(); - log.info("Average System Service Time =" + time); - bean.setAverageServiceTime(time); - } - } - - private void checkArrivalTime() { - WorkpoolService wp = reference.getService(); - - if (wp != null) { - double time = wp.getArrivalTime(); - log.info("Average Arrival Service Time =" + time); - bean.setAverageArrivalTime(time); - } - } - - private void checkActiveWorkers() { - bean.setWorkers(this.activeWorkers()); - } - - private void getProcessedItem() { - WorkpoolService wp = reference.getService(); - if (wp != null) { - long computed = wp.getJobComputed(); - log.info("The system has already computed " + computed + " jobs"); - bean.setJobComputed(computed); - } - } - - private boolean removeComponent(WorkerManager manager, int k) { - manager.removeWorkers(k); - activeWorkers.decrementAndGet(); - return true; - } - - @SuppressWarnings("unchecked") - private boolean addNewComponent(WorkerManager manager, - Vector> workerRefs) { - CallableReferenceImpl workerReference = (CallableReferenceImpl) manager - .addWorker(); - - if (workerReference != null) { - /* if i'll decide to use dynamically generated references */ - if (referenceInjection) { - workerReference.getService(); - String uri = workerReference.getEndpointReference().getURI(); - int nameIndex = uri.indexOf("/"); - String componentName = uri.substring(0, nameIndex); - if (componentName.startsWith("/")) - componentName = uri.substring(1, uri.length()); - if (componentName.endsWith("/")) - componentName = uri.substring(0, uri.length() - 1); - // String componentName = uri.substring(0, nameIndex-1); - - log.info("Adding wire from WorkpoolComponentService to " - + componentName); - String referenceName = "ref" + componentName; - - /* - * I'm updating the WorkpoolServiceComponent with a new - * reference to a just created component I assume that the - * WorkpoolManagerService and the WorkpoolServiceComponent stay - * in the same JVM It's like in the scdl there were: With this then - * I've a wire WorkpoolService---> a new Worker - */ - try { - node.addComponentReferenceWire(referenceName, "nodeA", - "Workpool.composite", "workpool.WorkerServiceImpl", - WorkerService.class, "WorkpoolServiceComponent", - componentName); - } catch (Exception e) { - e.printStackTrace(); - return false; - } - log.info("Sending reference name " + referenceName - + " to WorkpoolService"); - // TODO: this was part of dynamic wiring, but it doesn't work. - // reference.getService().PostWorkerName(referenceName); - - } else { - // log.info("Sending callable reference to WorkpoolService - // placed at -->"+reference); - // reference.getService().PostWorkerReference(workerReference); - workerRefs.add(workerReference); - } - activeWorkers.incrementAndGet(); - return true; - } - return false; - } - - public int activeWorkers() { - - return activeWorkers.get(); - } - - private void doRun(WorkpoolBean bean) { - - long startTime = System.currentTimeMillis(); - updateRuleLock.lock(); - if (wm == null) - wm = ruleBase.newStatefulSession(); - if (this.handle == null) - handle = wm.insert(bean); - else { - wm.update(handle, bean); - } - wm.fireAllRules(); - updateRuleLock.unlock(); - - System.out.println("Engine rule overhead = " - + (System.currentTimeMillis() - startTime)); - } - - private RuleBase readRule(String rule) { - - PackageBuilder packBuilder = new PackageBuilder(); - try { - packBuilder.addPackageFromDrl(new StringReader(rule)); - } catch (DroolsParserException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - Package pkg = packBuilder.getPackage(); - RuleBase ruleBase = RuleBaseFactory.newRuleBase(); - try { - ruleBase.addPackage(pkg); - } catch (Exception e) { - e.printStackTrace(); - } - return ruleBase; - } - - public void acceptRules(String rules) { - this.rules = rules; - if (ruleBase == null) { - RuleBase base = readRule(rules); - if (base != null) { - ruleBase = base; - } - } else { - updateRuleLock.lock(); - // i have already a rule: updating - ruleBase = readRule(rules); - wm = ruleBase.newStatefulSession(); - handle = null; - updateRuleLock.unlock(); - } - - System.out.println("Accepted rules = " + rules); - } - - public String getRules() { - return rules; - } - - private WorkerManager findMaxLoadNode() { - double load = 0.0; - WorkerManager toFind = null; - for (WorkerManager manager : workerManagerTable.values()) { - if (manager.getNodeLoad() > load) { - load = manager.getNodeLoad(); - toFind = manager; - } - } - return toFind; - - } - - public void setWorkpoolReference( - ServiceReference serviceReference) { - reference = serviceReference; - } - - public void setNode(SCANode arg0) { - node = (SCANodeImpl) arg0; - } - - public void handleEvent(WorkpoolEvent ev) { - if (ev == null) - return; - - String nodeName = ev.getNodeName(); - - switch (ev.getType()) { - case WorkpoolEvent.SINGLE_ADD_WORKER: { - if (nodeName != null) { - Vector> workerRefs = new Vector>(); - - // in this case I have a nodeName - if (!nodeName.equals("") - && (workerManagerTable.containsKey(nodeName))) { - WorkerManager manager = workerManagerTable.get(nodeName); - addNewComponent(manager, workerRefs); - startNewComponents(workerRefs); - } else if (nodeName.equals("")) { - WorkerManager manager = findMinLoad(); - addNewComponent(manager, workerRefs); - startNewComponents(workerRefs); - } - } - break; - } - case WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER: { - Vector> workerRefs = new Vector>(); - - if (nodeName.equals("")) { - - WorkerManager manager = findMinLoad(); - int k = ev.workers(); - for (int h = 0; h < k; ++h) { - addNewComponent(manager, workerRefs); - } - } else { - WorkerManager manager = workerManagerTable - .get(ev.getNodeName()); - int k = ev.workers(); - for (int h = 0; h < k; ++h) { - addNewComponent(manager, workerRefs); - } - } - startNewComponents(workerRefs); - break; - } - case WorkpoolEvent.SINGLE_REMOVE_WORKER: { - if (nodeName != null) { - // in this case I have a nodeName - if (!nodeName.equals("") - && (workerManagerTable.containsKey(nodeName))) { - WorkerManager manager = workerManagerTable.get(nodeName); - removeComponent(manager, 1); - } else if (nodeName.equals("")) { - WorkerManager manager = findMaxLoadNode(); - removeComponent(manager, 1); - } - } - break; - } - case WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER: { - if (nodeName.equals("")) { - WorkerManager manager = findMaxLoadNode(); - removeComponent(manager, ev.workers()); - - } else { - WorkerManager manager = workerManagerTable.get(nodeName); - removeComponent(manager, ev.workers()); - } - break; - } - } - - } - - @Destroy - public void onExit() { - // do cleanup - this.timer.cancel(); - this.timer.purge(); - } - - public void stopAutonomicCycle() { - this.timer.cancel(); - this.timer.purge(); - this.timer = null; - } - - public void startAutonomicCycle() { - if (this.timer == null) { - this.timer = new Timer(); - TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger(); - timer.schedule(task, 3000, cycleTime); - } - } -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java deleted file mode 100644 index d84ae549d8..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import org.apache.tuscany.sca.core.context.CallableReferenceImpl; -import org.apache.tuscany.sca.databinding.annotation.DataBinding; -import org.apache.tuscany.sca.databinding.job.Job; -import org.osoa.sca.annotations.OneWay; -import org.osoa.sca.annotations.Remotable; -import org.osoa.sca.ServiceReference; - -@DataBinding("org.apache.tuscany.sca.databinding.job.Job") -@Remotable -public interface WorkpoolService { - - /* this the functional part */ - void submit(Job i); - - /* the time between two subsequent worker invocations */ - double getServiceTime(); - - /* the number of ResultJob received */ - long getJobComputed(); - - /* the time elapsed between the stream has initiated and now */ - long getElapsedTime(); - - /* the size of the internal queue : it's not accurate */ - int estimatedQueueSize(); - - /* the average time between two consuecutive submit */ - double getArrivalTime(); - - void start(); - - void stop(); - - /* - * this is the part needed by management. May be in future i'll refactor it - * order to hide this part. - */ - @OneWay - void handleResult(Job j, boolean reuse, String string, - CallableReferenceImpl worker, boolean newJob); - - void addTrigger(CallableReferenceImpl reference); - - void removeTrigger(); - - void registerManager( - CallableReferenceImpl createSelfReference); - - /* - * This could placed in another interface definition - think about it These - * methods evict, and evictAll are needed when a worker finish to exist and - * it needs to be evicted by the WorkpoolManager. In the system I have two - * caches: 1) a domain cache, which holds the components URI 2) a - * workerReference cache (implemented by a ConcurrentHashMap), which holds a - * proxy to each worker. Every proxy gets built from the worker callable - * reference. I'm thinking for placing the workerReferenceCache in a local - * interface. Assuming that WorkpoolService and WorkpoolManager are in the - * same JVM. - */ - void evict(String workerURI); - - void evictAll(); - - /* - * these two are no longer needed. I leave it because if i'll have time to - * do dynamic wiring the first one is needed. void PostWorkerName(String - * referenceName); - */ - void PostWorkerReference(CallableReferenceImpl worker); - -} diff --git a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java b/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java deleted file mode 100644 index 52b88af42c..0000000000 --- a/branches/sca-java-1.x/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java +++ /dev/null @@ -1,416 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package workpool; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Logger; -import org.apache.tuscany.sca.core.context.CallableReferenceImpl; -import org.apache.tuscany.sca.databinding.annotation.DataBinding; -import org.osoa.sca.ComponentContext; -import org.osoa.sca.annotations.Context; -import org.osoa.sca.annotations.Scope; -import org.osoa.sca.annotations.Service; -import org.apache.tuscany.sca.databinding.job.Job; -import org.apache.tuscany.sca.databinding.job.JobDataMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * An implementation of the Workpool service. - */ -@Service(WorkpoolService.class) -@Scope("COMPOSITE") -@DataBinding("org.apache.tuscany.sca.databinding.job.Job") -public class WorkpoolServiceImpl implements WorkpoolService, - WorkerServiceCallback { - - /* incoming job queue */ - private LinkedBlockingQueue queue = new LinkedBlockingQueue(5000); - private CallableReferenceImpl trigger = null; - private Trigger forwardResult = null; - /* counter for job's number fetched from the queue and sent to the Worker */ - private AtomicInteger jobSent = new AtomicInteger(0); - /* time for initHandleResult */ - private AtomicLong initHandleResult = new AtomicLong(0); - /* time for endHandleResult */ - private AtomicLong endHandleResult = new AtomicLong(0); - /* - * number of job computed, this will be exposed in order to be used to - * firing rules - */ - private long jobComputed = 0; - /* same as above */ - private AtomicLong elapsedTime = new AtomicLong(0); - /* this is for comuputing averageServiceTime */ - private long times = 1; - /* this is for computing averageArrivalTime */ - private long timesArrival = 1; - private ReentrantLock arrivalLock = new ReentrantLock(); - private long arrivalPrevious = -1; - // private AtomicBoolean processingStopped = new AtomicBoolean(false); - private boolean processingStopped = false; - // private LinkedBlockingQueue triggers = new - // LinkedBlockingQueue(); - @Context - protected ComponentContext workpoolContext; - private CallableReferenceImpl manager; - private long previousSubmitTime = -1; - private boolean firstTime = true; - private boolean first = true; - private long start = 0; - private long end = 0; - private double averageServiceTime = 0; - private double averageArrivalTime = 0; - private int workersNo = 0; - private final Job nullJob = new NullJob(); - /* This is useful for counting the start and end */ - private Logger log = Logger.getLogger(WorkpoolServiceImpl.class.getName()); - private ReentrantLock handleResultLock = new ReentrantLock(); - private ReentrantLock postWorkerReferenceLock = new ReentrantLock(); - private ConcurrentHashMap cacheReference = new ConcurrentHashMap(); - private CallableReferenceImpl myReference; - private String previuosURI = ""; - private long time = 0; - - private void computeAverageTime() { - long actualServiceTime = 0; - // if the processing is finished - if (processingStopped) - return; - - if (firstTime == true) { - this.previousSubmitTime = System.currentTimeMillis(); - this.averageServiceTime = 0; - firstTime = false; - } else { - actualServiceTime = System.currentTimeMillis() - - this.previousSubmitTime; - this.previousSubmitTime = System.currentTimeMillis(); - averageServiceTime = ((averageServiceTime * times) + actualServiceTime) - / (times + 1); - ++times; - } - } - - public void submit(Job j) { - try { - // log.info("Submit job in queue -->"+ j.getType()); - // processingStopped.set(false); - try { - arrivalLock.lock(); - if (this.arrivalPrevious == -1) { - arrivalPrevious = System.currentTimeMillis(); - averageArrivalTime = 0; - } - double actualArrivalTime = System.currentTimeMillis() - - arrivalPrevious; - averageArrivalTime = ((averageArrivalTime * timesArrival) + actualArrivalTime) - / (timesArrival + 1); - arrivalPrevious = System.currentTimeMillis(); - ++timesArrival; - } finally { - arrivalLock.unlock(); - } - queue.put(j); - } catch (Exception e) { - log.info("Exception in queue"); - queue.clear(); - e.printStackTrace(); - } - } - - public double getArrivalTime() { - return this.averageArrivalTime; - } - - public double getServiceTime() { - return this.averageServiceTime; - } - - public void receiveResult(Job resultType, boolean reuse, String workerURI) { - - if (reuse) { - queue.add(resultType); - return; - } - - computeAverageTime(); - Job job = null; - try { - job = queue.take(); - } catch (InterruptedException e) { - // TODO Better exception handling --> see Exception antipattern doc - e.printStackTrace(); - return; - } - - if ((job != null) && (job.eos() == false)) { - int nameIndex = workerURI.indexOf("/"); - String workerName = workerURI.substring(0, nameIndex - 1); - log.info("Sending job to worker --> " + workerName); - WorkerService worker = workpoolContext.getService( - WorkerService.class, workerName); - worker.compute(job); - } - - JobDataMap map = ((ResultJob) resultType).getDataMap(); - if (map != null) { - ++jobComputed; - Object obj = map.getJobDataObject("result"); - System.out.println("Result = " + ((Double) obj).doubleValue()); - } - - } - - public void start() { - log.info("WorkpoolServiceComponent started..."); - myReference = (CallableReferenceImpl) workpoolContext - .createSelfReference(WorkpoolService.class, "WorkpoolService"); - myReference.getService(); - } - - /* - * - * This method is called by WorkpoolManagerImpl, when it creates a new - * worker component in order to dispatch worker to the WorkpoolServiceImpl - * @param CallableReferenceImpl reference - a dynamically created reference - * from the Worker - */ - public void PostWorkerReference( - CallableReferenceImpl reference) { - - try { - long initPostWorkerReference; - long endPostWorkerReference; - this.postWorkerReferenceLock.lock(); - - initPostWorkerReference = System.currentTimeMillis(); - WorkerService worker; - worker = reference.getService(); - worker.start(); - - ++workersNo; - if (myReference != null) { - - // Job poison = new ResultJob(); - this.postWorkerReferenceLock.unlock(); - log.info("Sending null job to worker"); - worker.computeFirstTime(nullJob, myReference); - // queue.put(poison); - endPostWorkerReference = System.currentTimeMillis(); - System.out.println("Time PostWorker =" - + (endPostWorkerReference - initPostWorkerReference)); - } else { - log.info("myReference is null"); - - } - } catch (Exception e) { - postWorkerReferenceLock.unlock(); - } finally { - } - - } - - /* - * FIXME This method currently is not used because i've not yet ready - * dynamic wire injection - */ - - public void PostWorkerName(String referenceName) { - /* TODO Do something similar to PostWorkerReference */ - } - - private void printComputingTime(Job j) { - - if (first == true) { - first = false; - start = System.currentTimeMillis(); - end = System.currentTimeMillis(); - } else { - end = System.currentTimeMillis(); - System.out.println("Elapsed Time = " + (end - start)); - elapsedTime.set(end - start); - } - /* - * i could use reflection or instance of (but it's a penalty kick) , or - * an object as result, but i'd prefer a job so i've defined a - * RESULT_JOB There're in the system three kind of jobs: RESULT_JOB, - * NULL_JOB, DEFAULT_JOB - */ - if ((j != null) && (j.getType() == Job.RESULT_JOB)) { - jobComputed++; - ResultJob result = (ResultJob) j; - JobDataMap map = result.getDataMap(); - if (map != null) { - Double doubleValue = (Double) map.getJobDataObject("result"); - System.out - .println("ResultValue = " + doubleValue.doubleValue()); - } - - } - - } - - public void handleResult(Job resultType, boolean reuse, String workerURI, - CallableReferenceImpl worker, boolean newWorker) { - initHandleResult.set(System.nanoTime()); - if (reuse) { - log.info("Reusing a job.."); - queue.add(resultType); - return; - } - // init job variable - Job job; - if (newWorker) - System.out.println("newWorkerActivation= " + System.nanoTime()); - printComputingTime(resultType); - - try { - job = queue.take(); - } catch (Exception e) { - log.info("Exception during fetching the queue"); - e.printStackTrace(); - return; - } - - try { - // it needs to be locked because multiple threads could invoke this. - handleResultLock.lock(); - if (previuosURI.equals("")) { - time = System.currentTimeMillis(); - this.previuosURI = workerURI; - } else { - if (previuosURI.equals(workerURI)) - System.out.println("Complete ComputeTime for an item =" - + (time - System.currentTimeMillis())); - } - if (job.eos()) { - long endTime = System.currentTimeMillis(); - /* checking for EOS */ - if (processingStopped == false) { - processingStopped = true; - System.out.println("GOT EOS in time=" + (endTime - start)); - log.info("Stop autonomic cycle.."); - /* - * I'm doing this because i want that in the termination i - * would have more jobs with eos == true than workers. So - * i'm sure that every worker removes itself from its - * manager. I do it only one time. This is necessary because - * i have a variable number of workers. The number of - * workers in the system might change every time the rule - * engine cycle gets executed. - */ - ResultJob poison = new ResultJob(); - for (int i = 0; i < workersNo; ++i) { - try { - - queue.put(poison); - - } catch (Exception e) { - log.info("Cannot duplicate poison tokens"); - break; - } - - } - manager.getService().stopAutonomicCycle(); - } - } - computeAverageTime(); - System.out.println("AverageTime =" + averageServiceTime); - if (job != null) { - - WorkerService workerService; - /* - * the workpool has a high reuse, i always call the same - * component set or un superset or subset, so i cache it. When - * the WorkpoolManager will remove an item, it removes still - * this cache entry - */ - if (!cacheReference.containsKey(workerURI)) { - workerService = worker.getService(); - handleResultLock.unlock(); - cacheReference.put(workerURI, workerService); - } else { - handleResultLock.unlock(); - workerService = cacheReference.get(workerURI); - } - // it's still a penalty kick locking compute because it's going - // to be scheduled whereas it's async. - workerService.compute(job); - log.info("Sent job #" + jobSent.incrementAndGet() - + " Queue size " + queue.size()); - endHandleResult.set(System.nanoTime()); - System.out - .println("begin:handleResult ==> end:handleResult:compute = " - + (endHandleResult.addAndGet(-(initHandleResult - .get())) / 1000000)); - } - } catch (Exception e) { - handleResultLock.unlock(); - } - } - - public void evictAll() { - cacheReference.clear(); - } - - public void evict(String workerURI) { - if (cacheReference.containsKey(workerURI)) { - cacheReference.remove(workerURI); - } - - } - - public int estimatedQueueSize() { - return queue.size(); - } - - public long getElapsedTime() { - return elapsedTime.get(); - } - - public long getJobComputed() { - return jobComputed; - } - - public void registerManager( - CallableReferenceImpl createSelfReference) { - manager = createSelfReference; - - } - - public void stop() { - // TODO Auto-generated method stub - - } - - public void addTrigger(CallableReferenceImpl reference) { - this.trigger = reference; - this.forwardResult = reference.getService(); - - } - - public void removeTrigger() { - this.trigger = null; - this.forwardResult = null; - } -} -- cgit v1.2.3