/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.tuscany.sca.implementation.guardian.itests.primaryBackup.common; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.tuscany.sca.implementation.guardian.GuardianMember; import org.apache.tuscany.sca.implementation.guardian.common.Context; import org.osoa.sca.annotations.Init; import org.osoa.sca.annotations.Reference; import org.osoa.sca.annotations.Scope; import org.osoa.sca.annotations.OneWay; @Scope("COMPOSITE") public class NodeImpl implements Node, TestInterface { private static int PRIMARY = 0; private static int BACKUP = 1; private boolean isDead; private Context mainContext; private Context primaryContext; private Context backupContext; private int role; private String pID; private Queue updates; @Reference(name = "guardian_member", required = true) public GuardianMember gm; @Reference(name = "nodes", required = true) public List nodeList; private boolean forcePSFException; private boolean forceAUFException; private int upcount; public NodeImpl() { mainContext = new Context("MAIN"); mainContext.addException(PrimaryFailedException.class); mainContext.addException(PrimaryExistsException.class); primaryContext = new Context("PRIMARY"); primaryContext.addException(BackupFailedException.class); primaryContext.addException(BackupJoinedException.class); primaryContext.addException(PrimaryServiceFailureException.class); backupContext = new Context("BACKUP", null); updates = new LinkedList(); isDead = true; nodeList = new LinkedList(); forcePSFException = false; forceAUFException = false; } @Init public void init() { pID = gm.getParticipantIdentifier(); } @OneWay public void execute() { isDead = false; gm.enableContext(mainContext); role = PRIMARY; while (true) { //blockingCheck(); try { System.out.println(pID + "#Main context: "); sleep(pID + "#Sleeping at main context...", 4000); gm.checkExceptionStatus(); if (role == PRIMARY) { //Config as primary then... primaryService(); } else { //Config as backup then... backupService(); } } catch (PrimaryExistsException ex) { System.out.println(pID + "# Exception captured!: PrimaryExistsException"); role = BACKUP; } catch (PrimaryFailedException ex) { System.out.println(pID + "# Exception captured!: PrimaryFailedException"); System.out.println(pID + "#Needs propagation?: " + gm.propagate(ex)); if (gm.propagate(ex)) { //throw ex; this.kill(); ex.printStackTrace(); return; } role = PRIMARY; } catch (BackupFailedException ex) { System.out.println(pID + "# Exception captured!: BackupFailedException"); System.out.println(pID + "#Needs propagation?: " + gm.propagate(ex)); if (gm.propagate(ex)) { //throw ex; this.kill(); ex.printStackTrace(); return; } } } } private void primaryService() { boolean backupAvailable = isThereBackupAvailable(); upcount = 1; while (true) { gm.enableContext(primaryContext); //blockingCheck(); try { System.out.println(pID + "#Primary context: "); sleep(pID + "#Sleeping at primary context...", 4000); gm.checkExceptionStatus(); //Process the request then... System.out.println(pID + "#Processing the request..."); //Check for an internal error if (forcePSFException) { throw new PrimaryServiceFailureException(); } if (backupAvailable) { if (!updateBackups()) { backupAvailable = isThereBackupAvailable(); } } else { System.out.println(pID + "#No backup available to send updates!"); //backupAvailable = isThereBackupAvailable(); } //send the reply to the client System.out.println(pID + "#Sending the reply to the client..."); } catch (PrimaryServiceFailureException ex) { System.out.println(pID + "# Exception captured!: PrimaryServiceFailureException"); gm.gthrow(new PrimaryFailedException(), null); } catch (BackupFailedException ex) { System.out.println(pID + "# Exception captured!: BackupFailedException"); //backupAvailable = false; backupAvailable = isThereBackupAvailable(); } catch (BackupJoinedException ex) { System.out.println(pID + "# Exception captured!: BackupJoinedException"); backupAvailable = true; } finally { gm.removeContext(); } } } private boolean updateBackups() { boolean flag = false; for (Node n : nodeList) { if (!n.isDead()) { n.sendUpdate("Update " + upcount); flag = true; } } if (flag) { upcount++; } return flag; } private void backupService() { while (true) { gm.enableContext(backupContext); //blockingCheck(); try { System.out.println(pID + "#Backup context: "); sleep(pID + "#Sleeping at backup service", 4000); gm.checkExceptionStatus(); applyUpdate(); if (forceAUFException) { throw new ApplyUpdateFailureException(); } } catch (ApplyUpdateFailureException ex) { System.out.println(pID + "# Exception captured!: ApplyUpdateFailureException"); gm.gthrow(new BackupFailedException(), null); } finally { gm.removeContext(); } } } public String getID() { return gm.getParticipantIdentifier(); } private void sleep(String msg, int millis) { try { System.out.println(msg); Thread.sleep(millis); } catch (InterruptedException ex) { Logger.getLogger(NodeImpl.class.getName()).log(Level.SEVERE, null, ex); } } private boolean isThereBackupAvailable() { for (Node n : nodeList) { if (!n.isDead()) { return true; } } return false; } public void kill() { isDead = true; } public boolean isDead() { return isDead; } public void sendUpdate(String update) { System.out.println(pID + "#Receiving updates from primary: " + update); updates.offer(update); } public void applyUpdate() { if (!updates.isEmpty()) { System.out.println(pID + "#Applying the updates received from the primary: " + updates.poll()); } } public void forcePrimaryServiceFailureException() { forcePSFException = true; } public void forceApplyUpdateFailureException() { forceAUFException = true; } }