diff options
Diffstat (limited to 'yarns/lib.py')
-rw-r--r-- | yarns/lib.py | 168 |
1 files changed, 168 insertions, 0 deletions
diff --git a/yarns/lib.py b/yarns/lib.py new file mode 100644 index 0000000..ee36f9f --- /dev/null +++ b/yarns/lib.py @@ -0,0 +1,168 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import json +import os +import re +import signal +import subprocess +import time + +import Crypto.PublicKey.RSA +import jwt +import requests + +from yarnutils import * + + +srcdir = os.environ['SRCDIR'] +datadir = os.environ['DATADIR'] +V = Variables(datadir) + + +json_mime_type = 'application/json' + + +def start_muck(): + pathname = os.path.join(srcdir, 'muck_poc') + pubkey = os.path.join(srcdir, 'test-key.pub') + + config = { + 'pid': 'muck.pid', + 'log': 'muck.log', + 'store': datadir, + 'signing-key-filename': pubkey, + } + + config_filename = os.path.join(datadir, 'mock.conf') + with open(config_filename, 'w') as f: + json.dump(config, f) + + out = os.path.join(datadir, 'muck.out') + err = os.path.join(datadir, 'muck.err') + argv = [ + '/usr/sbin/daemonize', '-o', out, '-e', err, '-c', '.', + pathname, config_filename, + ] + subprocess.check_call(argv) + V['base_url'] = 'http://127.0.0.1:{}'.format(12765) + + V['token'] = create_test_token() + + +def stop_muck(): + pid = int(read('muck.pid')) + os.kill(pid, signal.SIGTERM) + + +def create_test_token(): + key_filename = os.path.join(srcdir, 'test-key') + key_text = open(key_filename).read() + + iss = 'test-issuer' + aud = 'test-audience' + sub = 'test-user' + scopes = ['create', 'update', 'show', 'delete'] + lifetime = 3600 + + return create_token(key_text, iss, aud, sub, scopes, lifetime) + +def create_token(key_text, iss, aud, sub, scopes, lifetime): + key = Crypto.PublicKey.RSA.importKey(key_text) + + now = int(time.time()) + claims = { + 'iss': iss, + 'sub': sub, + 'aud': aud, + 'exp': now + lifetime, + 'scope': ' '.join(scopes), + } + + token = jwt.encode(claims, key.exportKey('PEM'), algorithm='RS512') + return token.decode('ascii') + + +def POST(path, headers, body): + return request(requests.post, path, headers, body) + + +def PUT(path, headers, body): + return request(requests.put, path, headers, body) + + +def GET(path, headers, body=None): + return request(requests.get, path, headers, body=body) + + +def DELETE(path, headers): + return request(requests.delete, path, headers) + + +def request(func, path, headers, body=None): + url = '{}{}'.format(V['base_url'], path) + if 'Content-Type' not in headers: + headers['Content-Type'] = json_mime_type + if 'Authorization' not in headers: + headers['Authorization'] = 'Bearer {}'.format(V['token']) + if body is not None: + body = json.dumps(body) + V['request_url'] = url + V['request_func'] = repr(func) + V['request_headers'] = repr(headers) + V['request_body'] = repr(body) + r = func(url, headers=headers, data=body) + V['status_code'] = r.status_code + V['response_body'] = r.text + V['response_headers'] = dict(r.headers) + return r + + +def read(filename): + with open(filename, 'r') as f: + return f.read() + + +def get_expanded_match(): + match = get_next_match() + return expand(match, V) + + +def expand(text, variables): + result = '' + while text: + m = re.search(r'\${(?P<name>[^}]+)}', text) + if not m: + result += text + break + name = m.group('name') + print('expanding ', name, repr(variables[name])) + result += text[:m.start()] + variables[name] + text = text[m.end():] + return result + + +def get_json_body(): + return json.loads(V['response_body']) + + +def get_header(header_name): + headers = V['response_headers'] + assert headers is not None + return headers.get(header_name, '') + + +def save_header(header_name, var_name): + V[var_name] = get_header(header_name) |