diff options
Diffstat (limited to '')
-rw-r--r-- | branches/sca-android/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java | 179 |
1 files changed, 179 insertions, 0 deletions
diff --git a/branches/sca-android/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java b/branches/sca-android/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java new file mode 100644 index 0000000000..86557548af --- /dev/null +++ b/branches/sca-android/demos/workpool-distributed/src/main/java/node/WorkpoolNode.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package node; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + +import javax.xml.namespace.QName; + +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.assembly.Service; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.apache.tuscany.sca.domain.SCADomain; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.SCANodeFactory; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import java.net.URI; + +import workpool.WorkerManager; +import workpool.WorkerManagerImpl; +import workpool.WorkpoolManager; +import workpool.WorkpoolService; +import workpool.WorkpoolServiceImpl; + +/** + * This client program shows how to run a distributed SCA node. In this case a + * calculator node has been constructed specifically for running the calculator + * composite. Internally it creates a representation of a node and associates a + * distributed domain with the node. This separation is made different + * implementations of the distributed domain can be provided. + */ +public class WorkpoolNode { + + public static void main(String[] args) throws Exception { + + // Check that the correct arguments have been provided + if (null == args || args.length < 4) { + System.err + .println("Useage: java WorkpoolNode domainname nodename iterTest workerNo"); + System.exit(1); + } + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + String domainName = args[0]; + String nodeName = args[1]; + long iterations = Long.parseLong(args[2]); + long jobsNo = Long.parseLong(args[3]); + long workerNo = Long.parseLong(args[4]); + ClassLoader cl = WorkpoolNode.class.getClassLoader(); + + SCANodeFactory nodeFactory = SCANodeFactory.newInstance(); + SCANode node = nodeFactory.createSCANode(null, domainName); + node.addContribution(nodeName, cl.getResource(nodeName + "/")); + node.addToDomainLevelComposite(new QName("http://sample", "Workpool")); + node.start(); + // nodeA is the head node and runs some tests while all other nodes + // simply listen for incoming messages + + FileReader rules = new FileReader("workerRules.drl"); + StringBuffer buffer = new StringBuffer(); + + BufferedReader br = new BufferedReader(rules); + String ruleString; + do { + ruleString = br.readLine(); + if (ruleString != null) { + buffer.append(ruleString + "\n"); + } + } while (ruleString != null); + + if (nodeName.equals("nodeA")) { + // do some application stuff + WorkpoolService workpoolService = node.getDomain().getService( + WorkpoolService.class, "WorkpoolServiceComponent"); + workpoolService.start(); + NodeManagerInitService nodeInit = node.getDomain().getService( + NodeManagerInitService.class, + "WorkpoolManagerComponent/NodeManagerInitService"); + nodeInit.setNode(node); + WorkpoolManager workpoolManager = node.getDomain().getService( + WorkpoolManager.class, + "WorkpoolManagerComponent/WorkpoolManager"); + workpoolManager.setWorkpoolReference(node.getDomain() + .getServiceReference(WorkpoolService.class, + "WorkpoolServiceComponent")); + workpoolManager.setCycleTime(8000); + workpoolManager.acceptRules(buffer.toString()); + workpoolManager.start(); + int items[] = { 3, 4, 5, 6, 3, 6, 3, 5, 9, 5, 6 }; + + double x = 398349; + + for (int i = 0; i < jobsNo; ++i) + workpoolService.submit(new TestJob(x, iterations, items)); + + TestJob j = new TestJob(-1.0, true); + for (int i = 0; i < workerNo + 1; ++i) { + j.setEOS(); + workpoolService.submit(j); + } + + } + try { + if (nodeName.equals("nodeB")) { + NodeManagerInitService serviceNodeB = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeBComponent/NodeManagerInitService"); + serviceNodeB.setNode(node); + } + if (nodeName.equals("nodeC")) { + NodeManagerInitService workerManagerC = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeCComponent/NodeManagerInitService"); + workerManagerC.setNode(node); + } + if (nodeName.equals("nodeD")) { + NodeManagerInitService workerManagerD = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeDComponent/NodeManagerInitService"); + workerManagerD.setNode(node); + } + if (nodeName.equals("nodeE")) { + NodeManagerInitService workerManagerE = node + .getDomain() + .getService(NodeManagerInitService.class, + "WorkerManagerNodeEComponent/NodeManagerInitService"); + workerManagerE.setNode(node); + } + + System.out.println("Node started (press enter to shutdown)"); + String buff; + for (;;) { + try { + buff = in.readLine(); + if (buff == null) + break; + System.out.print(in.readLine()); + } catch (IOException ex) { + break; // Exit thread. + } + } + // stop the node and all the domains in it + node.stop(); + node.destroy(); + System.exit(0); + } catch (Exception ex) { + System.err.println("Exception in node - " + ex.getMessage()); + ex.printStackTrace(System.err); + } + } +} |