#!/usr/bin/env python3 # # Copyright 2019 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import base64 import configparser import http import json import os import sys import urllib.request CONFIG_FILENAME = os.path.expanduser('~/.config/effitool/credentials.conf') JSON = 'application/json' URLENC = 'application/x-www-form-urlencoded' ADMIN_SCOPES = [ 'uapi_clients_get', 'uapi_clients_id_get', 'uapi_clients_id_put', 'uapi_clients_id_delete', 'uapi_clients_post', 'uapi_clients_id_secret_put', ] class Config: def __init__(self): self._cp = configparser.ConfigParser() def read(self, filename): with open(filename) as f: self._cp.read_file(f) def dump(self, f): self._cp.write(f) def servers(self): return [k for k in self._cp.keys() if k != 'DEFAULT'] def get_default_server(self): sections = self._cp.sections() if len(sections) != 1: raise Exception('Config must have exactly one server') return sections[0] def get(self, name): server = self._cp[name] return { 'url': server['url'], 'client_id': server['client_id'], 'client_secret': server['client_secret'], } class HTTPAPI: def __init__(self, url): self._url = url def url(self, path): return '{}{}'.format(self._url, path) def request_json(self, r): ok = (http.HTTPStatus.OK, http.HTTPStatus.CREATED) status = r.getcode() if status not in ok: raise Exception('Got HTTP status {}'.format(status)) info = r.info() ct = info.get_content_type() if ct != JSON: raise Exception('Response is not JSON: {}'.ct) body = r.read() return json.loads(body) def get(self, path): url = self.url(path) with urllib.request.urlopen(url) as r: return self.request_json(r) def get_access_token(self, client_id, client_secret, scopes): obj = self.post_form( '/token', client_id, client_secret, grant_type='client_credentials', scope=' '.join(scopes)) return obj['access_token'] def get_json(self, token, path): url = self.url(path) host, port, path = self.parse_url(url) headers = { 'Authorization': 'Bearer {}'.format(token), } req = urllib.request.Request( url, headers=headers, method='GET') r = urllib.request.urlopen(req) return self.request_json(r) def get_list(self, token, path): return self.get_json(token, path) def post_form(self, path, user, password, **kwargs): url = self.url(path) host, port, path = self.parse_url(url) data = urllib.parse.urlencode(kwargs).encode('UTF-8') headers = { 'Content-type': URLENC, 'Authorization': self.get_authorization(user, password), } req = urllib.request.Request( url, data=data, headers=headers, method='POST') r = urllib.request.urlopen(req) return self.request_json(r) def post_json(self, token, path, obj): return self.send_json('POST', token, path, obj) def put_json(self, token, path, obj): return self.send_json('PUT', token, path, obj) def send_json(self, method, token, path, obj): url = self.url(path) host, port, path = self.parse_url(url) data = json.dumps(obj).encode('UTF-8') headers = { 'Content-type': JSON, 'Authorization': 'Bearer {}'.format(token), } req = urllib.request.Request( url, data=data, headers=headers, method=method) r = urllib.request.urlopen(req) return self.request_json(r) def parse_url(self, url): parse = urllib.parse.urlparse(url) if parse.scheme != 'https': raise Exception( 'URL scheme must be https, not {}'.format(parse.scheme)) if ':' in parse.netloc: host, port = parse.netloc.split(':', 1) port = int(port) else: host = parse.netloc port = None return host, port, parse.path def get_authorization(self, user, password): clear = '{}:{}'.format(user, password) basic = base64.b64encode(clear.encode('UTF-8')) return 'Basic {}'.format(basic.decode('UTF-8')) class Tool: def __init__(self, config): self._config = config def servers(self): return self._config.servers() def get_server(self, name): return self._config.get(name) def get_chosen_server(self, args): if args['api']: name = args['api'] else: name = self._config.get_default_server() return self.get_server(name) def get_admin_token(self, server): url = server['url'] client_id = server['client_id'] client_secret = server['client_secret'] api = HTTPAPI(url) return api.get_access_token(client_id, client_secret, ADMIN_SCOPES) def list_servers(self, args): for name in self.servers(): print('server', name) server = self.get_server(name) for key in sorted(server): print(' ', key, server[key]) def status(self, args): for name in self.servers(): server = self.get_server(name) url = server['url'] api = HTTPAPI(url) obj = api.get('/status') print('server', name, obj['resources']) def register_admin_client(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) new_client = { 'id': args['client_id'], } api.post_json(token, '/clients', new_client) new_secret = { 'secret': args['client_secret'], } api.put_json( token, '/clients/{}/secret'.format(new_client['id']), new_secret) print('Created new admin client', new_client['id']) def list_clients(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) obj = api.get_list(token, '/clients') for name in obj.get('resources', []): client = api.get_json(token, '/clients/{}'.format(name)) del client['hashed_secret'] print(json.dumps(client, indent=4)) def process_args(config): tool = Tool(config) subcommands = [ ('list-servers', tool.list_servers, []), ('status', tool.status, []), ('register-admin-client', tool.register_admin_client, [ ('--client-id', {'required':True}), ('--client-secret', {'required':True}), ]), ('list-clients', tool.list_clients, []), ] p = argparse.ArgumentParser() factory = p.add_subparsers() p.add_argument('-a', '--api') for name, func, args in subcommands: pp = factory.add_parser(name) for name, kwargs in args: pp.add_argument(name, **kwargs) pp.set_defaults(func=func) args = vars(p.parse_args()) func = args['func'] func(args) def main(): config = Config() config.read(CONFIG_FILENAME) args = process_args(config) if __name__ == '__main__': main()