# Copyright (C) 2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import copy class BuildGraph: def __init__(self, graph=None): self.observer = None self.actions = graph or {} self.idgen = IdGenerator(self.actions.keys()) def set_observer(self, observer): self.observer = observer def get_actions(self): return copy.deepcopy(self.actions) def get_action(self, action_id): return self.actions[action_id]['action'] def get_action_status(self, action_id): return self.actions[action_id]['status'] def set_action_status(self, action_id, status): self.actions[action_id]['status'] = status self.trigger_observer() def has_more_to_do(self): return ( self.find_actions('ready') or self.find_actions('building') or self.find_actions('blocked') ) def unblock(self): blocked_ids = self.find_actions('blocked') for blocked_id in blocked_ids: blocked = self.actions[blocked_id] if self.is_unblockable(blocked): self.set_action_status(blocked_id, 'ready') def is_unblockable(self, action): return all( self.get_action_status(dep) == 'done' for dep in action['depends'] ) def trigger_observer(self): if self.observer is not None: self.observer() def append_action(self, action): prev_id, action_id = self.idgen.next_id() graph_node = { 'action': copy.deepcopy(action), } if not self.actions: graph_node['status'] = 'ready' graph_node['depends'] = [] else: graph_node['status'] = 'blocked' graph_node['depends'] = [prev_id] self.actions[action_id] = graph_node self.trigger_observer() return action_id def append_pipeline(self, pipeline): for action in pipeline.get('actions', []): self.append_action(action) def find_actions(self, status): return [ action_id for action_id, action in self.actions.items() if action['status'] == status ] class IdGenerator: def __init__(self, action_ids): self.current = 0 if action_ids: action_ids = [int(an_id) for an_id in action_ids] action_ids.sort() self.current = action_ids[-1] def next_id(self): prev = self.current self.current += 1 return str(prev), str(self.current)