diff options
author | Lars Wirzenius <liw@liw.fi> | 2018-11-17 12:10:40 +0200 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2018-11-17 13:07:48 +0200 |
commit | bc72355b941085bc3d36c52748e8d95c4a48b1f6 (patch) | |
tree | 14fc001aa115bd462bf0efda16a364829d85d6e9 | |
parent | 603fa65871346b528c1d2290be72226658676d7c (diff) | |
download | effi-reg-bc72355b941085bc3d36c52748e8d95c4a48b1f6.tar.gz |
Add: effitool
-rwxr-xr-x | effitool | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/effitool b/effitool new file mode 100755 index 0000000..f6cd0d2 --- /dev/null +++ b/effitool @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 + + +import argparse +import base64 +import json +import logging +import sys +import urllib + +import requests + + +all_possible_scopes = [ + 'create', + 'update', + 'show', + 'delete', + 'super', +] + + +class HttpError(Exception): + + pass + + +class HttpAPI: + + # Make requests to an HTTP API. + + json_type = 'application/json' + + def __init__(self): + self._session = requests.Session() + self._token = None + self._verify = True + + def set_token(self, token): + self._token = token + + def get_dict(self, url, headers=None, body=None): + r = self._request(self._session.get, url, headers=headers, data=body) + ct = r.headers.get('Content-Type') + if ct != self.json_type: + raise HttpError('Not JSON response') + try: + return r.json() + except json.decoder.JSONDecodeError: + raise HttpError('JSON parsing error') + + def get_blob(self, url, headers=None): + r = self._request(self._session.get, url, headers=headers) + return r.content + + def delete(self, url, headers=None): # pragma: no cover + r = self._request(self._session.delete, url, headers=headers) + return r.content + + def post(self, url, headers=None, body=None): + return self._send_request(self._session.post, url, headers=headers, body=body) + + def post_auth(self, url, headers=None, body=None, auth=None): + assert auth is not None + if headers is None: + headers = {} + headers['Authorization'] = self._basic_auth(auth) + return self._send_request( + self._session.post, url, headers=headers, body=body, auth=auth) + + def _basic_auth(self, auth): + username, password = auth + cleartext = '{}:{}'.format(username, password).encode('UTF-8') + encoded = base64.b64encode(cleartext) + return 'Basic {}'.format(encoded.decode('UTF-8')) + + def put(self, url, headers=None, body=None): + self._send_request(self._session.put, url, headers=headers, body=body) + + def _send_request(self, func, url, headers=None, body=None, auth=None): + if headers is None: + headers = {} + headers = dict(headers) + if not headers.get('Content-Type'): + h, body = self._get_content_type_header(body) + headers.update(h) + return self._request(func, url, headers=headers, data=body, auth=auth) + + def _get_content_type_header(self, body): + if isinstance(body, dict): + header = { + 'Content-Type': 'application/json', + } + body = json.dumps(body) + return header, body + return {}, body + + def _get_authorization_headers(self): + if self._token is None: + return {} + return { + 'Authorization': 'Bearer {}'.format(self._token), + } + + def _request(self, func, url, headers=None, **kwargs): + if headers is None: + headers = {} + + if 'Content-Type' not in headers: + data = kwargs.get('data') + if isinstance(data, dict): + headers['Content-Type'] = 'application/json' + + auth = kwargs.get('auth') + if auth is None: + headers.update(self._get_authorization_headers()) + if 'auth' in kwargs: + del kwargs['auth'] + + logging.info( + 'Request: %s url=%s headers=%s verify=%s kwargs=%s', + func.__func__.__name__, url, headers, self._verify, kwargs) + r = func(url, headers=headers, verify=self._verify, **kwargs) + if not r.ok: + raise HttpError('{}: {}'.format(r.status_code, r.text)) + return r + + +class EffiAPI: + + def __init__(self, url, client_id, client_secret, fake=False): + self._url = url + self._auth_url = '{}/token'.format(url) + self._http_api = HttpAPI() + self._client_id = client_id + self._client_secret = client_secret + self._fake = fake + self._token = None + + def new_token(self): + if self._fake: + self._token = 'FAKE' + else: + self._token = self.get_token_with_scopes(all_possible_scopes) + self._http_api.set_token(self._token) + + def get_token(self): + if self._token is None: + self.new_token() + return self._token + + def get_token_with_scopes(self, scopes): + auth = (self._client_id, self._client_secret) + params = { + 'grant_type': 'client_credentials', + 'scope': ' '.join(scopes), + } + body = urllib.parse.urlencode(params) + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + r = self._http_api.post_auth( + self._auth_url, headers=headers, body=body, auth=auth) + obj = r.json() + return obj['access_token'] + + def get_status(self): + url = self.url('/status') + return self._http_api.get_dict(url) + + def add_member(self, username, member): + logging.info('Add member %s: %r', username, member) + url = self.url('/mem') + r = self._http_api.post(url, body=member) + if not r.ok: + self.exit('ERROR: {} {}'.format(r.status_code, r.text)) + return r.json() + + def list_members(self): + url = self.url('/search') + cond = { + 'cond': [ + { + 'op': '>=', + 'where': 'meta', + 'field': 'id', + 'pattern': '', + }, + ] + } + obj = self._http_api.get_dict(url, body=cond) + return obj['resources'] + + def find_member_by_email(self, email): + objs = self.list_members() + for obj in objs.values(): + if obj.get('email') == email: + return obj + + def url(self, path): + return '{}{}'.format(self._url, path) + + +class EffiTool: + + def run(self): + p = self._create_command_line_parser() + args = vars(p.parse_args()) + self._setup_logging(args['logfile']) + self._api = self._new_api(args, True) + + func = args.pop('func') + if func is None: + logging.error('missing command on command line') + sys.exit('ERROR: Missing command') + + api = self._new_api(args, True) + try: + logging.debug('calling %r with %r', func.__name__, args) + value = func(args, api) + self._pretty_print(value) + except BaseException as e: + logging.error(str(e), exc_info=True) + sys.exit(str(e)) + + def _create_command_line_parser(self): + cmds = [ + { + 'name': 'token', + 'help': 'get an access token', + 'func': get_token, + 'args': [], + }, + { + 'name': 'status', + 'help': 'get status of server', + 'func': get_status, + 'args': [], + }, + { + 'name': 'add', + 'help': 'Add a member', + 'func': add_member, + 'args': [ + { + 'name': 'username', + 'help': 'Username of member', + }, + { + 'name': 'fullname', + 'help': 'Full name of member', + }, + { + 'name': 'email', + 'help': 'Email address of member', + }, + ], + }, + { + 'name': 'show', + 'help': "Show a member's info", + 'func': show_member, + 'args': [ + { + 'name': 'email', + 'help': 'Email address of member', + }, + ], + }, + { + 'name': 'list', + 'help': 'List all members', + 'func': list_members, + 'args': [], + }, + ] + + parser = argparse.ArgumentParser(description='Manage Effi members') + parser.add_argument('-u', '--url', dest='url', required=True) + parser.add_argument('-i', '--client-id', dest='client_id', required=True) + parser.add_argument( + '-s', '--client-secret', dest='client_secret', required=True) + + parser.add_argument('--log', dest='logfile', default='effitool.log') + parser.add_argument('--fake', dest='fake', action='store_true') + + factory = parser.add_subparsers() + for cmd in cmds: + p = factory.add_parser(cmd['name'], help=cmd['help']) + for arg in cmd['args']: + p.add_argument(arg['name'], help=arg['help']) + p.set_defaults(func=cmd['func']) + + return parser + + def _new_api(self, args, fake): + return EffiAPI(args['url'], args['client_id'], args['client_secret'], fake=fake) + + def _setup_logging(self, filename): + logging.basicConfig( + filename=filename, + level=logging.DEBUG, + format='%(asctime)s %(levelname)s %(message)s', + ) + logging.info('Effitool starts') + + def _pretty_print(self, value): + if isinstance(value, str): + print(value) + return + + print(json.dumps(value, indent=4)) + + +def get_token(args, api): + return api.get_token() + + +def get_status(args, api): + return api.get_status() + + +def add_member(args, api): + member = { + 'fullname': args['fullname'], + 'email': args['email'], + } + + api.new_token() + return api.add_member(args['username'], member) + + +def show_member(args, api): + email = args['email'] + member = api.find_member_by_email(email) + if member is None: + sys.exit('ERROR: could not find {}'.format(email)) + return member + + +def list_members(args, api): + api.new_token() + return api.list_members() + + +def main(): + effitool = EffiTool() + effitool.run() + + +if __name__ == '__main__': + main() |