#!/usr/bin/env python3 # # Copyright 2019 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import base64 import configparser import http import json import os import sys import traceback import urllib.request CONFIG_FILENAME = os.path.expanduser('~/.config/effitool/credentials.conf') JSON = 'application/json' URLENC = 'application/x-www-form-urlencoded' ADMIN_SCOPES = [ 'uapi_clients_get', 'uapi_clients_id_get', 'uapi_clients_id_put', 'uapi_clients_id_delete', 'uapi_clients_post', 'uapi_clients_id_secret_put', 'show', 'create', 'update', 'delete', ] class Config: def __init__(self): self._cp = configparser.ConfigParser() def read(self, filename): with open(filename) as f: self._cp.read_file(f) def dump(self, f): self._cp.write(f) def servers(self): return [k for k in self._cp.keys() if k != 'DEFAULT'] def get_default_server(self): sections = self._cp.sections() if len(sections) != 1: raise Exception('Config must have exactly one server') return sections[0] def get(self, name): server = self._cp[name] return { 'url': server['url'], 'client_id': server['client_id'], 'client_secret': server['client_secret'], } class HTTPAPI: def __init__(self, url): self._url = url def url(self, path): return '{}{}'.format(self._url, path) def request_json(self, r): ok = (http.HTTPStatus.OK, http.HTTPStatus.CREATED) status = r.getcode() if status not in ok: raise Exception('Got HTTP status {}'.format(status)) info = r.info() ct = info.get_content_type() if ct != JSON: raise Exception('Response is not JSON: {}'.ct) body = r.read() return json.loads(body) def get(self, path): url = self.url(path) with urllib.request.urlopen(url) as r: return self.request_json(r) def get_access_token(self, client_id, client_secret, scopes): obj = self.post_form( '/token', client_id, client_secret, grant_type='client_credentials', scope=' '.join(scopes)) return obj['access_token'] def get_json(self, token, path, headers=None, body=None): url = self.url(path) host, port, path = self.parse_url(url) if headers is None: headers = {} headers['Authorization'] = 'Bearer {}'.format(token) req = urllib.request.Request( url, data=body, headers=headers, method='GET') r = urllib.request.urlopen(req) return self.request_json(r) def get_list(self, token, path, headers=None, body=None): return self.get_json(token, path, headers=headers, body=body) def post_form(self, path, user, password, **kwargs): url = self.url(path) host, port, path = self.parse_url(url) data = urllib.parse.urlencode(kwargs).encode('UTF-8') headers = { 'Content-type': URLENC, 'Authorization': self.get_authorization(user, password), } req = urllib.request.Request( url, data=data, headers=headers, method='POST') r = urllib.request.urlopen(req) return self.request_json(r) def post_json(self, token, path, obj): return self.send_json('POST', token, path, obj) def put_json(self, token, path, obj, headers=None): return self.send_json('PUT', token, path, obj, headers=headers) def send_json(self, method, token, path, obj, headers=None): url = self.url(path) host, port, path = self.parse_url(url) data = json.dumps(obj).encode('UTF-8') if headers is None: headers = {} headers.update({ 'Content-type': JSON, 'Authorization': 'Bearer {}'.format(token), }) req = urllib.request.Request( url, data=data, headers=headers, method=method) r = urllib.request.urlopen(req) return self.request_json(r) def delete(self, token, path, headers=None): url = self.url(path) host, port, path = self.parse_url(url) if headers is None: headers = {} headers['Authorization'] = 'Bearer {}'.format(token) req = urllib.request.Request(url, headers=headers, method='DELETE') urllib.request.urlopen(req) def parse_url(self, url): parse = urllib.parse.urlparse(url) if parse.scheme != 'https': raise Exception( 'URL scheme must be https, not {}'.format(parse.scheme)) if ':' in parse.netloc: host, port = parse.netloc.split(':', 1) port = int(port) else: host = parse.netloc port = None return host, port, parse.path def get_authorization(self, user, password): clear = '{}:{}'.format(user, password) basic = base64.b64encode(clear.encode('UTF-8')) return 'Basic {}'.format(basic.decode('UTF-8')) class Session: def __init__(self, api, token): self._api = api self._token = token def register_admin_client(self, client_id, client_secret): new_client = { 'id': client_id, 'allowed_scopes': ADMIN_SCOPES, } self._api.post_json(self._token, '/clients', new_client) client_path = '/clients/{}'.format(client_id) new_secret = { 'secret': client_secret, } self._api.put_json(self._token, '{}/secret'.format(client_path), new_secret) def list_clients(self): obj = self._api.get_list(self._token, '/clients') return sorted(obj.get('resources', [])) def get_client(self, client_id): return self._api.get_json(self._token, self._get_client_path(client_id)) def update_client(self, client_id, client): self._api.put_json(self._token, self._get_client_path(client_id), client) def _get_client_path(self, client_id): return '/clients/{}'.format(client_id) def allow_scopes(self, client_id, scopes): client = self.get_client(client_id) client['allowed_scopes'] = uniq(client.get('allowed_scopes', []) + scopes) self.update_client(client_id, client) def deny_scopes(self, client_id, scopes): client = self.get_client(client_id) old_scopes = client.get('allowed_scopes', []) client['allowed_scopes'] = uniq( s for s in old_scopes if s not in denied_scopes) self.update_client(client_id, client) def fetch_member(self, rid): headers = { 'Muck-Id': rid, } return self._api.get_json(self._token, '/memb', headers=headers) def get_all_member_ids(self): cond = { 'where': 'meta', 'op': '>=', 'field': 'id', 'pattern': '', } return self.search_members(cond) def get_all_members(self): return { rid: self.fetch_member(rid) for rid in self.get_all_member_ids() } def search_members(self, cond): cond = {'cond': [cond]} body = json.dumps(cond).encode('utf-8') headers = { 'Content-Type': 'application/json', } obj = self._api.get_list(self._token, '/search', headers=headers, body=body) return sorted(obj.get('resources', [])) class Tool: def __init__(self, config): self._config = config def get_session(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) return Session(api, token) def servers(self): return self._config.servers() def get_server(self, name): return self._config.get(name) def get_chosen_server(self, args): if args['api']: name = args['api'] else: name = self._config.get_default_server() return self.get_server(name) def get_admin_token(self, server): url = server['url'] client_id = server['client_id'] client_secret = server['client_secret'] api = HTTPAPI(url) return api.get_access_token(client_id, client_secret, ADMIN_SCOPES) def list_servers(self, args): for name in self.servers(): print('server', name) server = self.get_server(name) for key in sorted(server): print(' ', key, server[key]) def status(self, args): for name in self.servers(): server = self.get_server(name) url = server['url'] api = HTTPAPI(url) obj = api.get('/status') print('server', name, obj['resources']) def token(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) print(token) def register_admin_client(self, args): session = self.get_session(args) session.register_admin_client(args['client_id'], args['client_secret']) print('Created new admin client', args['client_id']) def list_clients(self, args): session = self.get_session(args) for name in session.list_clients(): print(name) def show_client(self, args): session = self.get_session(args) client = session.get_client(args['client-name']) del client['hashed_secret'] for scope in client.get('allowed_scopes', []): print(scope) def allow_scope(self, args): session = self.get_session(args) session.allow_scopes(args['client-name'], args['scope']) def deny_scope(self, args): name = args['client-name'] denied_scopes = args['scope'] server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) path = '/clients/{}'.format(name) client = api.get_json(token, path) old_scopes = client.get('allowed_scopes', []) client['allowed_scopes'] = uniq( s for s in old_scopes if s not in denied_scopes) api.put_json(token, path, client) def add_member(self, args): with open(args['filename']) as f: memb = json.load(f) server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) api.post_json(token, '/memb', memb) def show_member(self, args): session = self.get_session(args) memb = session.fetch_member(args['rid']) self._format_json(memb, sys.stdout) def update_member(self, args): rid = args['rid'] with open(args['filename']) as f: memb = json.load(f) headers = { 'Muck-Id': args['rid'], } server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) api.put_json(token, '/memb', memb, headers=headers) def delete_member(self, args): headers = { 'Muck-Id': args['rid'], } server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) api.delete(token, '/memb', headers=headers) def list_members(self, args): session = self.get_session(args) for rid in session.get_all_member_ids(): print(rid) def dump_members(self, args): session = self.get_session(args) rids = session.get_all_member_ids() members = session.get_all_members() self._format_json(members, sys.stdout) def _format_json(self, obj, output): return json.dump(obj, output, indent=4, sort_keys=True) def uniq(items): return list(sorted(set(items))) def process_args(config): tool = Tool(config) subcommands = [ ('list-servers', tool.list_servers, []), ('token', tool.token, []), ('status', tool.status, []), ('register-admin-client', tool.register_admin_client, [ ('--client-id', {'required':True}), ('--client-secret', {'required':True}), ]), ('list-clients', tool.list_clients, []), ('show-client', tool.show_client, [ ('client-name', {}), ]), ('allow-scope', tool.allow_scope, [ ('client-name', {}), ('scope', {'nargs':'*'}), ]), ('deny-scope', tool.deny_scope, [ ('client-name', {}), ('scope', {'nargs':'*'}), ]), ('list-members', tool.list_members, []), ('dump-members', tool.dump_members, []), ('add-member', tool.add_member, [ ('filename', {}), ]), ('show-member', tool.show_member, [ ('rid', {}), ]), ('update-member', tool.update_member, [ ('rid', {}), ('filename', {}), ]), ('delete-member', tool.delete_member, [ ('rid', {}), ]), ] p = argparse.ArgumentParser() factory = p.add_subparsers() p.add_argument('-a', '--api') p.add_argument('--debug', action='store_true') for name, func, args in subcommands: pp = factory.add_parser(name) for name, kwargs in args: pp.add_argument(name, **kwargs) pp.set_defaults(func=func) args = vars(p.parse_args()) func = args['func'] try: func(args) except Exception as e: if args['debug']: sys.stderr.write('{}\n'.format(traceback.format_exc())) sys.exit(str(e)) def main(): config = Config() config.read(CONFIG_FILENAME) args = process_args(config) if __name__ == '__main__': main()