#!/usr/bin/python3 # Copyright 2017-2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import configparser import json import logging import os import sys import cliapp import yaml import ick2 def scopes(base): patterns = [ 'uapi_{}_get', 'uapi_{}_post', 'uapi_{}_id_get', 'uapi_{}_id_put', 'uapi_{}_id_delete', ] return [x.format(base) for x in patterns] def scopes_for_types(typelist): result = [] for type_name in typelist: result.extend(scopes(type_name)) return result types = [ 'projects', 'pipelines', 'workers', 'work', 'builds', 'logs', ] class Icktool(cliapp.Application): _default_scopes = [ 'uapi_version_get', 'uapi_work_post', 'uapi_projects_id_status_get', 'uapi_projects_id_status_put', 'uapi_blobs_id_get', 'uapi_blobs_id_put', ] + scopes_for_types(types) def add_settings(self): self.settings.string( ['controller', 'c'], 'use URL as the controller base URL', metavar='URL', ) self.settings.string( ['secrets'], 'use FILE for credentials for authentication server', metavar='FILE', default=os.path.expanduser('~/.config/icktool/credentials.conf') ) self.settings.boolean( ['verify-tls'], 'verify API provider TLS certificate ' '(default is verify, use --no-verify-tls)', default=True, ) self.settings.string( ['token'], 'use TOKEN instead of generating a new one', metavar='TOKEN', ) self.settings.string_list( ['scope'], 'add SCOPE to the list of scope in requested token', metavar='SCOPE', default=self._default_scopes, ) def setup(self): if not self.settings['verify-tls']: logging.captureWarnings(True) def cmd_scopes(self, args): for scope in self.settings['scope']: self.output.write('{}\n'.format(scope)) def cmd_token(self, args): api = self._new_api() token = self._new_token(api) self.output.write('{}\n'.format(token)) def cmd_version(self, args): api = self._new_api() token = self._new_token(api) api.set_token(token) version = api.get_version() self._prettyson(version) def cmd_status(self, args): table = Table() table.set_columns('project', 'status', 'build_status', 'log_id') api = self._new_api() token = self._new_token(api) api.set_token(token) projects = api.show('/projects') builds = api.show('/builds') for project in projects['projects']: project_name = project['project'] project_status = api.show( '/projects/{}/status'.format(project_name)) row = { 'project': project_name, 'status': project_status['status'], 'build_status': 'n/a', 'log_id': 'n/a' } build = self._latest_build(project_name, builds) if build: row['build_status'] = build['status'] row['log_id'] = build['log'] table.append_row(**row) self.output.write(table.format()) def _latest_build(self, project_name, builds): builds = self._find_builds(project_name, builds) if builds: return builds[-1] return None def _find_builds(self, project_name, builds): return [b for b in builds['builds'] if b['project'] == project_name] def _find_build(self, builds, build_id): for build in builds: if build['build_id'] == build_id: return build return None def cmd_build_graph(self, args): api = self._new_api() token = self._new_token(api) api.set_token(token) project_name = args[0] if len(args) > 1: build_id = args[1] else: build_id = 'latest' builds = api.show('/builds') builds = self._find_builds(project_name, builds) if not builds: sys.exit('No such build %s' % build_id) if build_id == 'latest': build = builds[-1] else: build = self._find_build(builds, build_id) actions = build['actions'] current = build['current_action'] status = build['status'] styles = { 'done': ('rectangle', '#ffffff'), 'building': ('ellipse', '#00ff00'), 'blocked': ('ellipse', '#bbbbbb'), } node_tmpl = 'a{} [label="{}" shape={} style=filled fillcolor="{}"]\n' f = self.output f.write('digraph "build_graph" {\n') for i, action in enumerate(actions): if current is None: shape, color = styles['done'] elif i < current: shape, color = styles['done'] elif i == current: shape, color = styles['building'] elif i > current: shape, color = styles['blocked'] f.write( node_tmpl.format( i, self._describe_action(action), shape, color)) if i > 0: f.write('a{} -> a{}\n'.format(i-1, i)) f.write('}\n') def _describe_action(self, action): for key in ['action', 'archive']: if key in action: return '{}: {}'.format(key, action[key]) for key in ['debootstrap', 'shell', 'python']: if key in action: return key return str(action) def cmd_make_it_so(self, argv): obj = self._read_object() api = self._new_api() token = self._new_token(api) api.set_token(token) self._create_resources(api, '/projects', obj.get('projects', [])) self._create_resources(api, '/pipelines', obj.get('pipelines', [])) def _read_object(self): return yaml.load(sys.stdin) def _create_resources(self, api, path, objs): for obj in objs: api.create(path, obj) def cmd_trigger(self, args): project = args[0] api = self._new_api() token = self._new_token(api) api.set_token(token) self._prettyson(api.trigger(project)) def cmd_show(self, args): api = self._new_api() token = self._new_token(api) api.set_token(token) if not args: args = [ 'projects', 'pipelines', ] for kind in args: objs = api.show('/' + kind) self._prettyson(objs) def _new_api(self): api = ick2.ControllerClient() api.set_verify_tls(self.settings['verify-tls']) api.set_controller_url(self.settings['controller']) return api def _new_auth(self, api): url = api.get_auth_url() client_id, client_secret = self._get_client_creds(url) ac = ick2.AuthClient() ac.set_auth_url(url) ac.set_client_creds(client_id, client_secret) return ac def _new_token(self, api): if self.settings['token']: return self.settings['token'] ac = self._new_auth(api) scopes = ' '.join(self.settings['scope']) return ac.get_token(scopes) def _get_client_creds(self, url): cp = configparser.ConfigParser() cp.read(self.settings['secrets']) client_id = cp.get(url, 'client_id') client_secret = cp.get(url, 'client_secret') return client_id, client_secret def _prettyson(self, obj): json.dump(obj, self.output, indent=4, sort_keys=True) self.output.write('\n') class Table: def __init__(self): self._column_names = None self._rows = [] def set_columns(self, *column_names): self._column_names = column_names def append_row(self, **kwargs): self._rows.append(kwargs) def format(self): assert self._column_names is not None headings = { key: key for key in self._column_names } self._rows.insert(0, headings) widths = self._column_widths() underlines = { key: '-' * widths[key] for key in self._column_names } self._rows.insert(1, underlines) lines = [self._format_row(widths, row) for row in self._rows] return ''.join('{}\n'.format(line) for line in lines) def _format_headings(self, widths): row = { key: key for key in self._column_names } return self._format_row(widths, row) def _format_row(self, widths, row): return ' | '.join( self._format_cell(widths[x], row[x]) for x in self._column_names ) def _format_cell(self, width, value): return '%*s' % (width, value) def _column_widths(self): return { key: self._width(key) for key in self._column_names } def _width(self, column_name): return max(len(str(row[column_name])) for row in self._rows) Icktool(version=ick2.__version__).run()