summaryrefslogtreecommitdiff
path: root/subplot/data.py
blob: 583e52d36f7deeae45944588c7247a9474da6a18 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import json
import logging
import os
import random
import socket
import stat
import yaml


def create_file_with_given_data(ctx, filename=None, data=None):
    logging.debug(f"creating file {filename} with {data!r}")
    dirname = os.path.dirname(filename) or "."
    os.makedirs(dirname, exist_ok=True)
    with open(filename, "wb") as f:
        f.write(data.encode("UTF-8"))


def create_file_with_random_data(ctx, filename=None):
    N = 128
    data = "".join(chr(random.randint(0, 255)) for i in range(N))
    create_file_with_given_data(ctx, filename=filename, data=data)


def create_unix_socket(ctx, filename=None):
    fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    fd.bind(filename)


def create_fifo(ctx, filename=None):
    os.mkfifo(filename)


def create_cachedir_tag_in(ctx, dirpath=None):
    filepath = f"{dirpath}/CACHEDIR.TAG"
    logging.debug(f"creating {filepath}")
    os.makedirs(dirpath, exist_ok=True)
    with open(filepath, "w") as f:
        f.write("Signature: 8a477f597d28d172789f06886806bc55")


def create_nonutf8_filename(ctx, dirname=None):
    filename = "\x88"
    os.mkdir(dirname)
    open(filename, "wb").close()


def chmod_file(ctx, filename=None, mode=None):
    os.chmod(filename, int(mode, 8))


def create_symlink(ctx, linkname=None, target=None):
    os.symlink(target, linkname)


def create_manifest_of_live(ctx, dirname=None, manifest=None):
    _create_manifest_of_directory(ctx, dirname=dirname, manifest=manifest)


def create_manifest_of_restored(ctx, dirname=None, restored=None, manifest=None):
    _create_manifest_of_directory(
        ctx, dirname=os.path.join(restored, "./" + dirname), manifest=manifest
    )


def _create_manifest_of_directory(ctx, dirname=None, manifest=None):
    runcmd_run = globals()["runcmd_run"]
    runcmd_get_exit_code = globals()["runcmd_get_exit_code"]
    runcmd_get_stdout = globals()["runcmd_get_stdout"]

    logging.info(f"creating manifest for {dirname} in {manifest}")
    runcmd_run(ctx, ["find", "-exec", "summain", "{}", "+"], cwd=dirname)
    assert runcmd_get_exit_code(ctx) == 0
    stdout = runcmd_get_stdout(ctx)
    with open(manifest, "w") as f:
        f.write(stdout)


def file_is_restored(ctx, filename=None, restored=None):
    filename = os.path.join(restored, "./" + filename)
    exists = os.path.exists(filename)
    logging.debug(f"restored? {filename} {exists}")
    assert exists


def file_is_not_restored(ctx, filename=None, restored=None):
    filename = os.path.join(restored, "./" + filename)
    logging.info(r"verifying that {filename} does not exist")
    try:
        exists = os.path.exists(filename)
    except PermissionError:
        exists = False
    logging.debug(f"restored? {filename} {exists}")
    assert not exists


def files_match(ctx, first=None, second=None):
    assert_eq = globals()["assert_eq"]

    f = open(first).read()
    s = open(second).read()
    logging.debug(f"files_match: f:\n{f}")
    logging.debug(f"files_match: s:\n{s}")
    assert_eq(f, s)


def convert_yaml_to_json(ctx, yaml_name=None, json_name=None):
    with open(yaml_name) as f:
        obj = yaml.safe_load(f)
    with open(json_name, "w") as f:
        json.dump(obj, f)


def match_stdout_to_json_file_superset(ctx, filename=None):
    runcmd_get_stdout = globals()["runcmd_get_stdout"]
    assert_eq = globals()["assert_eq"]
    assert_dict_eq = globals()["assert_dict_eq"]

    stdout = runcmd_get_stdout(ctx)
    stdout = json.loads(stdout.strip())
    obj = json.load(open(filename))
    logging.debug(f"match_stdout_to_json_file: stdout={stdout!r}")
    logging.debug(f"match_stdout_to_json_file: file={obj!r}")

    if isinstance(obj, dict):
        stdout = {key: value for key, value in stdout.items() if key in obj}
        assert_dict_eq(obj, stdout)
    elif isinstance(obj, list):
        obj = {"key": obj}
        stdout = {"key": stdout}
        assert_dict_eq(obj, stdout)
        assert_dict_eq(obj, stdout)
    else:
        assert_eq(obj, stdout)


def match_stdout_to_json_file_exactly(ctx, filename=None):
    runcmd_get_stdout = globals()["runcmd_get_stdout"]
    assert_eq = globals()["assert_eq"]
    assert_dict_eq = globals()["assert_dict_eq"]

    stdout = runcmd_get_stdout(ctx)
    stdout = json.loads(stdout.strip())
    obj = json.load(open(filename))
    logging.debug(f"match_stdout_to_json_file: stdout={stdout!r}")
    logging.debug(f"match_stdout_to_json_file: file={obj!r}")

    if isinstance(obj, list):
        obj = {"key": obj}
        stdout = {"key": stdout}
        assert_dict_eq(obj, stdout)
    elif isinstance(obj, dict):
        assert_dict_eq(obj, stdout)
    else:
        assert_eq(obj, stdout)


def manifests_match(ctx, expected=None, actual=None):
    assert_eq = globals()["assert_eq"]
    assert_dict_eq = globals()["assert_dict_eq"]

    logging.debug(f"comparing manifests {expected} and {actual}")

    expected_objs = list(yaml.safe_load_all(open(expected)))
    actual_objs = list(yaml.safe_load_all(open(actual)))

    logging.debug(f"there are {len(expected_objs)} and {len(actual_objs)} objects")

    i = 0
    while expected_objs and actual_objs:
        e = expected_objs.pop(0)
        a = actual_objs.pop(0)

        logging.debug(f"comparing manifest objects at index {i}:")
        logging.debug(f"  expected: {e}")
        logging.debug(f"  actual  : {a}")
        assert_dict_eq(e, a)

        i += 1

    logging.debug(f"remaining expected objecvts: {expected_objs}")
    logging.debug(f"remaining actual objecvts  : {actual_objs}")
    assert_eq(expected_objs, [])
    assert_eq(actual_objs, [])

    logging.debug(f"manifests {expected} and {actual} match")


def file_is_readable_by_owner(ctx, filename=None):
    assert_eq = globals()["assert_eq"]

    st = os.lstat(filename)
    mode = stat.S_IMODE(st.st_mode)
    logging.debug("file mode: %o", mode)
    assert_eq(mode, 0o400)


def file_does_not_contain(ctx, filename=None, pattern=None):
    data = open(filename).read()
    assert pattern not in data


def files_are_different(ctx, filename1=None, filename2=None):
    assert_ne = globals()["assert_ne"]

    data1 = open(filename1, "rb").read()
    data2 = open(filename2, "rb").read()
    assert_ne(data1, data2)


def files_are_identical(ctx, filename1=None, filename2=None):
    assert_eq = globals()["assert_eq"]

    data1 = open(filename1, "rb").read()
    data2 = open(filename2, "rb").read()
    assert_eq(data1, data2)