#!/usr/bin/env python3 import argparse import base64 import json import logging import sys import urllib import requests import yaml all_possible_scopes = [ 'create', 'update', 'show', 'delete', 'super', ] class HttpError(Exception): pass class HttpAPI: # Make requests to an HTTP API. json_type = 'application/json' def __init__(self): self._session = requests.Session() self._token = None self._verify = True def set_token(self, token): self._token = token def get_dict(self, url, headers=None, body=None): r = self._request(self._session.get, url, headers=headers, data=body) ct = r.headers.get('Content-Type') if ct != self.json_type: raise HttpError('Not JSON response') try: return r.json() except json.decoder.JSONDecodeError: raise HttpError('JSON parsing error') def get_blob(self, url, headers=None): r = self._request(self._session.get, url, headers=headers) return r.content def delete(self, url, headers=None): # pragma: no cover r = self._request(self._session.delete, url, headers=headers) return r.content def post(self, url, headers=None, body=None): return self._send_request(self._session.post, url, headers=headers, body=body) def post_auth(self, url, headers=None, body=None, auth=None): assert auth is not None if headers is None: headers = {} headers['Authorization'] = self._basic_auth(auth) return self._send_request( self._session.post, url, headers=headers, body=body, auth=auth) def _basic_auth(self, auth): username, password = auth cleartext = '{}:{}'.format(username, password).encode('UTF-8') encoded = base64.b64encode(cleartext) return 'Basic {}'.format(encoded.decode('UTF-8')) def put(self, url, headers=None, body=None): self._send_request(self._session.put, url, headers=headers, body=body) def _send_request(self, func, url, headers=None, body=None, auth=None): if headers is None: headers = {} headers = dict(headers) if not headers.get('Content-Type'): h, body = self._get_content_type_header(body) headers.update(h) return self._request(func, url, headers=headers, data=body, auth=auth) def _get_content_type_header(self, body): if isinstance(body, dict): header = { 'Content-Type': 'application/json', } body = json.dumps(body) return header, body return {}, body def _get_authorization_headers(self): if self._token is None: return {} return { 'Authorization': 'Bearer {}'.format(self._token), } def _request(self, func, url, headers=None, **kwargs): if headers is None: headers = {} if 'Content-Type' not in headers: data = kwargs.get('data') if isinstance(data, dict): headers['Content-Type'] = 'application/json' auth = kwargs.get('auth') if auth is None: headers.update(self._get_authorization_headers()) if 'auth' in kwargs: del kwargs['auth'] # logging.info( # 'Request: %s url=%s headers=%s verify=%s kwargs=%s', # func.__func__.__name__, url, headers, self._verify, kwargs) r = func(url, headers=headers, verify=self._verify, **kwargs) if not r.ok: raise HttpError('{}: {}'.format(r.status_code, r.text)) return r class EffiAPI: def __init__(self, url, token_url, client_id, client_secret, fake=False): self._url = url self._auth_url = '{}/token'.format(token_url) self._http_api = HttpAPI() self._client_id = client_id self._client_secret = client_secret self._fake = fake self._token = None def new_token(self): if self._fake: self._token = 'FAKE' elif self._token is None: self._token = self.get_token_with_scopes(all_possible_scopes) self._http_api.set_token(self._token) def set_token(self, token): self._token = token def get_token(self): if self._token is None: self.new_token() return self._token def get_token_with_scopes(self, scopes): auth = (self._client_id, self._client_secret) params = { 'grant_type': 'client_credentials', 'scope': ' '.join(scopes), } body = urllib.parse.urlencode(params) headers = { 'Content-Type': 'application/x-www-form-urlencoded', } r = self._http_api.post_auth( self._auth_url, headers=headers, body=body, auth=auth) obj = r.json() return obj['access_token'] def get_member(self, rid): url = self.url('/mem') headers = { 'Muck-Id': rid, } return self._http_api.get_dict(url, headers) def get_status(self): url = self.url('/status') return self._http_api.get_dict(url) def add_member(self, username, member): logging.info('Add member %s: %r', username, member) url = self.url('/mem') headers = { 'Muck-Owner': username, } r = self._http_api.post(url, body=member, headers=headers) if not r.ok: self.exit('ERROR: {} {}'.format(r.status_code, r.text)) return r.json() def list_members(self): url = self.url('/search') cond = { 'cond': [ { 'op': '>=', 'where': 'meta', 'field': 'id', 'pattern': '', }, ] } headers = { 'Content-Type': 'application/json', } obj = self._http_api.get_dict( url, body=json.dumps(cond), headers=headers) rids = obj['resources'] members = [] for rid in rids: logging.info('Retrieving member %s', rid) mem = self.get_member(rid) members.append(mem) return members def find_member_by_email(self, email): members = self.list_members() for member in members: if member.get('email') == email: return member def url(self, path): return '{}{}'.format(self._url, path) class EffiTool: def run(self): p = self._create_command_line_parser() args = vars(p.parse_args()) self._setup_logging(args['logfile']) func = args.pop('func') if func is None: logging.error('missing command on command line') sys.exit('ERROR: Missing command') api = self._new_api(args, args['fake']) if args['token']: api.set_token(args['token']) try: logging.debug('calling %r with %r', func.__name__, args) value = func(args, api) self._pretty_print(value) except BaseException as e: logging.error(str(e), exc_info=True) sys.exit(str(e)) def _create_command_line_parser(self): cmds = [ { 'name': 'token', 'help': 'get an access token', 'func': get_token, 'args': [], }, { 'name': 'status', 'help': 'get status of server', 'func': get_status, 'args': [], }, { 'name': 'add', 'help': 'Add a member', 'func': add_member, 'args': [ { 'name': 'username', 'help': 'Username of member', }, { 'name': 'member', 'help': 'read member from yaml file', }, ], }, { 'name': 'show', 'help': "Show a member's info", 'func': show_member, 'args': [ { 'name': 'email', 'help': 'Email address of member', }, ], }, { 'name': 'list', 'help': 'List all members', 'func': list_members, 'args': [], }, ] parser = argparse.ArgumentParser(description='Manage Effi members') parser.add_argument('-u', '--url', dest='url', required=True) parser.add_argument('-T', '--token', dest='token', required=False) parser.add_argument('-t', '--token-url', dest='token_url', required=True) parser.add_argument('-i', '--client-id', dest='client_id', required=True) parser.add_argument( '-s', '--client-secret', dest='client_secret', required=True) parser.add_argument('--log', dest='logfile', default='effitool.log') parser.add_argument('--fake', dest='fake', action='store_true') factory = parser.add_subparsers() for cmd in cmds: p = factory.add_parser(cmd['name'], help=cmd['help']) for arg in cmd['args']: p.add_argument(arg['name'], help=arg['help']) p.set_defaults(func=cmd['func']) return parser def _new_api(self, args, fake): return EffiAPI( args['url'], args['token_url'], args['client_id'], args['client_secret'], fake=fake) def _setup_logging(self, filename): logging.basicConfig( filename=filename, level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', ) logging.info('Effitool starts') def _pretty_print(self, value): if isinstance(value, str): print(value) return print(json.dumps(value, indent=4)) def get_token(args, api): return api.get_token() def get_status(args, api): return api.get_status() def add_member(args, api): with open(args['member']) as f: member = yaml.safe_load(f) api.new_token() return api.add_member(args['username'], member) def show_member(args, api): api.new_token() email = args['email'] member = api.find_member_by_email(email) if member is None: sys.exit('ERROR: could not find {}'.format(email)) return member def list_members(args, api): api.new_token() return api.list_members() def main(): effitool = EffiTool() effitool.run() if __name__ == '__main__': main()