diff options
Diffstat (limited to 'ick2/workapi.py')
-rw-r--r-- | ick2/workapi.py | 254 |
1 files changed, 104 insertions, 150 deletions
diff --git a/ick2/workapi.py b/ick2/workapi.py index 32c988e..d254515 100644 --- a/ick2/workapi.py +++ b/ick2/workapi.py @@ -20,11 +20,7 @@ class WorkAPI(ick2.APIbase): def __init__(self, state): super().__init__(state) - self._workers = ick2.Workers(state) - self._projects = ick2.Projects(state) - self._ps = ick2.ProjectStatus(state) - self._builds = ick2.Builds(state) - self._type_name = 'work' + self._trans = ick2.TransactionalState(state) def get_routes(self, path): # pragma: no cover return [ @@ -41,38 +37,40 @@ class WorkAPI(ick2.APIbase): ] def get_work(self, **kwargs): - worker = self._get_client_id(**kwargs) - worker_state = self._workers.get_worker(worker) - if not worker_state.get('doing'): - project = self._pick_triggered_project() - if project is None: - doing = {} - else: - self._set_project_status(project['project'], 'building') - - build_id, build_no = self._start_build(project, worker) + worker_id = self._get_client_id(**kwargs) + with self._trans.modify('workers', worker_id) as worker: + doing = worker.get('doing') + if doing: + return doing + + build_id = self._pick_build(worker_id) + if build_id is None: + return {} + + with self._trans.modify('builds', build_id) as build: + build['status'] = 'building' + build['worker'] = worker_id + self._start_log(build_id) - build = self._get_build(build_id) + actions = build['actions'] current_action = build['current_action'] - doing = { 'build_id': build_id, - 'build_number': build_no, - 'worker': worker, - 'project': project['project'], - 'parameters': project.get('parameters', {}), + 'build_number': build['build_number'], + 'worker': worker_id, + 'project': build['project'], + 'parameters': build['parameters'], 'step': actions[current_action], - 'log': '/logs/{}'.format(build_id), + 'log': build['log'], } - worker_state = { - 'worker': worker, - 'doing': doing, - } - self._workers.update_worker(worker_state) + worker.from_dict({ + 'worker': worker_id, + 'doing': doing, + }) - return worker_state['doing'] + return worker['doing'] def _get_client_id(self, **kwargs): claims = kwargs.get('claims', {}) @@ -81,147 +79,103 @@ class WorkAPI(ick2.APIbase): raise ick2.ClientIdMissing() return client_id - def _pick_build_number(self, project): - old_build_no = project.get('next_build_id') - build_no = (old_build_no or 0) + 1 - project['next_build_id'] = build_no - self._projects.update_project(project) - ick2.log.log( - 'info', msg_text='chose build number', - old_build_no=old_build_no, build_no=build_no) - return build_no - - def _pick_triggered_project(self): - projects = self._projects.get_projects() - for project in projects: - ps = self._ps.get_instance(project['project']) - if ps.get('status') == 'triggered': - return project - return None + def _pick_build(self, worker): + def on_worker(build): + return build.get('worker') == worker + + def status(build): + return build.get('status') - def _set_project_status(self, project_name, status): - ps = self._ps.get_instance(project_name) - ps['status'] = status - self._ps.update_instance(project_name, ps) - - def _start_build(self, project, worker): - build_no = self._pick_build_number(project) - build_id = '{}/{}'.format(project['project'], build_no) - - ick2.log.log( - 'info', msg_text='Starting new build', build_id=build_id, - build_no=build_no) - parameters = project.get('parameters', {}) - create_workspace = { - 'action': 'create_workspace', - 'where': 'host', - } - actions = [create_workspace] + self._get_actions(project) - build = { - 'build_id': build_id, - 'build_number': build_no, - 'log': '/logs/{}'.format(build_id), - 'worker': worker, - 'project': project['project'], - 'parameters': parameters, - 'status': 'building', - 'actions': actions, - 'current_action': 0, - } - self._builds.add(build_id, build) - return build_id, build_no - - def _get_actions(self, project): - actions = [] - for pipeline_name in project['pipelines']: - pipeline = self._state.get_resource('pipelines', pipeline_name) - actions.extend(list(pipeline['actions'])) - return actions - - def _get_build(self, build_id): - return self._builds.get(build_id) + def is_building(build): + return status(build) == 'building' + + def is_triggered(build): + return status(build) == 'triggered' + + builds = self._trans.get_resources('builds') + return (self._find_build(builds, on_worker, is_building) or + self._find_build(builds, is_triggered)) + + def _find_build(self, builds, *preds): + for build in builds: + if all(pred(build) for pred in preds): + return build['build_id'] + return None def _start_log(self, build_id): ick2.log.log('info', msg_text='Starting new log', build_id=build_id) - log = { - 'build_id': build_id, - 'log': '', - } - self._state.add_resource('log', str(build_id), log) + with self._trans.new('log', build_id) as r: + r.from_dict({ + 'build_id': build_id, + 'log': '', + }) def update_work(self, update, **kwargs): - if 'worker' not in update: # pragma: no cover - raise ick2.BadUpdate('no worker specified') - - worker_state = self._workers.get_worker(update['worker']) - doing = worker_state.get('doing', {}) - self._check_work_update(doing, update) - - project_name = update['project'] - - self._append_to_build_log(update) - - exit_code = update.get('exit_code') - if exit_code == 0: - build_id = doing['build_id'] - build = self._get_build(build_id) - actions = build['actions'] - current_action = build['current_action'] - if current_action + 1 >= len(actions): - self._set_project_status(project_name, 'idle') - doing = {} - self._finish_build(update) - else: - index = current_action + 1 - build['current_action'] = index - self._update_build(build) - doing['step'] = actions[index] - - worker_state = { - 'worker': update['worker'], - 'doing': doing, - } - self._workers.update_worker(worker_state) - elif exit_code is not None: - assert isinstance(exit_code, int) - assert exit_code != 0 - self._set_project_status(project_name, 'idle') - self._finish_build(update) - - worker_state = { - 'worker': update['worker'], - 'doing': {}, - } - self._workers.update_worker(worker_state) + try: + worker_id = update['worker'] + build_id = update['build_id'] + project_name = update['project'] + exit_code = update.get('exit_code') + except KeyError as e: # pragma: no cover + raise ick2.BadUpdate(str(e)) + + with self._trans.modify('workers', worker_id) as worker: + with self._trans.modify('builds', build_id) as build: + doing = worker.get('doing', {}) + self._check_work_update(doing, update) + self._append_to_build_log(update) + + if exit_code is not None: + if exit_code == 0: + action = self._move_to_next_action(build) + if action is None: + doing = {} + self._finish_build(build, exit_code) + else: + doing['step'] = action + + worker.from_dict({ + 'worker': worker_id, + 'doing': doing, + }) + elif exit_code is not None: + self._finish_build(build, exit_code) + worker.from_dict({ + 'worker': worker_id, + 'doing': {}, + }) def _check_work_update(self, doing, update): # pragma: no cover must_match = ['worker', 'project', 'build_id'] for name in must_match: if name not in update: raise ick2.BadUpdate('{} not specified'.format(name)) - if doing.get(name) != update[name]: + if doing.get(name) is not None and doing.get(name) != update[name]: raise ick2.BadUpdate( '{} differs from current work: {} vs {}'.format( name, doing.get(name), update[name])) def _append_to_build_log(self, update): build_id = update['build_id'] - log = self._state.get_resource('log', str(build_id)) - for kind in ['stdout', 'stderr']: - text = update.get(kind, '') - if text is not None: - log['log'] += text - self._state.update_resource('log', str(build_id), log) - - def _update_build(self, build): - self._builds.update_build(build) - - def _finish_build(self, update): - build_id = update['build_id'] - build = self._get_build(build_id) - build['status'] = update['exit_code'] + with self._trans.modify('log', build_id) as log: + for stream in ['stdout', 'stderr']: + text = update.get(stream, '') + if text is not None: + log['log'] += text + + def _move_to_next_action(self, build): + actions = build['actions'] + current_action = build['current_action'] + if current_action + 1 >= len(actions): + return None + + index = current_action + 1 + build['current_action'] = index + return actions[index] + + def _finish_build(self, build, exit_code): + build['status'] = exit_code build['current_action'] = None - self._update_build(build) def create(self, body, **kwargs): # pragma: no cover pass |