#!/usr/bin/env python3 # # Copyright 2019 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import base64 import configparser import http import json import os import sys import urllib.request CONFIG_FILENAME = os.path.expanduser('~/.config/effitool/credentials.conf') JSON = 'application/json' URLENC = 'application/x-www-form-urlencoded' class Config: def __init__(self): self._cp = configparser.ConfigParser() def read(self, filename): with open(filename) as f: self._cp.read_file(f) def dump(self, f): self._cp.write(f) def servers(self): return [k for k in self._cp.keys() if k != 'DEFAULT'] def get_default_server(self): sections = self._cp.sections() if len(sections) != 1: raise Exception('Config must have exactly one server') return sections[0] def get(self, name): server = self._cp[name] return { 'url': server['url'], 'client_id': server['client_id'], 'client_secret': server['client_secret'], } class HTTPAPI: def __init__(self, url): self._url = url def url(self, path): return '{}{}'.format(self._url, path) def get_json(self, r): ok = (http.HTTPStatus.OK, http.HTTPStatus.CREATED) status = r.getcode() if status not in ok: raise Exception('Got HTTP status {}'.format(status)) info = r.info() ct = info.get_content_type() if ct != JSON: raise Exception('Response is not JSON: {}'.ct) body = r.read() return json.loads(body) def get(self, path): url = self.url(path) with urllib.request.urlopen(url) as r: return self.get_json(r) def get_access_token(self, client_id, client_secret, scopes): obj = self.post_form( '/token', client_id, client_secret, grant_type='client_credentials', scope=' '.join(scopes)) return obj['access_token'] def post_form(self, path, user, password, **kwargs): url = self.url(path) host, port, path = self.parse_url(url) data = urllib.parse.urlencode(kwargs).encode('UTF-8') headers = { 'Content-type': URLENC, 'Authorization': self.get_authorization(user, password), } req = urllib.request.Request( url, data=data, headers=headers, method='POST') r = urllib.request.urlopen(req) return self.get_json(r) def parse_url(self, url): parse = urllib.parse.urlparse(url) if parse.scheme != 'https': raise Exception( 'URL scheme must be https, not {}'.format(parse.scheme)) if ':' in parse.netloc: host, port = parse.netloc.split(':', 1) port = int(port) else: host = parse.netloc port = None return host, port, parse.path def get_authorization(self, user, password): clear = '{}:{}'.format(user, password) basic = base64.b64encode(clear.encode('UTF-8')) return 'Basic {}'.format(basic.decode('UTF-8')) def list_servers(args, config): for name in config.servers(): print('server', name) server = config.get(name) for key in sorted(server): print(' ', key, server[key]) def status(args, config): for name in config.servers(): server = config.get(name) url = server['url'] api = HTTPAPI(url) obj = api.get('/status') print('server', name, obj['resources']) def register_client(args, config): new_client_id = args['client_id'] new_client_secret = args['client_secret'] name = config.get_default_server() server = config.get(name) url = server['url'] client_id = server['client_id'] client_secret = server['client_secret'] api = HTTPAPI(url) token = api.get_access_token(client_id, client_secret, ['xxx']) print(token) print('NEED TO ACTUALLY CREATE CLIENT HERE') assert 0 def process_args(config): subcommands = [ ('list-servers', list_servers, []), ('status', status, []), ('register-client', register_client, [ ('--client-id', {'required':True}), ('--client-secret', {'required':True}), ]), ] p = argparse.ArgumentParser() factory = p.add_subparsers() for name, func, args in subcommands: pp = factory.add_parser(name) for name, kwargs in args: pp.add_argument(name, **kwargs) pp.set_defaults(func=func) args = vars(p.parse_args()) func = args['func'] func(args, config) def main(): config = Config() config.read(CONFIG_FILENAME) args = process_args(config) if __name__ == '__main__': main()