#!/usr/bin/python3 # Copyright 2017-2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import configparser import json import logging import os import sys import urllib3 import cliapp import yaml import ick2 def scopes(base): patterns = [ 'uapi_{}_get', 'uapi_{}_post', 'uapi_{}_id_get', 'uapi_{}_id_put', 'uapi_{}_id_delete', ] return [x.format(base) for x in patterns] def scopes_for_types(typelist): result = [] for type_name in typelist: result.extend(scopes(type_name)) return result types = [ 'projects', 'pipelines', 'workers', 'work', 'builds', 'logs', ] class Icktool(cliapp.Application): _default_scopes = [ 'uapi_version_get', 'uapi_work_post', 'uapi_status_get', 'uapi_projects_id_status_get', 'uapi_projects_id_status_put', 'uapi_blobs_id_get', 'uapi_blobs_id_put', 'uapi_notify_post', ] + scopes_for_types(types) def add_settings(self): self.settings.string( ['controller', 'c'], 'use URL as the controller base URL', metavar='URL', ) self.settings.string( ['secrets'], 'use FILE for credentials for authentication server', metavar='FILE', default=os.path.expanduser('~/.config/icktool/credentials.conf') ) self.settings.boolean( ['verify-tls'], 'verify API provider TLS certificate ' '(default is verify, use --no-verify-tls)', default=True, ) self.settings.string( ['token'], 'use TOKEN instead of generating a new one', metavar='TOKEN', ) self.settings.string_list( ['scope'], 'add SCOPE to the list of scope in requested token', metavar='SCOPE', default=self._default_scopes, ) def cmd_scopes(self, args): for scope in self.settings['scope']: self.output.write('{}\n'.format(scope)) def cmd_token(self, args): api = self._new_api() token = self._new_token(api) self.output.write('{}\n'.format(token)) def cmd_version(self, args): cmd = self._command(VersionCommand) cmd.execute(args) def cmd_status(self, args): cmd = self._command(StatusCommand) cmd.execute(args) def cmd_build_graph(self, args): cmd = self._command(BuildGraphCommand) cmd.execute(args) def cmd_make_it_so(self, args): cmd = self._command(MakeItSoCommand) cmd.execute(args) def cmd_trigger(self, args): cmd = self._command(TriggerCommand) cmd.execute(args) def cmd_show(self, args): cmd = self._command(ShowCommand) cmd.execute(args) def cmd_delete(self, args): cmd = self._command(DeleteCommand) cmd.execute(args) def cmd_show_log(self, args): cmd = self._command(ShowLogCommand) cmd.execute(args) def cmd_show_latest_log(self, args): cmd = self._command(ShowLatestLogCommand) cmd.execute(args) def cmd_get_artifact(self, args): cmd = self._command(GetArtifactCommand) cmd.execute(args) def _command(self, klass): api = self._new_api() token = self._new_token(api) api.set_token(token) return klass(api, self.output) def _new_api(self): if not self.settings['verify-tls']: urllib3.disable_warnings() logging.captureWarnings(True) api = ick2.ControllerClient() api.set_verify_tls(self.settings['verify-tls']) api.set_controller_url(self.settings['controller']) return api def _new_auth(self, api): c_url = api.get_controller_url() auth_url = api.get_auth_url() client_id, client_secret = self._get_client_creds(c_url, auth_url) ac = ick2.AuthClient() ac.set_http_api(api.get_http_api()) ac.set_auth_url(auth_url) ac.set_client_creds(client_id, client_secret) return ac def _new_token(self, api): if self.settings['token']: return self.settings['token'] ac = self._new_auth(api) scopes = ' '.join(self.settings['scope']) try: return ac.get_token(scopes) except ick2.HttpError as e: sys.stderr.write('Error getting token: %s\n' % str(e)) sys.exit(1) def _get_client_creds(self, controller_url, auth_url): cp = configparser.ConfigParser() cp.read(self.settings['secrets']) if cp.has_section(controller_url): section = controller_url else: section = auth_url client_id = cp.get(section, 'client_id') client_secret = cp.get(section, 'client_secret') return client_id, client_secret def _prettyson(self, obj): json.dump(obj, self.output, indent=4, sort_keys=True) self.output.write('\n') class Table: def __init__(self): self._column_names = None self._rows = [] def set_columns(self, *column_names): self._column_names = column_names def append_row(self, **kwargs): self._rows.append(kwargs) def format(self): assert self._column_names is not None headings = { key: key for key in self._column_names } self._rows.insert(0, headings) widths = self._column_widths() underlines = { key: '-' * widths[key] for key in self._column_names } self._rows.insert(1, underlines) lines = [self._format_row(widths, row) for row in self._rows] return ''.join('{}\n'.format(line) for line in lines) def _format_headings(self, widths): row = { key: key for key in self._column_names } return self._format_row(widths, row) def _format_row(self, widths, row): return ' | '.join( self._format_cell(widths[x], row[x]) for x in self._column_names ) def _format_cell(self, width, value): return '%*s' % (width, value) def _column_widths(self): return { key: self._width(key) for key in self._column_names } def _width(self, column_name): return max(len(str(row[column_name])) for row in self._rows) def _latest_build(project_name, builds): builds = _find_builds(project_name, builds) if builds: return builds[-1] return None def _find_builds(project_name, builds): return [b for b in builds['builds'] if b['project'] == project_name] def _find_build(builds, build_id): for build in builds: if build['build_id'] == build_id: return build return None class Command: def __init__(self, api, output): self.api = api self.output = output def _check_for_leading_slash(self, args): for arg in args: if not arg.startswith('/'): raise Exception( 'Argument should start with slash: {}'.format(arg)) def _prettyson(self, obj): json.dump(obj, self.output, indent=4, sort_keys=True) self.output.write('\n') def execute(self, args): raise NotImplementedError() class VersionCommand(Command): def execute(self, args): version = self.api.get_version() self._prettyson(version) class StatusCommand(Command): def execute(self, args): table = Table() table.set_columns('project', 'build_status', 'log_id') projects = self.api.show('/projects') builds = self.api.show('/builds') projects = self._sort_projects(projects) for project in projects: project_name = project['project'] row = { 'project': project_name, 'build_status': 'n/a', 'log_id': 'n/a' } build = _latest_build(project_name, builds) if build: bs = build['status'] if bs == 0: bs = 'OK' elif isinstance(bs, int): bs = 'FAILED ({})'.format(bs) row['build_status'] = bs row['log_id'] = build['log'] table.append_row(**row) self.output.write(table.format()) def _sort_projects(self, projects): return list(sorted(projects['projects'], key=lambda p: p['project'])) class BuildGraphCommand(Command): def execute(self, args): project_name = args[0] if len(args) > 1: build_id = args[1] else: build_id = 'latest' builds = self.api.show('/builds') builds = _find_builds(project_name, builds) if not builds: sys.exit('No such build %s' % build_id) if build_id == 'latest': build = builds[-1] else: build = _find_build(builds, build_id) actions = build['actions'] current = build['current_action'] status = build['status'] f = self.output f.write('digraph "build_graph" {\n') for i, action in enumerate(actions): self._describe_node(f, i, current, action) if i > 0: f.write('a{} -> a{}\n'.format(i-1, i)) f.write('}\n') def _describe_node(self, f, i, current, action): styles = { 'done': ('rectangle', '#ffffff'), 'building': ('ellipse', '#00ff00'), 'blocked': ('ellipse', '#bbbbbb'), } if current is None: shape, color = styles['done'] elif i < current: shape, color = styles['done'] elif i == current: shape, color = styles['building'] elif i > current: shape, color = styles['blocked'] tmpl = 'a{} [label="{}" shape={} style=filled fillcolor="{}"]\n' f.write(tmpl.format(i, self._describe_action(action), shape, color)) def _describe_action(self, action): for key in ['action', 'archive']: if key in action: return '{}: {}'.format(key, action[key]) for key in ['debootstrap', 'shell', 'python']: if key in action: return key return str(action) class MakeItSoCommand(Command): def execute(self, args): if not args: obj = self._read_object(sys.stdin) self._make_it_so(obj) else: for filename in args: with open(filename) as f: obj = self._read_object(f) self._make_it_so(obj) def _read_object(self, f): return yaml.load(f) def _make_it_so(self, obj): self._create_resources( '/projects', 'project', obj.get('projects', [])) self._create_resources( '/pipelines', 'pipeline', obj.get('pipelines', [])) def _create_resources(self, path, name_field, objs): for obj in objs: try: self.api.create(path, obj) except ick2.HttpError: update_path = '{}/{}'.format(path, obj[name_field]) self.api.update(update_path, obj) class TriggerCommand(Command): def execute(self, args): for project in args: try: self._prettyson(self.api.trigger(project)) except ick2.HttpError as e: sys.stderr.write('ERROR: {}: {}\n'.format(project, str(e))) class ShowCommand(Command): def execute(self, args): if not args: args = [ '/projects', '/pipelines', ] else: self._check_for_leading_slash(args) for what in args: objs = self.api.show(what) self._prettyson(objs) class DeleteCommand(Command): def execute(self, args): self._check_for_leading_slash(args) for what in args: self.api.delete(what) class ShowLogCommand(Command): def execute(self, args): self._check_for_leading_slash(args) for log_id in args: log = self.api.show_blob(log_id) log = log.decode('UTF-8') self.output.write(log) if not log.endswith('\n'): self.output.write('\n') class ShowLatestLogCommand(Command): def execute(self, args): builds = self.api.show('/builds') if builds: builds = builds['builds'] for project in args: latest = self._get_latest_build(builds, project) if latest: log_id = latest['log'] log = self.api.show_blob(log_id) log = log.decode('UTF-8') self.output.write(log) if not log.endswith('\n'): self.output.write('\n') def _get_latest_build(self, builds, project): builds = [b for b in builds if b['project'] == project] if builds: return builds[-1] return None class GetArtifactCommand(Command): def execute(self, args): if not args: raise Exception('Must give blob id as argument') blob_id = args[0] path = '/blobs/{}'.format(blob_id) blob = self.api.show_blob(path) sys.stdout.buffer.write(blob) Icktool(version=ick2.__version__).run()