# Copyright (C) 2017-2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import ick2 class WorkAPI(ick2.APIbase): def __init__(self, state): super().__init__(state) self._workers = ick2.Workers(state) self._projects = ick2.Projects(state) self._pinstances = ick2.PipelineInstances(state) self._builds = ick2.Builds(state) self._type_name = 'work' def get_routes(self, path): # pragma: no cover return [ { 'method': 'GET', 'path': '{}/'.format(path), 'callback': self.GET(self.get_work), }, { 'method': 'POST', 'path': path, 'callback': self.POST(self.update_work), }, ] def get_work(self, worker, **kwargs): worker_state = self._workers.get_worker(worker) if not worker_state.get('doing'): project, pipeline = self._pick_triggered_pipeline() if project is None: doing = {} else: pipeline['status'] = 'building' self._update_pipeline(project, pipeline) build_id = self._start_build(project, pipeline, worker) self._start_log(build_id) build = self._get_build(build_id) actions = build['actions'] current_action = build['current_action'] doing = { 'build_id': build_id, 'worker': worker, 'project': project['project'], 'pipeline': pipeline['pipeline'], 'parameters': project.get('parameters', {}), 'step': actions[current_action], 'log': '/logs/{}'.format(build_id), } worker_state = { 'worker': worker, 'doing': doing, } self._workers.update_worker(worker_state) return worker_state['doing'] def _pick_build_number(self, project): old_build_no = project.get('next_build_id') build_no = (old_build_no or 0) + 1 project['next_build_id'] = build_no self._projects.update_project(project) ick2.log.log( 'info', msg_text='chose build number', old_build_no=old_build_no, build_no=build_no) return build_no def _pick_triggered_pipeline(self): projects = self._projects.get_projects() for project in projects: for name in project['pipelines']: pl = self._pinstances.get_instance(project['project'], name) if pl.get('status') == 'triggered': pt = self._state.get_resource('pipelines', name) return project, pt return None, None def _update_pipeline(self, project, pipeline_instance): project_name = project['project'] pipeline_name = pipeline_instance['pipeline'] try: self._pinstances.update_instance( project_name, pipeline_name, pipeline_instance) except ick2.NotFound: # pragma: no cover self._pinstances.add_instance( project_name, pipeline_name, pipeline_instance) def _start_build(self, project, pipeline, worker): build_no = self._pick_build_number(project) build_id = '{}/{}'.format(project['project'], build_no) ick2.log.log( 'info', msg_text='Starting new build', build_id=build_id, build_no=build_no) parameters = project.get('parameters', {}) create_workspace = { 'action': 'create_workspace', } actions = [create_workspace] + list(pipeline['actions']) build = { 'build_id': build_id, 'build_number': build_no, 'log': '/logs/{}'.format(build_id), 'worker': worker, 'project': project['project'], 'pipeline': pipeline['pipeline'], 'parameters': parameters, 'status': 'building', 'actions': actions, 'current_action': 0, } self._builds.add(build_id, build) return build_id def _get_build(self, build_id): return self._builds.get(build_id) def _start_log(self, build_id): ick2.log.log('info', msg_text='Starting new log', build_id=build_id) log = { 'build_id': build_id, 'log': '', } self._state.add_resource('log', str(build_id), log) def update_work(self, update, **kwargs): if 'worker' not in update: # pragma: no cover raise ick2.BadUpdate('no worker specified') worker_state = self._workers.get_worker(update['worker']) doing = worker_state.get('doing', {}) self._check_work_update(doing, update) project, pipeline = self._get_pipeline( update['project'], update['pipeline']) self._append_to_build_log(update) exit_code = update.get('exit_code') if exit_code == 0: build_id = doing['build_id'] build = self._get_build(build_id) actions = build['actions'] current_action = build['current_action'] if current_action + 1 >= len(actions): pipeline['status'] = 'idle' self._update_pipeline(project, pipeline) doing = {} self._finish_build(update) else: index = current_action + 1 build['current_action'] = index self._update_build(build) doing['step'] = actions[index] worker_state = { 'worker': update['worker'], 'doing': doing, } self._workers.update_worker(worker_state) elif exit_code is not None: assert isinstance(exit_code, int) assert exit_code != 0 pipeline['status'] = 'idle' self._update_pipeline(project, pipeline) self._finish_build(update) worker_state = { 'worker': update['worker'], 'doing': {}, } self._workers.update_worker(worker_state) def _check_work_update(self, doing, update): # pragma: no cover must_match = ['worker', 'project', 'pipeline', 'build_id'] for name in must_match: if name not in update: raise ick2.BadUpdate('{} not specified'.format(name)) if doing.get(name) != update[name]: raise ick2.BadUpdate( '{} differs from current work: {} vs {}'.format( name, doing.get(name), update[name])) def _get_pipeline(self, project, pipeline): # pragma: no cover projects = [ p for p in self._projects.get_projects() if p['project'] == project ] if not projects: raise ick2.NotFound() p = projects[0] if pipeline not in p['pipelines']: raise ick2.NotFound() pt = self._state.get_resource('pipelines', pipeline) return p, pt def _append_to_build_log(self, update): build_id = update['build_id'] log = self._state.get_resource('log', str(build_id)) for kind in ['stdout', 'stderr']: text = update.get(kind, '') if text is not None: log['log'] += text self._state.update_resource('log', str(build_id), log) def _update_build(self, build): self._builds.update_build(build) def _finish_build(self, update): build_id = update['build_id'] build = self._get_build(build_id) build['status'] = update['exit_code'] build['current_action'] = None self._update_build(build) def create(self, body, **kwargs): # pragma: no cover pass def update(self, body, name, **kwargs): # pragma: no cover pass def list(self, **kwargs): # pragma: no cover pass def show(self, name, **kwargs): # pragma: no cover pass def delete(self, name, **kwargs): # pragma: no cover pass