From a72fcfa3adf5be93bcf62e8ed26c08d10939b364 Mon Sep 17 00:00:00 2001 From: lresende Date: Thu, 11 Sep 2008 04:10:13 +0000 Subject: Branch before trunk cleanup git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@694106 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/workpool/MetaComponentWorker.java | 85 ++++ .../src/main/java/workpool/MyWorker.java | 46 ++ .../src/main/java/workpool/NullJob.java | 43 ++ .../src/main/java/workpool/ResultJob.java | 54 ++ .../src/main/java/workpool/Trigger.java | 29 ++ .../src/main/java/workpool/WorkerManager.java | 31 ++ .../src/main/java/workpool/WorkerManagerImpl.java | 213 ++++++++ .../src/main/java/workpool/WorkerService.java | 56 +++ .../main/java/workpool/WorkerServiceCallback.java | 27 + .../src/main/java/workpool/WorkerServiceImpl.java | 171 +++++++ .../src/main/java/workpool/WorkpoolBean.java | 162 ++++++ .../main/java/workpool/WorkpoolBeanListener.java | 25 + .../src/main/java/workpool/WorkpoolEvent.java | 71 +++ .../src/main/java/workpool/WorkpoolManager.java | 48 ++ .../main/java/workpool/WorkpoolManagerImpl.java | 555 +++++++++++++++++++++ .../src/main/java/workpool/WorkpoolService.java | 91 ++++ .../main/java/workpool/WorkpoolServiceImpl.java | 416 +++++++++++++++ 17 files changed, 2123 insertions(+) create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/MyWorker.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/NullJob.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/ResultJob.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/Trigger.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerService.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java create mode 100644 branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java (limited to 'branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool') diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java new file mode 100644 index 0000000000..cdd0f30b34 --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.io.StringReader; +import java.net.URI; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.logging.Logger; + +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamReader; + +import org.apache.tuscany.sca.assembly.MetaComponent; +import org.apache.tuscany.sca.assembly.impl.DefaultMetaComponent; + +public class MetaComponentWorker extends DefaultMetaComponent { + + private SecureRandom prng; + private String componentName; + private String scdl; + private String javaClass; + private boolean loadedFromString = false; + private Logger log = Logger.getLogger(MetaComponentWorker.class.getName()); + + public MetaComponentWorker() { + componentName = "WorkerComponent" + + java.util.UUID.randomUUID().toString(); + } + + public void setWorkerName(String componentName) { + this.componentName = componentName; + } + + public void setWorkerClass(String javaClass) { + this.javaClass = javaClass; + } + + private String generateSCDL() { + StringBuffer buffer = new StringBuffer(512); + buffer + .append("\n"); + buffer.append(""); + buffer.append(""); + buffer.append(this.componentName); + buffer.append("\n"); + return buffer.toString(); + } + + @Override + public XMLStreamReader build() throws Exception { + XMLInputFactory factory = XMLInputFactory.newInstance(); + if (!loadedFromString) + scdl = generateSCDL(); + return factory.createXMLStreamReader(new StringReader(scdl)); + + } + + public String getName() { + + return componentName; + } + +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/MyWorker.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/MyWorker.java new file mode 100644 index 0000000000..c45696e3cf --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/MyWorker.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import org.apache.tuscany.sca.databinding.job.JobExecutionContext; +import org.apache.tuscany.sca.databinding.job.RemoteJob; +import org.osoa.sca.annotations.Scope; + +@Scope("COMPOSITE") +public class MyWorker extends WorkerServiceImpl { + private static int resultcount = 0; + + @Override + public ResultJob computeTask(Job job) { + + RemoteJob remoteJob = (RemoteJob) job; + System.out.println("Computing the job"); + JobExecutionContext context = remoteJob.getContext(); + ResultJob resultJob = new ResultJob(); + JobDataMap resultMap = new JobDataMap(); + resultMap.addJobData("result", remoteJob.compute(context)); + resultJob.setJobDataMap(resultMap); + System.out.println("Count result = " + (++resultcount)); + return resultJob; + } + +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/NullJob.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/NullJob.java new file mode 100644 index 0000000000..fb930adf2e --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/NullJob.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; + +public class NullJob implements Job, java.io.Serializable { + + public Object compute(Object arg0) { + // TODO Auto-generated method stub + return null; + } + + public JobDataMap getDataMap() { + return null; + } + + public boolean eos() { + return false; + } + + public int getType() { + return Job.NULL_JOB; + } + +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/ResultJob.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/ResultJob.java new file mode 100644 index 0000000000..e04411668b --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/ResultJob.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import org.apache.tuscany.sca.databinding.job.JobExecutionContext; +import org.apache.tuscany.sca.databinding.job.RemoteJob; + +public class ResultJob extends RemoteJob implements + java.io.Serializable { + private JobDataMap map; + + public JobDataMap getDataMap() { + return map; + } + + public void setJobDataMap(JobDataMap map) { + this.map = map; + } + + public boolean eos() { + // TODO Auto-generated method stub + return true; + } + + public int getType() { + // TODO Auto-generated method stub + return Job.RESULT_JOB; + } + + @Override + public Object compute(JobExecutionContext v) { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/Trigger.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/Trigger.java new file mode 100644 index 0000000000..469675b19b --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/Trigger.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; +import org.osoa.sca.annotations.Remotable; + +@Remotable +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public interface Trigger { + void handleEvent(T c); +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java new file mode 100644 index 0000000000..520203e190 --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; +import org.osoa.sca.annotations.Remotable; +import org.osoa.sca.CallableReference; +@Remotable +public interface WorkerManager { + CallableReference addWorker(); + boolean removeWorker(String workerName); + boolean removeWorkers(int k); + boolean removeAllWorkers(); + double getNodeLoad(); + int activeWorkers(); + void start(); +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java new file mode 100644 index 0000000000..d4337cad2f --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.contribution.Contribution; +import org.apache.tuscany.sca.contribution.DeployedArtifact; +import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl; +import org.osoa.sca.CallableReference; +import org.apache.tuscany.sca.node.management.SCANodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.osoa.sca.ComponentContext; +import org.osoa.sca.annotations.Context; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Scope; +import org.osoa.sca.annotations.Service; +import java.util.LinkedList; +import java.util.ArrayList; + +@Scope("COMPOSITE") +@Service(interfaces = { SCANodeManagerInitService.class, WorkerManager.class }) +public class WorkerManagerImpl implements WorkerManager, SCANodeManagerInitService { + private Logger log = Logger.getLogger(WorkerManagerImpl.class.getName()); + private LinkedList> activeWorkers = new LinkedList>(); + private List workerComponentNames = new ArrayList(); + private SCANodeImpl node; + @Property + protected String nodeName; + @Property + protected String compositeName; + @Property + protected String workerClass; + @Context + protected ComponentContext context; + private double loadAverage; + + /* This method is used to find a composite inside all deployed artifacts */ + private Composite findComposite(List artifacts) { + for (Composite fact : artifacts) { + log.info("Searching in a contribution deployed artifacts -" + + compositeName); + Composite augmented = (Composite) fact; + // found + if (augmented.getURI().equals(compositeName)) { + log.info("Found composite..." + compositeName); + return augmented; + } + } + } + return null; + } + + public CallableReference addWorker() { + log.info("Adding a new worker call.."); + long addWorkerStartTime = System.nanoTime(); + ContributionServiceImpl cServiceImpl = (ContributionServiceImpl) node.getContributionService(); + Contribution contribution = cServiceImpl.getContribution(nodeName); + List artifacts = contribution.getDeployables(); + CallableReference workerReference = null; + CallableReference ref = null; + log.info("Creating a MetaComponentWorker.."); + MetaComponentWorker mcw = new MetaComponentWorker(); + boolean found = false; + mcw.setWorkerClass(workerClass); + Composite augmented = findComposite(artifacts); + try { + if (augmented != null) { + long startCreation = System.nanoTime(); + node.addComponentToComposite(mcw, contribution.getURI(), + augmented.getURI()); + System.out.println("addComponentToComposite time = " + + (System.nanoTime() - startCreation)); + RuntimeComponent workerComponent = (RuntimeComponent) node + .getComponent(mcw.getName()); + if (workerComponent != null) { + ref = (CallableReference) workerComponent + .getComponentContext().createSelfReference( + WorkerService.class); + ref.getService().start(); + activeWorkers.addLast(ref); + workerComponentNames.add(mcw.getName()); + CallableReference manager = (CallableReference) context + .createSelfReference(WorkerManager.class, + "WorkerManager"); + ref.getService().registerManager(manager); + return ref; + } + } else { + log.info("Workpool composite not found!"); + } + } catch (Exception e) { + log.info("Exception activation"); + e.printStackTrace(); + } + ; + System.out.println("Component Creation Time =" + + (System.nanoTime() - addWorkerStartTime)); + return ref; + } + + public boolean removeAllWorkers() { + for (CallableReference callable : activeWorkers) { + callable.getService().stop(); + } + return true; + } + + public boolean removeWorker() { + CallableReference callable = activeWorkers + .removeLast(); + callable.getService().stop(); + return true; + } + + public boolean removeWorkers(int k) { + if (k >= activeWorkers.size()) + return false; + for (int i = 0; i < k; ++i) { + if (!removeWorker()) + return false; + } + return true; + } + + public void setNode(SCANode node) { + this.node = (SCANodeImpl) node; + + } + + public double getNodeLoad() { + /* + * FIXME [jo] this works only on Linux To be replaced with an JNI + * extension + */ + RandomAccessFile statfile; + + this.loadAverage = 1.0; + // load = 0; + int NoProcessors = 0; + String cpuLine = null; + try { + NoProcessors = Runtime.getRuntime().availableProcessors(); + if (NoProcessors > 1) + this.loadAverage = 1 / (1.0 * NoProcessors); + statfile = new RandomAccessFile("/proc/loadavg", "r"); + try { + statfile.seek(0); + cpuLine = statfile.readLine(); + + } catch (IOException e) { + // FIX ME: Better exception handling. + e.printStackTrace(); + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (NumberFormatException e) { + e.printStackTrace(); + } + double min1; + if (cpuLine != null) { + java.util.StringTokenizer st = new java.util.StringTokenizer( + cpuLine, " "); + min1 = Double.parseDouble(st.nextToken()); + } else + min1 = 0; + + return min1 * this.loadAverage; + } + + public int activeWorkers() { + return activeWorkers.size(); + } + + public boolean removeWorker(String workerName) { + RuntimeComponent workerComponent = (RuntimeComponent) node + .getComponent(workerName); + if (workerComponent != null) { + log.info("Removing component " + workerName); + node.removeComponentFromComposite(nodeName, "Workpool.composite", + workerName); + return true; + } + return false; + } + + public void start() { + // do nothing for now. + } +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerService.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerService.java new file mode 100644 index 0000000000..37b7ea227a --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerService.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.osoa.sca.ServiceReference; +import org.osoa.sca.annotations.Callback; +import org.osoa.sca.annotations.Remotable; +import org.osoa.sca.annotations.OneWay; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; + +/** + * The interface for the multiply service + */ +@Remotable +@Callback(WorkerServiceCallback.class) +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public interface WorkerService { + @OneWay + void compute(Job j); + + void start(); + + void stop(); + + // void addJobCompleteHandler(String triggerName, + // CallableReferenceImpl handle); + // void removeJobCompleteHandler(String triggerName); + /* The worker manager */ + void registerManager(CallableReferenceImpl wm); + + void registerSender(CallableReferenceImpl sender); + + // void init(Job nullJob); + @OneWay + void computeFirstTime(Job nullJob, + CallableReferenceImpl myReference); + +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java new file mode 100644 index 0000000000..6fb1278e5e --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.databinding.job.Job; +import org.osoa.sca.annotations.Remotable; + +@Remotable +public interface WorkerServiceCallback { + void receiveResult(Job resultType, boolean reuse, String workerName); +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java new file mode 100644 index 0000000000..2c9bf5ea48 --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.osoa.sca.ComponentContext; +import org.osoa.sca.RequestContext; +import org.osoa.sca.ServiceReference; +import org.osoa.sca.annotations.Callback; +import org.osoa.sca.annotations.Context; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Scope; +import org.osoa.sca.annotations.Service; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.*; + +/** + * An implementation of the worker service. + */ +@Service(WorkerService.class) +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +@Scope("COMPOSITE") +public abstract class WorkerServiceImpl implements WorkerService { + private Logger log = Logger.getLogger(this.getClass().getName()); + private WorkerServiceCallback workerServiceCallback; + @Context + protected ComponentContext workerContext; + @Context + protected RequestContext requestContext; + @Property + protected String workerName; + private CallableReferenceImpl managerReference = null; + + /* TODO add the triggers, but before ask */ + // protected Map triggers = new HashMap(); + public abstract ResultJob computeTask(Job job); + + private boolean stopped = false; + private CallableReferenceImpl serviceRef; + private CallableReferenceImpl senderService; + private WorkpoolService wp = null; + private WorkerManager manager = null; + + public void start() { + log.info("Starting worker..."); + stopped = false; + serviceRef = (CallableReferenceImpl) workerContext + .createSelfReference(WorkerService.class); + + } + + public void init(CallableReferenceImpl sender, Job nullJob) { + compute(nullJob); + } + + public void stop() { + stopped = true; + } + + @Callback + public void setWorkerServiceCallback( + WorkerServiceCallback workerServiceCallback) { + log.info("Setting worker callback"); + this.workerServiceCallback = workerServiceCallback; + } + + public void computeFirstTime(Job nullJob, + CallableReferenceImpl sender) { + senderService = sender; + wp = sender.getService(); + workWithCallable(nullJob); + } + + public void registerManager(CallableReferenceImpl wm) { + managerReference = wm; + manager = managerReference.getService(); + + } + + public void registerSender(CallableReferenceImpl sender) { + log.info("Registering sender.."); + senderService = sender; + wp = sender.getService(); + } + + private void workWithInjection(Job j) { + log.info("Worker has received job"); + if (stopped) { + workerServiceCallback + .receiveResult(j, true, workerContext.getURI()); + if (managerReference != null) + manager.removeWorker(workerContext.getURI()); + } else if (j.eos()) { + if (managerReference != null) + manager.removeWorker(workerContext.getURI()); + } + if (j instanceof NullJob) { + workerServiceCallback.receiveResult(j, false, workerContext + .getURI()); + } else { + workerServiceCallback.receiveResult(computeTask(j), false, + workerContext.getURI()); + } + } + + private void workWithCallable(Job j) { + log.info("Worker " + workerContext.getURI() + + " has received job with eos --> " + j.eos()); + if (stopped) { + wp.handleResult(j, true, workerContext.getURI(), serviceRef, false); + return; + } + if (j.eos()) { + log.info("Got poison token..."); + if (managerReference != null) { + log.info("Removing component " + workerContext.getURI()); + manager.removeWorker(workerContext.getURI()); + + } + return; + } + if (j.getType() != Job.NULL_JOB) { + wp.handleResult(computeTask(j), false, workerContext.getURI(), + serviceRef, false); + } else { + log.info("Got a null job"); + wp.handleResult(j, false, workerContext.getURI(), serviceRef, true); + } + } + + public void compute(Job j) { + + if (senderService != null) { + log.info("Computing job using callable reference method"); + workWithCallable(j); + + } else { + log.info("Computing job using reference injection method"); + workWithInjection(j); + + } + } + /* + * public void addJobCompleteHandler(String triggerName, + * CallableReferenceImpl handle) { if + * (!triggers.containsKey(triggerName)) { triggers.put(triggerName, + * handle.getService()); } } public void removeJobCompleteHandler(String + * triggerName) { if (!triggers.containsKey(triggerName)) { + * triggers.remove(triggerName); } } + */ +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java new file mode 100644 index 0000000000..80c093ff1c --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.beans.*; +import java.util.Vector; +import java.util.logging.*; + +public class WorkpoolBean { + private Vector listeners = new Vector(); + double loadAverage = 0; + int nodeNumbers = 0; + int workers = 0; + int estimedQueueSize = 0; + double averageServiceTime = 0; + double averageArrivalTime = 0; + double usageFactor = 0; + private final PropertyChangeSupport changes = new PropertyChangeSupport( + this); + long jobComputed = 0; + boolean singleAction = false; + private Logger log = Logger.getLogger(WorkpoolBean.class.getName()); + + public void setNodeNumbers(int n) { + this.nodeNumbers = n; + } + + public void setWorkers(int w) { + this.workers = w; + } + + public void setLoadAverage(double loadAverage) { + this.loadAverage = loadAverage; + } + + public void setAverageServiceTime(double service) { + this.averageServiceTime = service; + } + + public void setAverageArrivalTime(double service) { + this.averageArrivalTime = service; + } + + public double getAverageArrivalTime() { + return this.averageArrivalTime; + } + + public double getUtilizationFactor() { + return usageFactor; + } + + public void setUsageFactor() { + usageFactor = averageServiceTime / averageArrivalTime; + } + + public void setEstimedQueueSize(int size) { + estimedQueueSize = size; + } + + public int getEstimedQueueSize() { + return estimedQueueSize; + } + + public double getLoadAverage() { + return this.loadAverage; + } + + public int getWorkers() { + return this.workers; + } + + public int getNodeNumbers() { + return this.nodeNumbers; + } + + public double getAverageServiceTime() { + return this.averageServiceTime; + } + + public void addPropertyChangeListener(final PropertyChangeListener l) { + this.changes.addPropertyChangeListener(l); + } + + public void removePropertyChangeListener(final PropertyChangeListener l) { + this.changes.removePropertyChangeListener(l); + } + + private synchronized void fireWorkpoolEvent(WorkpoolEvent ev) { + for (WorkpoolBeanListener l : listeners) { + l.handleEvent(new WorkpoolEvent(ev)); + } + } + + public void addWorkersToNode(int k, String nodeName) { + log.info("Adding a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER, k, nodeName); + fireWorkpoolEvent(ev); + } + + public void addWorkerToNode(String nodeName) { + log.info("Adding a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.SINGLE_ADD_WORKER, 1, nodeName); + fireWorkpoolEvent(ev); + } + + public void removeWorkersToNode(int k, String nodeName) { + log.info("Removing a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER, k, nodeName); + fireWorkpoolEvent(ev); + } + + public void removeWorkerToNode(String nodeName) { + log.info("Removing a worker to node " + nodeName); + WorkpoolEvent ev = new WorkpoolEvent(this, + WorkpoolEvent.SINGLE_REMOVE_WORKER, 1, nodeName); + fireWorkpoolEvent(ev); + } + + public synchronized void addListener(WorkpoolBeanListener l) { + this.listeners.add(l); + } + + public synchronized void removeListener(WorkpoolBeanListener l) { + this.listeners.remove(l); + } + + public void setJobComputed(long jobComputed) { + this.jobComputed = jobComputed; + + } + + public void setSingleAction() { + singleAction = true; + } + + public boolean getSingleAction() { + return singleAction; + } + + public long getJobComputed() { + return this.jobComputed; + } +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java new file mode 100644 index 0000000000..0ecc223fed --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.EventListener; + +public interface WorkpoolBeanListener extends EventListener { + public void handleEvent(WorkpoolEvent ev); +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java new file mode 100644 index 0000000000..0bdc3671d5 --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.EventObject; + +public class WorkpoolEvent extends EventObject { + + private static final long serialVersionUID = -1273928009411948768L; + + public WorkpoolEvent(Object source) { + super(source); + } + + public WorkpoolEvent(WorkpoolEvent ev) { + super(ev.source); + type = ev.type; + noWorker = ev.noWorker; + nodeName = ev.nodeName; + } + + public WorkpoolEvent(Object source, int typeEv, int worker) { + super(source); + type = typeEv; + noWorker = worker; + nodeName = ""; + } + + public WorkpoolEvent(Object source, int typeEv, int worker, String nodeName) { + super(source); + type = typeEv; + noWorker = worker; + this.nodeName = nodeName; + } + + public String getNodeName() { + return nodeName; + } + + public int getType() { + return type; + } + + public int workers() { + return noWorker; + } + + private int type; + private int noWorker; + private String nodeName; + public static final int EVENT_MULTIPLE_ADD_WORKER = 0; + public static final int EVENT_MULTIPLE_REMOVE_WORKER = 1; + public static final int SINGLE_REMOVE_WORKER = 2; + public static final int SINGLE_ADD_WORKER = 3; +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java new file mode 100644 index 0000000000..6520954bdd --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.osoa.sca.ServiceReference; +import org.osoa.sca.annotations.OneWay; +import org.osoa.sca.annotations.Remotable; + +@Remotable +public interface WorkpoolManager { + /* + * @param String rules This are the autonomic rules. The format is the Java + * Drools .drl file. You have to read it + */ + @OneWay + void acceptRules(String rules); + + @OneWay + void start(); + + @OneWay + void stopAutonomicCycle(); + + @OneWay + void startAutonomicCycle(); + + int activeWorkers(); + + void setCycleTime(long time); + + void setWorkpoolReference(ServiceReference serviceReference); +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java new file mode 100644 index 0000000000..f7c727ad04 --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.osoa.sca.ComponentContext; +import org.osoa.sca.ServiceReference; +import java.util.Collections; +import java.util.Enumeration; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Collection; +import java.util.Iterator; +import java.util.Timer; +import java.util.TimerTask; +import java.util.logging.Logger; + +import javax.xml.stream.XMLStreamException; + +import node.TestJob; +import java.io.File; +import java.util.Vector; +import org.apache.axiom.om.OMElement; +import org.apache.tuscany.sca.contribution.service.ContributionResolveException; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.core.context.ServiceReferenceImpl; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.node.NodeManagerInitService; +import org.apache.tuscany.sca.node.SCANode; +import org.apache.tuscany.sca.node.impl.SCANodeImpl; +import org.osoa.sca.CallableReference; +import org.drools.FactHandle; +import org.drools.RuleBase; +import org.drools.RuleBaseFactory; +import org.drools.StatefulSession; +import org.drools.StatelessSession; +import org.drools.compiler.DroolsParserException; +import org.drools.compiler.PackageBuilder; +import org.drools.rule.Package; +import org.osoa.sca.annotations.Constructor; +import org.osoa.sca.annotations.Context; +import org.osoa.sca.annotations.Destroy; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Reference; +import org.osoa.sca.annotations.Scope; +import org.osoa.sca.annotations.Service; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +@Service(interfaces = { NodeManagerInitService.class, WorkpoolManager.class }) +@Scope("COMPOSITE") +/* + * This is the core manager of the workpool application. The Workpool Manager + * holds the reference to each remote node manager. Inside it we've a rule + * engine instance. + */ +public class WorkpoolManagerImpl implements WorkpoolManager, + NodeManagerInitService, WorkpoolBeanListener { + /* + * This inner class trigs the rule engine, at given times: 1. It checks the + * different loads for each nodes and sets the WorkpoolBean 2. It checks the + * Workpool AverageService Time and sets the WorkpoolBean 3. It checks how + * many jobs are already computed and sets the WorkpoolBean Then given the + * configured bean and the rules, run the Rule Engine for executing the + * business logic + */ + class RuleEngineTrigger extends TimerTask { + // private ReentrantLock triggerLock = new ReentrantLock(); + @Override + public void run() { + + System.out.println("Updating WorkpoolBean.."); + // checkActiveWorkers(); + // checkLoadInNodes(); + checkServiceTime(); + // checkEstimedQueueSize(); + // checkArrivalTime(); + getProcessedItem(); + // computeUsageFactor(); + doRun(bean); + } + + } + + private WorkerManager managerNodeB; + private WorkerManager managerNodeC; + private WorkerManager managerNodeD; + private WorkerManager managerNodeE; + + private SCANodeImpl node; + private WorkpoolBean bean = new WorkpoolBean(); + private ReentrantLock handleEventLock = new ReentrantLock(); + private ReentrantLock updateRuleLock = new ReentrantLock(); + + private ServiceReference reference; + private AtomicInteger activeWorkers = new AtomicInteger(0); + private Logger log = Logger.getLogger(WorkpoolManagerImpl.class.getName()); + @Property + protected String workers; + @Property + protected String nodes; + @Property + protected String injection; + @Context + protected ComponentContext workpoolManagerContext; + private CallableReferenceImpl myReference; + private String rules = null; + private boolean referenceInjection = false; + private ConcurrentHashMap workerManagerTable = new ConcurrentHashMap(); + private int workersNo; + private int nodesNo; + private Timer timer = new Timer(); + /* this handle facts */ + private RuleBase ruleBase = null; + private FactHandle handle = null; + private StatefulSession wm = null; + private long cycleTime = 5000; + + @Reference + public void setManagerNodeB(WorkerManager managerNodeB) { + this.managerNodeB = managerNodeB; + workerManagerTable.put("nodeB", managerNodeB); + } + + @Reference + public void setManagerNodeC(WorkerManager managerNodeC) { + this.managerNodeC = managerNodeC; + workerManagerTable.put("nodeC", managerNodeC); + } + + @Reference + public void setManagerNodeD(WorkerManager managerNodeD) { + this.managerNodeD = managerNodeD; + workerManagerTable.put("nodeD", managerNodeD); + } + + @Reference + public void setManagerNodeE(WorkerManager managerNodeE) { + this.managerNodeE = managerNodeE; + workerManagerTable.put("nodeE", managerNodeE); + } + + private void startNewComponents( + Vector> vector) { + log.info("Starting new components"); + WorkpoolService wp = reference.getService(); + // CallableReferenceImpl sink = + // (CallableReferenceImpl) reference; + Job j = new NullJob(); + for (CallableReferenceImpl item : vector) { + // WorkerService service = item.getService(); + // service.start(); + // service.computeFirstTime(j, sink); + log.info("Send PostWorkerReference..."); + wp.PostWorkerReference(item); + } + if (myReference != null) + wp.registerManager(myReference); + } + + public void setCycleTime(long cycle) { + this.cycleTime = cycle; + } + + @SuppressWarnings("unchecked") + /* + * This gets the number of workers workerNo and instantiates them + */ + public void start() { + this.myReference = (CallableReferenceImpl) workpoolManagerContext + .createSelfReference(WorkpoolManager.class, "WorkpoolManager"); + this.workersNo = Integer.parseInt(this.workers); + this.nodesNo = Integer.parseInt(this.nodes); + this.referenceInjection = (Integer.parseInt(this.injection) != 0); + log.info("Starting WorkpoolManager Component with #" + workersNo + + " workers and #" + nodes + " nodes"); + nodesNo = workerManagerTable.values().size(); + // Sets info in the bean. + bean.setWorkers(this.workersNo); + bean.setNodeNumbers(nodesNo); + Vector> workerRefs = new Vector>(); + int exactTimes = workersNo / nodesNo; + for (int i = 0; i < exactTimes; ++i) { + for (WorkerManager manager : workerManagerTable.values()) { + manager.start(); + if (manager != null) { + System.err.println("Actual load = " + + manager.getNodeLoad() + " for node "); + addNewComponent(manager, workerRefs); + } + } + } + + int module = (workersNo % nodesNo); + int n = 0; + if (module > 0) { + Vector v = new Vector(workerManagerTable.keySet()); + Collections.sort(v); + // Iterator iter = + // workerManagerTable.values().iterator(); + // Display (sorted) hashtable. + for (Enumeration e = v.elements(); (e.hasMoreElements() && n < module); ++n) { + String key = e.nextElement(); + WorkerManager m = workerManagerTable.get(key); + System.err.println("Module Actual load = " + m.getNodeLoad() + + " for node "); + addNewComponent(m, workerRefs); + } + } + startNewComponents(workerRefs); + bean.addListener(this); + TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger(); + timer.scheduleAtFixedRate(task, 3000, cycleTime); + } + + private void checkLoadInNodes() { + System.out.println("CheckLoadInNodes"); + int number = 1; + double loadAverage = 0; + for (WorkerManager manager : workerManagerTable.values()) { + loadAverage += manager.getNodeLoad(); + number++; + } + bean.setLoadAverage(loadAverage / number); + } + + private void computeUsageFactor() { + bean.setUsageFactor(); + } + + private void checkEstimedQueueSize() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + int size = wp.estimatedQueueSize(); + log.info("Estimed Queue Size =" + size); + bean.setEstimedQueueSize(size); + } + } + + private WorkerManager findMinLoad() { + double load = 0; + // workerManagerTable.values().iterator().next().getNodeLoad(); + WorkerManager toFind = null; + for (WorkerManager manager : workerManagerTable.values()) { + if (load == 0) { + load = manager.getNodeLoad(); + toFind = manager; + } else if (manager.getNodeLoad() < load) { + load = manager.getNodeLoad(); + toFind = manager; + } + } + return toFind; + } + + private void checkServiceTime() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + double time = wp.getServiceTime(); + log.info("Average System Service Time =" + time); + bean.setAverageServiceTime(time); + } + } + + private void checkArrivalTime() { + WorkpoolService wp = reference.getService(); + + if (wp != null) { + double time = wp.getArrivalTime(); + log.info("Average Arrival Service Time =" + time); + bean.setAverageArrivalTime(time); + } + } + + private void checkActiveWorkers() { + bean.setWorkers(this.activeWorkers()); + } + + private void getProcessedItem() { + WorkpoolService wp = reference.getService(); + if (wp != null) { + long computed = wp.getJobComputed(); + log.info("The system has already computed " + computed + " jobs"); + bean.setJobComputed(computed); + } + } + + private boolean removeComponent(WorkerManager manager, int k) { + manager.removeWorkers(k); + activeWorkers.decrementAndGet(); + return true; + } + + @SuppressWarnings("unchecked") + private boolean addNewComponent(WorkerManager manager, + Vector> workerRefs) { + CallableReferenceImpl workerReference = (CallableReferenceImpl) manager + .addWorker(); + + if (workerReference != null) { + /* if i'll decide to use dynamically generated references */ + if (referenceInjection) { + workerReference.getService(); + String uri = workerReference.getEndpointReference().getURI(); + int nameIndex = uri.indexOf("/"); + String componentName = uri.substring(0, nameIndex); + if (componentName.startsWith("/")) + componentName = uri.substring(1, uri.length()); + if (componentName.endsWith("/")) + componentName = uri.substring(0, uri.length() - 1); + // String componentName = uri.substring(0, nameIndex-1); + + log.info("Adding wire from WorkpoolComponentService to " + + componentName); + String referenceName = "ref" + componentName; + + /* + * I'm updating the WorkpoolServiceComponent with a new + * reference to a just created component I assume that the + * WorkpoolManagerService and the WorkpoolServiceComponent stay + * in the same JVM It's like in the scdl there were: With this then + * I've a wire WorkpoolService---> a new Worker + */ + try { + node.addComponentReferenceWire(referenceName, "nodeA", + "Workpool.composite", "workpool.WorkerServiceImpl", + WorkerService.class, "WorkpoolServiceComponent", + componentName); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + log.info("Sending reference name " + referenceName + + " to WorkpoolService"); + // TODO: this was part of dynamic wiring, but it doesn't work. + // reference.getService().PostWorkerName(referenceName); + + } else { + // log.info("Sending callable reference to WorkpoolService + // placed at -->"+reference); + // reference.getService().PostWorkerReference(workerReference); + workerRefs.add(workerReference); + } + activeWorkers.incrementAndGet(); + return true; + } + return false; + } + + public int activeWorkers() { + + return activeWorkers.get(); + } + + private void doRun(WorkpoolBean bean) { + + long startTime = System.currentTimeMillis(); + updateRuleLock.lock(); + if (wm == null) + wm = ruleBase.newStatefulSession(); + if (this.handle == null) + handle = wm.insert(bean); + else { + wm.update(handle, bean); + } + wm.fireAllRules(); + updateRuleLock.unlock(); + + System.out.println("Engine rule overhead = " + + (System.currentTimeMillis() - startTime)); + } + + private RuleBase readRule(String rule) { + + PackageBuilder packBuilder = new PackageBuilder(); + try { + packBuilder.addPackageFromDrl(new StringReader(rule)); + } catch (DroolsParserException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + Package pkg = packBuilder.getPackage(); + RuleBase ruleBase = RuleBaseFactory.newRuleBase(); + try { + ruleBase.addPackage(pkg); + } catch (Exception e) { + e.printStackTrace(); + } + return ruleBase; + } + + public void acceptRules(String rules) { + this.rules = rules; + if (ruleBase == null) { + RuleBase base = readRule(rules); + if (base != null) { + ruleBase = base; + } + } else { + updateRuleLock.lock(); + // i have already a rule: updating + ruleBase = readRule(rules); + wm = ruleBase.newStatefulSession(); + handle = null; + updateRuleLock.unlock(); + } + + System.out.println("Accepted rules = " + rules); + } + + public String getRules() { + return rules; + } + + private WorkerManager findMaxLoadNode() { + double load = 0.0; + WorkerManager toFind = null; + for (WorkerManager manager : workerManagerTable.values()) { + if (manager.getNodeLoad() > load) { + load = manager.getNodeLoad(); + toFind = manager; + } + } + return toFind; + + } + + public void setWorkpoolReference( + ServiceReference serviceReference) { + reference = serviceReference; + } + + public void setNode(SCANode arg0) { + node = (SCANodeImpl) arg0; + } + + public void handleEvent(WorkpoolEvent ev) { + if (ev == null) + return; + + String nodeName = ev.getNodeName(); + + switch (ev.getType()) { + case WorkpoolEvent.SINGLE_ADD_WORKER: { + if (nodeName != null) { + Vector> workerRefs = new Vector>(); + + // in this case I have a nodeName + if (!nodeName.equals("") + && (workerManagerTable.containsKey(nodeName))) { + WorkerManager manager = workerManagerTable.get(nodeName); + addNewComponent(manager, workerRefs); + startNewComponents(workerRefs); + } else if (nodeName.equals("")) { + WorkerManager manager = findMinLoad(); + addNewComponent(manager, workerRefs); + startNewComponents(workerRefs); + } + } + break; + } + case WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER: { + Vector> workerRefs = new Vector>(); + + if (nodeName.equals("")) { + + WorkerManager manager = findMinLoad(); + int k = ev.workers(); + for (int h = 0; h < k; ++h) { + addNewComponent(manager, workerRefs); + } + } else { + WorkerManager manager = workerManagerTable + .get(ev.getNodeName()); + int k = ev.workers(); + for (int h = 0; h < k; ++h) { + addNewComponent(manager, workerRefs); + } + } + startNewComponents(workerRefs); + break; + } + case WorkpoolEvent.SINGLE_REMOVE_WORKER: { + if (nodeName != null) { + // in this case I have a nodeName + if (!nodeName.equals("") + && (workerManagerTable.containsKey(nodeName))) { + WorkerManager manager = workerManagerTable.get(nodeName); + removeComponent(manager, 1); + } else if (nodeName.equals("")) { + WorkerManager manager = findMaxLoadNode(); + removeComponent(manager, 1); + } + } + break; + } + case WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER: { + if (nodeName.equals("")) { + WorkerManager manager = findMaxLoadNode(); + removeComponent(manager, ev.workers()); + + } else { + WorkerManager manager = workerManagerTable.get(nodeName); + removeComponent(manager, ev.workers()); + } + break; + } + } + + } + + @Destroy + public void onExit() { + // do cleanup + this.timer.cancel(); + this.timer.purge(); + } + + public void stopAutonomicCycle() { + this.timer.cancel(); + this.timer.purge(); + this.timer = null; + } + + public void startAutonomicCycle() { + if (this.timer == null) { + this.timer = new Timer(); + TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger(); + timer.schedule(task, 3000, cycleTime); + } + } +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java new file mode 100644 index 0000000000..d84ae549d8 --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.apache.tuscany.sca.databinding.job.Job; +import org.osoa.sca.annotations.OneWay; +import org.osoa.sca.annotations.Remotable; +import org.osoa.sca.ServiceReference; + +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +@Remotable +public interface WorkpoolService { + + /* this the functional part */ + void submit(Job i); + + /* the time between two subsequent worker invocations */ + double getServiceTime(); + + /* the number of ResultJob received */ + long getJobComputed(); + + /* the time elapsed between the stream has initiated and now */ + long getElapsedTime(); + + /* the size of the internal queue : it's not accurate */ + int estimatedQueueSize(); + + /* the average time between two consuecutive submit */ + double getArrivalTime(); + + void start(); + + void stop(); + + /* + * this is the part needed by management. May be in future i'll refactor it + * order to hide this part. + */ + @OneWay + void handleResult(Job j, boolean reuse, String string, + CallableReferenceImpl worker, boolean newJob); + + void addTrigger(CallableReferenceImpl reference); + + void removeTrigger(); + + void registerManager( + CallableReferenceImpl createSelfReference); + + /* + * This could placed in another interface definition - think about it These + * methods evict, and evictAll are needed when a worker finish to exist and + * it needs to be evicted by the WorkpoolManager. In the system I have two + * caches: 1) a domain cache, which holds the components URI 2) a + * workerReference cache (implemented by a ConcurrentHashMap), which holds a + * proxy to each worker. Every proxy gets built from the worker callable + * reference. I'm thinking for placing the workerReferenceCache in a local + * interface. Assuming that WorkpoolService and WorkpoolManager are in the + * same JVM. + */ + void evict(String workerURI); + + void evictAll(); + + /* + * these two are no longer needed. I leave it because if i'll have time to + * do dynamic wiring the first one is needed. void PostWorkerName(String + * referenceName); + */ + void PostWorkerReference(CallableReferenceImpl worker); + +} diff --git a/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java new file mode 100644 index 0000000000..52b88af42c --- /dev/null +++ b/branches/trunk-20080910/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package workpool; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; +import org.apache.tuscany.sca.core.context.CallableReferenceImpl; +import org.apache.tuscany.sca.databinding.annotation.DataBinding; +import org.osoa.sca.ComponentContext; +import org.osoa.sca.annotations.Context; +import org.osoa.sca.annotations.Scope; +import org.osoa.sca.annotations.Service; +import org.apache.tuscany.sca.databinding.job.Job; +import org.apache.tuscany.sca.databinding.job.JobDataMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * An implementation of the Workpool service. + */ +@Service(WorkpoolService.class) +@Scope("COMPOSITE") +@DataBinding("org.apache.tuscany.sca.databinding.job.Job") +public class WorkpoolServiceImpl implements WorkpoolService, + WorkerServiceCallback { + + /* incoming job queue */ + private LinkedBlockingQueue queue = new LinkedBlockingQueue(5000); + private CallableReferenceImpl trigger = null; + private Trigger forwardResult = null; + /* counter for job's number fetched from the queue and sent to the Worker */ + private AtomicInteger jobSent = new AtomicInteger(0); + /* time for initHandleResult */ + private AtomicLong initHandleResult = new AtomicLong(0); + /* time for endHandleResult */ + private AtomicLong endHandleResult = new AtomicLong(0); + /* + * number of job computed, this will be exposed in order to be used to + * firing rules + */ + private long jobComputed = 0; + /* same as above */ + private AtomicLong elapsedTime = new AtomicLong(0); + /* this is for comuputing averageServiceTime */ + private long times = 1; + /* this is for computing averageArrivalTime */ + private long timesArrival = 1; + private ReentrantLock arrivalLock = new ReentrantLock(); + private long arrivalPrevious = -1; + // private AtomicBoolean processingStopped = new AtomicBoolean(false); + private boolean processingStopped = false; + // private LinkedBlockingQueue triggers = new + // LinkedBlockingQueue(); + @Context + protected ComponentContext workpoolContext; + private CallableReferenceImpl manager; + private long previousSubmitTime = -1; + private boolean firstTime = true; + private boolean first = true; + private long start = 0; + private long end = 0; + private double averageServiceTime = 0; + private double averageArrivalTime = 0; + private int workersNo = 0; + private final Job nullJob = new NullJob(); + /* This is useful for counting the start and end */ + private Logger log = Logger.getLogger(WorkpoolServiceImpl.class.getName()); + private ReentrantLock handleResultLock = new ReentrantLock(); + private ReentrantLock postWorkerReferenceLock = new ReentrantLock(); + private ConcurrentHashMap cacheReference = new ConcurrentHashMap(); + private CallableReferenceImpl myReference; + private String previuosURI = ""; + private long time = 0; + + private void computeAverageTime() { + long actualServiceTime = 0; + // if the processing is finished + if (processingStopped) + return; + + if (firstTime == true) { + this.previousSubmitTime = System.currentTimeMillis(); + this.averageServiceTime = 0; + firstTime = false; + } else { + actualServiceTime = System.currentTimeMillis() + - this.previousSubmitTime; + this.previousSubmitTime = System.currentTimeMillis(); + averageServiceTime = ((averageServiceTime * times) + actualServiceTime) + / (times + 1); + ++times; + } + } + + public void submit(Job j) { + try { + // log.info("Submit job in queue -->"+ j.getType()); + // processingStopped.set(false); + try { + arrivalLock.lock(); + if (this.arrivalPrevious == -1) { + arrivalPrevious = System.currentTimeMillis(); + averageArrivalTime = 0; + } + double actualArrivalTime = System.currentTimeMillis() + - arrivalPrevious; + averageArrivalTime = ((averageArrivalTime * timesArrival) + actualArrivalTime) + / (timesArrival + 1); + arrivalPrevious = System.currentTimeMillis(); + ++timesArrival; + } finally { + arrivalLock.unlock(); + } + queue.put(j); + } catch (Exception e) { + log.info("Exception in queue"); + queue.clear(); + e.printStackTrace(); + } + } + + public double getArrivalTime() { + return this.averageArrivalTime; + } + + public double getServiceTime() { + return this.averageServiceTime; + } + + public void receiveResult(Job resultType, boolean reuse, String workerURI) { + + if (reuse) { + queue.add(resultType); + return; + } + + computeAverageTime(); + Job job = null; + try { + job = queue.take(); + } catch (InterruptedException e) { + // TODO Better exception handling --> see Exception antipattern doc + e.printStackTrace(); + return; + } + + if ((job != null) && (job.eos() == false)) { + int nameIndex = workerURI.indexOf("/"); + String workerName = workerURI.substring(0, nameIndex - 1); + log.info("Sending job to worker --> " + workerName); + WorkerService worker = workpoolContext.getService( + WorkerService.class, workerName); + worker.compute(job); + } + + JobDataMap map = ((ResultJob) resultType).getDataMap(); + if (map != null) { + ++jobComputed; + Object obj = map.getJobDataObject("result"); + System.out.println("Result = " + ((Double) obj).doubleValue()); + } + + } + + public void start() { + log.info("WorkpoolServiceComponent started..."); + myReference = (CallableReferenceImpl) workpoolContext + .createSelfReference(WorkpoolService.class, "WorkpoolService"); + myReference.getService(); + } + + /* + * + * This method is called by WorkpoolManagerImpl, when it creates a new + * worker component in order to dispatch worker to the WorkpoolServiceImpl + * @param CallableReferenceImpl reference - a dynamically created reference + * from the Worker + */ + public void PostWorkerReference( + CallableReferenceImpl reference) { + + try { + long initPostWorkerReference; + long endPostWorkerReference; + this.postWorkerReferenceLock.lock(); + + initPostWorkerReference = System.currentTimeMillis(); + WorkerService worker; + worker = reference.getService(); + worker.start(); + + ++workersNo; + if (myReference != null) { + + // Job poison = new ResultJob(); + this.postWorkerReferenceLock.unlock(); + log.info("Sending null job to worker"); + worker.computeFirstTime(nullJob, myReference); + // queue.put(poison); + endPostWorkerReference = System.currentTimeMillis(); + System.out.println("Time PostWorker =" + + (endPostWorkerReference - initPostWorkerReference)); + } else { + log.info("myReference is null"); + + } + } catch (Exception e) { + postWorkerReferenceLock.unlock(); + } finally { + } + + } + + /* + * FIXME This method currently is not used because i've not yet ready + * dynamic wire injection + */ + + public void PostWorkerName(String referenceName) { + /* TODO Do something similar to PostWorkerReference */ + } + + private void printComputingTime(Job j) { + + if (first == true) { + first = false; + start = System.currentTimeMillis(); + end = System.currentTimeMillis(); + } else { + end = System.currentTimeMillis(); + System.out.println("Elapsed Time = " + (end - start)); + elapsedTime.set(end - start); + } + /* + * i could use reflection or instance of (but it's a penalty kick) , or + * an object as result, but i'd prefer a job so i've defined a + * RESULT_JOB There're in the system three kind of jobs: RESULT_JOB, + * NULL_JOB, DEFAULT_JOB + */ + if ((j != null) && (j.getType() == Job.RESULT_JOB)) { + jobComputed++; + ResultJob result = (ResultJob) j; + JobDataMap map = result.getDataMap(); + if (map != null) { + Double doubleValue = (Double) map.getJobDataObject("result"); + System.out + .println("ResultValue = " + doubleValue.doubleValue()); + } + + } + + } + + public void handleResult(Job resultType, boolean reuse, String workerURI, + CallableReferenceImpl worker, boolean newWorker) { + initHandleResult.set(System.nanoTime()); + if (reuse) { + log.info("Reusing a job.."); + queue.add(resultType); + return; + } + // init job variable + Job job; + if (newWorker) + System.out.println("newWorkerActivation= " + System.nanoTime()); + printComputingTime(resultType); + + try { + job = queue.take(); + } catch (Exception e) { + log.info("Exception during fetching the queue"); + e.printStackTrace(); + return; + } + + try { + // it needs to be locked because multiple threads could invoke this. + handleResultLock.lock(); + if (previuosURI.equals("")) { + time = System.currentTimeMillis(); + this.previuosURI = workerURI; + } else { + if (previuosURI.equals(workerURI)) + System.out.println("Complete ComputeTime for an item =" + + (time - System.currentTimeMillis())); + } + if (job.eos()) { + long endTime = System.currentTimeMillis(); + /* checking for EOS */ + if (processingStopped == false) { + processingStopped = true; + System.out.println("GOT EOS in time=" + (endTime - start)); + log.info("Stop autonomic cycle.."); + /* + * I'm doing this because i want that in the termination i + * would have more jobs with eos == true than workers. So + * i'm sure that every worker removes itself from its + * manager. I do it only one time. This is necessary because + * i have a variable number of workers. The number of + * workers in the system might change every time the rule + * engine cycle gets executed. + */ + ResultJob poison = new ResultJob(); + for (int i = 0; i < workersNo; ++i) { + try { + + queue.put(poison); + + } catch (Exception e) { + log.info("Cannot duplicate poison tokens"); + break; + } + + } + manager.getService().stopAutonomicCycle(); + } + } + computeAverageTime(); + System.out.println("AverageTime =" + averageServiceTime); + if (job != null) { + + WorkerService workerService; + /* + * the workpool has a high reuse, i always call the same + * component set or un superset or subset, so i cache it. When + * the WorkpoolManager will remove an item, it removes still + * this cache entry + */ + if (!cacheReference.containsKey(workerURI)) { + workerService = worker.getService(); + handleResultLock.unlock(); + cacheReference.put(workerURI, workerService); + } else { + handleResultLock.unlock(); + workerService = cacheReference.get(workerURI); + } + // it's still a penalty kick locking compute because it's going + // to be scheduled whereas it's async. + workerService.compute(job); + log.info("Sent job #" + jobSent.incrementAndGet() + + " Queue size " + queue.size()); + endHandleResult.set(System.nanoTime()); + System.out + .println("begin:handleResult ==> end:handleResult:compute = " + + (endHandleResult.addAndGet(-(initHandleResult + .get())) / 1000000)); + } + } catch (Exception e) { + handleResultLock.unlock(); + } + } + + public void evictAll() { + cacheReference.clear(); + } + + public void evict(String workerURI) { + if (cacheReference.containsKey(workerURI)) { + cacheReference.remove(workerURI); + } + + } + + public int estimatedQueueSize() { + return queue.size(); + } + + public long getElapsedTime() { + return elapsedTime.get(); + } + + public long getJobComputed() { + return jobComputed; + } + + public void registerManager( + CallableReferenceImpl createSelfReference) { + manager = createSelfReference; + + } + + public void stop() { + // TODO Auto-generated method stub + + } + + public void addTrigger(CallableReferenceImpl reference) { + this.trigger = reference; + this.forwardResult = reference.getService(); + + } + + public void removeTrigger() { + this.trigger = null; + this.forwardResult = null; + } +} -- cgit v1.2.3