/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.tuscany.sca.guardian.itests; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.tuscany.sca.guardian.Context; import org.apache.tuscany.sca.guardian.GlobalException; import org.apache.tuscany.sca.guardian.GuardianMember; import org.osoa.sca.annotations.Init; import org.osoa.sca.annotations.Reference; import org.osoa.sca.annotations.Scope; import org.apache.tuscany.sca.guardian.exceptions.BackupFailedException; import org.apache.tuscany.sca.guardian.exceptions.BackupJoinedException; import org.apache.tuscany.sca.guardian.exceptions.PrimaryExistsException; import org.apache.tuscany.sca.guardian.exceptions.PrimaryFailedException; import org.osoa.sca.annotations.Destroy; import org.osoa.sca.annotations.OneWay; @Scope("COMPOSITE") public class NodeImpl implements Node, TestInterface { private static int PRIMARY = 0; private static int BACKUP = 1; private boolean isBlocked; private Context mainContext; private Context primaryContext; private Context backupContext; private List exListMain; private List exListPrimary; private int role; private boolean isExecuting; private String pID; private Queue updates; @Reference(name = "guardian_member", required = true) public GuardianMember gm; @Reference(name = "node", required = true) public Node node; private boolean forcePSFException; private boolean forceAUFException; public NodeImpl() { exListMain = new LinkedList(); exListMain.add(new PrimaryFailedException()); exListMain.add(new PrimaryExistsException()); exListPrimary = new LinkedList(); exListPrimary.add(new BackupFailedException()); exListPrimary.add(new BackupJoinedException()); exListPrimary.add(new PrimaryServiceFailureException()); mainContext = new Context("MAIN", exListMain); primaryContext = new Context("PRIMARY", exListPrimary); backupContext = new Context("BACKUP", null); updates = new LinkedList(); isBlocked = false; isExecuting = false; node = null; forcePSFException = false; forceAUFException = false; } @Init public void init() { gm.setService(this); pID = gm.getParticipantIdentifier(); } @Destroy public void destroy() { gm.removeService(); } public boolean isExecuting() { return isExecuting; } @OneWay public void execute() { isExecuting = true; gm.enableContext(mainContext); role = PRIMARY; while (true) { try { System.out.println(pID + "#Main context: "); sleep(pID + "#Sleeping at main context...", 4000); gm.checkExceptionStatus(); if (role == PRIMARY) { //Config as primary then... primaryService(); } else { //Config as backup then... backupService(); } } catch (PrimaryExistsException ex) { System.out.println(pID + "# Exception captured!: PrimaryExistsException"); System.out.println(pID + "#Needs propagation?: " + gm.propagate(ex)); if (gm.propagate(ex)) { throw ex; } role = BACKUP; } catch (PrimaryFailedException ex) { System.out.println(pID + "# Exception captured!: PrimaryFailedException"); System.out.println(pID + "#Needs propagation?: " + gm.propagate(ex)); if (gm.propagate(ex)) { //throw ex; ex.printStackTrace(); return; } role = PRIMARY; } catch (BackupFailedException ex) { System.out.println(pID + "# Exception captured!: BackupFailedException"); System.out.println(pID + "#Needs propagation?: " + gm.propagate(ex)); if (gm.propagate(ex)) { //throw ex; ex.printStackTrace(); return; } } } } private void sleep(String msg, int millis) { try { System.out.println(msg); Thread.sleep(millis); } catch (InterruptedException ex) { Logger.getLogger(NodeImpl.class.getName()).log(Level.SEVERE, null, ex); } } private void primaryService() { boolean backupAvailable = false; int upcount = 1; while (true) { gm.enableContext(primaryContext); try { System.out.println(pID + "#Primary context: "); sleep(pID + "#Sleeping at primary context...", 4000); gm.checkExceptionStatus(); //Process the request then... System.out.println(pID + "#Processing the request..."); //Check for an internal error if (forcePSFException) { throw new PrimaryServiceFailureException(); } if (backupAvailable && node.isExecuting()) { node.sendUpdate("Update " + upcount); upcount++; } else { System.out.println(pID + "#No backup available to send updates!"); } //send the reply to the client System.out.println(pID + "#Sending the reply to the client..."); } catch (PrimaryServiceFailureException ex) { System.out.println(pID + "# Exception captured!: PrimaryServiceFailureException"); gm.gthrow(new PrimaryFailedException(), null); } catch (BackupFailedException ex) { System.out.println(pID + "# Exception captured!: BackupFailedException"); backupAvailable = false; } catch (BackupJoinedException ex) { System.out.println(pID + "# Exception captured!: BackupJoinedException"); backupAvailable = true; } finally { gm.removeContext(); } } } private void backupService() { while (true) { gm.enableContext(backupContext); try { System.out.println(pID + "#Backup context: "); sleep(pID + "#Sleeping at backup service", 4000); gm.checkExceptionStatus(); applyUpdate(); if (forceAUFException) { throw new ApplyUpdateFailureException(); } } catch (ApplyUpdateFailureException ex) { System.out.println(pID + "# Exception captured!: ApplyUpdateFailureException"); gm.gthrow(new BackupFailedException(), null); } finally { gm.removeContext(); } } } //FIXME - It is not working asynchronously //@OneWay public void block() { System.out.println(pID + "#Participant blocked!"); isBlocked = true; // while (isBlocked) { // try { // Thread.sleep(5000); // } catch (InterruptedException ex) { // Logger.getLogger(ComponentImpl.class.getName()).log(Level.SEVERE, null, ex); // } // } } public boolean isBlocked() { return isBlocked; } //@OneWay public void unblock() { System.out.println(pID + "#Participant unblocked!"); isBlocked = false; } public void sendUpdate(String update) { System.out.println(pID + "#Receiving updates from primary: " + update); updates.offer(update); } public void applyUpdate() { System.out.println(pID + "#Applying the updates received from the primary: " + updates.poll()); } public void forcePrimaryServiceFailureException() { forcePSFException = true; } public void forceApplyUpdateFailureException() { forceAUFException = true; } }