# Copyright (C) 2017 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import apifw import ick2 class ControllerAPI: def __init__(self): self._state = ick2.ControllerState() def get_state_directory(self): return self._state.get_state_directory() def set_state_directory(self, dirname): self._state.set_state_directory(dirname) def find_missing_route(self, missing_path): # pragma: no cover apis = { '/version': VersionAPI, '/builds': BuildsAPI, '/logs': LogAPI, '/projects': ProjectAPI, '/work': WorkAPI, '/workers': WorkerAPI, } routes = [] for path in apis: self._state.load_resources(path[1:]) api = apis[path](self._state) routes.extend(api.get_routes(path)) ick2.log.log('info', msg_texg='Found routes', routes=routes) return routes class APIbase: # pragma: no cover def __init__(self, state): self._state = state def get_routes(self, path): return [ { 'method': 'POST', 'path': path, 'callback': self.POST(self.create), }, { 'method': 'GET', 'path': path, 'callback': self.GET(self.list), }, { 'method': 'GET', 'path': '{}/'.format(path), 'callback': self.GET(self.show), }, { 'method': 'PUT', 'path': '{}/'.format(path), 'callback': self.PUT(self.update), }, { 'method': 'DELETE', 'path': '{}/'.format(path), 'callback': self.DELETE(self.delete), }, ] def GET(self, callback): def wrapper(content_type, body, **kwargs): ick2.log.log( 'trace', msg_text='GET called', kwargs=kwargs, content_type=content_type, body=body) try: if 'raw_uri_path' in kwargs: del kwargs['raw_uri_path'] body = callback(**kwargs) except ick2.NotFound as e: return not_found(e) if isinstance(body, dict): return OK(body) elif isinstance(body, str): return text_plain(body) raise Exception('this must not happen') return wrapper def POST(self, callback): def wrapper(content_type, body, **kwargs): ick2.log.log( 'trace', msg_text='POST called', kwargs=kwargs, content_type=content_type, body=body) body = callback(body) ick2.log.log('trace', msg_text='returned body', body=repr(body)) return created(body) return wrapper def PUT(self, callback): def wrapper(content_type, body, **kwargs): ick2.log.log( 'trace', msg_text='PUT called', kwargs=kwargs, content_type=content_type, body=body) if 'raw_uri_path' in kwargs: del kwargs['raw_uri_path'] try: body = callback(body, **kwargs) except ick2.NotFound as e: return not_found(e) except ick2.WrongPipelineStatus as e: ick2.log.log( 'error', msg_text='Wrong state for pipeline', exception=str(e)) return bad_request(e) ick2.log.log('trace', msg_text='returned body', body=repr(body)) return OK(body) return wrapper def DELETE(self, callback): def wrapper(content_type, body, **kwargs): ick2.log.log( 'trace', msg_text='DELETE called', kwargs=kwargs, content_type=content_type, body=body) try: if 'raw_uri_path' in kwargs: del kwargs['raw_uri_path'] body = callback(**kwargs) except ick2.NotFound as e: return not_found(e) return OK(body) return wrapper def create(self, body): raise NotImplementedError() def update(self, body, name): raise NotImplementedError() def delete(self, name): raise NotImplementedError() def list(self): raise NotImplementedError() def show(self, name): raise NotImplementedError() class VersionAPI(APIbase): def __init__(self, state): super().__init__(state) def get_routes(self, path): # pragma: no cover return [ { 'method': 'GET', 'path': path, 'callback': self.GET(self.get_version), } ] def get_version(self): return {'version': ick2.__version__} def create(self, *args): # pragma: no cover pass def update(self, *args): # pragma: no cover pass def delete(self, *args): # pragma: no cover pass def list(self): # pragma: no cover pass def show(self, *args): # pragma: no cover pass class ResourceApiBase(APIbase): def __init__(self, type_name, state): super().__init__(state) self._type_name = type_name def list(self): return { self._type_name: self._state.get_resources(self._type_name), } def show(self, name): return self._state.get_resource(self._type_name, name) def create(self, body): return self._state.add_resource( self._type_name, self.get_resource_name(body), body) def get_resource_name(self, resource): # pragma: no cover raise NotImplementedError def update(self, body, name): return self._state.update_resource(self._type_name, name, body) def delete(self, name): self._state.remove_resource(self._type_name, name) class WorkerAPI(ResourceApiBase): # pragma: no cover def __init__(self, state): super().__init__('workers', state) def get_resource_name(self, resource): return resource['worker'] class BuildsAPI(ResourceApiBase): # pragma: no cover def __init__(self, state): super().__init__('builds', state) def get_resource_name(self, resource): return resource['build'] def create(self, body): # pragma: no cover raise MethodNotAllowed('Creating builds directly is not allowed') def update(self, body, name): # pragma: no cover raise MethodNotAllowed('Updating builds directly is not allowed') def list(self): result = super().list() items = result[self._type_name] items.sort(key=lambda x: x.get('build_id')) result[self._type_name] = items return result class LogAPI(ResourceApiBase): # pragma: no cover def __init__(self, state): super().__init__('log', state) def get_resource_name(self, resource): return resource['log'] def create(self, body): # pragma: no cover raise MethodNotAllowed('Creating builds directly is not allowed') def update(self, body, name): # pragma: no cover raise MethodNotAllowed('Updating builds directly is not allowed') def show(self, name): log = self._state.get_resource('log', str(name)) ick2.log.log('info', msg_text='Returning log', log=log) return log['log'] class ProjectAPI(ResourceApiBase): def __init__(self, state): super().__init__('projects', state) def get_resource_name(self, resource): return resource['project'] def get_routes(self, path): # pragma: no cover return super().get_routes(path) + self.get_pipeline_routes(path) def get_pipeline_routes(self, path): # pragma: no cover pipeline_path = '{}//pipelines/'.format(path) builds_path = '{}//builds'.format(path) return [ { 'method': 'GET', 'path': pipeline_path, 'callback': self.GET(self.get_pipeline), }, { 'method': 'PUT', 'path': pipeline_path, 'callback': self.PUT(self.set_pipeline_callback), }, { 'method': 'GET', 'path': builds_path, 'callback': self.GET(self.get_builds), }, ] def get_pipeline(self, project, pipeline): p = self._state.get_resource(self._type_name, project) for pl in p['pipelines']: if pl['name'] == pipeline: return { 'status': pl.get('status', 'idle'), } raise ick2.NotFound() def set_pipeline_callback( self, body, project, pipeline): # pragma: no cover return self.set_pipeline(body['status'], project, pipeline) def set_pipeline(self, state, project, pipeline): allowed_changes = { 'idle': 'triggered', 'triggered': 'building', 'building': 'idle', } p = self._state.get_resource(self._type_name, project) for pl in p['pipelines']: if pl['name'] == pipeline: old_state = pl.get('status', 'idle') if allowed_changes[old_state] != state: raise ick2.WrongPipelineStatus(state) pl['status'] = state self._state.update_resource(self._type_name, project, p) return {'status': state} raise ick2.NotFound() def get_builds(self, project): p = self._state.get_resource(self._type_name, project) return { 'project': project, 'builds': p.get('builds', []), } class WorkAPI(APIbase): def __init__(self, state): super().__init__(state) self._type_name = 'work' def get_routes(self, path): # pragma: no cover return [ { 'method': 'GET', 'path': '{}/'.format(path), 'callback': self.GET(self.get_work), }, { 'method': 'POST', 'path': path, 'callback': self.POST(self.update_work), }, ] def get_work(self, worker): worker_state = self._get_worker(worker) if not worker_state.get('doing'): project, pipeline = self._pick_triggered_pipeline() if project is None: doing = {} else: pipeline['status'] = 'building' self._update_project(project) build_id = self._start_build(project, pipeline, worker) self._start_log(build_id) index = 0 doing = { 'build_id': build_id, 'worker': worker, 'project': project['project'], 'pipeline': pipeline['name'], 'step': pipeline['actions'][index], 'step_index': index, 'log': '/logs/{}'.format(build_id), } worker_state = { 'worker': worker, 'doing': doing, } self._update_worker(worker_state) return worker_state['doing'] def _get_worker(self, worker): # pragma: no cover try: return self._state.get_resource('workers', worker) except ick2.NotFound: return { 'worker': worker, } def _update_worker(self, worker_state): self._state.update_resource( 'workers', worker_state['worker'], worker_state) def _pick_triggered_pipeline(self): projects = self._get_projects() for project in projects: for pipeline in project['pipelines']: if pipeline.get('status') == 'triggered': return project, pipeline return None, None def _get_projects(self): return self._state.get_resources('projects') def _update_project(self, project): self._state.update_resource('projects', project['project'], project) def update_work(self, update): if 'worker' not in update: # pragma: no cover raise BadUpdate('no worker specified') worker_state = self._get_worker(update['worker']) doing = worker_state.get('doing', {}) self._check_work_update(doing, update) project, pipeline = self._get_pipeline( update['project'], update['pipeline']) self._append_to_build_log(update) ick2.log.log( 'trace', msg_text='xxx update_work', update=update, project=project, pipeline=pipeline, doing=doing) exit_code = update.get('exit_code') if exit_code == 0: index = doing['step_index'] + 1 actions = pipeline['actions'] if index >= len(actions): pipeline['status'] = 'idle' doing = {} self._finish_build(update) else: doing['step_index'] = index doing['step'] = actions[index] self._update_project(project) worker_state = { 'worker': update['worker'], 'doing': doing, } self._update_worker(worker_state) elif exit_code is not None: assert isinstance(exit_code, int) assert exit_code != 0 pipeline['status'] = 'idle' self._update_project(project) self._finish_build(update) worker_state = { 'worker': update['worker'], 'doing': {}, } self._update_worker(worker_state) def _check_work_update(self, doing, update): # pragma: no cover must_match = ['worker', 'project', 'pipeline', 'build_id'] for name in must_match: if name not in update: raise BadUpdate('{} not specified'.format(name)) if doing.get(name) != update[name]: raise BadUpdate( '{} differs from current work: {} vs {}'.format( name, doing.get(name), update[name])) def _get_pipeline(self, project, pipeline): # pragma: no cover projects = self._get_projects() for p in projects: for pl in p['pipelines']: if pl.get('name') == pipeline: return p, pl raise ick2.NotFound() def _start_build(self, project, pipeline, worker): ick2.log.log('info', msg_text='Starting new build') build_id = project.get('build_id', 0) build_id += 1 project['build_id'] = build_id self._update_project(project) build = { 'build_id': build_id, 'log': '/logs/{}'.format(build_id), 'worker': worker, 'project': project['project'], 'pipeline': pipeline['name'], 'status': 'building', } self._state.add_resource('builds', str(build_id), build) return build_id def _start_log(self, build_id): ick2.log.log('info', msg_text='Starting new log', build_id=build_id) log = { 'build_id': build_id, 'log': '', } self._state.add_resource('log', str(build_id), log) return build_id def _append_to_build_log(self, update): build_id = update['build_id'] log = self._state.get_resource('log', str(build_id)) for kind in ['stdout', 'stderr']: text = update.get(kind, '') if text is not None: log['log'] += text self._state.update_resource('log', str(build_id), log) def _finish_build(self, update): build = self._state.get_resource('builds', str(update['build_id'])) build['status'] = update['exit_code'] self._state.update_resource('builds', str(update['build_id']), build) def create(self, *args, **kwargs): # pragma: no cover pass def update(self, *args, **kwargs): # pragma: no cover pass def list(self, *args, **kwargs): # pragma: no cover pass def show(self, *args, **kwargs): # pragma: no cover pass def delete(self, *args, **kwargs): # pragma: no cover pass class BadUpdate(Exception): # pragma: no cover def __init__(self, how): super().__init__('Work update is BAD: {}'.format(how)) class MethodNotAllowed(Exception): # pragma: no cover def __init__(self, wat): super().__init__(wat) def response(status_code, body, headers): # pragma: no cover obj = { 'status': status_code, 'body': body, 'headers': headers, } return apifw.Response(obj) def OK(body): # pragma: no cover headers = { 'Content-Type': 'application/json', } return response(apifw.HTTP_OK, body, headers) def text_plain(body): # pragma: no cover headers = { 'Content-Type': 'text/plain', } return response(apifw.HTTP_OK, body, headers) def not_found(error): # pragma: no cover headers = { 'Content-Type': 'text/plain', } return response(apifw.HTTP_NOT_FOUND, str(error), headers) def bad_request(error): # pragma: no cover headers = { 'Content-Type': 'text/plain', } return response(apifw.HTTP_BAD_REQUEST, str(error), headers) def created(body): # pragma: no cover headers = { 'Content-Type': 'application/json', } return response(apifw.HTTP_CREATED, body, headers)