1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
import logging
import os
import shutil
import yaml
def install_vmadm(ctx):
runcmd_prepend_to_path = globals()["runcmd_prepend_to_path"]
srcdir = globals()["srcdir"]
runcmd_prepend_to_path(ctx, os.path.join(srcdir, "target", "debug"))
# Set permissions on the datadir and its parent. They are 0o700 by default,
# which means that the libvirt daemon can't access the virtual machine
# image we create.
os.chmod(".", 0o711)
os.chmod("..", 0o711)
# Create .ssh directory, so that the scenario can put files there later.
# This can be removed once the Subplot lib/files library creates
# directories.
os.mkdir(".ssh")
os.makedirs("expected/init-test")
os.mkdir("images")
def ensure_base_image(ctx):
assert_eq = globals()["assert_eq"]
assert_ne = globals()["assert_ne"]
base = os.environ.get("BASE_IMAGE", "")
_, ext = os.path.splitext(base)
assert_ne(base, "")
assert_eq(ext, ".qcow2")
shutil.copy(base, "base.qcow2")
def invoke_cloud_init(ctx, config=None, filename=None, dirname=None):
runcmd_run = globals()["runcmd_run"]
runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"]
runcmd_run(ctx, ["vmadm", "cloud-init", "--config", config, filename, dirname])
runcmd_exit_code_is_zero(ctx)
def directories_match(ctx, actual=None, expected=None):
assert_eq = globals()["assert_eq"]
efiles = list(sorted(os.listdir(expected)))
afiles = list(sorted(os.listdir(actual)))
assert_eq(efiles, afiles)
for filename in efiles:
with open(os.path.join(expected, filename)) as f:
edata = yaml.safe_load(f)
with open(os.path.join(actual, filename)) as f:
adata = yaml.safe_load(f)
if "runcmd" in adata:
del adata["runcmd"]
logging.debug(f"directories_match: filename={filename}")
logging.debug(f"directories_match: edata={edata!r}")
logging.debug(f"directories_match: adata={adata!r}")
assert_eq(edata, adata)
def create_vm(ctx, config=None, filename=None):
runcmd_run = globals()["runcmd_run"]
runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"]
ctx["config"] = config
ctx["spec"] = filename
runcmd_run(ctx, ["vmadm", "new", "--config", config, filename])
runcmd_exit_code_is_zero(ctx)
def delete_vm(ctx, config=None, filename=None):
runcmd_run = globals()["runcmd_run"]
runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"]
runcmd_run(ctx, ["vmadm", "delete", "--config", ctx["config"], ctx["spec"]])
runcmd_exit_code_is_zero(ctx)
def shutdown_vm(ctx, config=None, filename=None):
runcmd_run = globals()["runcmd_run"]
runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"]
runcmd_run(ctx, ["vmadm", "shutdown", "--config", config, filename])
runcmd_exit_code_is_zero(ctx)
def start_vm(ctx, config=None, filename=None):
runcmd_run = globals()["runcmd_run"]
runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"]
runcmd_run(ctx, ["vmadm", "start", "--config", config, filename])
runcmd_exit_code_is_zero(ctx)
def run_hostname_over_ssh(ctx, config=None, target=None, args=None):
runcmd_run = globals()["runcmd_run"]
runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"]
# Fix permissions for .ssh and its contents.
runcmd_run(ctx, ["chmod", "-R", "u=rwX,go=", ".ssh"])
runcmd_run(ctx, ["ssh", "-F", config, target] + args.split())
runcmd_exit_code_is_zero(ctx)
|