summaryrefslogtreecommitdiffstats
path: root/sandbox/dougsleite/guardian-model/src/test/java/org/apache/tuscany/sca/guardian/itests/primaryBackup/common/NodeImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/dougsleite/guardian-model/src/test/java/org/apache/tuscany/sca/guardian/itests/primaryBackup/common/NodeImpl.java')
-rw-r--r--sandbox/dougsleite/guardian-model/src/test/java/org/apache/tuscany/sca/guardian/itests/primaryBackup/common/NodeImpl.java288
1 files changed, 288 insertions, 0 deletions
diff --git a/sandbox/dougsleite/guardian-model/src/test/java/org/apache/tuscany/sca/guardian/itests/primaryBackup/common/NodeImpl.java b/sandbox/dougsleite/guardian-model/src/test/java/org/apache/tuscany/sca/guardian/itests/primaryBackup/common/NodeImpl.java
new file mode 100644
index 0000000000..5ff80e3472
--- /dev/null
+++ b/sandbox/dougsleite/guardian-model/src/test/java/org/apache/tuscany/sca/guardian/itests/primaryBackup/common/NodeImpl.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.guardian.itests.primaryBackup.common;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.tuscany.sca.guardian.Context;
+import org.apache.tuscany.sca.guardian.GuardianMember;
+import org.osoa.sca.annotations.Init;
+import org.osoa.sca.annotations.Reference;
+import org.osoa.sca.annotations.Scope;
+import org.osoa.sca.annotations.Destroy;
+import org.osoa.sca.annotations.OneWay;
+
+@Scope("COMPOSITE")
+public class NodeImpl implements Node, TestInterface {
+
+ private static int PRIMARY = 0;
+ private static int BACKUP = 1;
+ private boolean isBlocked;
+ private Context mainContext;
+ private Context primaryContext;
+ private Context backupContext;
+ private int role;
+ private String pID;
+ private Queue<String> updates;
+ @Reference(name = "guardian_member", required = true)
+ public GuardianMember gm;
+ @Reference(name = "nodes", required = true)
+ public List<Node> nodeList;
+ private boolean forcePSFException;
+ private boolean forceAUFException;
+
+ public NodeImpl() {
+
+ mainContext = new Context("MAIN");
+ mainContext.addException(PrimaryFailedException.class);
+ mainContext.addException(PrimaryExistsException.class);
+
+ primaryContext = new Context("PRIMARY");
+ primaryContext.addException(BackupFailedException.class);
+ primaryContext.addException(BackupJoinedException.class);
+ primaryContext.addException(PrimaryServiceFailureException.class);
+
+ backupContext = new Context("BACKUP", null);
+
+ updates = new LinkedList();
+
+ isBlocked = true;
+
+ nodeList = new LinkedList<Node>();
+
+ forcePSFException = false;
+ forceAUFException = false;
+ }
+
+ @Init
+ public void init() {
+ gm.setService(this);
+ pID = gm.getParticipantIdentifier();
+ }
+
+ @Destroy
+ public void destroy() {
+ gm.removeService();
+ }
+
+ @OneWay
+ public void execute() {
+ isBlocked = false;
+ gm.enableContext(mainContext);
+ role = PRIMARY;
+
+ while (true) {
+ try {
+ System.out.println(pID + "#Main context: ");
+ sleep(pID + "#Sleeping at main context...", 4000);
+
+ gm.checkExceptionStatus();
+
+ if (role == PRIMARY) {
+ //Config as primary then...
+ primaryService();
+ } else {
+ //Config as backup then...
+ backupService();
+ }
+
+
+ } catch (PrimaryExistsException ex) {
+ System.out.println(pID + "# Exception captured!: PrimaryExistsException");
+ System.out.println(pID + "#Needs propagation?: " + gm.propagate(ex));
+
+ if (gm.propagate(ex)) {
+ throw ex;
+ }
+
+ role = BACKUP;
+ } catch (PrimaryFailedException ex) {
+ System.out.println(pID + "# Exception captured!: PrimaryFailedException");
+ System.out.println(pID + "#Needs propagation?: " + gm.propagate(ex));
+
+ if (gm.propagate(ex)) {
+ //throw ex;
+ this.block();
+ ex.printStackTrace();
+ return;
+ }
+
+ role = PRIMARY;
+
+ } catch (BackupFailedException ex) {
+ System.out.println(pID + "# Exception captured!: BackupFailedException");
+ System.out.println(pID + "#Needs propagation?: " + gm.propagate(ex));
+
+ if (gm.propagate(ex)) {
+ //throw ex;
+ this.block();
+ ex.printStackTrace();
+ return;
+ }
+ }
+ }
+ }
+
+ public String getID() {
+ return gm.getParticipantIdentifier();
+ }
+
+ private void sleep(String msg, int millis) {
+ try {
+ System.out.println(msg);
+ Thread.sleep(millis);
+ } catch (InterruptedException ex) {
+ Logger.getLogger(NodeImpl.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+
+ private boolean isThereBackupAvailable() {
+ for (Node n : nodeList) {
+ if (!n.isBlocked()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void primaryService() {
+ boolean backupAvailable = isThereBackupAvailable();
+ System.out.println("Backup available?: " + backupAvailable);
+ int upcount = 1;
+
+ while (true) {
+
+ gm.enableContext(primaryContext);
+
+ try {
+ System.out.println(pID + "#Primary context: ");
+ sleep(pID + "#Sleeping at primary context...", 4000);
+ gm.checkExceptionStatus();
+
+ //Process the request then...
+ System.out.println(pID + "#Processing the request...");
+
+ //Check for an internal error
+ if (forcePSFException) {
+ throw new PrimaryServiceFailureException();
+ }
+
+ if (backupAvailable) {
+ for (Node n : nodeList) {
+ if (!n.isBlocked()) {
+ n.sendUpdate("Update " + upcount);
+ }
+ }
+ upcount++;
+ } else {
+ System.out.println(pID + "#No backup available to send updates!");
+ backupAvailable = isThereBackupAvailable();
+ }
+ //send the reply to the client
+ System.out.println(pID + "#Sending the reply to the client...");
+
+ } catch (PrimaryServiceFailureException ex) {
+ System.out.println(pID + "# Exception captured!: PrimaryServiceFailureException");
+ gm.gthrow(new PrimaryFailedException(), null);
+ } catch (BackupFailedException ex) {
+ System.out.println(pID + "# Exception captured!: BackupFailedException");
+ //backupAvailable = false;
+ backupAvailable = isThereBackupAvailable();
+ } catch (BackupJoinedException ex) {
+ System.out.println(pID + "# Exception captured!: BackupJoinedException");
+ backupAvailable = true;
+ } finally {
+ gm.removeContext();
+ }
+ }
+ }
+
+ private void backupService() {
+ while (true) {
+
+ gm.enableContext(backupContext);
+
+ try {
+ System.out.println(pID + "#Backup context: ");
+ sleep(pID + "#Sleeping at backup service", 4000);
+ gm.checkExceptionStatus();
+
+ applyUpdate();
+
+ if (forceAUFException) {
+ throw new ApplyUpdateFailureException();
+ }
+
+ } catch (ApplyUpdateFailureException ex) {
+ System.out.println(pID + "# Exception captured!: ApplyUpdateFailureException");
+ gm.gthrow(new BackupFailedException(), null);
+ } finally {
+ gm.removeContext();
+ }
+ }
+ }
+
+ //FIXME - It is not working asynchronously
+ //@OneWay
+ public void block() {
+ System.out.println(pID + "#Participant blocked!");
+ isBlocked = true;
+
+// while (isBlocked) {
+// try {
+// Thread.sleep(5000);
+// } catch (InterruptedException ex) {
+// Logger.getLogger(ComponentImpl.class.getName()).log(Level.SEVERE, null, ex);
+// }
+// }
+ }
+
+ public boolean isBlocked() {
+ return isBlocked;
+ }
+
+ //@OneWay
+ public void unblock() {
+ System.out.println(pID + "#Participant unblocked!");
+ isBlocked = false;
+ }
+
+ public void sendUpdate(String update) {
+ System.out.println(pID + "#Receiving updates from primary: " + update);
+ updates.offer(update);
+ }
+
+ public void applyUpdate() {
+ if (!updates.isEmpty()) {
+ System.out.println(pID + "#Applying the updates received from the primary: " + updates.poll());
+ }
+ }
+
+ public void forcePrimaryServiceFailureException() {
+ forcePSFException = true;
+ }
+
+ public void forceApplyUpdateFailureException() {
+ forceAUFException = true;
+ }
+}
+