summaryrefslogtreecommitdiff
path: root/yarns/lib.py
diff options
context:
space:
mode:
Diffstat (limited to 'yarns/lib.py')
-rw-r--r--yarns/lib.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/yarns/lib.py b/yarns/lib.py
new file mode 100644
index 0000000..ee36f9f
--- /dev/null
+++ b/yarns/lib.py
@@ -0,0 +1,168 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import json
+import os
+import re
+import signal
+import subprocess
+import time
+
+import Crypto.PublicKey.RSA
+import jwt
+import requests
+
+from yarnutils import *
+
+
+srcdir = os.environ['SRCDIR']
+datadir = os.environ['DATADIR']
+V = Variables(datadir)
+
+
+json_mime_type = 'application/json'
+
+
+def start_muck():
+ pathname = os.path.join(srcdir, 'muck_poc')
+ pubkey = os.path.join(srcdir, 'test-key.pub')
+
+ config = {
+ 'pid': 'muck.pid',
+ 'log': 'muck.log',
+ 'store': datadir,
+ 'signing-key-filename': pubkey,
+ }
+
+ config_filename = os.path.join(datadir, 'mock.conf')
+ with open(config_filename, 'w') as f:
+ json.dump(config, f)
+
+ out = os.path.join(datadir, 'muck.out')
+ err = os.path.join(datadir, 'muck.err')
+ argv = [
+ '/usr/sbin/daemonize', '-o', out, '-e', err, '-c', '.',
+ pathname, config_filename,
+ ]
+ subprocess.check_call(argv)
+ V['base_url'] = 'http://127.0.0.1:{}'.format(12765)
+
+ V['token'] = create_test_token()
+
+
+def stop_muck():
+ pid = int(read('muck.pid'))
+ os.kill(pid, signal.SIGTERM)
+
+
+def create_test_token():
+ key_filename = os.path.join(srcdir, 'test-key')
+ key_text = open(key_filename).read()
+
+ iss = 'test-issuer'
+ aud = 'test-audience'
+ sub = 'test-user'
+ scopes = ['create', 'update', 'show', 'delete']
+ lifetime = 3600
+
+ return create_token(key_text, iss, aud, sub, scopes, lifetime)
+
+def create_token(key_text, iss, aud, sub, scopes, lifetime):
+ key = Crypto.PublicKey.RSA.importKey(key_text)
+
+ now = int(time.time())
+ claims = {
+ 'iss': iss,
+ 'sub': sub,
+ 'aud': aud,
+ 'exp': now + lifetime,
+ 'scope': ' '.join(scopes),
+ }
+
+ token = jwt.encode(claims, key.exportKey('PEM'), algorithm='RS512')
+ return token.decode('ascii')
+
+
+def POST(path, headers, body):
+ return request(requests.post, path, headers, body)
+
+
+def PUT(path, headers, body):
+ return request(requests.put, path, headers, body)
+
+
+def GET(path, headers, body=None):
+ return request(requests.get, path, headers, body=body)
+
+
+def DELETE(path, headers):
+ return request(requests.delete, path, headers)
+
+
+def request(func, path, headers, body=None):
+ url = '{}{}'.format(V['base_url'], path)
+ if 'Content-Type' not in headers:
+ headers['Content-Type'] = json_mime_type
+ if 'Authorization' not in headers:
+ headers['Authorization'] = 'Bearer {}'.format(V['token'])
+ if body is not None:
+ body = json.dumps(body)
+ V['request_url'] = url
+ V['request_func'] = repr(func)
+ V['request_headers'] = repr(headers)
+ V['request_body'] = repr(body)
+ r = func(url, headers=headers, data=body)
+ V['status_code'] = r.status_code
+ V['response_body'] = r.text
+ V['response_headers'] = dict(r.headers)
+ return r
+
+
+def read(filename):
+ with open(filename, 'r') as f:
+ return f.read()
+
+
+def get_expanded_match():
+ match = get_next_match()
+ return expand(match, V)
+
+
+def expand(text, variables):
+ result = ''
+ while text:
+ m = re.search(r'\${(?P<name>[^}]+)}', text)
+ if not m:
+ result += text
+ break
+ name = m.group('name')
+ print('expanding ', name, repr(variables[name]))
+ result += text[:m.start()] + variables[name]
+ text = text[m.end():]
+ return result
+
+
+def get_json_body():
+ return json.loads(V['response_body'])
+
+
+def get_header(header_name):
+ headers = V['response_headers']
+ assert headers is not None
+ return headers.get(header_name, '')
+
+
+def save_header(header_name, var_name):
+ V[var_name] = get_header(header_name)