# Copyright (C) 2017 Lars Wirzenius
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
import apifw
import ick2
class ControllerAPI:
def __init__(self):
self._state = ick2.ControllerState()
def get_state_directory(self):
return self._state.get_state_directory()
def set_state_directory(self, dirname):
self._state.set_state_directory(dirname)
def find_missing_route(self, missing_path): # pragma: no cover
apis = {
'/version': VersionAPI,
'/builds': BuildsAPI,
'/logs': LogAPI,
'/projects': ProjectAPI,
'/work': WorkAPI,
'/workers': WorkerAPI,
}
routes = []
for path in apis:
self._state.load_resources(path[1:])
api = apis[path](self._state)
routes.extend(api.get_routes(path))
ick2.log.log('info', msg_texg='Found routes', routes=routes)
return routes
class APIbase: # pragma: no cover
def __init__(self, state):
self._state = state
def get_routes(self, path):
return [
{
'method': 'POST',
'path': path,
'callback': self.POST(self.create),
},
{
'method': 'GET',
'path': path,
'callback': self.GET(self.list),
},
{
'method': 'GET',
'path': '{}/'.format(path),
'callback': self.GET(self.show),
},
{
'method': 'PUT',
'path': '{}/'.format(path),
'callback': self.PUT(self.update),
},
{
'method': 'DELETE',
'path': '{}/'.format(path),
'callback': self.DELETE(self.delete),
},
]
def GET(self, callback):
def wrapper(content_type, body, **kwargs):
ick2.log.log(
'trace', msg_text='GET called', kwargs=kwargs,
content_type=content_type, body=body)
try:
if 'raw_uri_path' in kwargs:
del kwargs['raw_uri_path']
body = callback(**kwargs)
except ick2.NotFound as e:
return not_found(e)
if isinstance(body, dict):
return OK(body)
elif isinstance(body, str):
return text_plain(body)
raise Exception('this must not happen')
return wrapper
def POST(self, callback):
def wrapper(content_type, body, **kwargs):
ick2.log.log(
'trace', msg_text='POST called', kwargs=kwargs,
content_type=content_type, body=body)
body = callback(body)
ick2.log.log('trace', msg_text='returned body', body=repr(body))
return created(body)
return wrapper
def PUT(self, callback):
def wrapper(content_type, body, **kwargs):
ick2.log.log(
'trace', msg_text='PUT called', kwargs=kwargs,
content_type=content_type, body=body)
if 'raw_uri_path' in kwargs:
del kwargs['raw_uri_path']
try:
body = callback(body, **kwargs)
except ick2.NotFound as e:
return not_found(e)
except ick2.WrongPipelineStatus as e:
ick2.log.log(
'error',
msg_text='Wrong state for pipeline',
exception=str(e))
return bad_request(e)
ick2.log.log('trace', msg_text='returned body', body=repr(body))
return OK(body)
return wrapper
def DELETE(self, callback):
def wrapper(content_type, body, **kwargs):
ick2.log.log(
'trace', msg_text='DELETE called', kwargs=kwargs,
content_type=content_type, body=body)
try:
if 'raw_uri_path' in kwargs:
del kwargs['raw_uri_path']
body = callback(**kwargs)
except ick2.NotFound as e:
return not_found(e)
return OK(body)
return wrapper
def create(self, body):
raise NotImplementedError()
def update(self, body, name):
raise NotImplementedError()
def delete(self, name):
raise NotImplementedError()
def list(self):
raise NotImplementedError()
def show(self, name):
raise NotImplementedError()
class VersionAPI(APIbase):
def __init__(self, state):
super().__init__(state)
def get_routes(self, path): # pragma: no cover
return [
{
'method': 'GET',
'path': path,
'callback': self.GET(self.get_version),
}
]
def get_version(self):
return {'version': ick2.__version__}
def create(self, *args): # pragma: no cover
pass
def update(self, *args): # pragma: no cover
pass
def delete(self, *args): # pragma: no cover
pass
def list(self): # pragma: no cover
pass
def show(self, *args): # pragma: no cover
pass
class ResourceApiBase(APIbase):
def __init__(self, type_name, state):
super().__init__(state)
self._type_name = type_name
def list(self):
return {
self._type_name: self._state.get_resources(self._type_name),
}
def show(self, name):
return self._state.get_resource(self._type_name, name)
def create(self, body):
return self._state.add_resource(
self._type_name, self.get_resource_name(body), body)
def get_resource_name(self, resource): # pragma: no cover
raise NotImplementedError
def update(self, body, name):
return self._state.update_resource(self._type_name, name, body)
def delete(self, name):
self._state.remove_resource(self._type_name, name)
class WorkerAPI(ResourceApiBase): # pragma: no cover
def __init__(self, state):
super().__init__('workers', state)
def get_resource_name(self, resource):
return resource['worker']
class BuildsAPI(ResourceApiBase): # pragma: no cover
def __init__(self, state):
super().__init__('builds', state)
def get_resource_name(self, resource):
return resource['build']
def create(self, body): # pragma: no cover
raise MethodNotAllowed('Creating builds directly is not allowed')
def update(self, body, name): # pragma: no cover
raise MethodNotAllowed('Updating builds directly is not allowed')
def list(self):
result = super().list()
items = result[self._type_name]
items.sort(key=lambda x: x.get('build_id'))
result[self._type_name] = items
return result
class LogAPI(ResourceApiBase): # pragma: no cover
def __init__(self, state):
super().__init__('log', state)
def get_resource_name(self, resource):
return resource['log']
def create(self, body): # pragma: no cover
raise MethodNotAllowed('Creating builds directly is not allowed')
def update(self, body, name): # pragma: no cover
raise MethodNotAllowed('Updating builds directly is not allowed')
def show(self, name):
log = self._state.get_resource('log', str(name))
ick2.log.log('info', msg_text='Returning log', log=log)
return log['log']
class ProjectAPI(ResourceApiBase):
def __init__(self, state):
super().__init__('projects', state)
def get_resource_name(self, resource):
return resource['project']
def get_routes(self, path): # pragma: no cover
return super().get_routes(path) + self.get_pipeline_routes(path)
def get_pipeline_routes(self, path): # pragma: no cover
pipeline_path = '{}//pipelines/'.format(path)
builds_path = '{}//builds'.format(path)
return [
{
'method': 'GET',
'path': pipeline_path,
'callback': self.GET(self.get_pipeline),
},
{
'method': 'PUT',
'path': pipeline_path,
'callback': self.PUT(self.set_pipeline_callback),
},
{
'method': 'GET',
'path': builds_path,
'callback': self.GET(self.get_builds),
},
]
def get_pipeline(self, project, pipeline):
p = self._state.get_resource(self._type_name, project)
for pl in p['pipelines']:
if pl['name'] == pipeline:
return {
'status': pl.get('status', 'idle'),
}
raise ick2.NotFound()
def set_pipeline_callback(
self, body, project, pipeline): # pragma: no cover
return self.set_pipeline(body['status'], project, pipeline)
def set_pipeline(self, state, project, pipeline):
allowed_changes = {
'idle': 'triggered',
'triggered': 'building',
'building': 'idle',
}
p = self._state.get_resource(self._type_name, project)
for pl in p['pipelines']:
if pl['name'] == pipeline:
old_state = pl.get('status', 'idle')
if allowed_changes[old_state] != state:
raise ick2.WrongPipelineStatus(state)
pl['status'] = state
self._state.update_resource(self._type_name, project, p)
return {'status': state}
raise ick2.NotFound()
def get_builds(self, project):
p = self._state.get_resource(self._type_name, project)
return {
'project': project,
'builds': p.get('builds', []),
}
class WorkAPI(APIbase):
def __init__(self, state):
super().__init__(state)
self._type_name = 'work'
def get_routes(self, path): # pragma: no cover
return [
{
'method': 'GET',
'path': '{}/'.format(path),
'callback': self.GET(self.get_work),
},
{
'method': 'POST',
'path': path,
'callback': self.POST(self.update_work),
},
]
def get_work(self, worker):
worker_state = self._get_worker(worker)
if not worker_state.get('doing'):
project, pipeline = self._pick_triggered_pipeline()
if project is None:
doing = {}
else:
pipeline['status'] = 'building'
self._update_project(project)
build_id = self._start_build(project, pipeline, worker)
self._start_log(build_id)
index = 0
doing = {
'build_id': build_id,
'worker': worker,
'project': project['project'],
'pipeline': pipeline['name'],
'step': pipeline['actions'][index],
'step_index': index,
'log': '/logs/{}'.format(build_id),
}
worker_state = {
'worker': worker,
'doing': doing,
}
self._update_worker(worker_state)
return worker_state['doing']
def _get_worker(self, worker): # pragma: no cover
try:
return self._state.get_resource('workers', worker)
except ick2.NotFound:
return {
'worker': worker,
}
def _update_worker(self, worker_state):
self._state.update_resource(
'workers', worker_state['worker'], worker_state)
def _pick_triggered_pipeline(self):
projects = self._get_projects()
for project in projects:
for pipeline in project['pipelines']:
if pipeline.get('status') == 'triggered':
return project, pipeline
return None, None
def _get_projects(self):
return self._state.get_resources('projects')
def _update_project(self, project):
self._state.update_resource('projects', project['project'], project)
def update_work(self, update):
if 'worker' not in update: # pragma: no cover
raise BadUpdate('no worker specified')
worker_state = self._get_worker(update['worker'])
doing = worker_state.get('doing', {})
self._check_work_update(doing, update)
project, pipeline = self._get_pipeline(
update['project'], update['pipeline'])
self._append_to_build_log(update)
ick2.log.log(
'trace',
msg_text='xxx update_work',
update=update,
project=project,
pipeline=pipeline,
doing=doing)
exit_code = update.get('exit_code')
if exit_code == 0:
index = doing['step_index'] + 1
actions = pipeline['actions']
if index >= len(actions):
pipeline['status'] = 'idle'
doing = {}
self._finish_build(update)
else:
doing['step_index'] = index
doing['step'] = actions[index]
self._update_project(project)
worker_state = {
'worker': update['worker'],
'doing': doing,
}
self._update_worker(worker_state)
elif exit_code is not None:
assert isinstance(exit_code, int)
assert exit_code != 0
pipeline['status'] = 'idle'
self._update_project(project)
self._finish_build(update)
worker_state = {
'worker': update['worker'],
'doing': {},
}
self._update_worker(worker_state)
def _check_work_update(self, doing, update): # pragma: no cover
must_match = ['worker', 'project', 'pipeline', 'build_id']
for name in must_match:
if name not in update:
raise BadUpdate('{} not specified'.format(name))
if doing.get(name) != update[name]:
raise BadUpdate(
'{} differs from current work: {} vs {}'.format(
name, doing.get(name), update[name]))
def _get_pipeline(self, project, pipeline): # pragma: no cover
projects = self._get_projects()
for p in projects:
for pl in p['pipelines']:
if pl.get('name') == pipeline:
return p, pl
raise ick2.NotFound()
def _start_build(self, project, pipeline, worker):
ick2.log.log('info', msg_text='Starting new build')
build_id = project.get('build_id', 0)
build_id += 1
project['build_id'] = build_id
self._update_project(project)
build = {
'build_id': build_id,
'log': '/logs/{}'.format(build_id),
'worker': worker,
'project': project['project'],
'pipeline': pipeline['name'],
'status': 'building',
}
self._state.add_resource('builds', str(build_id), build)
return build_id
def _start_log(self, build_id):
ick2.log.log('info', msg_text='Starting new log', build_id=build_id)
log = {
'build_id': build_id,
'log': '',
}
self._state.add_resource('log', str(build_id), log)
return build_id
def _append_to_build_log(self, update):
build_id = update['build_id']
log = self._state.get_resource('log', str(build_id))
for kind in ['stdout', 'stderr']:
text = update.get(kind, '')
if text is not None:
log['log'] += text
self._state.update_resource('log', str(build_id), log)
def _finish_build(self, update):
build = self._state.get_resource('builds', str(update['build_id']))
build['status'] = update['exit_code']
self._state.update_resource('builds', str(update['build_id']), build)
def create(self, *args, **kwargs): # pragma: no cover
pass
def update(self, *args, **kwargs): # pragma: no cover
pass
def list(self, *args, **kwargs): # pragma: no cover
pass
def show(self, *args, **kwargs): # pragma: no cover
pass
def delete(self, *args, **kwargs): # pragma: no cover
pass
class BadUpdate(Exception): # pragma: no cover
def __init__(self, how):
super().__init__('Work update is BAD: {}'.format(how))
class MethodNotAllowed(Exception): # pragma: no cover
def __init__(self, wat):
super().__init__(wat)
def response(status_code, body, headers): # pragma: no cover
obj = {
'status': status_code,
'body': body,
'headers': headers,
}
return apifw.Response(obj)
def OK(body): # pragma: no cover
headers = {
'Content-Type': 'application/json',
}
return response(apifw.HTTP_OK, body, headers)
def text_plain(body): # pragma: no cover
headers = {
'Content-Type': 'text/plain',
}
return response(apifw.HTTP_OK, body, headers)
def not_found(error): # pragma: no cover
headers = {
'Content-Type': 'text/plain',
}
return response(apifw.HTTP_NOT_FOUND, str(error), headers)
def bad_request(error): # pragma: no cover
headers = {
'Content-Type': 'text/plain',
}
return response(apifw.HTTP_BAD_REQUEST, str(error), headers)
def created(body): # pragma: no cover
headers = {
'Content-Type': 'application/json',
}
return response(apifw.HTTP_CREATED, body, headers)