diff options
Diffstat (limited to 'branches/sca-java-1.5/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java')
-rw-r--r-- | branches/sca-java-1.5/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/branches/sca-java-1.5/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java b/branches/sca-java-1.5/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java new file mode 100644 index 0000000000..1f2a4d1f9a --- /dev/null +++ b/branches/sca-java-1.5/demos/workpool-distributed/src/main/java/node/WorkpoolDaemon.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package node; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + +import javax.xml.namespace.QName; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.commons.daemon.DaemonController; +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.node.NodeException; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.SCANodeFactory; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.apache.tuscany.sca.node.management.SCANodeManagerInitService; + +import java.net.URI; + +import workpool.WorkerManager; +import workpool.WorkerManagerImpl; +import workpool.WorkpoolManager; +import workpool.WorkpoolService; +import workpool.WorkpoolServiceImpl; + +/** + * This client program shows how to run a distributed SCA node. In this case a + * calculator node has been constructed specifically for running the calculator + * composite. Internally it creates a representation of a node and associates a + * distributed domain with the node. This separation is made different + * implementations of the distributed domain can be provided. + */ +public class WorkpoolDaemon implements Daemon, Runnable { + private String domainName; + private String nodeName; + private long iterations; + private long jobsNo; + private long workerNo; + private SCANode node; + private boolean stopped = false; + private DaemonController controller = null; + private Thread thread = null; + private String ruleFile = "workerRules.drl"; + + /* + * public static void main(String[] args) throws Exception { + * // Check that the correct arguments have been provided if (null == args || + * args.length < 4) { System.err.println("Usage: java WorkpoolNode + * domainname nodename iterTest workerNo"); System.exit(1); } + * + * try { String domainName = args[0]; String nodeName = args[1]; long + * iterations = Long.parseLong(args[2]); long jobsNo = + * Long.parseLong(args[3]); long workerNo = Long.parseLong(args[4]); + * ClassLoader cl = WorkpoolDaemon.class.getClassLoader(); + * + * SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); node = + * nodeFactory.createSCANode(null, domainName); + * node.addContribution(nodeName, cl.getResource(nodeName + "/")); + * node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + * node.start(); // nodeA is the head node and runs some tests while all + * other nodes // simply listen for incoming messages + * + * FileReader rules = new FileReader("workerRules.drl"); StringBuffer buffer = + * new StringBuffer(); + * + * BufferedReader br = new BufferedReader(rules); String ruleString; do { + * ruleString = br.readLine(); if (ruleString!=null) { + * buffer.append(ruleString);} } while (ruleString!=null); + * + * if ( nodeName.equals("nodeA") ) { // do some application stuff + * WorkpoolService workpoolService = + * node.getDomain().getService(WorkpoolService.class, + * "WorkpoolServiceComponent"); workpoolService.start(); + * NodeManagerInitService nodeInit = + * node.getDomain().getService(NodeManagerInitService.class, + * "WorkpoolManagerComponent/NodeManagerInitService"); + * nodeInit.setNode(node); WorkpoolManager workpoolManager = + * node.getDomain().getService(WorkpoolManager.class, + * "WorkpoolManagerComponent/WorkpoolManager"); + * workpoolManager.setWorkpoolReference(node.getDomain().getServiceReference(WorkpoolService.class, + * "WorkpoolServiceComponent")); + * workpoolManager.acceptRules(buffer.toString()); workpoolManager.start(); + * int items[] = {3,4,5,6,3,6,3,5,9,5,6}; + * + * double x = 398349; for (int i = 0; i < jobsNo; ++i) + * workpoolService.submit(new TestJob(x,iterations,items)); + * + * TestJob j = new TestJob(-1.0,true); for (int i = 0; i < workerNo+1; ++i){ + * j.setEOS(); workpoolService.submit(j); } } try { if + * (nodeName.equals("nodeB")) { NodeManagerInitService serviceNodeB = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeBComponent/NodeManagerInitService"); + * serviceNodeB.setNode(node); } if (nodeName.equals("nodeC")) { + * NodeManagerInitService workerManagerC = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeCComponent/NodeManagerInitService"); + * workerManagerC.setNode(node); } if (nodeName.equals("nodeD")) { + * NodeManagerInitService workerManagerD = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeDComponent/NodeManagerInitService"); + * workerManagerD.setNode(node); } if (nodeName.equals("nodeE")) { + * NodeManagerInitService workerManagerE = + * node.getDomain().getService(NodeManagerInitService.class,"WorkerManagerNodeEComponent/NodeManagerInitService"); + * workerManagerE.setNode(node); } + * + * System.out.println("Node started (press enter to shutdown)"); + * System.in.read(); } catch (IOException e) { e.printStackTrace(); } // + * stop the node and all the domains in it node.stop(); node.destroy(); + * System.exit(0); } catch(Exception ex) { System.err.println("Exception in + * node - " + ex.getMessage()); ex.printStackTrace(System.err); } } + */ + public void destroy() { + // TODO Auto-generated method stub + + } + + public void init(DaemonContext arg0) throws Exception { + String[] args = arg0.getArguments(); + domainName = args[0]; + nodeName = args[1]; + iterations = Long.parseLong(args[2]); + jobsNo = Long.parseLong(args[3]); + workerNo = Long.parseLong(args[4]); + if (args.length == 6) { + ruleFile = args[5]; + } + this.controller = arg0.getController(); + // this.thread=new Thread(this); + } + + public void start() throws Exception { + + ClassLoader cl = WorkpoolDaemon.class.getClassLoader(); + + SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); + node = nodeFactory.createSCANode(null, domainName); + node.addContribution(nodeName, cl.getResource(nodeName + "/")); + node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + node.start(); + // nodeA is the head node and runs some tests while all other nodes + // simply listen for incoming messages + + FileReader rules = new FileReader(ruleFile); + StringBuffer buffer = new StringBuffer(); + + BufferedReader br = new BufferedReader(rules); + String ruleString; + do { + ruleString = br.readLine(); + if (ruleString != null) { + buffer.append(ruleString + "\n"); + } + } while (ruleString != null); + + if (nodeName.equals("nodeA")) { + // do some application stuff + WorkpoolService workpoolService = node.getDomain().getService( + WorkpoolService.class, "WorkpoolServiceComponent"); + workpoolService.start(); + SCANodeManagerInitService nodeInit = node.getDomain().getService( + SCANodeManagerInitService.class, + "WorkpoolManagerComponent/NodeManagerInitService"); + nodeInit.setNode(node); + WorkpoolManager workpoolManager = node.getDomain().getService( + WorkpoolManager.class, + "WorkpoolManagerComponent/WorkpoolManager"); + workpoolManager.setWorkpoolReference(node.getDomain() + .getServiceReference(WorkpoolService.class, + "WorkpoolServiceComponent")); + workpoolManager.acceptRules(buffer.toString()); + workpoolManager.start(); + + int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 }; + + double x = 398349; + for (int i = 0; i < jobsNo; ++i) { + workpoolService.submit(new TestJob(x, iterations, items)); + } + TestJob j = new TestJob(-1.0, true); + for (int i = 0; i < workerNo + 1; ++i) { + j.setEOS(); + workpoolService.submit(j); + } + + } + if (nodeName.equals("nodeB")) { + SCANodeManagerInitService workerManagerNodeB = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeBComponent/NodeManagerInitService"); + workerManagerNodeB.setNode(node); + } + + if (nodeName.equals("nodeC")) { + SCANodeManagerInitService workerManagerNodeC = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeCComponent/NodeManagerInitService"); + workerManagerNodeC.setNode(node); + } + + if (nodeName.equals("nodeD")) { + SCANodeManagerInitService workerManagerNodeD = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeDComponent/NodeManagerInitService"); + workerManagerNodeD.setNode(node); + } + + if (nodeName.equals("nodeE")) { + SCANodeManagerInitService workerManagerNodeE = node + .getDomain() + .getService(SCANodeManagerInitService.class, + "WorkerManagerNodeEComponent/NodeManagerInitService"); + workerManagerNodeE.setNode(node); + } + + this.waitForever(); + // this.thread.start(); + } + + public void stop() throws Exception { + Thread.currentThread().interrupt(); + // thread.interrupt(); + node.stop(); + node.destroy(); + } + + private synchronized void waitForever() { + while (!stopped) { + try { + wait(); + } catch (InterruptedException ex) { + stopped = true; + return; + } + } + } + + public void run() { + waitForever(); + } +} |