# Copyright (C) 2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json import os import re import signal import subprocess import time import Crypto.PublicKey.RSA import jwt import requests from yarnutils import * srcdir = os.environ['SRCDIR'] datadir = os.environ['DATADIR'] V = Variables(datadir) json_mime_type = 'application/json' def start_muck(): pathname = os.path.join(srcdir, 'muck_poc') pubkey = os.path.join(srcdir, 'test-key.pub') config = { 'pid': 'muck.pid', 'port': 9999, 'log': 'muck.log', 'store': datadir, 'signing-key-filename': pubkey, } config_filename = os.path.join(datadir, 'mock.conf') with open(config_filename, 'w') as f: json.dump(config, f) out = os.path.join(datadir, 'muck.out') err = os.path.join(datadir, 'muck.err') argv = [ '/usr/sbin/daemonize', '-o', out, '-e', err, '-c', '.', pathname, config_filename, ] subprocess.check_call(argv) time.sleep(2) V['base_url'] = 'http://127.0.0.1:{}'.format(config['port']) def stop_muck(): pid = int(read('muck.pid')) os.kill(pid, signal.SIGTERM) def create_test_token(sub): key_filename = os.path.join(srcdir, 'test-key') key_text = open(key_filename).read() iss = 'test-issuer' aud = 'test-audience' scopes = ['create', 'update', 'show', 'delete'] if sub in (V['superusers'] or []): scopes.append('super') lifetime = 3600 return create_token(key_text, iss, aud, sub, scopes, lifetime) def create_token(key_text, iss, aud, sub, scopes, lifetime): key = Crypto.PublicKey.RSA.importKey(key_text) now = int(time.time()) claims = { 'iss': iss, 'sub': sub, 'aud': aud, 'exp': now + lifetime, 'scope': ' '.join(scopes), } token = jwt.encode(claims, key.exportKey('PEM'), algorithm='RS512') return token.decode('ascii') def POST(sub, path, headers, body): return request(sub, requests.post, path, headers, body) def PUT(sub, path, headers, body): return request(sub, requests.put, path, headers, body) def GET(sub, path, headers, body=None): return request(sub, requests.get, path, headers, body=body) def DELETE(sub, path, headers): return request(sub, requests.delete, path, headers) def request(sub, func, path, headers, body=None): url = '{}{}'.format(V['base_url'], path) if 'Content-Type' not in headers: headers['Content-Type'] = json_mime_type if 'Authorization' not in headers: token = create_test_token(sub) headers['Authorization'] = 'Bearer {}'.format(token) if body is not None: body = json.dumps(body) V['request_url'] = url V['request_func'] = repr(func) V['request_headers'] = repr(headers) V['request_body'] = repr(body) r = func(url, headers=headers, data=body) V['status_code'] = r.status_code V['response_body'] = r.text V['response_headers'] = dict(r.headers) return r def read(filename): with open(filename, 'r') as f: return f.read() def get_expanded_match(): match = get_next_match() return expand(match, V) def expand(text, variables): result = '' while text: m = re.search(r'\${(?P[^}]+)}', text) if not m: result += text break name = m.group('name') print('expanding ', name, repr(variables[name])) result += text[:m.start()] + variables[name] text = text[m.end():] return result def get_json_body(): return json.loads(V['response_body']) def get_header(header_name): headers = V['response_headers'] assert headers is not None return headers.get(header_name, '') def save_header(header_name, var_name): V[var_name] = get_header(header_name)