#!/usr/bin/python3 # Copyright 2017 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json import logging import sys import time import apifw import cliapp import Crypto.PublicKey.RSA import requests import ick2 def scopes(base): patterns = [ 'uapi_{}_get', 'uapi_{}_post', 'uapi_{}_id_get', 'uapi_{}_id_put', 'uapi_{}_id_delete', ] return [x.format(base) for x in patterns] class Icktool(cliapp.Application): _default_scopes = [ 'uapi_version_get', 'uapi_work_post', ] + scopes('projects') + scopes('workers') + scopes('work') + scopes('builds') + scopes('logs') def add_settings(self): self.settings.string( ['controller'], 'use URL as the controller base URL', metavar='URL', ) self.settings.boolean( ['verify-tls'], 'verify TLS certifcate signature? default is yes', ) self.settings.string( ['token'], 'use TOKEN instead of generating a new one', metavar='TOKEN', ) self.settings.string( ['token-private-key-cmd'], 'run CMD to print private key for token signing', metavar='CMD', ) self.settings.string_list( ['scope'], 'add SCOPE to the list of scope in generated token', metavar='SCOPE', default=self._default_scopes, ) def setup(self): if not self.settings['verify-tls']: logging.captureWarnings(True) def cmd_scopes(self, args): for scope in self.settings['scope']: sys.stdout.write('{}\n'.format(scope)) def cmd_token(self, args): token = self._new_token() sys.stdout.write(token) def cmd_version(self, args): api = self._new_api() code, text = api.get('/version') if code != 200: sys.stderr.write('HTTP status {}\n'.format(code)) sys.stderr.write(text) sys.exit(1) obj = json.loads(text) self._prettyson(obj) def cmd_list_projects(self, args): rc = self._new_rc('/projects', 'project') self._prettyson(rc.list()) def cmd_create_project(self, args): rc = self._new_rc('/projects', 'project') obj = self._read_object() rc.create(obj) def cmd_update_project(self, args): rc = self._new_rc('/projects', 'project') obj = self._read_object() rc.update(obj) def cmd_show_project(self, args): rc = self._new_rc('/projects', 'project') name = args[0] self._prettyson(rc.show(name)) def cmd_delete_project(self, args): rc = self._new_rc('/projects', 'project') name = args[0] rc.delete(name) def cmd_show_pipeline(self, args): project = args[0] pipeline = args[1] path = '/projects/{}/pipelines/{}'.format(project, pipeline) api = self._new_api() code, text = api.get(path) self._report(code, 200, text) obj = json.loads(text) self._prettyson(obj) def _report(self, code, expected, text): if code != expected: sys.stderr.write('HTTP status {}\n'.format(code)) sys.stderr.write(text) if not text.endswith('\n'): sys.stderr.write('\n') sys.exit(1) def cmd_set_pipeline(self, args): project = args[0] pipeline = args[1] state = args[2] path = '/projects/{}/pipelines/{}'.format(project, pipeline) api = self._new_api() code, text = api.put(path, {'status': state}) self._report(code, 200, text) obj = json.loads(text) self._prettyson(obj) def cmd_list_workers(self, args): rc = self._new_rc('/workers', 'worker') self._prettyson(rc.list()) def cmd_create_worker(self, args): rc = self._new_rc('/workers', 'worker') obj = self._read_object() rc.create(obj) def cmd_update_worker(self, args): rc = self._new_rc('/workers', 'worker') obj = self._read_object() rc.update(obj) def cmd_show_worker(self, args): rc = self._new_rc('/workers', 'worker') name = args[0] self._prettyson(rc.show(name)) def cmd_delete_worker(self, args): rc = self._new_rc('/workers', 'worker') name = args[0] rc.delete(name) def cmd_list_builds(self, args): rc = self._new_rc('/builds', 'build') self._prettyson(rc.list()) def cmd_list_logs(self, args): rc = self._new_rc('/logs', 'log') self._prettyson(rc.list()) def cmd_show_log(self, args): log = args[0] path = '/logs/{}'.format(log) api = self._new_api() code, text = api.get(path) self._report(code, 200, text) self.output.write(text) def _new_token(self): scopes = self.settings['scope'] cmd = self.settings['token-private-key-cmd'] if not cmd: raise cliapp.AppException('no --token-private-cmd specified') gen = TokenGenerator() gen.set_cmd(cmd) gen.set_scopes(scopes) return gen.new_token() def _new_api(self): token = self.settings['token'] or self._new_token() api = API() api.set_token(token) api.set_url(self.settings['controller']) api.set_verify(self.settings['verify-tls']) return api def _new_rc(self, path, field_name): api = self._new_api() return ResourceCommands(path, api, field_name) def _prettyson(self, obj): json.dump(obj, sys.stdout, indent=4) sys.stdout.write('\n') def _read_object(self): return json.load(sys.stdin) class API: def __init__(self): self._url = None self._token = None self._verify = True def set_url(self, url): self._url = url def set_token(self, token): self._token = token def set_verify(self, verify): self._verify = verify def get(self, path): assert self._url is not None assert self._token is not None full_url = '{}/{}'.format(self._url, path) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.get(full_url, headers=headers, verify=self._verify) return r.status_code, r.text def post(self, path, obj): assert self._url is not None assert self._token is not None full_url = '{}{}'.format(self._url, path) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.post( full_url, json=obj, headers=headers, verify=self._verify) return r.status_code, r.text def put(self, path, obj): assert self._url is not None assert self._token is not None full_url = '{}{}'.format(self._url, path) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.put( full_url, json=obj, headers=headers, verify=self._verify) return r.status_code, r.text def delete(self, path): assert self._url is not None assert self._token is not None full_url = '{}{}'.format(self._url, path) headers = { 'Authorization': 'Bearer {}'.format(self._token), } r = requests.delete( full_url, headers=headers, verify=self._verify) return r.status_code, r.text class ResourceCommands: def __init__(self, path, api, name_field): self._path = path self._api = api self._name = name_field def list(self): code, text = self._api.get(self._path) self._report(code, 200, text) return json.loads(text) def create(self, obj): code, text = self._api.post(self._path, obj) self._report(code, 201, text) def update(self, obj): code, text = self._api.put(self._id_path(obj[self._name]), obj) self._report(code, 200, text) def show(self, name): code, text = self._api.get(self._id_path(name)) self._report(code, 200, text) return json.loads(text) def delete(self, name): code, text = self._api.delete(self._id_path(name)) self._report(code, 200, text) def _id_path(self, name): return '{}/{}'.format(self._path, name) def _report(self, code, expected, text): if code != expected: sys.stderr.write('HTTP status {}\n'.format(code)) sys.stderr.write(text) if not text.endswith('\n'): sys.stderr.write('\n') sys.exit(1) class TokenGenerator: def __init__(self): self._cmd = None self._scopes = None def set_cmd(self, cmd): self._cmd = cmd def set_scopes(self, scopes): self._scopes = scopes def new_token(self): assert self._cmd is not None assert self._scopes is not None # These should agree with how ick controller is configured. # See the Ansible playbook. They should probably be # configurable. iss = 'localhost' aud = 'localhost' privkey = cliapp.runcmd(['sh', '-c', self._cmd]) key = Crypto.PublicKey.RSA.importKey(privkey) scopes = ' '.join(self._scopes) now = time.time() claims = { 'iss': iss, 'sub': 'subject-uuid', 'aud': aud, 'exp': now + 86400, # FIXME: This is silly long 'scope': scopes, } token = apifw.create_token(claims, key) return token.decode('ascii') Icktool(version=ick2.__version__).run()