1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import logging
import os
import yaml
def fixme(ctx, **kwargs):
assert 0
def create_vm(ctx):
QemuSystem = globals()["QemuSystem"]
srcdir = globals()["srcdir"]
name = "debian-ansible-test"
base_image = "/home/liw/tmp/debian-10-openstack-amd64.qcow2"
GiB = 1024 ** 3
disk_size = 10 * GiB
pubkey = open(os.path.join(srcdir, "ssh", "id.pub")).read().strip()
memory = 1 * GiB
cpus = 2
username = "debian"
logging.info("starting a VM using qemu-system")
logging.info(f" name : {name}")
logging.info(f" image : {base_image}")
logging.info(f" disk : {disk_size}")
logging.info(f" pubkey : {pubkey}")
logging.info(f" memory : {memory}")
logging.info(f" cpus : {cpus}")
logging.info(f" username: {username}")
qemu = QemuSystem(name, base_image, disk_size, pubkey)
qemu.set_memory(memory)
qemu.set_vcpus(cpus)
qemu.set_username(username)
qemu.start()
logging.debug("waiting for SSH to be ready")
if qemu.wait_for_ssh():
logging.debug("SSH is ready")
else:
logging.error("SSH did not get ready")
assert 0
logging.info("a qemu-system VM is up and running and accessible over SSH")
ctx["qemu"] = qemu
def destroy_vm(ctx):
logging.debug(f"destroying qemu running")
qemu = ctx["qemu"]
qemu.stop()
def run_true_on_host(ctx):
qemu = ctx["qemu"]
qemu.ssh(["/bin/true"])
def use_role_in_playbook(ctx, role=None):
empty_playbook = {
"hosts": "test-host",
"remote_user": "debian", # FIXME: don't hardcode this
"become": True,
"roles": [],
}
playbook = ctx.get("playbook", dict(empty_playbook))
playbook["roles"].append(role)
ctx["playbook"] = playbook
def set_vars_file(ctx, filename=None):
get_file = globals()["get_file"]
data = get_file(filename)
with open("vars.yml", "wb") as f:
f.write(data)
def run_playbook(ctx):
runcmd = globals()["runcmd"]
exit_code_zero = globals()["exit_code_zero"]
assert_ne = globals()["assert_ne"]
srcdir = globals()["srcdir"]
qemu = ctx["qemu"]
with open("hosts", "w") as f:
f.write("test-host\n")
if not os.path.exists("vars.yml"):
with open("vars.yml", "w") as f:
yaml.safe_dump([], stream=f)
playbook = [ctx["playbook"]]
assert_ne(playbook, None)
with open("playbook.yml", "w") as f:
yaml.safe_dump(playbook, stream=f)
ssh_opts = [
"-ouserknownhostsfile=/dev/null",
"-ostricthostkeychecking=accept-new",
"-i",
os.path.join(srcdir, "ssh", "id"),
f"-p{qemu.get_port()}",
]
env = dict(os.environ)
env["ANSIBLE_SSH_ARGS"] = " ".join(ssh_opts)
env["ANSIBLE_LOG"] = "ansible.log"
env["ANSIBLE_ROLES_PATH"] = os.path.join(srcdir, "roles")
argv = [
"ansible-playbook",
"-i",
"hosts",
f"-e@vars.yml",
"-eansible_ssh_host=localhost",
f"-eansible_ssh_port={qemu.get_port()}",
"playbook.yml",
]
runcmd(ctx, argv, env=env)
exit_code_zero(ctx)
|