# Copyright (C) 2017-2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import ick2 class ProjectAPI(ick2.ResourceApiBase): def __init__(self, state): super().__init__('projects', state) def mangle_new_resource(self, resource): new = dict(resource) if 'next_build_id' not in new: new['next_build_id'] = None return new def mangle_updated_resource(self, old, new): new = dict(new) new['next_build_id'] = old.get('next_build_id') return new def get_resource_name(self, resource): return resource['project'] def get_routes(self, path): # pragma: no cover return super().get_routes(path) + self.get_status_routes(path) def get_status_routes(self, path): # pragma: no cover trigger_path = '{}//+trigger'.format(path) return [ { 'needs-authorization': False, 'method': 'GET', 'path': trigger_path, 'callback': self.GET(self.trigger_project), }, ] def trigger_project(self, project, **kwargs): # pragma: no cover with self._trans.modify('projects', project) as p: self._start_build(p) return {'status': ick2.BUILD_TRIGGERED} def _start_build(self, project): # pragma: no cover build_no = self._pick_build_number(project) build_id = '{}/{}'.format(project['project'], build_no) ick2.log.log('info', msg_text='Starting new build', build_id=build_id) with self._trans.new('builds', build_id) as build: parameters = project.get('parameters', {}) build.from_dict({ 'build_id': build_id, 'build_number': build_no, 'log': '/logs/{}'.format(build_id), 'worker': None, 'project': project['project'], 'parameters': parameters, 'status': ick2.BUILD_TRIGGERED, 'exit_code': None, 'graph': {}, }) create_workspace = { 'action': 'create_workspace', 'where': 'host', } build_obj = ick2.Build(build) graph = build_obj.get_graph() graph.append_action(create_workspace) for action in self._get_actions(project): graph.append_action(action) with self._trans.new('log', build_id) as r: r.from_dict({ 'build_id': build_id, 'log': '', }) return build_id, build_no def _pick_build_number(self, project): # pragma: no cover old_build_no = project.get('next_build_id') build_no = (old_build_no or 0) + 1 project['next_build_id'] = build_no ick2.log.log( 'info', msg_text='chose build number', old_build_no=old_build_no, build_no=build_no) return build_no def _get_actions(self, project): # pragma: no cover actions = [] params = project.get('parameters', {}) for pipeline_name in project.get('pipelines', []): pipeline = self._trans.get_resource('pipelines', pipeline_name) wanted = pipeline.get('parameters', []) missing = [ name for name in wanted if name not in params ] if missing: raise ick2.ParametersMissing(missing) actions.extend(list(pipeline['actions'])) return actions