#!/usr/bin/env python3 # # Copyright 2019 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import base64 import configparser import http import json import os import sys import urllib.request CONFIG_FILENAME = os.path.expanduser('~/.config/effitool/credentials.conf') JSON = 'application/json' URLENC = 'application/x-www-form-urlencoded' ADMIN_SCOPES = [ 'uapi_clients_get', 'uapi_clients_id_get', 'uapi_clients_id_put', 'uapi_clients_id_delete', 'uapi_clients_post', 'uapi_clients_id_secret_put', 'show', 'create', 'update', 'delete', ] class Config: def __init__(self): self._cp = configparser.ConfigParser() def read(self, filename): with open(filename) as f: self._cp.read_file(f) def dump(self, f): self._cp.write(f) def servers(self): return [k for k in self._cp.keys() if k != 'DEFAULT'] def get_default_server(self): sections = self._cp.sections() if len(sections) != 1: raise Exception('Config must have exactly one server') return sections[0] def get(self, name): server = self._cp[name] return { 'url': server['url'], 'client_id': server['client_id'], 'client_secret': server['client_secret'], } class HTTPAPI: def __init__(self, url): self._url = url def url(self, path): return '{}{}'.format(self._url, path) def request_json(self, r): ok = (http.HTTPStatus.OK, http.HTTPStatus.CREATED) status = r.getcode() if status not in ok: raise Exception('Got HTTP status {}'.format(status)) info = r.info() ct = info.get_content_type() if ct != JSON: raise Exception('Response is not JSON: {}'.ct) body = r.read() return json.loads(body) def get(self, path): url = self.url(path) with urllib.request.urlopen(url) as r: return self.request_json(r) def get_access_token(self, client_id, client_secret, scopes): obj = self.post_form( '/token', client_id, client_secret, grant_type='client_credentials', scope=' '.join(scopes)) return obj['access_token'] def get_json(self, token, path, headers=None, body=None): url = self.url(path) host, port, path = self.parse_url(url) if headers is None: headers = {} headers['Authorization'] = 'Bearer {}'.format(token) req = urllib.request.Request( url, data=body, headers=headers, method='GET') r = urllib.request.urlopen(req) return self.request_json(r) def get_list(self, token, path, headers=None, body=None): return self.get_json(token, path, headers=headers, body=body) def post_form(self, path, user, password, **kwargs): url = self.url(path) host, port, path = self.parse_url(url) data = urllib.parse.urlencode(kwargs).encode('UTF-8') headers = { 'Content-type': URLENC, 'Authorization': self.get_authorization(user, password), } req = urllib.request.Request( url, data=data, headers=headers, method='POST') r = urllib.request.urlopen(req) return self.request_json(r) def post_json(self, token, path, obj): return self.send_json('POST', token, path, obj) def put_json(self, token, path, obj, headers=None): return self.send_json('PUT', token, path, obj, headers=headers) def send_json(self, method, token, path, obj, headers=None): url = self.url(path) host, port, path = self.parse_url(url) data = json.dumps(obj).encode('UTF-8') if headers is None: headers = {} headers.update({ 'Content-type': JSON, 'Authorization': 'Bearer {}'.format(token), }) req = urllib.request.Request( url, data=data, headers=headers, method=method) r = urllib.request.urlopen(req) return self.request_json(r) def parse_url(self, url): parse = urllib.parse.urlparse(url) if parse.scheme != 'https': raise Exception( 'URL scheme must be https, not {}'.format(parse.scheme)) if ':' in parse.netloc: host, port = parse.netloc.split(':', 1) port = int(port) else: host = parse.netloc port = None return host, port, parse.path def get_authorization(self, user, password): clear = '{}:{}'.format(user, password) basic = base64.b64encode(clear.encode('UTF-8')) return 'Basic {}'.format(basic.decode('UTF-8')) class Tool: def __init__(self, config): self._config = config def servers(self): return self._config.servers() def get_server(self, name): return self._config.get(name) def get_chosen_server(self, args): if args['api']: name = args['api'] else: name = self._config.get_default_server() return self.get_server(name) def get_admin_token(self, server): url = server['url'] client_id = server['client_id'] client_secret = server['client_secret'] api = HTTPAPI(url) return api.get_access_token(client_id, client_secret, ADMIN_SCOPES) def list_servers(self, args): for name in self.servers(): print('server', name) server = self.get_server(name) for key in sorted(server): print(' ', key, server[key]) def status(self, args): for name in self.servers(): server = self.get_server(name) url = server['url'] api = HTTPAPI(url) obj = api.get('/status') print('server', name, obj['resources']) def token(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) print(token) def register_admin_client(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) new_client = { 'id': args['client_id'], 'allowed_scopes': ADMIN_SCOPES, } client_path = '/clients/{}'.format(new_client['id']) api.post_json(token, '/clients', new_client) new_secret = { 'secret': args['client_secret'], } api.put_json(token, '{}/secret'.format(client_path), new_secret) print('Created new admin client', new_client['id']) def list_clients(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) obj = api.get_list(token, '/clients') for name in sorted(obj.get('resources', [])): print(name) def show_clients(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) obj = api.get_list(token, '/clients') client = api.get_json(token, '/clients/{}'.format(args['client-name'])) del client['hashed_secret'] for scope in client.get('allowed_scopes', []): print(scope) def allow_scope(self, args): name = args['client-name'] scopes = args['scope'] server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) path = '/clients/{}'.format(name) client = api.get_json(token, path) client['allowed_scopes'] = uniq(client.get('allowed_scopes', []) + scopes) api.put_json(token, path, client) def deny_scope(self, args): name = args['client-name'] denied_scopes = args['scope'] server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) path = '/clients/{}'.format(name) client = api.get_json(token, path) old_scopes = client.get('allowed_scopes', []) client['allowed_scopes'] = uniq( s for s in old_scopes if s not in denied_scopes) api.put_json(token, path, client) def add_member(self, args): with open(args['filename']) as f: memb = json.load(f) server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) api.post_json(token, '/memb', memb) def show_member(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) headers = { 'Muck-Id': args['rid'], } memb = api.get_json(token, '/memb', headers=headers) print(memb) def update_member(self, args): rid = args['rid'] with open(args['filename']) as f: memb = json.load(f) headers = { 'Muck-Id': args['rid'], } server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) api.put_json(token, '/memb', memb, headers=headers) def list_members(self, args): server = self.get_chosen_server(args) token = self.get_admin_token(server) api = HTTPAPI(server['url']) cond = { 'cond': [ { 'where': 'meta', 'op': '>=', 'field': 'id', 'pattern': '', } ] } body = json.dumps(cond).encode('utf-8') headers = { 'Content-Type': 'application/json', } obj = api.get_list(token, '/search', headers=headers, body=body) for rid in sorted(obj.get('resources', [])): print(rid) def uniq(items): return list(sorted(set(items))) def process_args(config): tool = Tool(config) subcommands = [ ('list-servers', tool.list_servers, []), ('token', tool.token, []), ('status', tool.status, []), ('register-admin-client', tool.register_admin_client, [ ('--client-id', {'required':True}), ('--client-secret', {'required':True}), ]), ('list-clients', tool.list_clients, []), ('show-client', tool.show_clients, [ ('client-name', {}), ]), ('allow-scope', tool.allow_scope, [ ('client-name', {}), ('scope', {'nargs':'*'}), ]), ('deny-scope', tool.deny_scope, [ ('client-name', {}), ('scope', {'nargs':'*'}), ]), ('list-members', tool.list_members, []), ('add-member', tool.add_member, [ ('filename', {}), ]), ('show-member', tool.show_member, [ ('rid', {}), ]), ('update-member', tool.update_member, [ ('rid', {}), ('filename', {}), ]), ] p = argparse.ArgumentParser() factory = p.add_subparsers() p.add_argument('-a', '--api') for name, func, args in subcommands: pp = factory.add_parser(name) for name, kwargs in args: pp.add_argument(name, **kwargs) pp.set_defaults(func=func) args = vars(p.parse_args()) func = args['func'] func(args) def main(): config = Config() config.read(CONFIG_FILENAME) args = process_args(config) if __name__ == '__main__': main()