summaryrefslogtreecommitdiffstats
path: root/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool')
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java85
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java46
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java43
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java54
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java29
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java31
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java213
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java56
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java27
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java171
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java162
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java25
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java71
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java48
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java555
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java91
-rw-r--r--sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java416
17 files changed, 2123 insertions, 0 deletions
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java
new file mode 100644
index 0000000000..cdd0f30b34
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/MetaComponentWorker.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.io.StringReader;
+import java.net.URI;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.logging.Logger;
+
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.tuscany.sca.assembly.MetaComponent;
+import org.apache.tuscany.sca.assembly.impl.DefaultMetaComponent;
+
+public class MetaComponentWorker extends DefaultMetaComponent {
+
+ private SecureRandom prng;
+ private String componentName;
+ private String scdl;
+ private String javaClass;
+ private boolean loadedFromString = false;
+ private Logger log = Logger.getLogger(MetaComponentWorker.class.getName());
+
+ public MetaComponentWorker() {
+ componentName = "WorkerComponent"
+ + java.util.UUID.randomUUID().toString();
+ }
+
+ public void setWorkerName(String componentName) {
+ this.componentName = componentName;
+ }
+
+ public void setWorkerClass(String javaClass) {
+ this.javaClass = javaClass;
+ }
+
+ private String generateSCDL() {
+ StringBuffer buffer = new StringBuffer(512);
+ buffer
+ .append("<component xmlns=\"http://www.osoa.org/xmlns/sca/1.0\" name=\"");
+ buffer.append(this.componentName);
+ buffer.append("\">\n");
+ buffer.append("<implementation.java class=\"");
+ buffer.append(this.javaClass);
+ buffer.append("\"/>");
+ buffer.append("<property name=\"workerName\">");
+ buffer.append(this.componentName);
+ buffer.append("</property>\n</component>");
+ return buffer.toString();
+ }
+
+ @Override
+ public XMLStreamReader build() throws Exception {
+ XMLInputFactory factory = XMLInputFactory.newInstance();
+ if (!loadedFromString)
+ scdl = generateSCDL();
+ return factory.createXMLStreamReader(new StringReader(scdl));
+
+ }
+
+ public String getName() {
+
+ return componentName;
+ }
+
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java
new file mode 100644
index 0000000000..c45696e3cf
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/MyWorker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.databinding.job.JobDataMap;
+import org.apache.tuscany.sca.databinding.job.JobExecutionContext;
+import org.apache.tuscany.sca.databinding.job.RemoteJob;
+import org.osoa.sca.annotations.Scope;
+
+@Scope("COMPOSITE")
+public class MyWorker extends WorkerServiceImpl<Object, Double> {
+ private static int resultcount = 0;
+
+ @Override
+ public ResultJob computeTask(Job<Object, Double> job) {
+
+ RemoteJob remoteJob = (RemoteJob) job;
+ System.out.println("Computing the job");
+ JobExecutionContext context = remoteJob.getContext();
+ ResultJob resultJob = new ResultJob();
+ JobDataMap resultMap = new JobDataMap();
+ resultMap.addJobData("result", remoteJob.compute(context));
+ resultJob.setJobDataMap(resultMap);
+ System.out.println("Count result = " + (++resultcount));
+ return resultJob;
+ }
+
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java
new file mode 100644
index 0000000000..fb930adf2e
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/NullJob.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.databinding.job.JobDataMap;
+
+public class NullJob implements Job, java.io.Serializable {
+
+ public Object compute(Object arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public JobDataMap getDataMap() {
+ return null;
+ }
+
+ public boolean eos() {
+ return false;
+ }
+
+ public int getType() {
+ return Job.NULL_JOB;
+ }
+
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java
new file mode 100644
index 0000000000..e04411668b
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/ResultJob.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.databinding.job.JobDataMap;
+import org.apache.tuscany.sca.databinding.job.JobExecutionContext;
+import org.apache.tuscany.sca.databinding.job.RemoteJob;
+
+public class ResultJob extends RemoteJob<Object> implements
+ java.io.Serializable {
+ private JobDataMap map;
+
+ public JobDataMap getDataMap() {
+ return map;
+ }
+
+ public void setJobDataMap(JobDataMap map) {
+ this.map = map;
+ }
+
+ public boolean eos() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+ public int getType() {
+ // TODO Auto-generated method stub
+ return Job.RESULT_JOB;
+ }
+
+ @Override
+ public Object compute(JobExecutionContext v) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java
new file mode 100644
index 0000000000..469675b19b
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/Trigger.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.databinding.annotation.DataBinding;
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.osoa.sca.annotations.Remotable;
+
+@Remotable
+@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
+public interface Trigger<T> {
+ void handleEvent(T c);
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java
new file mode 100644
index 0000000000..520203e190
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+import org.osoa.sca.annotations.Remotable;
+import org.osoa.sca.CallableReference;
+@Remotable
+public interface WorkerManager {
+ CallableReference<WorkerService> addWorker();
+ boolean removeWorker(String workerName);
+ boolean removeWorkers(int k);
+ boolean removeAllWorkers();
+ double getNodeLoad();
+ int activeWorkers();
+ void start();
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java
new file mode 100644
index 0000000000..d4337cad2f
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerManagerImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.tuscany.sca.assembly.Composite;
+import org.apache.tuscany.sca.contribution.Contribution;
+import org.apache.tuscany.sca.contribution.DeployedArtifact;
+import org.apache.tuscany.sca.contribution.service.impl.ContributionServiceImpl;
+import org.osoa.sca.CallableReference;
+import org.apache.tuscany.sca.node.management.SCANodeManagerInitService;
+import org.apache.tuscany.sca.node.SCANode;
+import org.apache.tuscany.sca.node.impl.SCANodeImpl;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.osoa.sca.ComponentContext;
+import org.osoa.sca.annotations.Context;
+import org.osoa.sca.annotations.Property;
+import org.osoa.sca.annotations.Scope;
+import org.osoa.sca.annotations.Service;
+import java.util.LinkedList;
+import java.util.ArrayList;
+
+@Scope("COMPOSITE")
+@Service(interfaces = { SCANodeManagerInitService.class, WorkerManager.class })
+public class WorkerManagerImpl implements WorkerManager, SCANodeManagerInitService {
+ private Logger log = Logger.getLogger(WorkerManagerImpl.class.getName());
+ private LinkedList<CallableReference<WorkerService>> activeWorkers = new LinkedList<CallableReference<WorkerService>>();
+ private List<String> workerComponentNames = new ArrayList<String>();
+ private SCANodeImpl node;
+ @Property
+ protected String nodeName;
+ @Property
+ protected String compositeName;
+ @Property
+ protected String workerClass;
+ @Context
+ protected ComponentContext context;
+ private double loadAverage;
+
+ /* This method is used to find a composite inside all deployed artifacts */
+ private Composite findComposite(List<Composite> artifacts) {
+ for (Composite fact : artifacts) {
+ log.info("Searching in a contribution deployed artifacts -"
+ + compositeName);
+ Composite augmented = (Composite) fact;
+ // found
+ if (augmented.getURI().equals(compositeName)) {
+ log.info("Found composite..." + compositeName);
+ return augmented;
+ }
+ }
+ }
+ return null;
+ }
+
+ public CallableReference<WorkerService> addWorker() {
+ log.info("Adding a new worker call..");
+ long addWorkerStartTime = System.nanoTime();
+ ContributionServiceImpl cServiceImpl = (ContributionServiceImpl) node.getContributionService();
+ Contribution contribution = cServiceImpl.getContribution(nodeName);
+ List<Composite> artifacts = contribution.getDeployables();
+ CallableReference<WorkerService> workerReference = null;
+ CallableReference<WorkerService> ref = null;
+ log.info("Creating a MetaComponentWorker..");
+ MetaComponentWorker mcw = new MetaComponentWorker();
+ boolean found = false;
+ mcw.setWorkerClass(workerClass);
+ Composite augmented = findComposite(artifacts);
+ try {
+ if (augmented != null) {
+ long startCreation = System.nanoTime();
+ node.addComponentToComposite(mcw, contribution.getURI(),
+ augmented.getURI());
+ System.out.println("addComponentToComposite time = "
+ + (System.nanoTime() - startCreation));
+ RuntimeComponent workerComponent = (RuntimeComponent) node
+ .getComponent(mcw.getName());
+ if (workerComponent != null) {
+ ref = (CallableReference<WorkerService>) workerComponent
+ .getComponentContext().createSelfReference(
+ WorkerService.class);
+ ref.getService().start();
+ activeWorkers.addLast(ref);
+ workerComponentNames.add(mcw.getName());
+ CallableReference<WorkerManager> manager = (CallableReference) context
+ .createSelfReference(WorkerManager.class,
+ "WorkerManager");
+ ref.getService().registerManager(manager);
+ return ref;
+ }
+ } else {
+ log.info("Workpool composite not found!");
+ }
+ } catch (Exception e) {
+ log.info("Exception activation");
+ e.printStackTrace();
+ }
+ ;
+ System.out.println("Component Creation Time ="
+ + (System.nanoTime() - addWorkerStartTime));
+ return ref;
+ }
+
+ public boolean removeAllWorkers() {
+ for (CallableReference<WorkerService> callable : activeWorkers) {
+ callable.getService().stop();
+ }
+ return true;
+ }
+
+ public boolean removeWorker() {
+ CallableReference<WorkerService> callable = activeWorkers
+ .removeLast();
+ callable.getService().stop();
+ return true;
+ }
+
+ public boolean removeWorkers(int k) {
+ if (k >= activeWorkers.size())
+ return false;
+ for (int i = 0; i < k; ++i) {
+ if (!removeWorker())
+ return false;
+ }
+ return true;
+ }
+
+ public void setNode(SCANode node) {
+ this.node = (SCANodeImpl) node;
+
+ }
+
+ public double getNodeLoad() {
+ /*
+ * FIXME [jo] this works only on Linux To be replaced with an JNI
+ * extension
+ */
+ RandomAccessFile statfile;
+
+ this.loadAverage = 1.0;
+ // load = 0;
+ int NoProcessors = 0;
+ String cpuLine = null;
+ try {
+ NoProcessors = Runtime.getRuntime().availableProcessors();
+ if (NoProcessors > 1)
+ this.loadAverage = 1 / (1.0 * NoProcessors);
+ statfile = new RandomAccessFile("/proc/loadavg", "r");
+ try {
+ statfile.seek(0);
+ cpuLine = statfile.readLine();
+
+ } catch (IOException e) {
+ // FIX ME: Better exception handling.
+ e.printStackTrace();
+ }
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (NumberFormatException e) {
+ e.printStackTrace();
+ }
+ double min1;
+ if (cpuLine != null) {
+ java.util.StringTokenizer st = new java.util.StringTokenizer(
+ cpuLine, " ");
+ min1 = Double.parseDouble(st.nextToken());
+ } else
+ min1 = 0;
+
+ return min1 * this.loadAverage;
+ }
+
+ public int activeWorkers() {
+ return activeWorkers.size();
+ }
+
+ public boolean removeWorker(String workerName) {
+ RuntimeComponent workerComponent = (RuntimeComponent) node
+ .getComponent(workerName);
+ if (workerComponent != null) {
+ log.info("Removing component " + workerName);
+ node.removeComponentFromComposite(nodeName, "Workpool.composite",
+ workerName);
+ return true;
+ }
+ return false;
+ }
+
+ public void start() {
+ // do nothing for now.
+ }
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java
new file mode 100644
index 0000000000..37b7ea227a
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.osoa.sca.ServiceReference;
+import org.osoa.sca.annotations.Callback;
+import org.osoa.sca.annotations.Remotable;
+import org.osoa.sca.annotations.OneWay;
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.databinding.annotation.DataBinding;
+import org.apache.tuscany.sca.databinding.job.Job;
+
+/**
+ * The interface for the multiply service
+ */
+@Remotable
+@Callback(WorkerServiceCallback.class)
+@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
+public interface WorkerService<T, E> {
+ @OneWay
+ void compute(Job<T, E> j);
+
+ void start();
+
+ void stop();
+
+ // void addJobCompleteHandler(String triggerName,
+ // CallableReferenceImpl<Trigger> handle);
+ // void removeJobCompleteHandler(String triggerName);
+ /* The worker manager */
+ void registerManager(CallableReferenceImpl<WorkerManager> wm);
+
+ void registerSender(CallableReferenceImpl<WorkpoolService> sender);
+
+ // void init(Job nullJob);
+ @OneWay
+ void computeFirstTime(Job nullJob,
+ CallableReferenceImpl<WorkpoolService> myReference);
+
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java
new file mode 100644
index 0000000000..6fb1278e5e
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceCallback.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.osoa.sca.annotations.Remotable;
+
+@Remotable
+public interface WorkerServiceCallback {
+ void receiveResult(Job resultType, boolean reuse, String workerName);
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java
new file mode 100644
index 0000000000..2c9bf5ea48
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkerServiceImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.osoa.sca.ComponentContext;
+import org.osoa.sca.RequestContext;
+import org.osoa.sca.ServiceReference;
+import org.osoa.sca.annotations.Callback;
+import org.osoa.sca.annotations.Context;
+import org.osoa.sca.annotations.Property;
+import org.osoa.sca.annotations.Scope;
+import org.osoa.sca.annotations.Service;
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.databinding.annotation.DataBinding;
+import org.apache.tuscany.sca.databinding.job.Job;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.*;
+
+/**
+ * An implementation of the worker service.
+ */
+@Service(WorkerService.class)
+@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
+@Scope("COMPOSITE")
+public abstract class WorkerServiceImpl<T, E> implements WorkerService<T, E> {
+ private Logger log = Logger.getLogger(this.getClass().getName());
+ private WorkerServiceCallback workerServiceCallback;
+ @Context
+ protected ComponentContext workerContext;
+ @Context
+ protected RequestContext requestContext;
+ @Property
+ protected String workerName;
+ private CallableReferenceImpl<WorkerManager> managerReference = null;
+
+ /* TODO add the triggers, but before ask */
+ // protected Map<String,Trigger> triggers = new HashMap<String,Trigger>();
+ public abstract ResultJob computeTask(Job<T, E> job);
+
+ private boolean stopped = false;
+ private CallableReferenceImpl<WorkerService> serviceRef;
+ private CallableReferenceImpl<WorkpoolService> senderService;
+ private WorkpoolService wp = null;
+ private WorkerManager manager = null;
+
+ public void start() {
+ log.info("Starting worker...");
+ stopped = false;
+ serviceRef = (CallableReferenceImpl) workerContext
+ .createSelfReference(WorkerService.class);
+
+ }
+
+ public void init(CallableReferenceImpl<WorkpoolService> sender, Job nullJob) {
+ compute(nullJob);
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ @Callback
+ public void setWorkerServiceCallback(
+ WorkerServiceCallback workerServiceCallback) {
+ log.info("Setting worker callback");
+ this.workerServiceCallback = workerServiceCallback;
+ }
+
+ public void computeFirstTime(Job nullJob,
+ CallableReferenceImpl<WorkpoolService> sender) {
+ senderService = sender;
+ wp = sender.getService();
+ workWithCallable(nullJob);
+ }
+
+ public void registerManager(CallableReferenceImpl<WorkerManager> wm) {
+ managerReference = wm;
+ manager = managerReference.getService();
+
+ }
+
+ public void registerSender(CallableReferenceImpl<WorkpoolService> sender) {
+ log.info("Registering sender..");
+ senderService = sender;
+ wp = sender.getService();
+ }
+
+ private void workWithInjection(Job j) {
+ log.info("Worker has received job");
+ if (stopped) {
+ workerServiceCallback
+ .receiveResult(j, true, workerContext.getURI());
+ if (managerReference != null)
+ manager.removeWorker(workerContext.getURI());
+ } else if (j.eos()) {
+ if (managerReference != null)
+ manager.removeWorker(workerContext.getURI());
+ }
+ if (j instanceof NullJob) {
+ workerServiceCallback.receiveResult(j, false, workerContext
+ .getURI());
+ } else {
+ workerServiceCallback.receiveResult(computeTask(j), false,
+ workerContext.getURI());
+ }
+ }
+
+ private void workWithCallable(Job j) {
+ log.info("Worker " + workerContext.getURI()
+ + " has received job with eos --> " + j.eos());
+ if (stopped) {
+ wp.handleResult(j, true, workerContext.getURI(), serviceRef, false);
+ return;
+ }
+ if (j.eos()) {
+ log.info("Got poison token...");
+ if (managerReference != null) {
+ log.info("Removing component " + workerContext.getURI());
+ manager.removeWorker(workerContext.getURI());
+
+ }
+ return;
+ }
+ if (j.getType() != Job.NULL_JOB) {
+ wp.handleResult(computeTask(j), false, workerContext.getURI(),
+ serviceRef, false);
+ } else {
+ log.info("Got a null job");
+ wp.handleResult(j, false, workerContext.getURI(), serviceRef, true);
+ }
+ }
+
+ public void compute(Job<T, E> j) {
+
+ if (senderService != null) {
+ log.info("Computing job using callable reference method");
+ workWithCallable(j);
+
+ } else {
+ log.info("Computing job using reference injection method");
+ workWithInjection(j);
+
+ }
+ }
+ /*
+ * public void addJobCompleteHandler(String triggerName,
+ * CallableReferenceImpl<Trigger> handle) { if
+ * (!triggers.containsKey(triggerName)) { triggers.put(triggerName,
+ * handle.getService()); } } public void removeJobCompleteHandler(String
+ * triggerName) { if (!triggers.containsKey(triggerName)) {
+ * triggers.remove(triggerName); } }
+ */
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java
new file mode 100644
index 0000000000..80c093ff1c
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBean.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.beans.*;
+import java.util.Vector;
+import java.util.logging.*;
+
+public class WorkpoolBean {
+ private Vector<WorkpoolBeanListener> listeners = new Vector<WorkpoolBeanListener>();
+ double loadAverage = 0;
+ int nodeNumbers = 0;
+ int workers = 0;
+ int estimedQueueSize = 0;
+ double averageServiceTime = 0;
+ double averageArrivalTime = 0;
+ double usageFactor = 0;
+ private final PropertyChangeSupport changes = new PropertyChangeSupport(
+ this);
+ long jobComputed = 0;
+ boolean singleAction = false;
+ private Logger log = Logger.getLogger(WorkpoolBean.class.getName());
+
+ public void setNodeNumbers(int n) {
+ this.nodeNumbers = n;
+ }
+
+ public void setWorkers(int w) {
+ this.workers = w;
+ }
+
+ public void setLoadAverage(double loadAverage) {
+ this.loadAverage = loadAverage;
+ }
+
+ public void setAverageServiceTime(double service) {
+ this.averageServiceTime = service;
+ }
+
+ public void setAverageArrivalTime(double service) {
+ this.averageArrivalTime = service;
+ }
+
+ public double getAverageArrivalTime() {
+ return this.averageArrivalTime;
+ }
+
+ public double getUtilizationFactor() {
+ return usageFactor;
+ }
+
+ public void setUsageFactor() {
+ usageFactor = averageServiceTime / averageArrivalTime;
+ }
+
+ public void setEstimedQueueSize(int size) {
+ estimedQueueSize = size;
+ }
+
+ public int getEstimedQueueSize() {
+ return estimedQueueSize;
+ }
+
+ public double getLoadAverage() {
+ return this.loadAverage;
+ }
+
+ public int getWorkers() {
+ return this.workers;
+ }
+
+ public int getNodeNumbers() {
+ return this.nodeNumbers;
+ }
+
+ public double getAverageServiceTime() {
+ return this.averageServiceTime;
+ }
+
+ public void addPropertyChangeListener(final PropertyChangeListener l) {
+ this.changes.addPropertyChangeListener(l);
+ }
+
+ public void removePropertyChangeListener(final PropertyChangeListener l) {
+ this.changes.removePropertyChangeListener(l);
+ }
+
+ private synchronized void fireWorkpoolEvent(WorkpoolEvent ev) {
+ for (WorkpoolBeanListener l : listeners) {
+ l.handleEvent(new WorkpoolEvent(ev));
+ }
+ }
+
+ public void addWorkersToNode(int k, String nodeName) {
+ log.info("Adding a worker to node " + nodeName);
+ WorkpoolEvent ev = new WorkpoolEvent(this,
+ WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER, k, nodeName);
+ fireWorkpoolEvent(ev);
+ }
+
+ public void addWorkerToNode(String nodeName) {
+ log.info("Adding a worker to node " + nodeName);
+ WorkpoolEvent ev = new WorkpoolEvent(this,
+ WorkpoolEvent.SINGLE_ADD_WORKER, 1, nodeName);
+ fireWorkpoolEvent(ev);
+ }
+
+ public void removeWorkersToNode(int k, String nodeName) {
+ log.info("Removing a worker to node " + nodeName);
+ WorkpoolEvent ev = new WorkpoolEvent(this,
+ WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER, k, nodeName);
+ fireWorkpoolEvent(ev);
+ }
+
+ public void removeWorkerToNode(String nodeName) {
+ log.info("Removing a worker to node " + nodeName);
+ WorkpoolEvent ev = new WorkpoolEvent(this,
+ WorkpoolEvent.SINGLE_REMOVE_WORKER, 1, nodeName);
+ fireWorkpoolEvent(ev);
+ }
+
+ public synchronized void addListener(WorkpoolBeanListener l) {
+ this.listeners.add(l);
+ }
+
+ public synchronized void removeListener(WorkpoolBeanListener l) {
+ this.listeners.remove(l);
+ }
+
+ public void setJobComputed(long jobComputed) {
+ this.jobComputed = jobComputed;
+
+ }
+
+ public void setSingleAction() {
+ singleAction = true;
+ }
+
+ public boolean getSingleAction() {
+ return singleAction;
+ }
+
+ public long getJobComputed() {
+ return this.jobComputed;
+ }
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java
new file mode 100644
index 0000000000..0ecc223fed
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolBeanListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.util.EventListener;
+
+public interface WorkpoolBeanListener extends EventListener {
+ public void handleEvent(WorkpoolEvent ev);
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java
new file mode 100644
index 0000000000..0bdc3671d5
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolEvent.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.util.EventObject;
+
+public class WorkpoolEvent extends EventObject {
+
+ private static final long serialVersionUID = -1273928009411948768L;
+
+ public WorkpoolEvent(Object source) {
+ super(source);
+ }
+
+ public WorkpoolEvent(WorkpoolEvent ev) {
+ super(ev.source);
+ type = ev.type;
+ noWorker = ev.noWorker;
+ nodeName = ev.nodeName;
+ }
+
+ public WorkpoolEvent(Object source, int typeEv, int worker) {
+ super(source);
+ type = typeEv;
+ noWorker = worker;
+ nodeName = "";
+ }
+
+ public WorkpoolEvent(Object source, int typeEv, int worker, String nodeName) {
+ super(source);
+ type = typeEv;
+ noWorker = worker;
+ this.nodeName = nodeName;
+ }
+
+ public String getNodeName() {
+ return nodeName;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public int workers() {
+ return noWorker;
+ }
+
+ private int type;
+ private int noWorker;
+ private String nodeName;
+ public static final int EVENT_MULTIPLE_ADD_WORKER = 0;
+ public static final int EVENT_MULTIPLE_REMOVE_WORKER = 1;
+ public static final int SINGLE_REMOVE_WORKER = 2;
+ public static final int SINGLE_ADD_WORKER = 3;
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java
new file mode 100644
index 0000000000..6520954bdd
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.osoa.sca.ServiceReference;
+import org.osoa.sca.annotations.OneWay;
+import org.osoa.sca.annotations.Remotable;
+
+@Remotable
+public interface WorkpoolManager {
+ /*
+ * @param String rules This are the autonomic rules. The format is the Java
+ * Drools .drl file. You have to read it
+ */
+ @OneWay
+ void acceptRules(String rules);
+
+ @OneWay
+ void start();
+
+ @OneWay
+ void stopAutonomicCycle();
+
+ @OneWay
+ void startAutonomicCycle();
+
+ int activeWorkers();
+
+ void setCycleTime(long time);
+
+ void setWorkpoolReference(ServiceReference<WorkpoolService> serviceReference);
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java
new file mode 100644
index 0000000000..f7c727ad04
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolManagerImpl.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.osoa.sca.ComponentContext;
+import org.osoa.sca.ServiceReference;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Logger;
+
+import javax.xml.stream.XMLStreamException;
+
+import node.TestJob;
+import java.io.File;
+import java.util.Vector;
+import org.apache.axiom.om.OMElement;
+import org.apache.tuscany.sca.contribution.service.ContributionResolveException;
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.core.context.ServiceReferenceImpl;
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.node.NodeManagerInitService;
+import org.apache.tuscany.sca.node.SCANode;
+import org.apache.tuscany.sca.node.impl.SCANodeImpl;
+import org.osoa.sca.CallableReference;
+import org.drools.FactHandle;
+import org.drools.RuleBase;
+import org.drools.RuleBaseFactory;
+import org.drools.StatefulSession;
+import org.drools.StatelessSession;
+import org.drools.compiler.DroolsParserException;
+import org.drools.compiler.PackageBuilder;
+import org.drools.rule.Package;
+import org.osoa.sca.annotations.Constructor;
+import org.osoa.sca.annotations.Context;
+import org.osoa.sca.annotations.Destroy;
+import org.osoa.sca.annotations.Property;
+import org.osoa.sca.annotations.Reference;
+import org.osoa.sca.annotations.Scope;
+import org.osoa.sca.annotations.Service;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Service(interfaces = { NodeManagerInitService.class, WorkpoolManager.class })
+@Scope("COMPOSITE")
+/*
+ * This is the core manager of the workpool application. The Workpool Manager
+ * holds the reference to each remote node manager. Inside it we've a rule
+ * engine instance.
+ */
+public class WorkpoolManagerImpl implements WorkpoolManager,
+ NodeManagerInitService, WorkpoolBeanListener {
+ /*
+ * This inner class trigs the rule engine, at given times: 1. It checks the
+ * different loads for each nodes and sets the WorkpoolBean 2. It checks the
+ * Workpool AverageService Time and sets the WorkpoolBean 3. It checks how
+ * many jobs are already computed and sets the WorkpoolBean Then given the
+ * configured bean and the rules, run the Rule Engine for executing the
+ * business logic
+ */
+ class RuleEngineTrigger extends TimerTask {
+ // private ReentrantLock triggerLock = new ReentrantLock();
+ @Override
+ public void run() {
+
+ System.out.println("Updating WorkpoolBean..");
+ // checkActiveWorkers();
+ // checkLoadInNodes();
+ checkServiceTime();
+ // checkEstimedQueueSize();
+ // checkArrivalTime();
+ getProcessedItem();
+ // computeUsageFactor();
+ doRun(bean);
+ }
+
+ }
+
+ private WorkerManager managerNodeB;
+ private WorkerManager managerNodeC;
+ private WorkerManager managerNodeD;
+ private WorkerManager managerNodeE;
+
+ private SCANodeImpl node;
+ private WorkpoolBean bean = new WorkpoolBean();
+ private ReentrantLock handleEventLock = new ReentrantLock();
+ private ReentrantLock updateRuleLock = new ReentrantLock();
+
+ private ServiceReference<WorkpoolService> reference;
+ private AtomicInteger activeWorkers = new AtomicInteger(0);
+ private Logger log = Logger.getLogger(WorkpoolManagerImpl.class.getName());
+ @Property
+ protected String workers;
+ @Property
+ protected String nodes;
+ @Property
+ protected String injection;
+ @Context
+ protected ComponentContext workpoolManagerContext;
+ private CallableReferenceImpl<WorkpoolManager> myReference;
+ private String rules = null;
+ private boolean referenceInjection = false;
+ private ConcurrentHashMap<String, WorkerManager> workerManagerTable = new ConcurrentHashMap<String, WorkerManager>();
+ private int workersNo;
+ private int nodesNo;
+ private Timer timer = new Timer();
+ /* this handle facts */
+ private RuleBase ruleBase = null;
+ private FactHandle handle = null;
+ private StatefulSession wm = null;
+ private long cycleTime = 5000;
+
+ @Reference
+ public void setManagerNodeB(WorkerManager managerNodeB) {
+ this.managerNodeB = managerNodeB;
+ workerManagerTable.put("nodeB", managerNodeB);
+ }
+
+ @Reference
+ public void setManagerNodeC(WorkerManager managerNodeC) {
+ this.managerNodeC = managerNodeC;
+ workerManagerTable.put("nodeC", managerNodeC);
+ }
+
+ @Reference
+ public void setManagerNodeD(WorkerManager managerNodeD) {
+ this.managerNodeD = managerNodeD;
+ workerManagerTable.put("nodeD", managerNodeD);
+ }
+
+ @Reference
+ public void setManagerNodeE(WorkerManager managerNodeE) {
+ this.managerNodeE = managerNodeE;
+ workerManagerTable.put("nodeE", managerNodeE);
+ }
+
+ private void startNewComponents(
+ Vector<CallableReferenceImpl<WorkerService>> vector) {
+ log.info("Starting new components");
+ WorkpoolService wp = reference.getService();
+ // CallableReferenceImpl<WorkpoolService> sink =
+ // (CallableReferenceImpl<WorkpoolService>) reference;
+ Job j = new NullJob();
+ for (CallableReferenceImpl<WorkerService> item : vector) {
+ // WorkerService service = item.getService();
+ // service.start();
+ // service.computeFirstTime(j, sink);
+ log.info("Send PostWorkerReference...");
+ wp.PostWorkerReference(item);
+ }
+ if (myReference != null)
+ wp.registerManager(myReference);
+ }
+
+ public void setCycleTime(long cycle) {
+ this.cycleTime = cycle;
+ }
+
+ @SuppressWarnings("unchecked")
+ /*
+ * This gets the number of workers workerNo and instantiates them
+ */
+ public void start() {
+ this.myReference = (CallableReferenceImpl<WorkpoolManager>) workpoolManagerContext
+ .createSelfReference(WorkpoolManager.class, "WorkpoolManager");
+ this.workersNo = Integer.parseInt(this.workers);
+ this.nodesNo = Integer.parseInt(this.nodes);
+ this.referenceInjection = (Integer.parseInt(this.injection) != 0);
+ log.info("Starting WorkpoolManager Component with #" + workersNo
+ + " workers and #" + nodes + " nodes");
+ nodesNo = workerManagerTable.values().size();
+ // Sets info in the bean.
+ bean.setWorkers(this.workersNo);
+ bean.setNodeNumbers(nodesNo);
+ Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>();
+ int exactTimes = workersNo / nodesNo;
+ for (int i = 0; i < exactTimes; ++i) {
+ for (WorkerManager manager : workerManagerTable.values()) {
+ manager.start();
+ if (manager != null) {
+ System.err.println("Actual load = "
+ + manager.getNodeLoad() + " for node ");
+ addNewComponent(manager, workerRefs);
+ }
+ }
+ }
+
+ int module = (workersNo % nodesNo);
+ int n = 0;
+ if (module > 0) {
+ Vector<String> v = new Vector(workerManagerTable.keySet());
+ Collections.sort(v);
+ // Iterator<WorkerManager> iter =
+ // workerManagerTable.values().iterator();
+ // Display (sorted) hashtable.
+ for (Enumeration<String> e = v.elements(); (e.hasMoreElements() && n < module); ++n) {
+ String key = e.nextElement();
+ WorkerManager m = workerManagerTable.get(key);
+ System.err.println("Module Actual load = " + m.getNodeLoad()
+ + " for node ");
+ addNewComponent(m, workerRefs);
+ }
+ }
+ startNewComponents(workerRefs);
+ bean.addListener(this);
+ TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger();
+ timer.scheduleAtFixedRate(task, 3000, cycleTime);
+ }
+
+ private void checkLoadInNodes() {
+ System.out.println("CheckLoadInNodes");
+ int number = 1;
+ double loadAverage = 0;
+ for (WorkerManager manager : workerManagerTable.values()) {
+ loadAverage += manager.getNodeLoad();
+ number++;
+ }
+ bean.setLoadAverage(loadAverage / number);
+ }
+
+ private void computeUsageFactor() {
+ bean.setUsageFactor();
+ }
+
+ private void checkEstimedQueueSize() {
+ WorkpoolService wp = reference.getService();
+
+ if (wp != null) {
+ int size = wp.estimatedQueueSize();
+ log.info("Estimed Queue Size =" + size);
+ bean.setEstimedQueueSize(size);
+ }
+ }
+
+ private WorkerManager findMinLoad() {
+ double load = 0;
+ // workerManagerTable.values().iterator().next().getNodeLoad();
+ WorkerManager toFind = null;
+ for (WorkerManager manager : workerManagerTable.values()) {
+ if (load == 0) {
+ load = manager.getNodeLoad();
+ toFind = manager;
+ } else if (manager.getNodeLoad() < load) {
+ load = manager.getNodeLoad();
+ toFind = manager;
+ }
+ }
+ return toFind;
+ }
+
+ private void checkServiceTime() {
+ WorkpoolService wp = reference.getService();
+
+ if (wp != null) {
+ double time = wp.getServiceTime();
+ log.info("Average System Service Time =" + time);
+ bean.setAverageServiceTime(time);
+ }
+ }
+
+ private void checkArrivalTime() {
+ WorkpoolService wp = reference.getService();
+
+ if (wp != null) {
+ double time = wp.getArrivalTime();
+ log.info("Average Arrival Service Time =" + time);
+ bean.setAverageArrivalTime(time);
+ }
+ }
+
+ private void checkActiveWorkers() {
+ bean.setWorkers(this.activeWorkers());
+ }
+
+ private void getProcessedItem() {
+ WorkpoolService wp = reference.getService();
+ if (wp != null) {
+ long computed = wp.getJobComputed();
+ log.info("The system has already computed " + computed + " jobs");
+ bean.setJobComputed(computed);
+ }
+ }
+
+ private boolean removeComponent(WorkerManager manager, int k) {
+ manager.removeWorkers(k);
+ activeWorkers.decrementAndGet();
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ private boolean addNewComponent(WorkerManager manager,
+ Vector<CallableReferenceImpl<WorkerService>> workerRefs) {
+ CallableReferenceImpl<WorkerService> workerReference = (CallableReferenceImpl<WorkerService>) manager
+ .addWorker();
+
+ if (workerReference != null) {
+ /* if i'll decide to use dynamically generated references */
+ if (referenceInjection) {
+ workerReference.getService();
+ String uri = workerReference.getEndpointReference().getURI();
+ int nameIndex = uri.indexOf("/");
+ String componentName = uri.substring(0, nameIndex);
+ if (componentName.startsWith("/"))
+ componentName = uri.substring(1, uri.length());
+ if (componentName.endsWith("/"))
+ componentName = uri.substring(0, uri.length() - 1);
+ // String componentName = uri.substring(0, nameIndex-1);
+
+ log.info("Adding wire from WorkpoolComponentService to "
+ + componentName);
+ String referenceName = "ref" + componentName;
+
+ /*
+ * I'm updating the WorkpoolServiceComponent with a new
+ * reference to a just created component I assume that the
+ * WorkpoolManagerService and the WorkpoolServiceComponent stay
+ * in the same JVM It's like in the scdl there were: <reference
+ * name=referenceName target="componentName"/> With this then
+ * I've a wire WorkpoolService---> a new Worker
+ */
+ try {
+ node.addComponentReferenceWire(referenceName, "nodeA",
+ "Workpool.composite", "workpool.WorkerServiceImpl",
+ WorkerService.class, "WorkpoolServiceComponent",
+ componentName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ log.info("Sending reference name " + referenceName
+ + " to WorkpoolService");
+ // TODO: this was part of dynamic wiring, but it doesn't work.
+ // reference.getService().PostWorkerName(referenceName);
+
+ } else {
+ // log.info("Sending callable reference to WorkpoolService
+ // placed at -->"+reference);
+ // reference.getService().PostWorkerReference(workerReference);
+ workerRefs.add(workerReference);
+ }
+ activeWorkers.incrementAndGet();
+ return true;
+ }
+ return false;
+ }
+
+ public int activeWorkers() {
+
+ return activeWorkers.get();
+ }
+
+ private void doRun(WorkpoolBean bean) {
+
+ long startTime = System.currentTimeMillis();
+ updateRuleLock.lock();
+ if (wm == null)
+ wm = ruleBase.newStatefulSession();
+ if (this.handle == null)
+ handle = wm.insert(bean);
+ else {
+ wm.update(handle, bean);
+ }
+ wm.fireAllRules();
+ updateRuleLock.unlock();
+
+ System.out.println("Engine rule overhead = "
+ + (System.currentTimeMillis() - startTime));
+ }
+
+ private RuleBase readRule(String rule) {
+
+ PackageBuilder packBuilder = new PackageBuilder();
+ try {
+ packBuilder.addPackageFromDrl(new StringReader(rule));
+ } catch (DroolsParserException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ Package pkg = packBuilder.getPackage();
+ RuleBase ruleBase = RuleBaseFactory.newRuleBase();
+ try {
+ ruleBase.addPackage(pkg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return ruleBase;
+ }
+
+ public void acceptRules(String rules) {
+ this.rules = rules;
+ if (ruleBase == null) {
+ RuleBase base = readRule(rules);
+ if (base != null) {
+ ruleBase = base;
+ }
+ } else {
+ updateRuleLock.lock();
+ // i have already a rule: updating
+ ruleBase = readRule(rules);
+ wm = ruleBase.newStatefulSession();
+ handle = null;
+ updateRuleLock.unlock();
+ }
+
+ System.out.println("Accepted rules = " + rules);
+ }
+
+ public String getRules() {
+ return rules;
+ }
+
+ private WorkerManager findMaxLoadNode() {
+ double load = 0.0;
+ WorkerManager toFind = null;
+ for (WorkerManager manager : workerManagerTable.values()) {
+ if (manager.getNodeLoad() > load) {
+ load = manager.getNodeLoad();
+ toFind = manager;
+ }
+ }
+ return toFind;
+
+ }
+
+ public void setWorkpoolReference(
+ ServiceReference<WorkpoolService> serviceReference) {
+ reference = serviceReference;
+ }
+
+ public void setNode(SCANode arg0) {
+ node = (SCANodeImpl) arg0;
+ }
+
+ public void handleEvent(WorkpoolEvent ev) {
+ if (ev == null)
+ return;
+
+ String nodeName = ev.getNodeName();
+
+ switch (ev.getType()) {
+ case WorkpoolEvent.SINGLE_ADD_WORKER: {
+ if (nodeName != null) {
+ Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>();
+
+ // in this case I have a nodeName
+ if (!nodeName.equals("")
+ && (workerManagerTable.containsKey(nodeName))) {
+ WorkerManager manager = workerManagerTable.get(nodeName);
+ addNewComponent(manager, workerRefs);
+ startNewComponents(workerRefs);
+ } else if (nodeName.equals("")) {
+ WorkerManager manager = findMinLoad();
+ addNewComponent(manager, workerRefs);
+ startNewComponents(workerRefs);
+ }
+ }
+ break;
+ }
+ case WorkpoolEvent.EVENT_MULTIPLE_ADD_WORKER: {
+ Vector<CallableReferenceImpl<WorkerService>> workerRefs = new Vector<CallableReferenceImpl<WorkerService>>();
+
+ if (nodeName.equals("")) {
+
+ WorkerManager manager = findMinLoad();
+ int k = ev.workers();
+ for (int h = 0; h < k; ++h) {
+ addNewComponent(manager, workerRefs);
+ }
+ } else {
+ WorkerManager manager = workerManagerTable
+ .get(ev.getNodeName());
+ int k = ev.workers();
+ for (int h = 0; h < k; ++h) {
+ addNewComponent(manager, workerRefs);
+ }
+ }
+ startNewComponents(workerRefs);
+ break;
+ }
+ case WorkpoolEvent.SINGLE_REMOVE_WORKER: {
+ if (nodeName != null) {
+ // in this case I have a nodeName
+ if (!nodeName.equals("")
+ && (workerManagerTable.containsKey(nodeName))) {
+ WorkerManager manager = workerManagerTable.get(nodeName);
+ removeComponent(manager, 1);
+ } else if (nodeName.equals("")) {
+ WorkerManager manager = findMaxLoadNode();
+ removeComponent(manager, 1);
+ }
+ }
+ break;
+ }
+ case WorkpoolEvent.EVENT_MULTIPLE_REMOVE_WORKER: {
+ if (nodeName.equals("")) {
+ WorkerManager manager = findMaxLoadNode();
+ removeComponent(manager, ev.workers());
+
+ } else {
+ WorkerManager manager = workerManagerTable.get(nodeName);
+ removeComponent(manager, ev.workers());
+ }
+ break;
+ }
+ }
+
+ }
+
+ @Destroy
+ public void onExit() {
+ // do cleanup
+ this.timer.cancel();
+ this.timer.purge();
+ }
+
+ public void stopAutonomicCycle() {
+ this.timer.cancel();
+ this.timer.purge();
+ this.timer = null;
+ }
+
+ public void startAutonomicCycle() {
+ if (this.timer == null) {
+ this.timer = new Timer();
+ TimerTask task = new WorkpoolManagerImpl.RuleEngineTrigger();
+ timer.schedule(task, 3000, cycleTime);
+ }
+ }
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java
new file mode 100644
index 0000000000..d84ae549d8
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolService.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.databinding.annotation.DataBinding;
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.osoa.sca.annotations.OneWay;
+import org.osoa.sca.annotations.Remotable;
+import org.osoa.sca.ServiceReference;
+
+@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
+@Remotable
+public interface WorkpoolService {
+
+ /* this the functional part */
+ void submit(Job i);
+
+ /* the time between two subsequent worker invocations */
+ double getServiceTime();
+
+ /* the number of ResultJob received */
+ long getJobComputed();
+
+ /* the time elapsed between the stream has initiated and now */
+ long getElapsedTime();
+
+ /* the size of the internal queue : it's not accurate */
+ int estimatedQueueSize();
+
+ /* the average time between two consuecutive submit */
+ double getArrivalTime();
+
+ void start();
+
+ void stop();
+
+ /*
+ * this is the part needed by management. May be in future i'll refactor it
+ * order to hide this part.
+ */
+ @OneWay
+ void handleResult(Job j, boolean reuse, String string,
+ CallableReferenceImpl<WorkerService> worker, boolean newJob);
+
+ void addTrigger(CallableReferenceImpl<Trigger> reference);
+
+ void removeTrigger();
+
+ void registerManager(
+ CallableReferenceImpl<WorkpoolManager> createSelfReference);
+
+ /*
+ * This could placed in another interface definition - think about it These
+ * methods evict, and evictAll are needed when a worker finish to exist and
+ * it needs to be evicted by the WorkpoolManager. In the system I have two
+ * caches: 1) a domain cache, which holds the components URI 2) a
+ * workerReference cache (implemented by a ConcurrentHashMap), which holds a
+ * proxy to each worker. Every proxy gets built from the worker callable
+ * reference. I'm thinking for placing the workerReferenceCache in a local
+ * interface. Assuming that WorkpoolService and WorkpoolManager are in the
+ * same JVM.
+ */
+ void evict(String workerURI);
+
+ void evictAll();
+
+ /*
+ * these two are no longer needed. I leave it because if i'll have time to
+ * do dynamic wiring the first one is needed. void PostWorkerName(String
+ * referenceName);
+ */
+ void PostWorkerReference(CallableReferenceImpl<WorkerService> worker);
+
+}
diff --git a/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java
new file mode 100644
index 0000000000..52b88af42c
--- /dev/null
+++ b/sca-java-1.x/contrib/demos/workpool-distributed/src/main/java/workpool/WorkpoolServiceImpl.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package workpool;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Logger;
+import org.apache.tuscany.sca.core.context.CallableReferenceImpl;
+import org.apache.tuscany.sca.databinding.annotation.DataBinding;
+import org.osoa.sca.ComponentContext;
+import org.osoa.sca.annotations.Context;
+import org.osoa.sca.annotations.Scope;
+import org.osoa.sca.annotations.Service;
+import org.apache.tuscany.sca.databinding.job.Job;
+import org.apache.tuscany.sca.databinding.job.JobDataMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * An implementation of the Workpool service.
+ */
+@Service(WorkpoolService.class)
+@Scope("COMPOSITE")
+@DataBinding("org.apache.tuscany.sca.databinding.job.Job")
+public class WorkpoolServiceImpl implements WorkpoolService,
+ WorkerServiceCallback {
+
+ /* incoming job queue */
+ private LinkedBlockingQueue<Job> queue = new LinkedBlockingQueue<Job>(5000);
+ private CallableReferenceImpl<Trigger> trigger = null;
+ private Trigger forwardResult = null;
+ /* counter for job's number fetched from the queue and sent to the Worker */
+ private AtomicInteger jobSent = new AtomicInteger(0);
+ /* time for initHandleResult */
+ private AtomicLong initHandleResult = new AtomicLong(0);
+ /* time for endHandleResult */
+ private AtomicLong endHandleResult = new AtomicLong(0);
+ /*
+ * number of job computed, this will be exposed in order to be used to
+ * firing rules
+ */
+ private long jobComputed = 0;
+ /* same as above */
+ private AtomicLong elapsedTime = new AtomicLong(0);
+ /* this is for comuputing averageServiceTime */
+ private long times = 1;
+ /* this is for computing averageArrivalTime */
+ private long timesArrival = 1;
+ private ReentrantLock arrivalLock = new ReentrantLock();
+ private long arrivalPrevious = -1;
+ // private AtomicBoolean processingStopped = new AtomicBoolean(false);
+ private boolean processingStopped = false;
+ // private LinkedBlockingQueue<Trigger> triggers = new
+ // LinkedBlockingQueue<Trigger>();
+ @Context
+ protected ComponentContext workpoolContext;
+ private CallableReferenceImpl<WorkpoolManager> manager;
+ private long previousSubmitTime = -1;
+ private boolean firstTime = true;
+ private boolean first = true;
+ private long start = 0;
+ private long end = 0;
+ private double averageServiceTime = 0;
+ private double averageArrivalTime = 0;
+ private int workersNo = 0;
+ private final Job nullJob = new NullJob();
+ /* This is useful for counting the start and end */
+ private Logger log = Logger.getLogger(WorkpoolServiceImpl.class.getName());
+ private ReentrantLock handleResultLock = new ReentrantLock();
+ private ReentrantLock postWorkerReferenceLock = new ReentrantLock();
+ private ConcurrentHashMap<String, WorkerService> cacheReference = new ConcurrentHashMap<String, WorkerService>();
+ private CallableReferenceImpl<WorkpoolService> myReference;
+ private String previuosURI = "";
+ private long time = 0;
+
+ private void computeAverageTime() {
+ long actualServiceTime = 0;
+ // if the processing is finished
+ if (processingStopped)
+ return;
+
+ if (firstTime == true) {
+ this.previousSubmitTime = System.currentTimeMillis();
+ this.averageServiceTime = 0;
+ firstTime = false;
+ } else {
+ actualServiceTime = System.currentTimeMillis()
+ - this.previousSubmitTime;
+ this.previousSubmitTime = System.currentTimeMillis();
+ averageServiceTime = ((averageServiceTime * times) + actualServiceTime)
+ / (times + 1);
+ ++times;
+ }
+ }
+
+ public void submit(Job j) {
+ try {
+ // log.info("Submit job in queue -->"+ j.getType());
+ // processingStopped.set(false);
+ try {
+ arrivalLock.lock();
+ if (this.arrivalPrevious == -1) {
+ arrivalPrevious = System.currentTimeMillis();
+ averageArrivalTime = 0;
+ }
+ double actualArrivalTime = System.currentTimeMillis()
+ - arrivalPrevious;
+ averageArrivalTime = ((averageArrivalTime * timesArrival) + actualArrivalTime)
+ / (timesArrival + 1);
+ arrivalPrevious = System.currentTimeMillis();
+ ++timesArrival;
+ } finally {
+ arrivalLock.unlock();
+ }
+ queue.put(j);
+ } catch (Exception e) {
+ log.info("Exception in queue");
+ queue.clear();
+ e.printStackTrace();
+ }
+ }
+
+ public double getArrivalTime() {
+ return this.averageArrivalTime;
+ }
+
+ public double getServiceTime() {
+ return this.averageServiceTime;
+ }
+
+ public void receiveResult(Job resultType, boolean reuse, String workerURI) {
+
+ if (reuse) {
+ queue.add(resultType);
+ return;
+ }
+
+ computeAverageTime();
+ Job job = null;
+ try {
+ job = queue.take();
+ } catch (InterruptedException e) {
+ // TODO Better exception handling --> see Exception antipattern doc
+ e.printStackTrace();
+ return;
+ }
+
+ if ((job != null) && (job.eos() == false)) {
+ int nameIndex = workerURI.indexOf("/");
+ String workerName = workerURI.substring(0, nameIndex - 1);
+ log.info("Sending job to worker --> " + workerName);
+ WorkerService worker = workpoolContext.getService(
+ WorkerService.class, workerName);
+ worker.compute(job);
+ }
+
+ JobDataMap map = ((ResultJob) resultType).getDataMap();
+ if (map != null) {
+ ++jobComputed;
+ Object obj = map.getJobDataObject("result");
+ System.out.println("Result = " + ((Double) obj).doubleValue());
+ }
+
+ }
+
+ public void start() {
+ log.info("WorkpoolServiceComponent started...");
+ myReference = (CallableReferenceImpl) workpoolContext
+ .createSelfReference(WorkpoolService.class, "WorkpoolService");
+ myReference.getService();
+ }
+
+ /*
+ *
+ * This method is called by WorkpoolManagerImpl, when it creates a new
+ * worker component in order to dispatch worker to the WorkpoolServiceImpl
+ * @param CallableReferenceImpl reference - a dynamically created reference
+ * from the Worker
+ */
+ public void PostWorkerReference(
+ CallableReferenceImpl<WorkerService> reference) {
+
+ try {
+ long initPostWorkerReference;
+ long endPostWorkerReference;
+ this.postWorkerReferenceLock.lock();
+
+ initPostWorkerReference = System.currentTimeMillis();
+ WorkerService worker;
+ worker = reference.getService();
+ worker.start();
+
+ ++workersNo;
+ if (myReference != null) {
+
+ // Job poison = new ResultJob();
+ this.postWorkerReferenceLock.unlock();
+ log.info("Sending null job to worker");
+ worker.computeFirstTime(nullJob, myReference);
+ // queue.put(poison);
+ endPostWorkerReference = System.currentTimeMillis();
+ System.out.println("Time PostWorker ="
+ + (endPostWorkerReference - initPostWorkerReference));
+ } else {
+ log.info("myReference is null");
+
+ }
+ } catch (Exception e) {
+ postWorkerReferenceLock.unlock();
+ } finally {
+ }
+
+ }
+
+ /*
+ * FIXME This method currently is not used because i've not yet ready
+ * dynamic wire injection
+ */
+
+ public void PostWorkerName(String referenceName) {
+ /* TODO Do something similar to PostWorkerReference */
+ }
+
+ private void printComputingTime(Job j) {
+
+ if (first == true) {
+ first = false;
+ start = System.currentTimeMillis();
+ end = System.currentTimeMillis();
+ } else {
+ end = System.currentTimeMillis();
+ System.out.println("Elapsed Time = " + (end - start));
+ elapsedTime.set(end - start);
+ }
+ /*
+ * i could use reflection or instance of (but it's a penalty kick) , or
+ * an object as result, but i'd prefer a job so i've defined a
+ * RESULT_JOB There're in the system three kind of jobs: RESULT_JOB,
+ * NULL_JOB, DEFAULT_JOB
+ */
+ if ((j != null) && (j.getType() == Job.RESULT_JOB)) {
+ jobComputed++;
+ ResultJob result = (ResultJob) j;
+ JobDataMap map = result.getDataMap();
+ if (map != null) {
+ Double doubleValue = (Double) map.getJobDataObject("result");
+ System.out
+ .println("ResultValue = " + doubleValue.doubleValue());
+ }
+
+ }
+
+ }
+
+ public void handleResult(Job resultType, boolean reuse, String workerURI,
+ CallableReferenceImpl<WorkerService> worker, boolean newWorker) {
+ initHandleResult.set(System.nanoTime());
+ if (reuse) {
+ log.info("Reusing a job..");
+ queue.add(resultType);
+ return;
+ }
+ // init job variable
+ Job job;
+ if (newWorker)
+ System.out.println("newWorkerActivation= " + System.nanoTime());
+ printComputingTime(resultType);
+
+ try {
+ job = queue.take();
+ } catch (Exception e) {
+ log.info("Exception during fetching the queue");
+ e.printStackTrace();
+ return;
+ }
+
+ try {
+ // it needs to be locked because multiple threads could invoke this.
+ handleResultLock.lock();
+ if (previuosURI.equals("")) {
+ time = System.currentTimeMillis();
+ this.previuosURI = workerURI;
+ } else {
+ if (previuosURI.equals(workerURI))
+ System.out.println("Complete ComputeTime for an item ="
+ + (time - System.currentTimeMillis()));
+ }
+ if (job.eos()) {
+ long endTime = System.currentTimeMillis();
+ /* checking for EOS */
+ if (processingStopped == false) {
+ processingStopped = true;
+ System.out.println("GOT EOS in time=" + (endTime - start));
+ log.info("Stop autonomic cycle..");
+ /*
+ * I'm doing this because i want that in the termination i
+ * would have more jobs with eos == true than workers. So
+ * i'm sure that every worker removes itself from its
+ * manager. I do it only one time. This is necessary because
+ * i have a variable number of workers. The number of
+ * workers in the system might change every time the rule
+ * engine cycle gets executed.
+ */
+ ResultJob poison = new ResultJob();
+ for (int i = 0; i < workersNo; ++i) {
+ try {
+
+ queue.put(poison);
+
+ } catch (Exception e) {
+ log.info("Cannot duplicate poison tokens");
+ break;
+ }
+
+ }
+ manager.getService().stopAutonomicCycle();
+ }
+ }
+ computeAverageTime();
+ System.out.println("AverageTime =" + averageServiceTime);
+ if (job != null) {
+
+ WorkerService workerService;
+ /*
+ * the workpool has a high reuse, i always call the same
+ * component set or un superset or subset, so i cache it. When
+ * the WorkpoolManager will remove an item, it removes still
+ * this cache entry
+ */
+ if (!cacheReference.containsKey(workerURI)) {
+ workerService = worker.getService();
+ handleResultLock.unlock();
+ cacheReference.put(workerURI, workerService);
+ } else {
+ handleResultLock.unlock();
+ workerService = cacheReference.get(workerURI);
+ }
+ // it's still a penalty kick locking compute because it's going
+ // to be scheduled whereas it's async.
+ workerService.compute(job);
+ log.info("Sent job #" + jobSent.incrementAndGet()
+ + " Queue size " + queue.size());
+ endHandleResult.set(System.nanoTime());
+ System.out
+ .println("begin:handleResult ==> end:handleResult:compute = "
+ + (endHandleResult.addAndGet(-(initHandleResult
+ .get())) / 1000000));
+ }
+ } catch (Exception e) {
+ handleResultLock.unlock();
+ }
+ }
+
+ public void evictAll() {
+ cacheReference.clear();
+ }
+
+ public void evict(String workerURI) {
+ if (cacheReference.containsKey(workerURI)) {
+ cacheReference.remove(workerURI);
+ }
+
+ }
+
+ public int estimatedQueueSize() {
+ return queue.size();
+ }
+
+ public long getElapsedTime() {
+ return elapsedTime.get();
+ }
+
+ public long getJobComputed() {
+ return jobComputed;
+ }
+
+ public void registerManager(
+ CallableReferenceImpl<WorkpoolManager> createSelfReference) {
+ manager = createSelfReference;
+
+ }
+
+ public void stop() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void addTrigger(CallableReferenceImpl<Trigger> reference) {
+ this.trigger = reference;
+ this.forwardResult = reference.getService();
+
+ }
+
+ public void removeTrigger() {
+ this.trigger = null;
+ this.forwardResult = null;
+ }
+}