summaryrefslogtreecommitdiff
path: root/ick2/workapi.py
diff options
context:
space:
mode:
Diffstat (limited to 'ick2/workapi.py')
-rw-r--r--ick2/workapi.py254
1 files changed, 104 insertions, 150 deletions
diff --git a/ick2/workapi.py b/ick2/workapi.py
index 32c988e..d254515 100644
--- a/ick2/workapi.py
+++ b/ick2/workapi.py
@@ -20,11 +20,7 @@ class WorkAPI(ick2.APIbase):
def __init__(self, state):
super().__init__(state)
- self._workers = ick2.Workers(state)
- self._projects = ick2.Projects(state)
- self._ps = ick2.ProjectStatus(state)
- self._builds = ick2.Builds(state)
- self._type_name = 'work'
+ self._trans = ick2.TransactionalState(state)
def get_routes(self, path): # pragma: no cover
return [
@@ -41,38 +37,40 @@ class WorkAPI(ick2.APIbase):
]
def get_work(self, **kwargs):
- worker = self._get_client_id(**kwargs)
- worker_state = self._workers.get_worker(worker)
- if not worker_state.get('doing'):
- project = self._pick_triggered_project()
- if project is None:
- doing = {}
- else:
- self._set_project_status(project['project'], 'building')
-
- build_id, build_no = self._start_build(project, worker)
+ worker_id = self._get_client_id(**kwargs)
+ with self._trans.modify('workers', worker_id) as worker:
+ doing = worker.get('doing')
+ if doing:
+ return doing
+
+ build_id = self._pick_build(worker_id)
+ if build_id is None:
+ return {}
+
+ with self._trans.modify('builds', build_id) as build:
+ build['status'] = 'building'
+ build['worker'] = worker_id
+
self._start_log(build_id)
- build = self._get_build(build_id)
+
actions = build['actions']
current_action = build['current_action']
-
doing = {
'build_id': build_id,
- 'build_number': build_no,
- 'worker': worker,
- 'project': project['project'],
- 'parameters': project.get('parameters', {}),
+ 'build_number': build['build_number'],
+ 'worker': worker_id,
+ 'project': build['project'],
+ 'parameters': build['parameters'],
'step': actions[current_action],
- 'log': '/logs/{}'.format(build_id),
+ 'log': build['log'],
}
- worker_state = {
- 'worker': worker,
- 'doing': doing,
- }
- self._workers.update_worker(worker_state)
+ worker.from_dict({
+ 'worker': worker_id,
+ 'doing': doing,
+ })
- return worker_state['doing']
+ return worker['doing']
def _get_client_id(self, **kwargs):
claims = kwargs.get('claims', {})
@@ -81,147 +79,103 @@ class WorkAPI(ick2.APIbase):
raise ick2.ClientIdMissing()
return client_id
- def _pick_build_number(self, project):
- old_build_no = project.get('next_build_id')
- build_no = (old_build_no or 0) + 1
- project['next_build_id'] = build_no
- self._projects.update_project(project)
- ick2.log.log(
- 'info', msg_text='chose build number',
- old_build_no=old_build_no, build_no=build_no)
- return build_no
-
- def _pick_triggered_project(self):
- projects = self._projects.get_projects()
- for project in projects:
- ps = self._ps.get_instance(project['project'])
- if ps.get('status') == 'triggered':
- return project
- return None
+ def _pick_build(self, worker):
+ def on_worker(build):
+ return build.get('worker') == worker
+
+ def status(build):
+ return build.get('status')
- def _set_project_status(self, project_name, status):
- ps = self._ps.get_instance(project_name)
- ps['status'] = status
- self._ps.update_instance(project_name, ps)
-
- def _start_build(self, project, worker):
- build_no = self._pick_build_number(project)
- build_id = '{}/{}'.format(project['project'], build_no)
-
- ick2.log.log(
- 'info', msg_text='Starting new build', build_id=build_id,
- build_no=build_no)
- parameters = project.get('parameters', {})
- create_workspace = {
- 'action': 'create_workspace',
- 'where': 'host',
- }
- actions = [create_workspace] + self._get_actions(project)
- build = {
- 'build_id': build_id,
- 'build_number': build_no,
- 'log': '/logs/{}'.format(build_id),
- 'worker': worker,
- 'project': project['project'],
- 'parameters': parameters,
- 'status': 'building',
- 'actions': actions,
- 'current_action': 0,
- }
- self._builds.add(build_id, build)
- return build_id, build_no
-
- def _get_actions(self, project):
- actions = []
- for pipeline_name in project['pipelines']:
- pipeline = self._state.get_resource('pipelines', pipeline_name)
- actions.extend(list(pipeline['actions']))
- return actions
-
- def _get_build(self, build_id):
- return self._builds.get(build_id)
+ def is_building(build):
+ return status(build) == 'building'
+
+ def is_triggered(build):
+ return status(build) == 'triggered'
+
+ builds = self._trans.get_resources('builds')
+ return (self._find_build(builds, on_worker, is_building) or
+ self._find_build(builds, is_triggered))
+
+ def _find_build(self, builds, *preds):
+ for build in builds:
+ if all(pred(build) for pred in preds):
+ return build['build_id']
+ return None
def _start_log(self, build_id):
ick2.log.log('info', msg_text='Starting new log', build_id=build_id)
- log = {
- 'build_id': build_id,
- 'log': '',
- }
- self._state.add_resource('log', str(build_id), log)
+ with self._trans.new('log', build_id) as r:
+ r.from_dict({
+ 'build_id': build_id,
+ 'log': '',
+ })
def update_work(self, update, **kwargs):
- if 'worker' not in update: # pragma: no cover
- raise ick2.BadUpdate('no worker specified')
-
- worker_state = self._workers.get_worker(update['worker'])
- doing = worker_state.get('doing', {})
- self._check_work_update(doing, update)
-
- project_name = update['project']
-
- self._append_to_build_log(update)
-
- exit_code = update.get('exit_code')
- if exit_code == 0:
- build_id = doing['build_id']
- build = self._get_build(build_id)
- actions = build['actions']
- current_action = build['current_action']
- if current_action + 1 >= len(actions):
- self._set_project_status(project_name, 'idle')
- doing = {}
- self._finish_build(update)
- else:
- index = current_action + 1
- build['current_action'] = index
- self._update_build(build)
- doing['step'] = actions[index]
-
- worker_state = {
- 'worker': update['worker'],
- 'doing': doing,
- }
- self._workers.update_worker(worker_state)
- elif exit_code is not None:
- assert isinstance(exit_code, int)
- assert exit_code != 0
- self._set_project_status(project_name, 'idle')
- self._finish_build(update)
-
- worker_state = {
- 'worker': update['worker'],
- 'doing': {},
- }
- self._workers.update_worker(worker_state)
+ try:
+ worker_id = update['worker']
+ build_id = update['build_id']
+ project_name = update['project']
+ exit_code = update.get('exit_code')
+ except KeyError as e: # pragma: no cover
+ raise ick2.BadUpdate(str(e))
+
+ with self._trans.modify('workers', worker_id) as worker:
+ with self._trans.modify('builds', build_id) as build:
+ doing = worker.get('doing', {})
+ self._check_work_update(doing, update)
+ self._append_to_build_log(update)
+
+ if exit_code is not None:
+ if exit_code == 0:
+ action = self._move_to_next_action(build)
+ if action is None:
+ doing = {}
+ self._finish_build(build, exit_code)
+ else:
+ doing['step'] = action
+
+ worker.from_dict({
+ 'worker': worker_id,
+ 'doing': doing,
+ })
+ elif exit_code is not None:
+ self._finish_build(build, exit_code)
+ worker.from_dict({
+ 'worker': worker_id,
+ 'doing': {},
+ })
def _check_work_update(self, doing, update): # pragma: no cover
must_match = ['worker', 'project', 'build_id']
for name in must_match:
if name not in update:
raise ick2.BadUpdate('{} not specified'.format(name))
- if doing.get(name) != update[name]:
+ if doing.get(name) is not None and doing.get(name) != update[name]:
raise ick2.BadUpdate(
'{} differs from current work: {} vs {}'.format(
name, doing.get(name), update[name]))
def _append_to_build_log(self, update):
build_id = update['build_id']
- log = self._state.get_resource('log', str(build_id))
- for kind in ['stdout', 'stderr']:
- text = update.get(kind, '')
- if text is not None:
- log['log'] += text
- self._state.update_resource('log', str(build_id), log)
-
- def _update_build(self, build):
- self._builds.update_build(build)
-
- def _finish_build(self, update):
- build_id = update['build_id']
- build = self._get_build(build_id)
- build['status'] = update['exit_code']
+ with self._trans.modify('log', build_id) as log:
+ for stream in ['stdout', 'stderr']:
+ text = update.get(stream, '')
+ if text is not None:
+ log['log'] += text
+
+ def _move_to_next_action(self, build):
+ actions = build['actions']
+ current_action = build['current_action']
+ if current_action + 1 >= len(actions):
+ return None
+
+ index = current_action + 1
+ build['current_action'] = index
+ return actions[index]
+
+ def _finish_build(self, build, exit_code):
+ build['status'] = exit_code
build['current_action'] = None
- self._update_build(build)
def create(self, body, **kwargs): # pragma: no cover
pass