# Copyright (C) 2018 Lars Wirzenius
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
import json
import os
import re
import signal
import subprocess
import time
import Crypto.PublicKey.RSA
import jwt
import requests
from yarnutils import *
srcdir = os.environ['SRCDIR']
datadir = os.environ['DATADIR']
V = Variables(datadir)
json_mime_type = 'application/json'
def start_muck():
pathname = os.path.join(srcdir, 'muck_poc')
pubkey = os.path.join(srcdir, 'test-key.pub')
config = {
'pid': 'muck.pid',
'port': 9999,
'log': 'muck.log',
'store': datadir,
'signing-key-filename': pubkey,
}
config_filename = os.path.join(datadir, 'mock.conf')
with open(config_filename, 'w') as f:
json.dump(config, f)
out = os.path.join(datadir, 'muck.out')
err = os.path.join(datadir, 'muck.err')
argv = [
'/usr/sbin/daemonize', '-o', out, '-e', err, '-c', '.',
pathname, config_filename,
]
subprocess.check_call(argv)
time.sleep(2)
V['base_url'] = 'http://127.0.0.1:{}'.format(config['port'])
def stop_muck():
pid = int(read('muck.pid'))
os.kill(pid, signal.SIGTERM)
def create_test_token(sub):
key_filename = os.path.join(srcdir, 'test-key')
key_text = open(key_filename).read()
iss = 'test-issuer'
aud = 'test-audience'
scopes = ['create', 'update', 'show', 'delete']
if sub in (V['superusers'] or []):
scopes.append('super')
lifetime = 3600
return create_token(key_text, iss, aud, sub, scopes, lifetime)
def create_token(key_text, iss, aud, sub, scopes, lifetime):
key = Crypto.PublicKey.RSA.importKey(key_text)
now = int(time.time())
claims = {
'iss': iss,
'sub': sub,
'aud': aud,
'exp': now + lifetime,
'scope': ' '.join(scopes),
}
token = jwt.encode(claims, key.exportKey('PEM'), algorithm='RS512')
return token.decode('ascii')
def POST(sub, path, headers, body):
return request(sub, requests.post, path, headers, body)
def PUT(sub, path, headers, body):
return request(sub, requests.put, path, headers, body)
def GET(sub, path, headers, body=None):
return request(sub, requests.get, path, headers, body=body)
def DELETE(sub, path, headers):
return request(sub, requests.delete, path, headers)
def request(sub, func, path, headers, body=None):
url = '{}{}'.format(V['base_url'], path)
if 'Content-Type' not in headers:
headers['Content-Type'] = json_mime_type
if 'Authorization' not in headers:
token = create_test_token(sub)
headers['Authorization'] = 'Bearer {}'.format(token)
if body is not None:
body = json.dumps(body)
V['request_url'] = url
V['request_func'] = repr(func)
V['request_headers'] = repr(headers)
V['request_body'] = repr(body)
r = func(url, headers=headers, data=body)
V['status_code'] = r.status_code
V['response_body'] = r.text
V['response_headers'] = dict(r.headers)
return r
def read(filename):
with open(filename, 'r') as f:
return f.read()
def get_expanded_match():
match = get_next_match()
return expand(match, V)
def expand(text, variables):
result = ''
while text:
m = re.search(r'\${(?P[^}]+)}', text)
if not m:
result += text
break
name = m.group('name')
print('expanding ', name, repr(variables[name]))
result += text[:m.start()] + variables[name]
text = text[m.end():]
return result
def get_json_body():
return json.loads(V['response_body'])
def get_header(header_name):
headers = V['response_headers']
assert headers is not None
return headers.get(header_name, '')
def save_header(header_name, var_name):
V[var_name] = get_header(header_name)