/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.tuscany.sca.guardian; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Stack; import java.util.logging.Level; import java.util.logging.Logger; import org.osoa.sca.annotations.Init; import org.osoa.sca.annotations.Reference; import org.osoa.sca.annotations.Scope; import org.osoa.sca.annotations.Destroy; import org.osoa.sca.annotations.Service; @Service(GuardianMember.class) @Scope("COMPOSITE") public class GuardianMemberImpl implements GuardianMember { private int participantState; private Stack contextList; private BlockingInterface service; private Queue exceptionQueue; @Reference(name = "guardian_group", required = true) public GuardianGroup guardianGroup; private int id; public GuardianMemberImpl() { contextList = new Stack(); contextList.add(Context.INIT_CONTEXT); exceptionQueue = new LinkedList(); participantState = GuardianGroup.NORMAL_PARTICIPANT_STATE; } @Init public void init() { guardianGroup.addGuardianMember(this); } @Destroy public void destroy() { guardianGroup.removeGuardianMember(this); } public void addException(GlobalException ex) { exceptionQueue.add(ex); } public void setService(BlockingInterface service) { this.service = service; } public Context getCurrentContext() { return contextList.peek(); } public BlockingInterface getService() { return service; } public void enableContext(Context context) { //Update the context list with the related set of exceptions contextList.push(context); if (contextList.size() == 2) { JoinException ex = new JoinException(); ex.setSignalingContext(context); ex.setSignalingParticipant(getParticipantIdentifier()); gthrow(ex, null); } } public void removeContext() { if (!contextList.isEmpty()) { contextList.pop(); } } //Adapt to allow a regular expression //If participantList is null then signal to ALL participants public void gthrow(GlobalExceptionInterface ex, List participantList) { //1)Block the participant until raise an exception if (!(ex instanceof SuspendException)) { //Set the exception's parameters ex.setSignalingContext(getCurrentContext()); ex.setSignalingParticipant(getParticipantIdentifier()); guardianGroup.gthrow(ex, participantList); } else { /*if (service instanceof BlockingInterface && !service.isBlocked()) { service.block(); }*/ participantState = GuardianGroup.SUSPENDED_PARTICIPANT_STATE; } //*Is here the best place to receive such kind of msg? // //2B)When receive exception_msg from GuardianGroup do // if participant is not suspended then suspend it // if participant supports interrupts then // interrupt it, add SuspendException in the exception_queue, and invoke checkExceptionStatus // else invoke checkExceptionStatus periodically // //4)Once ALL required participants are SUSPENDED (suspended point), invoke the defined Recovery Rules // 4.1) recovery_rules < signaled exceptions + context_list of all participant > target context + exception to raise // 4.2) raise the resolved exception in each participant -> it goes to the exception_queue } public boolean propagate(GlobalExceptionInterface ex) { //1)Compares the current context with the exception's target context return !getCurrentContext().equals(ex.getTargetContext()); } public void checkExceptionStatus() throws GlobalException { //1)Chek the exception queue // 1.1) if exception_queue is empty then return // else if first element on the exception_queue is SuspendException then // the method blocks until the next exception is received. The method raises the first non-SuspendException in the queue //Blocks until the state be diferent the SUSPENDED_STATE while (participantState == GuardianGroup.SUSPENDED_PARTICIPANT_STATE) { try { Thread.sleep(5000); } catch (InterruptedException ex) { Logger.getLogger(GuardianMemberImpl.class.getName()).log(Level.SEVERE, null, ex); } } GlobalException exc; if ((exc = exceptionQueue.peek()) == null) { System.out.println(getParticipantIdentifier() + "#No exception on exception queue"); return; } //Check if ex.targetContext() matches the participant id //Eg. ex.targetContext(): Main and participant id: Init.Main.Backup -> should thrown the exception //Test if the exception should be thrown in the target context for (Context c : contextList) { if (exc.getTargetContext().equals(c) && (c.equals(Context.INIT_CONTEXT) || c.getExceptionList().contains(exc.getClass()))) { System.out.println(getParticipantIdentifier() + "#Returning an exception"); exceptionQueue.poll(); throw exc; } } return; } public String getParticipantIdentifier() { //1) Return the participant identifier -> context list dot separated StringBuffer participantIdentifier = new StringBuffer(); //id.append(this.id + "." + Context.INIT_CONTEXT.getName()); participantIdentifier.append(this.id); for (int i = 0; i < contextList.size(); i++) { participantIdentifier.append("." + contextList.get(i).getName()); } return participantIdentifier.toString(); } public void setUniqueParticipantID(int id) { this.id = id; } public void removeService() { this.service = null; } public int getParticipantState() { return participantState; } public void setParticipantState(int state) { this.participantState = state; } }