# Copyright (C) 2017-2018 Lars Wirzenius
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
import ick2
class WorkAPI(ick2.APIbase):
def __init__(self, state):
super().__init__(state)
self._workers = ick2.Workers(state)
self._projects = ick2.Projects(state)
self._ps = ick2.ProjectStatus(state)
self._builds = ick2.Builds(state)
self._type_name = 'work'
def get_routes(self, path): # pragma: no cover
return [
{
'method': 'GET',
'path': '{}/'.format(path),
'callback': self.GET(self.get_work),
},
{
'method': 'POST',
'path': path,
'callback': self.POST(self.update_work),
},
]
def get_work(self, worker, **kwargs):
worker_state = self._workers.get_worker(worker)
if not worker_state.get('doing'):
project = self._pick_triggered_project()
if project is None:
doing = {}
else:
self._set_project_status(project['project'], 'building')
build_id, build_no = self._start_build(project, worker)
self._start_log(build_id)
build = self._get_build(build_id)
actions = build['actions']
current_action = build['current_action']
doing = {
'build_id': build_id,
'build_number': build_no,
'worker': worker,
'project': project['project'],
'parameters': project.get('parameters', {}),
'step': actions[current_action],
'log': '/logs/{}'.format(build_id),
}
worker_state = {
'worker': worker,
'doing': doing,
}
self._workers.update_worker(worker_state)
return worker_state['doing']
def _pick_build_number(self, project):
old_build_no = project.get('next_build_id')
build_no = (old_build_no or 0) + 1
project['next_build_id'] = build_no
self._projects.update_project(project)
ick2.log.log(
'info', msg_text='chose build number',
old_build_no=old_build_no, build_no=build_no)
return build_no
def _pick_triggered_project(self):
projects = self._projects.get_projects()
for project in projects:
ps = self._ps.get_instance(project['project'])
if ps.get('status') == 'triggered':
return project
return None
def _set_project_status(self, project_name, status):
ps = self._ps.get_instance(project_name)
ps['status'] = status
self._ps.update_instance(project_name, ps)
def _start_build(self, project, worker):
build_no = self._pick_build_number(project)
build_id = '{}/{}'.format(project['project'], build_no)
ick2.log.log(
'info', msg_text='Starting new build', build_id=build_id,
build_no=build_no)
parameters = project.get('parameters', {})
create_workspace = {
'action': 'create_workspace',
}
actions = [create_workspace] + self._get_actions(project)
build = {
'build_id': build_id,
'build_number': build_no,
'log': '/logs/{}'.format(build_id),
'worker': worker,
'project': project['project'],
'parameters': parameters,
'status': 'building',
'actions': actions,
'current_action': 0,
}
self._builds.add(build_id, build)
return build_id, build_no
def _get_actions(self, project):
actions = []
for pipeline_name in project['pipelines']:
pipeline = self._state.get_resource('pipelines', pipeline_name)
actions.extend(list(pipeline['actions']))
return actions
def _get_build(self, build_id):
return self._builds.get(build_id)
def _start_log(self, build_id):
ick2.log.log('info', msg_text='Starting new log', build_id=build_id)
log = {
'build_id': build_id,
'log': '',
}
self._state.add_resource('log', str(build_id), log)
def update_work(self, update, **kwargs):
if 'worker' not in update: # pragma: no cover
raise ick2.BadUpdate('no worker specified')
worker_state = self._workers.get_worker(update['worker'])
doing = worker_state.get('doing', {})
self._check_work_update(doing, update)
project_name = update['project']
self._append_to_build_log(update)
exit_code = update.get('exit_code')
if exit_code == 0:
build_id = doing['build_id']
build = self._get_build(build_id)
actions = build['actions']
current_action = build['current_action']
if current_action + 1 >= len(actions):
self._set_project_status(project_name, 'idle')
doing = {}
self._finish_build(update)
else:
index = current_action + 1
build['current_action'] = index
self._update_build(build)
doing['step'] = actions[index]
worker_state = {
'worker': update['worker'],
'doing': doing,
}
self._workers.update_worker(worker_state)
elif exit_code is not None:
assert isinstance(exit_code, int)
assert exit_code != 0
self._set_project_status(project_name, 'idle')
self._finish_build(update)
worker_state = {
'worker': update['worker'],
'doing': {},
}
self._workers.update_worker(worker_state)
def _check_work_update(self, doing, update): # pragma: no cover
must_match = ['worker', 'project', 'build_id']
for name in must_match:
if name not in update:
raise ick2.BadUpdate('{} not specified'.format(name))
if doing.get(name) != update[name]:
raise ick2.BadUpdate(
'{} differs from current work: {} vs {}'.format(
name, doing.get(name), update[name]))
def _append_to_build_log(self, update):
build_id = update['build_id']
log = self._state.get_resource('log', str(build_id))
for kind in ['stdout', 'stderr']:
text = update.get(kind, '')
if text is not None:
log['log'] += text
self._state.update_resource('log', str(build_id), log)
def _update_build(self, build):
self._builds.update_build(build)
def _finish_build(self, update):
build_id = update['build_id']
build = self._get_build(build_id)
build['status'] = update['exit_code']
build['current_action'] = None
self._update_build(build)
def create(self, body, **kwargs): # pragma: no cover
pass
def update(self, body, name, **kwargs): # pragma: no cover
pass
def list(self, **kwargs): # pragma: no cover
pass
def show(self, name, **kwargs): # pragma: no cover
pass
def delete(self, name, **kwargs): # pragma: no cover
pass