#!/usr/bin/python3 # Copyright 2017-2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import configparser import json import logging import os import sys import urllib3 import cliapp import yaml import ick2 def scopes(base): patterns = [ "uapi_{}_get", "uapi_{}_post", "uapi_{}_id_get", "uapi_{}_id_put", "uapi_{}_id_delete", ] return [x.format(base) for x in patterns] def scopes_for_types(typelist): result = [] for type_name in typelist: result.extend(scopes(type_name)) return result types = ["projects", "pipelines", "workers", "work", "builds", "logs"] class Icktool(cliapp.Application): _default_scopes = [ "uapi_version_get", "uapi_work_post", "uapi_status_get", "uapi_projects_id_status_get", "uapi_projects_id_status_put", "uapi_blobs_id_get", "uapi_blobs_id_put", "uapi_notify_post", "create", "update", "show", "delete", ] + scopes_for_types(types) def add_settings(self): self.settings.string( ["controller", "c"], "use URL as the controller base URL", metavar="URL" ) self.settings.string( ["secrets"], "use FILE for credentials for authentication server", metavar="FILE", default=os.path.expanduser("~/.config/icktool/credentials.conf"), ) self.settings.boolean( ["verify-tls"], "verify API provider TLS certificate " "(default is verify, use --no-verify-tls)", default=True, ) self.settings.string( ["token"], "use TOKEN instead of generating a new one", metavar="TOKEN" ) self.settings.string_list( ["scope"], "add SCOPE to the list of scope in requested token", metavar="SCOPE", default=self._default_scopes, ) def cmd_scopes(self, args): for scope in self.settings["scope"]: self.output.write("{}\n".format(scope)) def cmd_token(self, args): api = self._new_api() token = self._new_token(api) self.output.write("{}\n".format(token)) def cmd_version(self, args): cmd = self._command(VersionCommand) cmd.execute(args) def cmd_status(self, args): cmd = self._command(StatusCommand) cmd.execute(args) def cmd_build_graph(self, args): cmd = self._command(BuildGraphCommand) cmd.execute(args) def cmd_make_it_so(self, args): cmd = self._command(MakeItSoCommand) cmd.execute(args) def cmd_trigger(self, args): cmd = self._command(TriggerCommand) cmd.execute(args) def cmd_show(self, args): cmd = self._command(ShowCommand) cmd.execute(args) def cmd_delete(self, args): cmd = self._command(DeleteCommand) cmd.execute(args) def cmd_show_log(self, args): cmd = self._command(ShowLogCommand) cmd.execute(args) def cmd_show_latest_log(self, args): cmd = self._command(ShowLatestLogCommand) cmd.execute(args) def cmd_get_artifact(self, args): cmd = self._command(GetArtifactCommand) cmd.execute(args) def _command(self, klass): api = self._new_api() token = self._new_token(api) api.set_token(token) return klass(api, self.output) def _new_api(self): if not self.settings["verify-tls"]: urllib3.disable_warnings() logging.captureWarnings(True) api = ick2.ControllerClient() api.set_verify_tls(self.settings["verify-tls"]) api.set_controller_url(self.settings["controller"]) return api def _new_auth(self, api): c_url = api.get_controller_url() auth_url = api.get_auth_url() client_id, client_secret = self._get_client_creds(c_url, auth_url) ac = ick2.AuthClient() ac.set_http_api(api.get_http_api()) ac.set_auth_url(auth_url) ac.set_client_creds(client_id, client_secret) return ac def _new_token(self, api): if self.settings["token"]: return self.settings["token"] ac = self._new_auth(api) scopes = " ".join(self.settings["scope"]) try: return ac.get_token(scopes) except ick2.HttpError as e: sys.stderr.write("Error getting token: %s\n" % str(e)) sys.exit(1) def _get_client_creds(self, controller_url, auth_url): cp = configparser.ConfigParser() cp.read(self.settings["secrets"]) if cp.has_section(controller_url): section = controller_url else: section = auth_url client_id = cp.get(section, "client_id") client_secret = cp.get(section, "client_secret") return client_id, client_secret def _prettyson(self, obj): json.dump(obj, self.output, indent=4, sort_keys=True) self.output.write("\n") class Table: def __init__(self): self._column_names = None self._rows = [] def set_columns(self, *column_names): self._column_names = column_names def append_row(self, **kwargs): self._rows.append(kwargs) def format(self): assert self._column_names is not None headings = {key: key for key in self._column_names} self._rows.insert(0, headings) widths = self._column_widths() underlines = {key: "-" * widths[key] for key in self._column_names} self._rows.insert(1, underlines) lines = [self._format_row(widths, row) for row in self._rows] return "".join("{}\n".format(line) for line in lines) def _format_headings(self, widths): row = {key: key for key in self._column_names} return self._format_row(widths, row) def _format_row(self, widths, row): return " | ".join( self._format_cell(widths[x], row[x]) for x in self._column_names ) def _format_cell(self, width, value): return "%*s" % (width, value) def _column_widths(self): return {key: self._width(key) for key in self._column_names} def _width(self, column_name): return max(len(str(row[column_name])) for row in self._rows) def _latest_build(project_name, builds): builds = _find_builds(project_name, builds) if builds: return builds[-1] return None def _find_builds(project_name, builds): return [b for b in builds["builds"] if b["project"] == project_name] def _find_build(builds, build_id): for build in builds: if build["build_id"] == build_id: return build return None class Command: def __init__(self, api, output): self.api = api self.output = output def _check_for_leading_slash(self, args): for arg in args: if not arg.startswith("/"): raise Exception("Argument should start with slash: {}".format(arg)) def _prettyson(self, obj): json.dump(obj, self.output, indent=4, sort_keys=True) self.output.write("\n") def execute(self, args): raise NotImplementedError() class VersionCommand(Command): def execute(self, args): version = self.api.get_version() self._prettyson(version) class StatusCommand(Command): def execute(self, args): table = Table() table.set_columns("project", "build_status", "log_id") projects = self.api.show("/projects") builds = self.api.show("/builds") projects = self._sort_projects(projects) for project in projects: project_name = project["project"] row = {"project": project_name, "build_status": "n/a", "log_id": "n/a"} build = _latest_build(project_name, builds) if build: bs = build["status"] if bs == 0: bs = "OK" elif isinstance(bs, int): bs = "FAILED ({})".format(bs) row["build_status"] = bs row["log_id"] = build["log"] table.append_row(**row) self.output.write(table.format()) def _sort_projects(self, projects): return list(sorted(projects["projects"], key=lambda p: p["project"])) class BuildGraphCommand(Command): def execute(self, args): project_name = args[0] if len(args) > 1: build_id = args[1] else: build_id = "latest" builds = self.api.show("/builds") builds = _find_builds(project_name, builds) if not builds: sys.exit("No such build %s" % build_id) if build_id == "latest": build = builds[-1] else: build = _find_build(builds, build_id) actions = build["actions"] current = build["current_action"] status = build["status"] f = self.output f.write('digraph "build_graph" {\n') for i, action in enumerate(actions): self._describe_node(f, i, current, action) if i > 0: f.write("a{} -> a{}\n".format(i - 1, i)) f.write("}\n") def _describe_node(self, f, i, current, action): styles = { "done": ("rectangle", "#ffffff"), "building": ("ellipse", "#00ff00"), "blocked": ("ellipse", "#bbbbbb"), } if current is None: shape, color = styles["done"] elif i < current: shape, color = styles["done"] elif i == current: shape, color = styles["building"] elif i > current: shape, color = styles["blocked"] tmpl = 'a{} [label="{}" shape={} style=filled fillcolor="{}"]\n' f.write(tmpl.format(i, self._describe_action(action), shape, color)) def _describe_action(self, action): for key in ["action", "archive"]: if key in action: return "{}: {}".format(key, action[key]) for key in ["debootstrap", "shell", "python"]: if key in action: return key return str(action) class MakeItSoCommand(Command): def execute(self, args): if not args: obj = self._read_object(sys.stdin) self._make_it_so(obj) else: for filename in args: with open(filename) as f: obj = self._read_object(f) self._make_it_so(obj) def _read_object(self, f): return yaml.load(f) def _make_it_so(self, obj): self._create_resources("/projects", "project", obj.get("projects", [])) self._create_resources("/pipelines", "pipeline", obj.get("pipelines", [])) def _create_resources(self, path, name_field, objs): for obj in objs: try: self.api.create(path, obj) except ick2.HttpError: update_path = "{}/{}".format(path, obj[name_field]) self.api.update(update_path, obj) class TriggerCommand(Command): def execute(self, args): for project in args: try: self._prettyson(self.api.trigger(project)) except ick2.HttpError as e: sys.stderr.write("ERROR: {}: {}\n".format(project, str(e))) class ShowCommand(Command): def execute(self, args): if not args: args = ["/projects", "/pipelines"] else: self._check_for_leading_slash(args) for what in args: objs = self.api.show(what) self._prettyson(objs) class DeleteCommand(Command): def execute(self, args): self._check_for_leading_slash(args) for what in args: self.api.delete(what) class ShowLogCommand(Command): def execute(self, args): self._check_for_leading_slash(args) for log_id in args: log = self.api.show_blob(log_id) log = log.decode("UTF-8") self.output.write(log) if not log.endswith("\n"): self.output.write("\n") class ShowLatestLogCommand(Command): def execute(self, args): builds = self.api.show("/builds") if builds: builds = builds["builds"] for project in args: latest = self._get_latest_build(builds, project) if latest: log_id = latest["log"] log = self.api.show_blob(log_id) log = log.decode("UTF-8") self.output.write(log) if not log.endswith("\n"): self.output.write("\n") def _get_latest_build(self, builds, project): builds = [b for b in builds if b["project"] == project] if builds: return builds[-1] return None class GetArtifactCommand(Command): def execute(self, args): if not args: raise Exception("Must give blob id as argument") blob_id = args[0] path = "/blobs/{}".format(blob_id) blob = self.api.show_blob(path) sys.stdout.buffer.write(blob) Icktool(version=ick2.__version__).run()