# Copyright (C) 2017-2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json import time import ick2 class WorkAPI(ick2.APIbase): def __init__(self, state): super().__init__(state) self._trans = ick2.TransactionalState(state) def get_routes(self, path): # pragma: no cover return [ { 'method': 'GET', 'path': '{}'.format(path), 'callback': self.GET(self.get_work), }, { 'method': 'POST', 'path': path, 'callback': self.POST(self.update_work), }, ] def get_work(self, **kwargs): worker_id = self._get_client_id(**kwargs) ick2.log.log( 'trace', msg_text='Worker wants work', worker_id=worker_id) with self._trans.modify('workers', worker_id) as worker: doing = worker.get('doing') if doing: ick2.log.log( 'trace', msg_text='Worker already got work', doing=doing) return doing build_id = self._pick_build(worker_id) if build_id is None: ick2.log.log('trace', msg_text='No suitable build for worker') return {} with self._trans.modify('builds', build_id) as build: with self._trans.modify('log', build_id) as log: ick2.log.log( 'trace', msg_text='Picked build for worker', build_id=build_id, build=build.as_dict()) build_obj = ick2.Build(build) sm = ick2.BuildStateMachine(build_obj) if sm.get_state() == ick2.BUILD_TRIGGERED: sm.handle_event(ick2.BuildStartsEvent()) self._append_text_to_build_log( log, 'Build starts at {}\n'.format( time.strftime('%Y-%m-%d %H:%M:%S'))) resp = sm.handle_event(ick2.NeedWorkEvent()) if resp is None: # pragma: no cover ick2.log.log( 'trace', msg_text='Did not find work in build') self._append_text_to_build_log( log, 'Did not find work to do in build\n') return {} action_id, action = resp build['worker'] = worker_id doing = { 'build_id': build_id, 'build_number': build['build_number'], 'worker': worker_id, 'project': build['project'], 'parameters': build['parameters'], 'action_id': action_id, 'step': action, 'log': build['log'], } worker.from_dict({ 'worker': worker_id, 'doing': doing, }) self._append_text_to_build_log( log, 'Action starts at {}:\n{}\n'.format( time.strftime('%Y-%m-%d %H:%M:%S'), json.dumps(doing, indent=4))) ick2.log.log( 'trace', msg_text='Returning work for worker', doing=worker['doing']) return worker['doing'] def _get_client_id(self, **kwargs): claims = kwargs.get('claims', {}) client_id = claims.get('aud') if client_id is None: # pragma: no cover raise ick2.ClientIdMissing() return client_id def _pick_build(self, worker): def on_worker(build): return build.get('worker') == worker def status(build): return build.get('status') def is_building(build): building_states = (ick2.BUILD_BUILDING, ick2.BUILD_NOTIFYING) return status(build) in building_states def is_triggered(build): return status(build) == ick2.BUILD_TRIGGERED builds = self._trans.get_resources('builds') return (self._find_build(builds, on_worker, is_building) or self._find_build(builds, is_triggered)) def _find_build(self, builds, *preds): for build in builds: if all(pred(build) for pred in preds): return build['build_id'] return None def update_work(self, update, **kwargs): try: worker_id = update['worker'] build_id = update['build_id'] project_name = update['project'] exit_code = update.get('exit_code') except KeyError as e: # pragma: no cover raise ick2.BadUpdate(str(e)) with self._trans.modify('workers', worker_id) as worker: with self._trans.modify('builds', build_id) as build: with self._trans.modify('log', build_id) as log: doing = worker.get('doing', {}) self._check_work_update(doing, update) self._append_to_build_log(log, update) action_id = doing['action_id'] build_obj = ick2.Build(build) sm = ick2.BuildStateMachine(build_obj) event = ick2.create_build_event(update) sm.handle_event(event) action_ended = exit_code is not None if action_ended: self._append_text_to_build_log( log, 'Action ends at {}, exit code {}\n'.format( time.strftime('%Y-%m-%d %H:%M:%S'), update['exit_code'])) if build_obj.is_finished(): # pragma: no cover self._append_text_to_build_log( log, 'Build ends at {}, exit code {}\n'.format( time.strftime('%Y-%m-%d %H:%M:%S'), build['exit_code'])) if action_ended or build_obj.is_finished(): worker.from_dict({ 'worker': worker_id, 'doing': {}, }) def _check_work_update(self, doing, update): # pragma: no cover must_match = ['worker', 'project', 'build_id', 'action_id'] for name in must_match: if name not in update: raise ick2.BadUpdate('{} not specified'.format(name)) if doing.get(name) is not None and doing.get(name) != update[name]: raise ick2.BadUpdate( '{} differs from current work: {} vs {}'.format( name, doing.get(name), update[name])) def _append_to_build_log(self, log, update): ick2.log.log('trace', msg_text='appending to build log', update=update) for stream in ['stdout', 'stderr']: text = update.get(stream, '') self._append_text_to_build_log(log, text) def _append_text_to_build_log(self, log, text): log['log'] = log.get('log', '') + (text or '') def create(self, body, **kwargs): # pragma: no cover pass def update(self, body, name, **kwargs): # pragma: no cover pass def list(self, **kwargs): # pragma: no cover pass def show(self, name, **kwargs): # pragma: no cover pass def delete(self, name, **kwargs): # pragma: no cover pass