#!/usr/bin/python3 # Copyright 2017-2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json import logging import sys import time import apifw import cliapp import Crypto.PublicKey.RSA import requests import yaml import ick2 def scopes(base): patterns = [ 'uapi_{}_get', 'uapi_{}_post', 'uapi_{}_id_get', 'uapi_{}_id_put', 'uapi_{}_id_delete', ] return [x.format(base) for x in patterns] def scopes_for_types(typelist): result = [] for type_name in typelist: result.extend(scopes(type_name)) return result types = [ 'projects', 'pipelines', 'workers', 'work', 'builds', 'logs', ] class Icktool(cliapp.Application): _default_scopes = [ 'uapi_version_get', 'uapi_work_post', 'uapi_projects_id_pipelines_id_get', 'uapi_projects_id_pipelines_id_put', 'uapi_blobs_id_get', 'uapi_blobs_id_put', ] + scopes_for_types(types) def add_settings(self): self.settings.string( ['controller', 'c'], 'use URL as the controller base URL', metavar='URL', ) self.settings.boolean( ['verify-tls'], 'verify TLS certifcate signature? default is yes', ) self.settings.string( ['token'], 'use TOKEN instead of generating a new one', metavar='TOKEN', ) self.settings.string( ['token-private-key-cmd'], 'run CMD to print private key for token signing', metavar='CMD', ) self.settings.string_list( ['scope'], 'add SCOPE to the list of scope in generated token', metavar='SCOPE', default=self._default_scopes, ) def setup(self): if not self.settings['verify-tls']: logging.captureWarnings(True) def cmd_scopes(self, args): for scope in self.settings['scope']: sys.stdout.write('{}\n'.format(scope)) def cmd_token(self, args): token = self._new_token() sys.stdout.write(token) def cmd_version(self, args): api = self._new_api() version = api.get_version() if not version: sys.exit(1) self._prettyson(version) def cmd_status(self, args): rows = [] projects = self._get_projects() project_objs = sorted( projects['projects'], key=lambda p: p.get('project')) builds = self._get_builds()['builds'] for project in project_objs: pipelines = sorted(project['pipelines']) for pipeline in pipelines: build = self._get_latest_build( project['project'], pipeline, builds) if build is None: build = { 'build_id': 'never', 'log': 'none', 'status': 'n/a', } status = self._get_pipeline_status( project['project'], pipeline) row = { 'project': project['project'], 'pipeline': pipeline, 'build_id': build['build_id'], 'status': status['status'], 'build_status': build['status'], 'log': build['log'], } rows.append(row) self._pretty_table( rows, ['project', 'pipeline', 'status', 'build_status', 'log']) def _get_projects(self): rc = self._new_rc('/projects', 'project') projects = rc.list() return projects def _get_builds(self): rc = self._new_rc('/builds', 'build') return rc.list() def _get_latest_build(self, project, pipeline, builds): latest = None for build in builds: if (build['project'], build['pipeline']) == (project, pipeline): latest = build return latest def cmd_make_it_so(self, args): obj = self._read_object() projects = self._new_rc('/projects', 'project') self._make_it_so(projects, obj.get('projects', [])) pipelines = self._new_rc('/pipelines', 'pipeline') self._make_it_so(pipelines, obj.get('pipelines', [])) def _make_it_so(self, rc, objs): for obj in objs: if rc.exists(obj): rc.update(obj) else: rc.create(obj) def cmd_list_projects(self, args): self._prettyson(self._get_projects()) def cmd_create_project(self, args): rc = self._new_rc('/projects', 'project') obj = self._read_object() rc.create(obj) def cmd_update_project(self, args): rc = self._new_rc('/projects', 'project') obj = self._read_object() rc.update(obj) def cmd_show_project(self, args): rc = self._new_rc('/projects', 'project') name = args[0] self._prettyson(rc.show(name)) def cmd_delete_project(self, args): rc = self._new_rc('/projects', 'project') name = args[0] rc.delete(name) def cmd_list_pipelines(self, args): rc = self._new_rc('/pipelines', 'pipeline') self._prettyson(rc.list()) def cmd_create_pipeline(self, args): rc = self._new_rc('/pipelines', 'pipeline') obj = self._read_object() rc.create(obj) def cmd_update_pipeline(self, args): rc = self._new_rc('/pipelines', 'pipeline') obj = self._read_object() rc.update(obj) def cmd_show_pipeline(self, args): rc = self._new_rc('/pipelines', 'pipeline') name = args[0] self._prettyson(rc.show(name)) def cmd_delete_pipeline(self, args): rc = self._new_rc('/pipelines', 'pipeline') name = args[0] rc.delete(name) def cmd_show_pipeline_status(self, args): self._prettyson(self._get_pipeline_status(args[0], args[1])) def _get_pipeline_status(self, project, pipeline): path = '/projects/{}/pipelines/{}'.format(project, pipeline) api = self._new_api() code, text = api.get(path) self._report(code, 200, text) return json.loads(text) def _report(self, code, expected, text): if code != expected: sys.stderr.write('HTTP status {}\n'.format(code)) sys.stderr.write(text) if not text.endswith('\n'): sys.stderr.write('\n') sys.exit(1) def cmd_set_pipeline(self, args): project = args[0] pipeline = args[1] state = args[2] path = '/projects/{}/pipelines/{}'.format(project, pipeline) api = self._new_api() code, text = api.put(path, {'status': state}) self._report(code, 200, text) obj = json.loads(text) self._prettyson(obj) def cmd_trigger(self, args): project = args[0] pipeline = args[1] path = '/projects/{}/pipelines/{}/+trigger'.format(project, pipeline) api = self._new_api() code, text = api.get(path) self._report(code, 200, text) obj = json.loads(text) self._prettyson(obj) def cmd_list_workers(self, args): rc = self._new_rc('/workers', 'worker') self._prettyson(rc.list()) def cmd_create_worker(self, args): rc = self._new_rc('/workers', 'worker') obj = self._read_object() rc.create(obj) def cmd_update_worker(self, args): rc = self._new_rc('/workers', 'worker') obj = self._read_object() rc.update(obj) def cmd_show_worker(self, args): rc = self._new_rc('/workers', 'worker') name = args[0] self._prettyson(rc.show(name)) def cmd_delete_worker(self, args): rc = self._new_rc('/workers', 'worker') name = args[0] rc.delete(name) def cmd_list_builds(self, args): self._prettyson(self._get_builds()) def cmd_list_logs(self, args): rc = self._new_rc('/logs', 'log') self._prettyson(rc.list()) def cmd_show_log(self, args): log = args[0] path = '/logs/{}'.format(log) api = self._new_api() code, text = api.get(path) self._report(code, 200, text) self.output.write(text) def cmd_show_latest_log(self, args): project = args[0] builds = self._get_builds() project_builds = [ b for b in builds['builds'] if b['project'] == project ] if project_builds: b = project_builds[-1] self.cmd_show_log([b['build_id']]) def cmd_get_blob(self, args): blob_id = args[0] api = self._new_api() blob_api = self._new_blob_api(api) status_code, blob = blob_api.get(blob_id) if status_code == 200: filename = self.settings['output'] with open(filename, 'wb') as f: f.write(blob) else: sys.exit('Error: {}'.format(status_code)) def cmd_put_blob(self, args): blob_id = args[0] blob = sys.stdin.read() api = self._new_api() blob_api = self._new_blob_api(api) code, text = blob_api.put(blob_id, blob) if code != 200: sys.exit(text) def _new_token(self): wanted_scopes = self.settings['scope'] cmd = self.settings['token-private-key-cmd'] if not cmd: raise cliapp.AppException('no --token-private-cmd specified') gen = TokenGenerator() gen.set_cmd(cmd) gen.set_scopes(wanted_scopes) token = gen.new_token() self.settings['token'] = token return token def _new_api(self): token = self.settings['token'] or self._new_token() api = API() api.set_token(token) api.set_url(self.settings['controller']) api.set_verify(self.settings['verify-tls']) return api def _new_blob_api(self, api): token = self.settings['token'] or self._new_token() blob_api = BlobAPI() blob_api.set_token(token) blob_api.set_url(api.get_artifact_store_url()) blob_api.set_verify(self.settings['verify-tls']) return blob_api def _new_rc(self, path, field_name): api = self._new_api() return ResourceCommands(path, api, field_name) def _prettyson(self, obj): json.dump(obj, sys.stdout, indent=4, sort_keys=True) sys.stdout.write('\n') def _read_object(self): return yaml.load(sys.stdin) def _pretty_table(self, rows, columns): headings = { column: column for column in columns } widths = { column: 0 for column in columns } for row in [headings] + rows: for column in columns: widths[column] = max(widths[column], len(str(row[column]))) underlines = { column: '-' * widths[column] for column in columns } for row in [headings, underlines] + rows: self.output.write( '{}\n'.format(self._pretty_row(widths, row, columns))) def _pretty_row(self, widths, row, columns): parts = ['%*s' % (widths[c], row[c]) for c in columns] return ' | '.join(parts) class API: def __init__(self): self._url = None self._token = None self._verify = True def set_url(self, url): self._url = url def set_token(self, token): self._token = token def set_verify(self, verify): self._verify = verify def get_version(self): code, text = self.get('/version') if code == 200: return json.loads(text) def get_artifact_store_url(self): version = self.get_version() if version: return version.get('artifact_store') def get(self, path): assert self._url is not None assert self._token is not None full_url = '{}/{}'.format(self._url, path) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.get(full_url, headers=headers, verify=self._verify) return r.status_code, r.text def post(self, path, obj): assert self._url is not None assert self._token is not None full_url = '{}{}'.format(self._url, path) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.post( full_url, json=obj, headers=headers, verify=self._verify) return r.status_code, r.text def put(self, path, obj): assert self._url is not None assert self._token is not None full_url = '{}{}'.format(self._url, path) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.put( full_url, json=obj, headers=headers, verify=self._verify) return r.status_code, r.text def delete(self, path): assert self._url is not None assert self._token is not None full_url = '{}{}'.format(self._url, path) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.delete( full_url, headers=headers, verify=self._verify) return r.status_code, r.text class BlobAPI: def __init__(self): self._url = None self._token = None self._verify = True def set_url(self, url): self._url = url def set_token(self, token): self._token = token def set_verify(self, verify): self._verify = verify def get(self, blob_id): assert self._url is not None assert self._token is not None full_url = '{}/blobs/{}'.format(self._url, blob_id) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.get(full_url, headers=headers, verify=self._verify) print('blob length', len(r.content)) return r.status_code, r.content def put(self, blob_id, blob): assert self._url is not None assert self._token is not None full_url = '{}/blobs/{}'.format(self._url, blob_id) headers = { 'Authorization': 'Bearer {}'.format(self._token), 'Content-Type': 'application/octet-stream', } r = requests.put( full_url, data=blob, headers=headers, verify=self._verify) return r.status_code, r.text class ResourceCommands: def __init__(self, path, api, name_field): self._path = path self._api = api self._name = name_field def list(self): code, text = self._api.get(self._path) self._report(code, 200, text) return json.loads(text) def create(self, obj): code, text = self._api.post(self._path, obj) self._report(code, 201, text) def update(self, obj): code, text = self._api.put(self._id_path(obj[self._name]), obj) self._report(code, 200, text) def show(self, name): code, text = self._api.get(self._id_path(name)) self._report(code, 200, text) return json.loads(text) def exists(self, obj): code, text = self._api.get(self._id_path(obj[self._name])) return code == 200 def delete(self, name): code, text = self._api.delete(self._id_path(name)) self._report(code, 200, text) def _id_path(self, name): return '{}/{}'.format(self._path, name) def _report(self, code, expected, text): if code != expected: sys.stderr.write('HTTP status {}\n'.format(code)) sys.stderr.write(text) if not text.endswith('\n'): sys.stderr.write('\n') sys.exit(1) class TokenGenerator: def __init__(self): self._cmd = None self._scopes = None def set_cmd(self, cmd): self._cmd = cmd def set_scopes(self, wanted_scopes): self._scopes = wanted_scopes def new_token(self): assert self._cmd is not None assert self._scopes is not None # These should agree with how ick controller is configured. # See the Ansible playbook. They should probably be # configurable. iss = 'localhost' aud = 'localhost' privkey = cliapp.runcmd(['sh', '-c', self._cmd]) key = Crypto.PublicKey.RSA.importKey(privkey) wanted_scopes = ' '.join(self._scopes) now = time.time() claims = { 'iss': iss, 'sub': 'subject-uuid', 'aud': aud, 'exp': now + 86400, 'scope': wanted_scopes, } token = apifw.create_token(claims, key) return token.decode('ascii') Icktool(version=ick2.__version__).run()