1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
#############################################################################
# Some helpers to make step functions simpler.
import json
import os
import random
import re
import shutil
import signal
import subprocess
import time
import urllib.parse
import yaml
# Run a subprocess, capture its output and exit code in context.
def _run(ctx, argv):
p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate("")
ctx["argv"] = argv
ctx["stdout"] = stdout.decode("utf-8")
ctx["stderr"] = stderr.decode("utf-8")
ctx["exit"] = p.returncode
# Check that latest call of _run ended with an specific exit code.
def _run_exit(ctx, expected):
if ctx["exit"] != expected:
print("ctx:", ctx.as_dict())
assert_eq(ctx["exit"], expected)
# Name of Rust binary, debug-build.
def _binary(name):
return os.path.abspath(os.path.join(srcdir, "target", "debug", name))
# Write a file with given content.
def _write(filename, content):
open(filename, "w").write(content)
# Construct a URL that points to server running on localhost by
# replacing the actual scheme and host with ones that work for test.
def _url(ctx, url):
port = ctx["config"]["port"]
c = urllib.parse.urlparse(url)
host = c[1]
c = (c[0], "localhost:{}".format(port)) + c[2:]
return urllib.parse.urlunparse(c), host
#############################################################################
# The actual step functions.
# Fail: use this for unimplemented steps.
def fixme(*args, **kwargs):
assert 0
# Create a file.
def create_file(ctx, filename=None, content=None):
dirname = os.path.dirname(filename)
os.makedirs(dirname)
_write(filename, content)
# Copy test certificate from source tree, where it's been created previously by
# ./check.
def copy_test_certificate(ctx, cert=None, key=None):
shutil.copy(os.path.join(srcdir, "test.pem"), cert)
shutil.copy(os.path.join(srcdir, "test.key"), key)
# Start server using named configuration file.
def start_server(ctx, filename=None):
config = get_file(filename).decode("UTF-8")
config = yaml.safe_load(config)
config["port"] = random.randint(2000, 30000)
ctx["config"] = config
config = yaml.safe_dump(config)
_write(filename, config)
start_daemon(ctx, "ewww", [_binary("ewww"), filename])
# Stop previously started server.
def stop_server(ctx):
stop_daemon(ctx, "ewww")
# Make a HTTP request.
def request(ctx, method=None, url=None):
url, host = _url(ctx, url)
print(url)
_run(ctx, ["curl", "-ksv", "-X", method, "-HHost: {}".format(host), url])
_run_exit(ctx, 0)
# Check status code of latest HTTP request.
def status_code_is(ctx, code=None):
pattern = "\n< HTTP/2 {} ".format(code)
assert_eq(pattern in ctx["stderr"], True)
# Check a HTTP response header for latest request has a given value.
def http_header_is(ctx, header=None, value=None):
s = ctx["stderr"]
pattern = "\n< {}: {}".format(header, value)
if pattern not in s:
print("stderr:", repr(s))
print("pattern:", repr(pattern))
assert_eq(pattern in s, True)
# Check a HTTP body response for latest request has a given value.
def http_body_is(ctx, body=None):
s = ctx["stdout"]
body = body.encode("UTF8").decode("unicode-escape")
if body != s:
print("stdout:", repr(s))
prin("pattern:", repr(body))
assert_eq(body, s)
|