summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2018-11-17 12:10:40 +0200
committerLars Wirzenius <liw@liw.fi>2018-11-17 13:07:48 +0200
commitbc72355b941085bc3d36c52748e8d95c4a48b1f6 (patch)
tree14fc001aa115bd462bf0efda16a364829d85d6e9
parent603fa65871346b528c1d2290be72226658676d7c (diff)
downloadeffi-reg-bc72355b941085bc3d36c52748e8d95c4a48b1f6.tar.gz
Add: effitool
-rwxr-xr-xeffitool352
1 files changed, 352 insertions, 0 deletions
diff --git a/effitool b/effitool
new file mode 100755
index 0000000..f6cd0d2
--- /dev/null
+++ b/effitool
@@ -0,0 +1,352 @@
+#!/usr/bin/env python3
+
+
+import argparse
+import base64
+import json
+import logging
+import sys
+import urllib
+
+import requests
+
+
+all_possible_scopes = [
+ 'create',
+ 'update',
+ 'show',
+ 'delete',
+ 'super',
+]
+
+
+class HttpError(Exception):
+
+ pass
+
+
+class HttpAPI:
+
+ # Make requests to an HTTP API.
+
+ json_type = 'application/json'
+
+ def __init__(self):
+ self._session = requests.Session()
+ self._token = None
+ self._verify = True
+
+ def set_token(self, token):
+ self._token = token
+
+ def get_dict(self, url, headers=None, body=None):
+ r = self._request(self._session.get, url, headers=headers, data=body)
+ ct = r.headers.get('Content-Type')
+ if ct != self.json_type:
+ raise HttpError('Not JSON response')
+ try:
+ return r.json()
+ except json.decoder.JSONDecodeError:
+ raise HttpError('JSON parsing error')
+
+ def get_blob(self, url, headers=None):
+ r = self._request(self._session.get, url, headers=headers)
+ return r.content
+
+ def delete(self, url, headers=None): # pragma: no cover
+ r = self._request(self._session.delete, url, headers=headers)
+ return r.content
+
+ def post(self, url, headers=None, body=None):
+ return self._send_request(self._session.post, url, headers=headers, body=body)
+
+ def post_auth(self, url, headers=None, body=None, auth=None):
+ assert auth is not None
+ if headers is None:
+ headers = {}
+ headers['Authorization'] = self._basic_auth(auth)
+ return self._send_request(
+ self._session.post, url, headers=headers, body=body, auth=auth)
+
+ def _basic_auth(self, auth):
+ username, password = auth
+ cleartext = '{}:{}'.format(username, password).encode('UTF-8')
+ encoded = base64.b64encode(cleartext)
+ return 'Basic {}'.format(encoded.decode('UTF-8'))
+
+ def put(self, url, headers=None, body=None):
+ self._send_request(self._session.put, url, headers=headers, body=body)
+
+ def _send_request(self, func, url, headers=None, body=None, auth=None):
+ if headers is None:
+ headers = {}
+ headers = dict(headers)
+ if not headers.get('Content-Type'):
+ h, body = self._get_content_type_header(body)
+ headers.update(h)
+ return self._request(func, url, headers=headers, data=body, auth=auth)
+
+ def _get_content_type_header(self, body):
+ if isinstance(body, dict):
+ header = {
+ 'Content-Type': 'application/json',
+ }
+ body = json.dumps(body)
+ return header, body
+ return {}, body
+
+ def _get_authorization_headers(self):
+ if self._token is None:
+ return {}
+ return {
+ 'Authorization': 'Bearer {}'.format(self._token),
+ }
+
+ def _request(self, func, url, headers=None, **kwargs):
+ if headers is None:
+ headers = {}
+
+ if 'Content-Type' not in headers:
+ data = kwargs.get('data')
+ if isinstance(data, dict):
+ headers['Content-Type'] = 'application/json'
+
+ auth = kwargs.get('auth')
+ if auth is None:
+ headers.update(self._get_authorization_headers())
+ if 'auth' in kwargs:
+ del kwargs['auth']
+
+ logging.info(
+ 'Request: %s url=%s headers=%s verify=%s kwargs=%s',
+ func.__func__.__name__, url, headers, self._verify, kwargs)
+ r = func(url, headers=headers, verify=self._verify, **kwargs)
+ if not r.ok:
+ raise HttpError('{}: {}'.format(r.status_code, r.text))
+ return r
+
+
+class EffiAPI:
+
+ def __init__(self, url, client_id, client_secret, fake=False):
+ self._url = url
+ self._auth_url = '{}/token'.format(url)
+ self._http_api = HttpAPI()
+ self._client_id = client_id
+ self._client_secret = client_secret
+ self._fake = fake
+ self._token = None
+
+ def new_token(self):
+ if self._fake:
+ self._token = 'FAKE'
+ else:
+ self._token = self.get_token_with_scopes(all_possible_scopes)
+ self._http_api.set_token(self._token)
+
+ def get_token(self):
+ if self._token is None:
+ self.new_token()
+ return self._token
+
+ def get_token_with_scopes(self, scopes):
+ auth = (self._client_id, self._client_secret)
+ params = {
+ 'grant_type': 'client_credentials',
+ 'scope': ' '.join(scopes),
+ }
+ body = urllib.parse.urlencode(params)
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ r = self._http_api.post_auth(
+ self._auth_url, headers=headers, body=body, auth=auth)
+ obj = r.json()
+ return obj['access_token']
+
+ def get_status(self):
+ url = self.url('/status')
+ return self._http_api.get_dict(url)
+
+ def add_member(self, username, member):
+ logging.info('Add member %s: %r', username, member)
+ url = self.url('/mem')
+ r = self._http_api.post(url, body=member)
+ if not r.ok:
+ self.exit('ERROR: {} {}'.format(r.status_code, r.text))
+ return r.json()
+
+ def list_members(self):
+ url = self.url('/search')
+ cond = {
+ 'cond': [
+ {
+ 'op': '>=',
+ 'where': 'meta',
+ 'field': 'id',
+ 'pattern': '',
+ },
+ ]
+ }
+ obj = self._http_api.get_dict(url, body=cond)
+ return obj['resources']
+
+ def find_member_by_email(self, email):
+ objs = self.list_members()
+ for obj in objs.values():
+ if obj.get('email') == email:
+ return obj
+
+ def url(self, path):
+ return '{}{}'.format(self._url, path)
+
+
+class EffiTool:
+
+ def run(self):
+ p = self._create_command_line_parser()
+ args = vars(p.parse_args())
+ self._setup_logging(args['logfile'])
+ self._api = self._new_api(args, True)
+
+ func = args.pop('func')
+ if func is None:
+ logging.error('missing command on command line')
+ sys.exit('ERROR: Missing command')
+
+ api = self._new_api(args, True)
+ try:
+ logging.debug('calling %r with %r', func.__name__, args)
+ value = func(args, api)
+ self._pretty_print(value)
+ except BaseException as e:
+ logging.error(str(e), exc_info=True)
+ sys.exit(str(e))
+
+ def _create_command_line_parser(self):
+ cmds = [
+ {
+ 'name': 'token',
+ 'help': 'get an access token',
+ 'func': get_token,
+ 'args': [],
+ },
+ {
+ 'name': 'status',
+ 'help': 'get status of server',
+ 'func': get_status,
+ 'args': [],
+ },
+ {
+ 'name': 'add',
+ 'help': 'Add a member',
+ 'func': add_member,
+ 'args': [
+ {
+ 'name': 'username',
+ 'help': 'Username of member',
+ },
+ {
+ 'name': 'fullname',
+ 'help': 'Full name of member',
+ },
+ {
+ 'name': 'email',
+ 'help': 'Email address of member',
+ },
+ ],
+ },
+ {
+ 'name': 'show',
+ 'help': "Show a member's info",
+ 'func': show_member,
+ 'args': [
+ {
+ 'name': 'email',
+ 'help': 'Email address of member',
+ },
+ ],
+ },
+ {
+ 'name': 'list',
+ 'help': 'List all members',
+ 'func': list_members,
+ 'args': [],
+ },
+ ]
+
+ parser = argparse.ArgumentParser(description='Manage Effi members')
+ parser.add_argument('-u', '--url', dest='url', required=True)
+ parser.add_argument('-i', '--client-id', dest='client_id', required=True)
+ parser.add_argument(
+ '-s', '--client-secret', dest='client_secret', required=True)
+
+ parser.add_argument('--log', dest='logfile', default='effitool.log')
+ parser.add_argument('--fake', dest='fake', action='store_true')
+
+ factory = parser.add_subparsers()
+ for cmd in cmds:
+ p = factory.add_parser(cmd['name'], help=cmd['help'])
+ for arg in cmd['args']:
+ p.add_argument(arg['name'], help=arg['help'])
+ p.set_defaults(func=cmd['func'])
+
+ return parser
+
+ def _new_api(self, args, fake):
+ return EffiAPI(args['url'], args['client_id'], args['client_secret'], fake=fake)
+
+ def _setup_logging(self, filename):
+ logging.basicConfig(
+ filename=filename,
+ level=logging.DEBUG,
+ format='%(asctime)s %(levelname)s %(message)s',
+ )
+ logging.info('Effitool starts')
+
+ def _pretty_print(self, value):
+ if isinstance(value, str):
+ print(value)
+ return
+
+ print(json.dumps(value, indent=4))
+
+
+def get_token(args, api):
+ return api.get_token()
+
+
+def get_status(args, api):
+ return api.get_status()
+
+
+def add_member(args, api):
+ member = {
+ 'fullname': args['fullname'],
+ 'email': args['email'],
+ }
+
+ api.new_token()
+ return api.add_member(args['username'], member)
+
+
+def show_member(args, api):
+ email = args['email']
+ member = api.find_member_by_email(email)
+ if member is None:
+ sys.exit('ERROR: could not find {}'.format(email))
+ return member
+
+
+def list_members(args, api):
+ api.new_token()
+ return api.list_members()
+
+
+def main():
+ effitool = EffiTool()
+ effitool.run()
+
+
+if __name__ == '__main__':
+ main()