summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2020-08-20 10:29:08 +0300
committerLars Wirzenius <liw@liw.fi>2020-08-20 10:29:08 +0300
commitea64a6c256477c832fadb48fc84e807e9d378552 (patch)
treef5fd4cf610d97eb9a172451098bfd6eb81b6107f
parent1fe5cff98cc52f2d7ebe9fc269a6c6d2eea52125 (diff)
downloadvmdb2-ea64a6c256477c832fadb48fc84e807e9d378552.tar.gz
refactor: format all Python modules with black
-rw-r--r--vmdb/__init__.py13
-rw-r--r--vmdb/app.py38
-rw-r--r--vmdb/plugins/ansible_plugin.py45
-rw-r--r--vmdb/plugins/apt_plugin.py50
-rw-r--r--vmdb/plugins/cache_rootfs_plugin.py54
-rw-r--r--vmdb/plugins/chroot_plugin.py14
-rw-r--r--vmdb/plugins/copy_file_plugin.py34
-rw-r--r--vmdb/plugins/create_dir_plugin.py26
-rw-r--r--vmdb/plugins/create_file_plugin.py30
-rw-r--r--vmdb/plugins/debootstrap_plugin.py51
-rw-r--r--vmdb/plugins/echo_plugin.py18
-rw-r--r--vmdb/plugins/error_plugin.py14
-rw-r--r--vmdb/plugins/fstab_plugin.py31
-rw-r--r--vmdb/plugins/kpartx_plugin.py21
-rw-r--r--vmdb/plugins/luks_plugin.py63
-rw-r--r--vmdb/plugins/lvcreate_plugin.py19
-rw-r--r--vmdb/plugins/mkfs_plugin.py33
-rw-r--r--vmdb/plugins/mkimg_plugin.py25
-rw-r--r--vmdb/plugins/mklabel_plugin.py14
-rw-r--r--vmdb/plugins/mkpart_plugin.py60
-rw-r--r--vmdb/plugins/mount_plugin.py30
-rw-r--r--vmdb/plugins/qemudebootstrap_plugin.py80
-rw-r--r--vmdb/plugins/shell_plugin.py16
-rw-r--r--vmdb/plugins/unpack_rootfs_plugin.py20
-rw-r--r--vmdb/plugins/vgcreate_plugin.py21
-rw-r--r--vmdb/plugins/virtualfs_plugin.py34
-rw-r--r--vmdb/runcmd.py30
-rw-r--r--vmdb/spec.py10
-rw-r--r--vmdb/spec_tests.py45
-rw-r--r--vmdb/state.py5
-rw-r--r--vmdb/step_list.py23
-rw-r--r--vmdb/step_list_tests.py45
-rw-r--r--vmdb/tags.py68
-rw-r--r--vmdb/tags_tests.py112
-rw-r--r--vmdb/unmount.py20
-rw-r--r--vmdb/unmount_tests.py38
36 files changed, 522 insertions, 728 deletions
diff --git a/vmdb/__init__.py b/vmdb/__init__.py
index 5308c5a..b4a9582 100644
--- a/vmdb/__init__.py
+++ b/vmdb/__init__.py
@@ -28,13 +28,7 @@ from .step_list import (
StepKeyMissing,
StepKeyWrongValueType,
)
-from .runcmd import (
- runcmd,
- runcmd_chroot,
- set_verbose_progress,
- progress,
- error,
-)
+from .runcmd import runcmd, runcmd_chroot, set_verbose_progress, progress, error
from .tags import (
Tags,
UnknownTag,
@@ -46,8 +40,5 @@ from .tags import (
NeedBothMountPoints,
)
from .unmount import unmount, NotMounted
-from .spec import (
- Spec,
- expand_templates,
-)
+from .spec import Spec, expand_templates
from .app import Vmdb2
diff --git a/vmdb/app.py b/vmdb/app.py
index 9db656b..2376116 100644
--- a/vmdb/app.py
+++ b/vmdb/app.py
@@ -25,16 +25,14 @@ import vmdb
class Vmdb2(cliapp.Application):
-
def add_settings(self):
self.settings.string(
- ['image'],
- 'use existing image file/device FILE (use --output to create new file)',
- metavar='FILE')
+ ["image"],
+ "use existing image file/device FILE (use --output to create new file)",
+ metavar="FILE",
+ )
- self.settings.boolean(
- ['verbose', 'v'],
- 'verbose output')
+ self.settings.boolean(["verbose", "v"], "verbose output")
def setup(self):
self.step_runners = vmdb.StepRunnerList()
@@ -43,7 +41,7 @@ class Vmdb2(cliapp.Application):
if len(args) != 1:
sys.exit("No image specification was given on the command line.")
- vmdb.set_verbose_progress(self.settings['verbose'])
+ vmdb.set_verbose_progress(self.settings["verbose"])
spec = self.load_spec_file(args[0])
state = vmdb.State()
@@ -57,47 +55,47 @@ class Vmdb2(cliapp.Application):
steps_taken, core_meltdown = self.run_steps(steps, state)
if core_meltdown:
- vmdb.progress('Something went wrong, cleaning up!')
+ vmdb.progress("Something went wrong, cleaning up!")
self.run_teardowns(steps_taken, state)
else:
self.run_teardowns(steps_taken, state)
- vmdb.progress('All went fine.')
+ vmdb.progress("All went fine.")
if core_meltdown:
- logging.error('An error occurred, exiting with non-zero exit code')
+ logging.error("An error occurred, exiting with non-zero exit code")
sys.exit(1)
def load_spec_file(self, filename):
spec = vmdb.Spec()
if filename == "-":
- vmdb.progress('Load spec from stdin')
+ vmdb.progress("Load spec from stdin")
spec.load_file(sys.stdin)
else:
- vmdb.progress('Load spec file {}'.format(filename))
+ vmdb.progress("Load spec file {}".format(filename))
with open(filename) as f:
spec.load_file(f)
return spec
def run_steps(self, steps, state):
- return self.run_steps_helper(
- steps, state, 'Running step: %r', 'run', False)
+ return self.run_steps_helper(steps, state, "Running step: %r", "run", False)
def run_teardowns(self, steps, state):
return self.run_steps_helper(
- list(reversed(steps)), state, 'Running teardown: %r', 'teardown', True)
+ list(reversed(steps)), state, "Running teardown: %r", "teardown", True
+ )
def run_steps_helper(self, steps, state, msg, method_name, keep_going):
core_meltdown = False
steps_taken = []
- even_if_skipped = method_name + '_even_if_skipped'
+ even_if_skipped = method_name + "_even_if_skipped"
for step in steps:
try:
logging.info(msg, step)
steps_taken.append(step)
runner = self.step_runners.find(step)
if runner.skip(step, self.settings, state):
- logging.info('Skipping as requested by unless')
+ logging.info("Skipping as requested by unless")
method_names = [even_if_skipped]
else:
method_names = [method_name, even_if_skipped]
@@ -110,10 +108,10 @@ class Vmdb2(cliapp.Application):
values = runner.get_values(step)
for method in methods:
- logging.info('Calling %s', method)
+ logging.info("Calling %s", method)
method(values, self.settings, state)
except KeyError as e:
- vmdb.error('Key error: %s' % str(e))
+ vmdb.error("Key error: %s" % str(e))
vmdb.error(repr(e))
core_meltdown = True
if not keep_going:
diff --git a/vmdb/plugins/ansible_plugin.py b/vmdb/plugins/ansible_plugin.py
index a223735..867a554 100644
--- a/vmdb/plugins/ansible_plugin.py
+++ b/vmdb/plugins/ansible_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import tempfile
@@ -26,50 +25,52 @@ import vmdb
class AnsiblePlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(AnsibleStepRunner())
class AnsibleStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'ansible': str,
- 'playbook': str,
- }
+ return {"ansible": str, "playbook": str}
def run(self, values, settings, state):
- tag = values['ansible']
- playbook = values['playbook']
+ tag = values["ansible"]
+ playbook = values["playbook"]
mount_point = state.tags.get_builder_mount_point(tag)
- rootfs_tarball = settings['rootfs-tarball']
+ rootfs_tarball = settings["rootfs-tarball"]
state.ansible_inventory = self.create_inventory(mount_point)
vmdb.progress(
- 'Created {} for Ansible inventory'.format(state.ansible_inventory))
+ "Created {} for Ansible inventory".format(state.ansible_inventory)
+ )
vars_filename = self.create_vars(rootfs_tarball)
- vmdb.progress(
- 'Created {} for Ansible variables'.format(vars_filename))
+ vmdb.progress("Created {} for Ansible variables".format(vars_filename))
env = dict(os.environ)
- env['ANSIBLE_NOCOWS'] = '1'
+ env["ANSIBLE_NOCOWS"] = "1"
vmdb.runcmd(
- ['ansible-playbook', '-c', 'chroot',
- '-i', state.ansible_inventory,
- '-e', '@{}'.format(vars_filename),
- playbook],
- env=env)
+ [
+ "ansible-playbook",
+ "-c",
+ "chroot",
+ "-i",
+ state.ansible_inventory,
+ "-e",
+ "@{}".format(vars_filename),
+ playbook,
+ ],
+ env=env,
+ )
def teardown(self, values, settings, state):
- if hasattr(state, 'ansible_inventory'):
- vmdb.progress('Removing {}'.format(state.ansible_inventory))
+ if hasattr(state, "ansible_inventory"):
+ vmdb.progress("Removing {}".format(state.ansible_inventory))
os.remove(state.ansible_inventory)
def create_inventory(self, chroot):
fd, filename = tempfile.mkstemp()
- os.write(fd, '[image]\n{}\n'.format(chroot).encode())
+ os.write(fd, "[image]\n{}\n".format(chroot).encode())
os.close(fd)
return filename
diff --git a/vmdb/plugins/apt_plugin.py b/vmdb/plugins/apt_plugin.py
index 4ebaf38..f2547c7 100644
--- a/vmdb/plugins/apt_plugin.py
+++ b/vmdb/plugins/apt_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import cliapp
@@ -25,65 +24,50 @@ import vmdb
class AptPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(AptStepRunner())
class AptStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'apt': str,
- 'packages': [],
- 'tag': '',
- 'fs-tag': '',
- 'clean': True,
- }
+ return {"apt": str, "packages": [], "tag": "", "fs-tag": "", "clean": True}
def run(self, values, settings, state):
- operation = values['apt']
- if operation != 'install':
+ operation = values["apt"]
+ if operation != "install":
raise Exception('"apt" must always have value "install"')
- packages = values['packages']
- tag = values.get('tag') or None
+ packages = values["packages"]
+ tag = values.get("tag") or None
if tag is None:
- tag = values['fs-tag']
+ tag = values["fs-tag"]
mount_point = state.tags.get_builder_mount_point(tag)
if not self.got_eatmydata(state):
- self.install_packages(mount_point, [], ['eatmydata'])
+ self.install_packages(mount_point, [], ["eatmydata"])
state.got_eatmydata = True
- self.install_packages(mount_point, ['eatmydata'], packages)
+ self.install_packages(mount_point, ["eatmydata"], packages)
- if values['clean']:
+ if values["clean"]:
self.clean_cache(mount_point)
def got_eatmydata(self, state):
- return hasattr(state, 'got_eatmydata') and getattr(state, 'got_eatmydata')
+ return hasattr(state, "got_eatmydata") and getattr(state, "got_eatmydata")
def install_packages(self, mount_point, argv_prefix, packages):
env = os.environ.copy()
- env['DEBIAN_FRONTEND'] = 'noninteractive'
+ env["DEBIAN_FRONTEND"] = "noninteractive"
- vmdb.runcmd_chroot(
- mount_point,
- argv_prefix +
- ['apt-get', 'update'],
- env=env)
+ vmdb.runcmd_chroot(mount_point, argv_prefix + ["apt-get", "update"], env=env)
vmdb.runcmd_chroot(
mount_point,
- argv_prefix +
- ['apt-get', '-y', '--no-show-progress', 'install'] + packages,
- env=env)
+ argv_prefix + ["apt-get", "-y", "--no-show-progress", "install"] + packages,
+ env=env,
+ )
def clean_cache(self, mount_point):
env = os.environ.copy()
- env['DEBIAN_FRONTEND'] = 'noninteractive'
+ env["DEBIAN_FRONTEND"] = "noninteractive"
- vmdb.runcmd_chroot(
- mount_point,
- ['apt-get', 'clean'],
- env=env)
+ vmdb.runcmd_chroot(mount_point, ["apt-get", "clean"], env=env)
diff --git a/vmdb/plugins/cache_rootfs_plugin.py b/vmdb/plugins/cache_rootfs_plugin.py
index eb4f75a..316de8b 100644
--- a/vmdb/plugins/cache_rootfs_plugin.py
+++ b/vmdb/plugins/cache_rootfs_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import cliapp
@@ -25,54 +24,53 @@ import vmdb
class CacheRootFSPlugin(cliapp.Plugin):
-
def enable(self):
self.app.settings.string(
- ['rootfs-tarball'],
- 'store rootfs cache tar archives in FILE',
- metavar='FILE')
+ ["rootfs-tarball"],
+ "store rootfs cache tar archives in FILE",
+ metavar="FILE",
+ )
self.app.step_runners.add(MakeCacheStepRunner())
class MakeCacheStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'cache-rootfs': str,
- 'options': '--one-file-system',
- }
+ return {"cache-rootfs": str, "options": "--one-file-system"}
def run(self, values, settings, state):
- fs_tag = values['cache-rootfs']
+ fs_tag = values["cache-rootfs"]
rootdir = state.tags.get_builder_mount_point(fs_tag)
- tar_path = settings['rootfs-tarball']
- opts = values['options'].split()
+ tar_path = settings["rootfs-tarball"]
+ opts = values["options"].split()
if not tar_path:
- raise Exception('--rootfs-tarball MUST be set')
+ raise Exception("--rootfs-tarball MUST be set")
dirs = self._find_cacheable_mount_points(state.tags, rootdir)
tags = state.tags
for tag in tags.get_tags():
vmdb.progress(
- 'tag {} mounted {} cached {}'.format(
- tag, tags.get_builder_mount_point(tag), tags.is_cached(tag)))
+ "tag {} mounted {} cached {}".format(
+ tag, tags.get_builder_mount_point(tag), tags.is_cached(tag)
+ )
+ )
- vmdb.progress('caching rootdir {}'.format(rootdir))
- vmdb.progress('caching relative {}'.format(dirs))
+ vmdb.progress("caching rootdir {}".format(rootdir))
+ vmdb.progress("caching relative {}".format(dirs))
if not os.path.exists(tar_path):
- vmdb.runcmd(
- ['tar'] + opts + ['-C', rootdir, '-caf', tar_path] + dirs)
+ vmdb.runcmd(["tar"] + opts + ["-C", rootdir, "-caf", tar_path] + dirs)
def _find_cacheable_mount_points(self, tags, rootdir):
- return list(sorted(
- self._make_relative(rootdir, tags.get_builder_mount_point(tag))
- for tag in tags.get_tags()
- if tags.is_cached(tag)
- ))
+ return list(
+ sorted(
+ self._make_relative(rootdir, tags.get_builder_mount_point(tag))
+ for tag in tags.get_tags()
+ if tags.is_cached(tag)
+ )
+ )
def _make_relative(self, rootdir, dirname):
- assert dirname == rootdir or dirname.startswith(rootdir + '/')
+ assert dirname == rootdir or dirname.startswith(rootdir + "/")
if dirname == rootdir:
- return '.'
- return dirname[len(rootdir) + 1:]
+ return "."
+ return dirname[len(rootdir) + 1 :]
diff --git a/vmdb/plugins/chroot_plugin.py b/vmdb/plugins/chroot_plugin.py
index bded609..7aab1aa 100644
--- a/vmdb/plugins/chroot_plugin.py
+++ b/vmdb/plugins/chroot_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import cliapp
@@ -25,22 +24,17 @@ import vmdb
class ChrootPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(ChrootStepRunner())
class ChrootStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'chroot': str,
- 'shell': str,
- }
+ return {"chroot": str, "shell": str}
def run(self, values, settings, state):
- fs_tag = values['chroot']
- shell = values['shell']
+ fs_tag = values["chroot"]
+ shell = values["shell"]
mount_point = state.tags.get_builder_mount_point(fs_tag)
- vmdb.runcmd_chroot(mount_point, ['sh', '-ec', shell])
+ vmdb.runcmd_chroot(mount_point, ["sh", "-ec", shell])
diff --git a/vmdb/plugins/copy_file_plugin.py b/vmdb/plugins/copy_file_plugin.py
index b82497b..3c3d0e5 100644
--- a/vmdb/plugins/copy_file_plugin.py
+++ b/vmdb/plugins/copy_file_plugin.py
@@ -20,41 +20,35 @@ import vmdb
import os
import logging
-class CopyFilePlugin(cliapp.Plugin):
+class CopyFilePlugin(cliapp.Plugin):
def enable(self):
self.app.step_runners.add(CopyFileStepRunner())
class CopyFileStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'copy-file': str,
- 'src': str,
- 'perm': 0o644,
- 'uid': 0,
- 'gid': 0,
- }
+ return {"copy-file": str, "src": str, "perm": 0o644, "uid": 0, "gid": 0}
def run(self, values, settings, state):
- root = state.tags.get_builder_from_target_mount_point('/')
- newfile = values['copy-file']
- src = values['src']
- perm = values['perm']
- uid = values['uid']
- gid = values['gid']
+ root = state.tags.get_builder_from_target_mount_point("/")
+ newfile = values["copy-file"]
+ src = values["src"]
+ perm = values["perm"]
+ uid = values["uid"]
+ gid = values["gid"]
- filename = '/'.join([root,newfile])
+ filename = "/".join([root, newfile])
logging.info(
- 'Copying file %s to %s, uid %d, gid %d, perms %o' % (
- src, filename, uid, gid, perm))
+ "Copying file %s to %s, uid %d, gid %d, perms %o"
+ % (src, filename, uid, gid, perm)
+ )
dirname = os.path.dirname(filename)
os.makedirs(dirname, mode=0o511, exist_ok=True)
- with open(src, 'rb') as inp:
- with open(filename, 'wb') as output:
+ with open(src, "rb") as inp:
+ with open(filename, "wb") as output:
contents = inp.read()
output.write(contents)
diff --git a/vmdb/plugins/create_dir_plugin.py b/vmdb/plugins/create_dir_plugin.py
index 77fc72a..51f22a3 100644
--- a/vmdb/plugins/create_dir_plugin.py
+++ b/vmdb/plugins/create_dir_plugin.py
@@ -20,33 +20,27 @@ import vmdb
import os
import logging
-class CreateDirPlugin(cliapp.Plugin):
+class CreateDirPlugin(cliapp.Plugin):
def enable(self):
self.app.step_runners.add(CreateDirStepRunner())
class CreateDirStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'create-dir': str,
- 'perm': 0o755,
- 'uid': 0,
- 'gid': 0,
- }
+ return {"create-dir": str, "perm": 0o755, "uid": 0, "gid": 0}
def run(self, values, settings, state):
- root = state.tags.get_builder_from_target_mount_point('/')
- newdir = values['create-dir']
- path = '/'.join([root, newdir])
- perm = values['perm']
- uid = values['uid']
- gid = values['gid']
+ root = state.tags.get_builder_from_target_mount_point("/")
+ newdir = values["create-dir"]
+ path = "/".join([root, newdir])
+ perm = values["perm"]
+ uid = values["uid"]
+ gid = values["gid"]
logging.info(
- 'Creating directory %s, uid %d, gid %d, perms %o' % (
- path, uid, gid, perm))
+ "Creating directory %s, uid %d, gid %d, perms %o" % (path, uid, gid, perm)
+ )
os.makedirs(path, perm)
os.chown(path, uid, gid)
diff --git a/vmdb/plugins/create_file_plugin.py b/vmdb/plugins/create_file_plugin.py
index 6d5621c..b332e64 100644
--- a/vmdb/plugins/create_file_plugin.py
+++ b/vmdb/plugins/create_file_plugin.py
@@ -22,37 +22,29 @@ import logging
class CreateFilePlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(CreateFileStepRunner())
class CreateFileStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'create-file': str,
- 'contents': str,
- 'perm': 0o644,
- 'uid': 0,
- 'gid': 0,
- }
+ return {"create-file": str, "contents": str, "perm": 0o644, "uid": 0, "gid": 0}
def run(self, values, settings, state):
- root = state.tags.get_builder_from_target_mount_point('/')
- newfile = values['create-file']
- contents = values['contents']
- perm = values['perm']
- uid = values['uid']
- gid = values['gid']
+ root = state.tags.get_builder_from_target_mount_point("/")
+ newfile = values["create-file"]
+ contents = values["contents"]
+ perm = values["perm"]
+ uid = values["uid"]
+ gid = values["gid"]
- filename = '/'.join([root, newfile])
+ filename = "/".join([root, newfile])
logging.info(
- 'Creating file %s, uid %d, gid %d, perms %o' % (
- filename, uid, gid, perm))
+ "Creating file %s, uid %d, gid %d, perms %o" % (filename, uid, gid, perm)
+ )
- fd = open(filename, 'w')
+ fd = open(filename, "w")
fd.write(contents)
fd.close
diff --git a/vmdb/plugins/debootstrap_plugin.py b/vmdb/plugins/debootstrap_plugin.py
index 2d93dea..a0db6e1 100644
--- a/vmdb/plugins/debootstrap_plugin.py
+++ b/vmdb/plugins/debootstrap_plugin.py
@@ -16,52 +16,53 @@
# =*= License: GPL-3+ =*=
-
import cliapp
import vmdb
class DebootstrapPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(DebootstrapStepRunner())
class DebootstrapStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
return {
- 'debootstrap': str,
- 'target': str,
- 'mirror': str,
- 'keyring': '',
- 'variant': '-',
+ "debootstrap": str,
+ "target": str,
+ "mirror": str,
+ "keyring": "",
+ "variant": "-",
}
def run(self, values, settings, state):
- suite = values['debootstrap']
- tag = values['target']
+ suite = values["debootstrap"]
+ tag = values["target"]
target = state.tags.get_builder_mount_point(tag)
- mirror = values['mirror']
- keyring = values['keyring'] or None
- variant = values['variant']
+ mirror = values["mirror"]
+ keyring = values["keyring"] or None
+ variant = values["variant"]
if not (suite and tag and target and mirror):
- raise Exception('missing arg for debootstrap step')
+ raise Exception("missing arg for debootstrap step")
if keyring:
- vmdb.runcmd([
- 'debootstrap',
- '--keyring', keyring,
- '--variant', variant,
- suite, target, mirror])
+ vmdb.runcmd(
+ [
+ "debootstrap",
+ "--keyring",
+ keyring,
+ "--variant",
+ variant,
+ suite,
+ target,
+ mirror,
+ ]
+ )
else:
- vmdb.runcmd([
- 'debootstrap',
- '--variant', variant,
- suite, target, mirror])
+ vmdb.runcmd(["debootstrap", "--variant", variant, suite, target, mirror])
def run_even_if_skipped(self, values, settings, state):
- tag = values['target']
+ tag = values["target"]
target = state.tags.get_builder_mount_point(tag)
- vmdb.runcmd_chroot(target, ['apt-get', 'update'])
+ vmdb.runcmd_chroot(target, ["apt-get", "update"])
diff --git a/vmdb/plugins/echo_plugin.py b/vmdb/plugins/echo_plugin.py
index d7748d7..312a5d7 100644
--- a/vmdb/plugins/echo_plugin.py
+++ b/vmdb/plugins/echo_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import logging
import cliapp
@@ -25,25 +24,20 @@ import vmdb
class EchoPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(EchoStepRunner())
class EchoStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'echo': str,
- 'teardown': '',
- }
+ return {"echo": str, "teardown": ""}
def run(self, values, settings, state):
- text = values['echo']
- vmdb.progress('{}'.format(text))
+ text = values["echo"]
+ vmdb.progress("{}".format(text))
def teardown(self, values, settings, state):
- text = values['teardown']
+ text = values["teardown"]
if text:
- vmdb.progress('{}'.format(text))
- logging.info('%s', text)
+ vmdb.progress("{}".format(text))
+ logging.info("%s", text)
diff --git a/vmdb/plugins/error_plugin.py b/vmdb/plugins/error_plugin.py
index bed7319..f90d8c0 100644
--- a/vmdb/plugins/error_plugin.py
+++ b/vmdb/plugins/error_plugin.py
@@ -16,33 +16,27 @@
# =*= License: GPL-3+ =*=
-
import cliapp
import vmdb
class ErrorPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(ErrorStepRunner())
class ErrorStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'error': str,
- 'teardown': str,
- }
+ return {"error": str, "teardown": str}
def run(self, values, settings, state):
# We use vmdb.progress here to get output to go to stdout,
# instead of stderr. We want that for tests.
- vmdb.progress('{}'.format(values['error']))
- raise vmdb.StepError('an error occurred')
+ vmdb.progress("{}".format(values["error"]))
+ raise vmdb.StepError("an error occurred")
def teardown(self, values, settings, state):
# We use vmdb.progress here to get output to go to stdout,
# instead of stderr. We want that for tests.
- vmdb.progress('{}'.format(values['teardown']))
+ vmdb.progress("{}".format(values["teardown"]))
diff --git a/vmdb/plugins/fstab_plugin.py b/vmdb/plugins/fstab_plugin.py
index 508ef07..974fc56 100644
--- a/vmdb/plugins/fstab_plugin.py
+++ b/vmdb/plugins/fstab_plugin.py
@@ -23,20 +23,16 @@ import vmdb
class FstabPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(FstabStepRunner())
class FstabStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'fstab': str,
- }
+ return {"fstab": str}
def run(self, values, setting, state):
- tag = values['fstab']
+ tag = values["fstab"]
chroot = state.tags.get_builder_mount_point(tag)
filesystems = []
@@ -46,22 +42,23 @@ class FstabStepRunner(vmdb.StepRunnerInterface):
mount_point = state.tags.get_target_mount_point(tag)
if mount_point is not None:
fstype = state.tags.get_fstype(tag)
- output = vmdb.runcmd([
- 'blkid', '-c', '/dev/null', '-o', 'value', '-s', 'UUID', device])
+ output = vmdb.runcmd(
+ ["blkid", "-c", "/dev/null", "-o", "value", "-s", "UUID", device]
+ )
if output:
uuid = output.decode().strip()
- filesystems.append({
- 'uuid': uuid,
- 'mount_point': mount_point,
- 'fstype': fstype,
- })
+ filesystems.append(
+ {"uuid": uuid, "mount_point": mount_point, "fstype": fstype}
+ )
else:
raise Exception(
- 'Unknown UUID for device {} (to be mounted on {})'.format(
- device, mount_point))
+ "Unknown UUID for device {} (to be mounted on {})".format(
+ device, mount_point
+ )
+ )
- fstab_path = os.path.join(chroot, 'etc/fstab')
+ fstab_path = os.path.join(chroot, "etc/fstab")
line = "UUID={uuid} {mount_point} {fstype} errors=remount-ro 0 1\n"
- with open(fstab_path, 'w') as fstab:
+ with open(fstab_path, "w") as fstab:
for entry in filesystems:
fstab.write(line.format(**entry))
diff --git a/vmdb/plugins/kpartx_plugin.py b/vmdb/plugins/kpartx_plugin.py
index 5d320ea..d1f4840 100644
--- a/vmdb/plugins/kpartx_plugin.py
+++ b/vmdb/plugins/kpartx_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import stat
@@ -26,34 +25,30 @@ import vmdb
class KpartxPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(KpartxStepRunner())
class KpartxStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'kpartx': str,
- }
+ return {"kpartx": str}
def run(self, values, settings, state):
- device = values['kpartx']
+ device = values["kpartx"]
tags = state.tags.get_tags()
devs = self.kpartx(device)
for tag, dev in zip(tags, devs):
- vmdb.progress('remembering {} as {}'.format(dev, tag))
+ vmdb.progress("remembering {} as {}".format(dev, tag))
state.tags.set_dev(tag, dev)
def kpartx(self, device):
- output = vmdb.runcmd(['kpartx', '-asv', device]).decode('UTF-8')
+ output = vmdb.runcmd(["kpartx", "-asv", device]).decode("UTF-8")
for line in output.splitlines():
words = line.split()
- if words[0] == 'add':
+ if words[0] == "add":
name = words[2]
- yield '/dev/mapper/{}'.format(name)
+ yield "/dev/mapper/{}".format(name)
def teardown(self, values, settings, state):
- device = values['kpartx']
- vmdb.runcmd(['kpartx', '-dsv', device])
+ device = values["kpartx"]
+ vmdb.runcmd(["kpartx", "-dsv", device])
diff --git a/vmdb/plugins/luks_plugin.py b/vmdb/plugins/luks_plugin.py
index 531d570..946893e 100644
--- a/vmdb/plugins/luks_plugin.py
+++ b/vmdb/plugins/luks_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import logging
import os
import tempfile
@@ -27,60 +26,64 @@ import vmdb
class LuksPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(CryptsetupStepRunner())
class CryptsetupStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'cryptsetup': str,
- 'tag': str,
- 'key-file': '',
- 'key-cmd': '',
- }
+ return {"cryptsetup": str, "tag": str, "key-file": "", "key-cmd": ""}
def run(self, values, settings, state):
- underlying = values['cryptsetup']
- crypt_name = values['tag']
+ underlying = values["cryptsetup"]
+ crypt_name = values["tag"]
if not isinstance(underlying, str):
- raise vmdb.NotString('cryptsetup', underlying)
+ raise vmdb.NotString("cryptsetup", underlying)
if not isinstance(crypt_name, str):
- raise vmdb.NotString('cryptsetup: tag', crypt_name)
+ raise vmdb.NotString("cryptsetup: tag", crypt_name)
state.tmp_key_file = None
- key_file = values['key-file'] or None
- key_cmd = values['key-cmd'] or None
+ key_file = values["key-file"] or None
+ key_cmd = values["key-cmd"] or None
if key_file is None and key_cmd is None:
- raise Exception(
- 'cryptsetup step MUST define one of key-file or key-cmd')
+ raise Exception("cryptsetup step MUST define one of key-file or key-cmd")
if key_file is None:
- output = vmdb.runcmd(['sh', '-ec', key_cmd])
- output = output.decode('UTF-8')
+ output = vmdb.runcmd(["sh", "-ec", key_cmd])
+ output = output.decode("UTF-8")
key = output.splitlines()[0]
fd, key_file = tempfile.mkstemp()
state.tmp_key_file = key_file
os.close(fd)
- open(key_file, 'w').write(key)
+ open(key_file, "w").write(key)
dev = state.tags.get_dev(underlying)
if dev is None:
for t in state.tags.get_tags():
logging.debug(
- 'tag %r dev %r mp %r', t, state.tags.get_dev(t),
- state.tags.get_builder_mount_point(t))
+ "tag %r dev %r mp %r",
+ t,
+ state.tags.get_dev(t),
+ state.tags.get_builder_mount_point(t),
+ )
assert 0
- vmdb.runcmd(['cryptsetup', '-q', 'luksFormat', dev, key_file])
+ vmdb.runcmd(["cryptsetup", "-q", "luksFormat", dev, key_file])
vmdb.runcmd(
- ['cryptsetup', 'open', '--type', 'luks', '--key-file', key_file, dev,
- crypt_name])
-
- crypt_dev = '/dev/mapper/{}'.format(crypt_name)
+ [
+ "cryptsetup",
+ "open",
+ "--type",
+ "luks",
+ "--key-file",
+ key_file,
+ dev,
+ crypt_name,
+ ]
+ )
+
+ crypt_dev = "/dev/mapper/{}".format(crypt_name)
assert os.path.exists(crypt_dev)
state.tags.append(crypt_name)
state.tags.set_dev(crypt_name, crypt_dev)
@@ -90,7 +93,7 @@ class CryptsetupStepRunner(vmdb.StepRunnerInterface):
if x is not None and os.path.exists(x):
os.remove(x)
- crypt_name = values['tag']
+ crypt_name = values["tag"]
- crypt_dev = '/dev/mapper/{}'.format(crypt_name)
- vmdb.runcmd(['cryptsetup', 'close', crypt_dev])
+ crypt_dev = "/dev/mapper/{}".format(crypt_name)
+ vmdb.runcmd(["cryptsetup", "close", crypt_dev])
diff --git a/vmdb/plugins/lvcreate_plugin.py b/vmdb/plugins/lvcreate_plugin.py
index 4b135bc..1d3de33 100644
--- a/vmdb/plugins/lvcreate_plugin.py
+++ b/vmdb/plugins/lvcreate_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import cliapp
@@ -25,28 +24,22 @@ import vmdb
class LvcreatePlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(LvcreateStepRunner())
class LvcreateStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'lvcreate': str,
- 'name': str,
- 'size': str,
- }
+ return {"lvcreate": str, "name": str, "size": str}
def run(self, values, settings, state):
- vgname = values['lvcreate']
- lvname = values['name']
- size = values['size']
+ vgname = values["lvcreate"]
+ lvname = values["name"]
+ size = values["size"]
- vmdb.runcmd(['lvcreate', '--name', lvname, '--size', size, vgname])
+ vmdb.runcmd(["lvcreate", "--name", lvname, "--size", size, vgname])
- lvdev = '/dev/{}/{}'.format(vgname, lvname)
+ lvdev = "/dev/{}/{}".format(vgname, lvname)
assert os.path.exists(lvdev)
state.tags.append(lvname)
state.tags.set_dev(lvname, lvdev)
diff --git a/vmdb/plugins/mkfs_plugin.py b/vmdb/plugins/mkfs_plugin.py
index 967493f..17bf88e 100644
--- a/vmdb/plugins/mkfs_plugin.py
+++ b/vmdb/plugins/mkfs_plugin.py
@@ -16,48 +16,41 @@
# =*= License: GPL-3+ =*=
-
import cliapp
import vmdb
class MkfsPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(MkfsStepRunner())
class MkfsStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'mkfs': str,
- 'partition': str,
- 'label': '',
- }
+ return {"mkfs": str, "partition": str, "label": ""}
def run(self, values, settings, state):
- fstype = values['mkfs']
- tag = values['partition']
+ fstype = values["mkfs"]
+ tag = values["partition"]
device = state.tags.get_dev(tag)
if not isinstance(fstype, str):
- raise vmdb.NotString('mkfs', fstype)
+ raise vmdb.NotString("mkfs", fstype)
if not isinstance(tag, str):
- raise vmdb.NotString('mkfs: tag', tag)
+ raise vmdb.NotString("mkfs: tag", tag)
if not isinstance(device, str):
- raise vmdb.NotString('mkfs: device (for tag)', device)
+ raise vmdb.NotString("mkfs: device (for tag)", device)
- cmd = ['/sbin/mkfs', '-t', fstype]
- label = values['label'] or None
+ cmd = ["/sbin/mkfs", "-t", fstype]
+ label = values["label"] or None
if label:
- if fstype == 'vfat':
- cmd.append('-n')
- elif fstype == 'f2fs':
- cmd.append('-l')
+ if fstype == "vfat":
+ cmd.append("-n")
+ elif fstype == "f2fs":
+ cmd.append("-l")
else:
- cmd.append('-L')
+ cmd.append("-L")
cmd.append(label)
cmd.append(device)
vmdb.runcmd(cmd)
diff --git a/vmdb/plugins/mkimg_plugin.py b/vmdb/plugins/mkimg_plugin.py
index 15467e1..75d00ed 100644
--- a/vmdb/plugins/mkimg_plugin.py
+++ b/vmdb/plugins/mkimg_plugin.py
@@ -16,40 +16,31 @@
# =*= License: GPL-3+ =*=
-
import cliapp
import vmdb
class MkimgPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(MkimgStepRunner())
- self.app.settings.bytesize(
- ['size'],
- 'size of output image',
- default='1GiB')
+ self.app.settings.bytesize(["size"], "size of output image", default="1GiB")
class MkimgStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'mkimg': str,
- 'size': str,
- }
+ return {"mkimg": str, "size": str}
def run(self, values, settings, state):
- filename = values['mkimg']
- size = values['size']
+ filename = values["mkimg"]
+ size = values["size"]
if not isinstance(filename, str):
- raise vmdb.NotString('mkimg', filename)
+ raise vmdb.NotString("mkimg", filename)
if not filename:
- raise vmdb.IsEmptyString('mkimg', filename)
+ raise vmdb.IsEmptyString("mkimg", filename)
if not isinstance(size, str):
- raise vmdb.NotString('mkimg: size', size)
+ raise vmdb.NotString("mkimg: size", size)
- vmdb.runcmd(['qemu-img', 'create', '-f', 'raw', filename, size])
+ vmdb.runcmd(["qemu-img", "create", "-f", "raw", filename, size])
diff --git a/vmdb/plugins/mklabel_plugin.py b/vmdb/plugins/mklabel_plugin.py
index 5b5fd5a..c455394 100644
--- a/vmdb/plugins/mklabel_plugin.py
+++ b/vmdb/plugins/mklabel_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import stat
@@ -26,20 +25,15 @@ import vmdb
class MklabelPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(MklabelStepRunner())
class MklabelStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'mklabel': str,
- 'device': str,
- }
+ return {"mklabel": str, "device": str}
def run(self, values, settings, state):
- label_type = values['mklabel']
- device = values['device']
- vmdb.runcmd(['parted', '-s', device, 'mklabel', label_type])
+ label_type = values["mklabel"]
+ device = values["device"]
+ vmdb.runcmd(["parted", "-s", device, "mklabel", label_type])
diff --git a/vmdb/plugins/mkpart_plugin.py b/vmdb/plugins/mkpart_plugin.py
index 6b0a602..787bd05 100644
--- a/vmdb/plugins/mkpart_plugin.py
+++ b/vmdb/plugins/mkpart_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import stat
import time
@@ -27,35 +26,33 @@ import vmdb
class MkpartPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(MkpartStepRunner())
class MkpartStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
return {
- 'mkpart': str,
- 'device': str,
- 'start': str,
- 'end': str,
- 'tag': '',
- 'part-tag': '',
- 'fs-type': 'ext2',
+ "mkpart": str,
+ "device": str,
+ "start": str,
+ "end": str,
+ "tag": "",
+ "part-tag": "",
+ "fs-type": "ext2",
}
def run(self, values, settings, state):
- part_type = values['mkpart']
- device = values['device']
- start = values['start']
- end = values['end']
- tag = values['tag'] or values['part-tag'] or None
- fs_type = values['fs-type']
+ part_type = values["mkpart"]
+ device = values["device"]
+ start = values["start"]
+ end = values["end"]
+ tag = values["tag"] or values["part-tag"] or None
+ fs_type = values["fs-type"]
device = os.path.realpath(device)
orig = self.list_partitions(device)
- vmdb.runcmd(['parted', '-s', device, 'mkpart', part_type, fs_type, start, end])
+ vmdb.runcmd(["parted", "-s", device, "mkpart", part_type, fs_type, start, end])
new = self.list_partitions(device)
diff = self.diff_partitions(orig, new)
@@ -78,7 +75,7 @@ class MkpartStepRunner(vmdb.StepRunnerInterface):
# tags.
if self.is_block_dev(device):
self.wait_for_file_to_exist(diff[0])
- vmdb.progress('remembering partition {} as {}'.format(diff[0], tag))
+ vmdb.progress("remembering partition {} as {}".format(diff[0], tag))
state.tags.set_dev(tag, diff[0])
def is_block_dev(self, filename):
@@ -86,24 +83,16 @@ class MkpartStepRunner(vmdb.StepRunnerInterface):
return stat.S_ISBLK(st.st_mode)
def list_partitions(self, device):
- output = vmdb.runcmd(['parted', '-m', device, 'print'])
- output = output.decode('UTF-8')
- partitions = [
- line.split(':')[0]
- for line in output.splitlines()
- if ':' in line
- ]
+ output = vmdb.runcmd(["parted", "-m", device, "print"])
+ output = output.decode("UTF-8")
+ partitions = [line.split(":")[0] for line in output.splitlines() if ":" in line]
return [
- word if word.startswith('/') else '{}{}'.format(device, word)
+ word if word.startswith("/") else "{}{}".format(device, word)
for word in partitions
]
def diff_partitions(self, old, new):
- return [
- line
- for line in new
- if line not in old
- ]
+ return [line for line in new if line not in old]
def wait_for_file_to_exist(self, filename):
while not os.path.exists(filename):
@@ -116,14 +105,13 @@ class MkpartError(cliapp.AppException):
class ExpectedNewPartition(MkpartError):
-
def __init__(self):
- super().__init__('Expected a new partition to exist after mkpart')
+ super().__init__("Expected a new partition to exist after mkpart")
class UnexpectedNewPartitions(MkpartError):
-
def __init__(self, diff):
super().__init__(
- 'Expected only one new partition to exist after mkpart, '
- 'but found {}'.format(' '.join(diff)))
+ "Expected only one new partition to exist after mkpart, "
+ "but found {}".format(" ".join(diff))
+ )
diff --git a/vmdb/plugins/mount_plugin.py b/vmdb/plugins/mount_plugin.py
index fc5ffeb..f50de11 100644
--- a/vmdb/plugins/mount_plugin.py
+++ b/vmdb/plugins/mount_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import logging
import os
import tempfile
@@ -27,19 +26,13 @@ import vmdb
class MountPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(MountStepRunner())
class MountStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'mount': str,
- 'dirname': '',
- 'mount-on': '',
- }
+ return {"mount": str, "dirname": "", "mount-on": ""}
def run(self, values, settings, state):
self.mount_rootfs(values, settings, state)
@@ -48,36 +41,37 @@ class MountStepRunner(vmdb.StepRunnerInterface):
self.unmount_rootfs(values, settings, state)
def mount_rootfs(self, values, settings, state):
- tag = values['mount']
- dirname = values['dirname'] or None
- mount_on = values['mount-on'] or None
+ tag = values["mount"]
+ dirname = values["dirname"] or None
+ mount_on = values["mount-on"] or None
device = state.tags.get_dev(tag)
if dirname:
if not mount_on:
- raise Exception('no mount-on tag given')
+ raise Exception("no mount-on tag given")
if not state.tags.has_tag(mount_on):
- raise Exception('cannot find tag {}'.format(mount_on))
+ raise Exception("cannot find tag {}".format(mount_on))
mount_point = os.path.join(
- state.tags.get_builder_mount_point(mount_on), './' + dirname)
+ state.tags.get_builder_mount_point(mount_on), "./" + dirname
+ )
if not os.path.exists(mount_point):
os.makedirs(mount_point)
else:
- dirname = '/'
+ dirname = "/"
mount_point = tempfile.mkdtemp()
- vmdb.runcmd(['mount', device, mount_point])
+ vmdb.runcmd(["mount", device, mount_point])
state.tags.set_builder_mount_point(tag, mount_point, cached=True)
state.tags.set_target_mount_point(tag, dirname)
return mount_point
def unmount_rootfs(self, values, settings, state):
- tag = values['mount']
+ tag = values["mount"]
mount_point = state.tags.get_builder_mount_point(tag)
if mount_point is None:
return
@@ -87,5 +81,5 @@ class MountStepRunner(vmdb.StepRunnerInterface):
except vmdb.NotMounted as e:
logging.warning(str(e))
- if not values['mount-on']:
+ if not values["mount-on"]:
os.rmdir(mount_point)
diff --git a/vmdb/plugins/qemudebootstrap_plugin.py b/vmdb/plugins/qemudebootstrap_plugin.py
index fa97523..01184a3 100644
--- a/vmdb/plugins/qemudebootstrap_plugin.py
+++ b/vmdb/plugins/qemudebootstrap_plugin.py
@@ -16,62 +16,74 @@
# =*= License: GPL-3+ =*=
-
import cliapp
import vmdb
class QemuDebootstrapPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(QemuDebootstrapStepRunner())
class QemuDebootstrapStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
return {
- 'qemu-debootstrap': str,
- 'target': str,
- 'mirror': str,
- 'arch': str,
- 'keyring': '',
- 'variant': '-',
- 'components': ['main'],
+ "qemu-debootstrap": str,
+ "target": str,
+ "mirror": str,
+ "arch": str,
+ "keyring": "",
+ "variant": "-",
+ "components": ["main"],
}
def run(self, values, settings, state):
- suite = values['qemu-debootstrap']
- tag = values['target']
+ suite = values["qemu-debootstrap"]
+ tag = values["target"]
target = state.tags.get_builder_mount_point(tag)
- mirror = values['mirror']
- keyring = values['keyring'] or None
- variant = values['variant']
- arch = values['arch']
- components = values['components']
+ mirror = values["mirror"]
+ keyring = values["keyring"] or None
+ variant = values["variant"]
+ arch = values["arch"]
+ components = values["components"]
if not (suite and tag and target and mirror and arch):
- raise Exception('missing arg for qemu-debootstrap step')
+ raise Exception("missing arg for qemu-debootstrap step")
if keyring:
vmdb.runcmd(
- ['qemu-debootstrap',
- '--keyring', keyring,
- '--arch', arch,
- '--variant', variant,
- '--components', ','.join(components), suite,
- target,
- mirror])
+ [
+ "qemu-debootstrap",
+ "--keyring",
+ keyring,
+ "--arch",
+ arch,
+ "--variant",
+ variant,
+ "--components",
+ ",".join(components),
+ suite,
+ target,
+ mirror,
+ ]
+ )
else:
vmdb.runcmd(
- ['qemu-debootstrap',
- '--arch', arch,
- '--variant', variant,
- '--components', ','.join(components), suite,
- target,
- mirror])
- vmdb.runcmd_chroot(target, ['apt-get', 'update'])
+ [
+ "qemu-debootstrap",
+ "--arch",
+ arch,
+ "--variant",
+ variant,
+ "--components",
+ ",".join(components),
+ suite,
+ target,
+ mirror,
+ ]
+ )
+ vmdb.runcmd_chroot(target, ["apt-get", "update"])
def run_even_if_skipped(self, values, settings, state):
- tag = values['target']
+ tag = values["target"]
target = state.tags.get_builder_mount_point(tag)
- vmdb.runcmd_chroot(target, ['apt-get', 'update'])
+ vmdb.runcmd_chroot(target, ["apt-get", "update"])
diff --git a/vmdb/plugins/shell_plugin.py b/vmdb/plugins/shell_plugin.py
index cbc52ea..b9e516a 100644
--- a/vmdb/plugins/shell_plugin.py
+++ b/vmdb/plugins/shell_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import cliapp
@@ -25,23 +24,18 @@ import vmdb
class ShellPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(ShellStepRunner())
class ShellStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'shell': str,
- 'root-fs': str,
- }
+ return {"shell": str, "root-fs": str}
def run(self, step, settings, state):
- shell = step['shell']
- fs_tag = step['root-fs']
+ shell = step["shell"]
+ fs_tag = step["root-fs"]
env = dict(os.environ)
- env['ROOT'] = state.tags.get_builder_mount_point(fs_tag)
- vmdb.runcmd(['sh', '-ec', shell], env=env)
+ env["ROOT"] = state.tags.get_builder_mount_point(fs_tag)
+ vmdb.runcmd(["sh", "-ec", shell], env=env)
diff --git a/vmdb/plugins/unpack_rootfs_plugin.py b/vmdb/plugins/unpack_rootfs_plugin.py
index 9143143..c8b7649 100644
--- a/vmdb/plugins/unpack_rootfs_plugin.py
+++ b/vmdb/plugins/unpack_rootfs_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import cliapp
@@ -25,30 +24,25 @@ import vmdb
class UnpackRootFSPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(UnpackCacheStepRunner())
class UnpackCacheStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'unpack-rootfs': str,
- }
+ return {"unpack-rootfs": str}
def run(self, values, settings, state):
- fs_tag = values['unpack-rootfs']
+ fs_tag = values["unpack-rootfs"]
rootdir = state.tags.get_builder_mount_point(fs_tag)
- tar_path = settings['rootfs-tarball']
+ tar_path = settings["rootfs-tarball"]
if not tar_path:
- raise Exception('--rootfs-tarball MUST be set')
+ raise Exception("--rootfs-tarball MUST be set")
if os.path.exists(tar_path):
- vmdb.runcmd(
- ['tar', '-C', rootdir, '-xf', tar_path, '--numeric-owner'])
+ vmdb.runcmd(["tar", "-C", rootdir, "-xf", tar_path, "--numeric-owner"])
self.copy_resolv_conf(rootdir)
state.rootfs_unpacked = True
def copy_resolv_conf(self, rootdir):
- filename = os.path.join(rootdir, 'etc', 'resolv.conf')
- vmdb.runcmd(['cp', '/etc/resolv.conf', filename])
+ filename = os.path.join(rootdir, "etc", "resolv.conf")
+ vmdb.runcmd(["cp", "/etc/resolv.conf", filename])
diff --git a/vmdb/plugins/vgcreate_plugin.py b/vmdb/plugins/vgcreate_plugin.py
index d6b3c45..d979918 100644
--- a/vmdb/plugins/vgcreate_plugin.py
+++ b/vmdb/plugins/vgcreate_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import os
import cliapp
@@ -25,36 +24,28 @@ import vmdb
class VgcreatePlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(VgcreateStepRunner())
class VgcreateStepRunner(vmdb.StepRunnerInterface):
-
def get_key_spec(self):
- return {
- 'vgcreate': str,
- 'physical': [],
- }
+ return {"vgcreate": str, "physical": []}
def run(self, values, settings, state):
vgname = self.get_vg(values)
physical = self.get_pv(values, state)
for phys in physical:
- vmdb.runcmd(['pvcreate', '-ff', '--yes', phys])
- vmdb.runcmd(['vgcreate', vgname] + physical)
+ vmdb.runcmd(["pvcreate", "-ff", "--yes", phys])
+ vmdb.runcmd(["vgcreate", vgname] + physical)
def teardown(self, values, settings, state):
vgname = self.get_vg(values)
- vmdb.runcmd(['vgchange', '-an', vgname])
+ vmdb.runcmd(["vgchange", "-an", vgname])
def get_vg(self, values):
- return values['vgcreate']
+ return values["vgcreate"]
def get_pv(self, values, state):
- return [
- state.tags.get_dev(tag)
- for tag in values['physical']
- ]
+ return [state.tags.get_dev(tag) for tag in values["physical"]]
diff --git a/vmdb/plugins/virtualfs_plugin.py b/vmdb/plugins/virtualfs_plugin.py
index 848d000..59996c1 100644
--- a/vmdb/plugins/virtualfs_plugin.py
+++ b/vmdb/plugins/virtualfs_plugin.py
@@ -16,7 +16,6 @@
# =*= License: GPL-3+ =*=
-
import logging
import os
@@ -26,7 +25,6 @@ import vmdb
class VirtualFilesystemMountPlugin(cliapp.Plugin):
-
def enable(self):
self.app.step_runners.add(VirtualFilesystemMountStepRunner())
@@ -34,22 +32,20 @@ class VirtualFilesystemMountPlugin(cliapp.Plugin):
class VirtualFilesystemMountStepRunner(vmdb.StepRunnerInterface):
virtuals = [
- ['none', '/proc', 'proc'],
- ['none', '/dev', 'devtmpfs'],
- ['none', '/dev/pts', 'devpts'],
- ['none', '/dev/shm', 'tmpfs'],
- ['none', '/run', 'tmpfs'],
- ['none', '/run/lock', 'tmpfs'],
- ['none', '/sys', 'sysfs'],
+ ["none", "/proc", "proc"],
+ ["none", "/dev", "devtmpfs"],
+ ["none", "/dev/pts", "devpts"],
+ ["none", "/dev/shm", "tmpfs"],
+ ["none", "/run", "tmpfs"],
+ ["none", "/run/lock", "tmpfs"],
+ ["none", "/sys", "sysfs"],
]
def get_key_spec(self):
- return {
- 'virtual-filesystems': str,
- }
+ return {"virtual-filesystems": str}
def run(self, values, settings, state):
- fstag = values['virtual-filesystems']
+ fstag = values["virtual-filesystems"]
mount_point = state.tags.get_builder_mount_point(fstag)
self.mount_virtuals(mount_point, state)
@@ -57,23 +53,23 @@ class VirtualFilesystemMountStepRunner(vmdb.StepRunnerInterface):
self.unmount_virtuals(state)
def mount_virtuals(self, rootfs, state):
- if not hasattr(state, 'virtuals'):
+ if not hasattr(state, "virtuals"):
state.virtuals = []
for device, mount_point, fstype in self.virtuals:
- path = os.path.join(rootfs, './' + mount_point)
+ path = os.path.join(rootfs, "./" + mount_point)
if not os.path.exists(path):
os.mkdir(path)
- vmdb.runcmd(['mount', '-t', fstype, device, path])
+ vmdb.runcmd(["mount", "-t", fstype, device, path])
state.virtuals.append(path)
- logging.debug('mounted virtuals: %r', state.virtuals)
+ logging.debug("mounted virtuals: %r", state.virtuals)
def unmount_virtuals(self, state):
- logging.debug('unmounting virtuals: %r', state.virtuals)
+ logging.debug("unmounting virtuals: %r", state.virtuals)
for mount_point in reversed(state.virtuals):
try:
vmdb.unmount(mount_point)
except vmdb.NotMounted as e:
logging.warning(str(e))
except cliapp.AppException:
- vmdb.warning('Something went wrong while unmounting. Ignoring.')
+ vmdb.warning("Something went wrong while unmounting. Ignoring.")
diff --git a/vmdb/runcmd.py b/vmdb/runcmd.py
index 7cd6406..985a717 100644
--- a/vmdb/runcmd.py
+++ b/vmdb/runcmd.py
@@ -34,50 +34,50 @@ def set_verbose_progress(verbose):
def error(msg):
logging.error(msg, exc_info=True)
- sys.stderr.write('ERROR: {}\n'.format(msg))
+ sys.stderr.write("ERROR: {}\n".format(msg))
sys.stderr.flush()
def progress(msg):
logging.info(msg)
if _verbose:
- sys.stdout.write('{}\n'.format(msg))
+ sys.stdout.write("{}\n".format(msg))
sys.stdout.flush()
def runcmd(argv, **kwargs):
- progress('Exec: %r' % (argv,))
- env = kwargs.get('env', os.environ.copy())
- env['LC_ALL'] = 'C'
- kwargs['env'] = env
- kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
- kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
+ progress("Exec: %r" % (argv,))
+ env = kwargs.get("env", os.environ.copy())
+ env["LC_ALL"] = "C"
+ kwargs["env"] = env
+ kwargs["stdout"] = kwargs.get("stdout", subprocess.PIPE)
+ kwargs["stderr"] = kwargs.get("stderr", subprocess.PIPE)
p = subprocess.Popen(argv, **kwargs)
out, err = p.communicate()
- logging.debug('STDOUT: %s', out.decode('UTF8'))
- logging.debug('STDERR: %s', err.decode('UTF8'))
+ logging.debug("STDOUT: %s", out.decode("UTF8"))
+ logging.debug("STDERR: %s", err.decode("UTF8"))
if p.returncode != 0:
- raise cliapp.AppException('Command failed: {}'.format(p.returncode))
+ raise cliapp.AppException("Command failed: {}".format(p.returncode))
return out
def runcmd_chroot(chroot, argv, *argvs, **kwargs):
- full_argv = ['chroot', chroot] + argv
+ full_argv = ["chroot", chroot] + argv
return runcmd(full_argv, *argvs, **kwargs)
def _procdir(chroot):
- proc = os.path.join(chroot, 'proc')
+ proc = os.path.join(chroot, "proc")
if not os.path.exists(proc):
os.mkdir(proc, mode=Oo755)
return proc
def _log_stdout(data):
- logging.debug('STDOUT: %r', data)
+ logging.debug("STDOUT: %r", data)
return data
def _log_stderr(data):
- logging.debug('STDERR: %r', data)
+ logging.debug("STDERR: %r", data)
return data
diff --git a/vmdb/spec.py b/vmdb/spec.py
index 98f5c56..cfcba9c 100644
--- a/vmdb/spec.py
+++ b/vmdb/spec.py
@@ -21,7 +21,6 @@ import yaml
class Spec:
-
def __init__(self):
self._dict = None
@@ -32,7 +31,7 @@ class Spec:
return dict(self._dict)
def get_steps(self, params):
- return expand_templates(self._dict['steps'], params)
+ return expand_templates(self._dict["steps"], params)
def expand_templates(value, params):
@@ -44,9 +43,6 @@ def expand_templates(value, params):
elif isinstance(value, list):
return [expand_templates(x, params) for x in value]
elif isinstance(value, dict):
- return {
- key: expand_templates(value[key], params)
- for key in value
- }
+ return {key: expand_templates(value[key], params) for key in value}
else:
- assert 0, 'Unknown value type: {!r}'.format(value)
+ assert 0, "Unknown value type: {!r}".format(value)
diff --git a/vmdb/spec_tests.py b/vmdb/spec_tests.py
index 4b23238..71ef965 100644
--- a/vmdb/spec_tests.py
+++ b/vmdb/spec_tests.py
@@ -28,16 +28,16 @@ import vmdb
class SpecTests(unittest.TestCase):
- spec_yaml = '''
+ spec_yaml = """
steps:
- step: foo
arg: "{{ var1 }}"
number: 0711
- step: bar
- '''
+ """
def setUp(self):
- self.filename = write_temp_file(bytes(self.spec_yaml, 'ascii'))
+ self.filename = write_temp_file(bytes(self.spec_yaml, "ascii"))
self.spec = vmdb.Spec()
def tearDown(self):
@@ -50,52 +50,33 @@ class SpecTests(unittest.TestCase):
def test_expands_templates(self):
self.spec.load_file(open(self.filename))
- params = {
- 'var1': 'value1',
- }
+ params = {"var1": "value1"}
steps = self.spec.get_steps(params)
self.assertEqual(
- steps,
- [
- {
- 'step': 'foo',
- 'arg': 'value1',
- 'number': 0o711,
- },
- {
- 'step': 'bar',
- },
- ]
+ steps, [{"step": "foo", "arg": "value1", "number": 0o711}, {"step": "bar"}]
)
-class ExpandTemplatesTests(unittest.TestCase):
+class ExpandTemplatesTests(unittest.TestCase):
def test_raises_assert_if_given_incomprehensible_value(self):
with self.assertRaises(AssertionError):
vmdb.expand_templates(None, {})
def test_returns_same_given_string_without_template(self):
- self.assertEqual(vmdb.expand_templates('foo', {}), 'foo')
+ self.assertEqual(vmdb.expand_templates("foo", {}), "foo")
def test_expands_simple_string_template(self):
- params = {
- 'foo': 'bar',
- }
- self.assertEqual(vmdb.expand_templates('{{ foo }}', params), 'bar')
+ params = {"foo": "bar"}
+ self.assertEqual(vmdb.expand_templates("{{ foo }}", params), "bar")
def test_expands_list_of_templates(self):
- params = {
- 'foo': 'bar',
- }
- self.assertEqual(vmdb.expand_templates(['{{ foo }}'], params), ['bar'])
+ params = {"foo": "bar"}
+ self.assertEqual(vmdb.expand_templates(["{{ foo }}"], params), ["bar"])
def test_expands_dict_of_templates(self):
- params = {
- 'foo': 'bar',
- }
+ params = {"foo": "bar"}
self.assertEqual(
- vmdb.expand_templates({'key': '{{ foo }}'}, params),
- {'key': 'bar'}
+ vmdb.expand_templates({"key": "{{ foo }}"}, params), {"key": "bar"}
)
diff --git a/vmdb/state.py b/vmdb/state.py
index e6b74d2..9ae046e 100644
--- a/vmdb/state.py
+++ b/vmdb/state.py
@@ -17,14 +17,11 @@
class State:
-
def __init__(self):
self._attrs = {} # make sure this attribute exists
self._attrs = self.as_dict()
def as_dict(self):
return {
- key: repr(getattr(self, key))
- for key in dir(self)
- if not key in self._attrs
+ key: repr(getattr(self, key)) for key in dir(self) if not key in self._attrs
}
diff --git a/vmdb/step_list.py b/vmdb/step_list.py
index c8d0ec8..1cc0605 100644
--- a/vmdb/step_list.py
+++ b/vmdb/step_list.py
@@ -23,7 +23,6 @@ import cliapp
class StepRunnerInterface: # pragma: no cover
-
def get_key_spec(self):
raise NotImplementedError()
@@ -71,7 +70,7 @@ class StepRunnerInterface: # pragma: no cover
# a list of such names. If all variables have a value that
# evaluates as truth, the step is skipped.
- value = step_spec.get('unless', None)
+ value = step_spec.get("unless", None)
if value is None:
return False
@@ -81,7 +80,6 @@ class StepRunnerInterface: # pragma: no cover
class StepRunnerList:
-
def __init__(self):
self._runners = []
@@ -109,42 +107,37 @@ class StepRunnerList:
class StepError(cliapp.AppException):
-
def __init__(self, msg):
logging.error(msg)
super().__init__(msg)
class StepKeyMissing(StepError):
-
def __init__(self, key):
- super().__init__('Step is missing key {}'.format(key))
+ super().__init__("Step is missing key {}".format(key))
class StepKeyWrongValueType(StepError):
-
def __init__(self, key, wanted, actual):
super().__init__(
- 'Step key {} has value {!r}, expected {!r}'.format(
- key, wanted, actual))
+ "Step key {} has value {!r}, expected {!r}".format(key, wanted, actual)
+ )
class NoMatchingRunner(StepError):
-
def __init__(self, keys):
super().__init__(
- 'No runner implements step with keys {}'.format(', '.join(keys)))
+ "No runner implements step with keys {}".format(", ".join(keys))
+ )
class NotString(StepError): # pragma: no cover
-
def __init__(self, name, actual):
- msg = '%s: value must be string, got %r' % (name, actual)
+ msg = "%s: value must be string, got %r" % (name, actual)
super().__init__(msg)
class IsEmptyString(StepError): # pragma: no cover
-
def __init__(self, name, actual):
- msg = '%s: value must not be an empty string, got %r' % (name, actual)
+ msg = "%s: value must not be an empty string, got %r" % (name, actual)
super().__init__(msg)
diff --git a/vmdb/step_list_tests.py b/vmdb/step_list_tests.py
index 13836a5..5c4d080 100644
--- a/vmdb/step_list_tests.py
+++ b/vmdb/step_list_tests.py
@@ -22,7 +22,6 @@ import vmdb
class StepRunnerListTests(unittest.TestCase):
-
def test_is_empty_initially(self):
steps = vmdb.StepRunnerList()
self.assertEqual(len(steps), 0)
@@ -35,39 +34,30 @@ class StepRunnerListTests(unittest.TestCase):
def test_finds_correct_runner(self):
steps = vmdb.StepRunnerList()
- keyspec = {
- 'foo': str,
- 'bar': str,
- }
+ keyspec = {"foo": str, "bar": str}
runner = DummyStepRunner(keyspec=keyspec)
steps.add(runner)
- found = steps.find({'foo': 'foo', 'bar': 'bar'})
+ found = steps.find({"foo": "foo", "bar": "bar"})
self.assertEqual(runner, found)
def test_raises_error_if_runner_not_found(self):
steps = vmdb.StepRunnerList()
- keyspec = {
- 'foo': str,
- 'bar': str,
- }
+ keyspec = {"foo": str, "bar": str}
runner = DummyStepRunner(keyspec=keyspec)
steps.add(runner)
with self.assertRaises(vmdb.NoMatchingRunner):
- steps.find({'foo': 'foo'})
+ steps.find({"foo": "foo"})
def test_raises_error_if_wrong_step_key_values(self):
steps = vmdb.StepRunnerList()
- keyspec = {
- 'foo': str,
- }
+ keyspec = {"foo": str}
runner = DummyStepRunner(keyspec=keyspec)
steps.add(runner)
with self.assertRaises(vmdb.StepKeyWrongValueType):
- steps.find({'foo': 42})
+ steps.find({"foo": 42})
class DummyStepRunner(vmdb.StepRunnerInterface):
-
def __init__(self, keyspec=None):
self.keyspec = keyspec
@@ -79,36 +69,35 @@ class DummyStepRunner(vmdb.StepRunnerInterface):
class StepRunnerGetKeyValuesTests(unittest.TestCase):
-
def test_returns_values_from_step_for_mandatory_keys(self):
- keyspec = {'foo': str}
+ keyspec = {"foo": str}
runner = DummyStepRunner(keyspec=keyspec)
- self.assertEqual(runner.get_values({'foo': 'bar'}), {'foo': 'bar'})
+ self.assertEqual(runner.get_values({"foo": "bar"}), {"foo": "bar"})
def test_raises_error_for_missing_mandatory_key(self):
- keyspec = {'foo': str}
+ keyspec = {"foo": str}
runner = DummyStepRunner(keyspec=keyspec)
with self.assertRaises(vmdb.StepKeyMissing):
runner.get_values({})
def test_raises_error_for_wrong_type_of_value_for_mandatory_key(self):
- keyspec = {'foo': str}
+ keyspec = {"foo": str}
runner = DummyStepRunner(keyspec=keyspec)
with self.assertRaises(vmdb.StepKeyWrongValueType):
- runner.get_values({'foo': 42})
+ runner.get_values({"foo": 42})
def test_returns_default_value_for_missing_optional_key(self):
- keyspec = {'foo': 'bar'}
+ keyspec = {"foo": "bar"}
runner = DummyStepRunner(keyspec=keyspec)
- self.assertEqual(runner.get_values({}), {'foo': 'bar'})
+ self.assertEqual(runner.get_values({}), {"foo": "bar"})
def test_returns_actual_value_for_optional_key(self):
- keyspec = {'foo': 'bar'}
+ keyspec = {"foo": "bar"}
runner = DummyStepRunner(keyspec=keyspec)
- self.assertEqual(runner.get_values({'foo': 'yo'}), {'foo': 'yo'})
+ self.assertEqual(runner.get_values({"foo": "yo"}), {"foo": "yo"})
def test_raises_error_for_wrong_type_of_value_for_optional_key(self):
- keyspec = {'foo': 'bar'}
+ keyspec = {"foo": "bar"}
runner = DummyStepRunner(keyspec=keyspec)
with self.assertRaises(vmdb.StepKeyWrongValueType):
- runner.get_values({'foo': 42})
+ runner.get_values({"foo": 42})
diff --git a/vmdb/tags.py b/vmdb/tags.py
index 7cbae09..818193f 100644
--- a/vmdb/tags.py
+++ b/vmdb/tags.py
@@ -17,16 +17,12 @@
class Tags:
-
def __init__(self):
self._tags = {}
self._tagnames = []
def __repr__(self): # pragma: no cover
- return repr({
- 'tags': self._tags,
- 'tagnames': self._tagnames,
- })
+ return repr({"tags": self._tags, "tagnames": self._tagnames})
def get_tags(self):
return list(self._tags.keys())
@@ -36,64 +32,64 @@ class Tags:
def get_dev(self, tag):
item = self._get(tag)
- return item['dev']
+ return item["dev"]
def get_builder_mount_point(self, tag):
item = self._get(tag)
- return item['builder_mount_point']
+ return item["builder_mount_point"]
def get_fstype(self, tag):
item = self._get(tag)
- return item['fstype']
+ return item["fstype"]
def get_target_mount_point(self, tag):
item = self._get(tag)
- return item['target_mount_point']
+ return item["target_mount_point"]
def is_cached(self, tag):
item = self._get(tag)
- return item.get('cached', False)
+ return item.get("cached", False)
def append(self, tag):
if tag in self._tags:
raise TagInUse(tag)
self._tagnames.append(tag)
self._tags[tag] = {
- 'dev': None,
- 'builder_mount_point': None,
- 'fstype': None,
- 'target_mount_point': None,
+ "dev": None,
+ "builder_mount_point": None,
+ "fstype": None,
+ "target_mount_point": None,
}
def set_dev(self, tag, dev):
item = self._get(tag)
- if item['dev'] is not None:
+ if item["dev"] is not None:
raise AlreadyHasDev(tag)
- item['dev'] = dev
+ item["dev"] = dev
def set_builder_mount_point(self, tag, mount_point, cached=False):
item = self._get(tag)
- if item['builder_mount_point'] is not None:
+ if item["builder_mount_point"] is not None:
raise AlreadyMounted(tag)
- item['builder_mount_point'] = mount_point
- item['cached'] = cached
+ item["builder_mount_point"] = mount_point
+ item["cached"] = cached
def set_fstype(self, tag, fstype):
item = self._get(tag)
- if item['fstype'] is not None:
+ if item["fstype"] is not None:
raise AlreadyHasFsType(tag)
- item['fstype'] = fstype
+ item["fstype"] = fstype
def set_target_mount_point(self, tag, target_mount_point):
item = self._get(tag)
- if item['target_mount_point'] is not None:
+ if item["target_mount_point"] is not None:
raise AlreadyHasTargetMountPoint(tag)
- item['target_mount_point'] = target_mount_point
+ item["target_mount_point"] = target_mount_point
def get_builder_from_target_mount_point(self, target_mount_point):
for item in self._tags.values():
- if item['target_mount_point'] == target_mount_point:
- return item['builder_mount_point']
+ if item["target_mount_point"] == target_mount_point:
+ return item["builder_mount_point"]
raise NeedBothMountPoints(target_mount_point)
def _get(self, tag):
@@ -104,43 +100,35 @@ class Tags:
class TagInUse(Exception):
-
def __init__(self, tag):
- super().__init__('Tag already used: {}'.format(tag))
+ super().__init__("Tag already used: {}".format(tag))
class UnknownTag(Exception):
-
def __init__(self, tag):
- super().__init__('Unknown tag: {}'.format(tag))
+ super().__init__("Unknown tag: {}".format(tag))
class AlreadyHasDev(Exception):
-
def __init__(self, tag):
- super().__init__('Already has device: {}'.format(tag))
+ super().__init__("Already has device: {}".format(tag))
class AlreadyMounted(Exception):
-
def __init__(self, tag):
- super().__init__('Already mounted tag: {}'.format(tag))
+ super().__init__("Already mounted tag: {}".format(tag))
class AlreadyHasFsType(Exception):
-
def __init__(self, tag):
- super().__init__('Already has filesytem type: {}'.format(tag))
+ super().__init__("Already has filesytem type: {}".format(tag))
class AlreadyHasTargetMountPoint(Exception):
-
def __init__(self, tag):
- super().__init__('Already has target mount point: {}'.format(tag))
+ super().__init__("Already has target mount point: {}".format(tag))
class NeedBothMountPoints(Exception):
-
def __init__(self, target_mp):
- super().__init__(
- 'Need both mount points set, target: {}'.format(target_mp))
+ super().__init__("Need both mount points set, target: {}".format(target_mp))
diff --git a/vmdb/tags_tests.py b/vmdb/tags_tests.py
index 29ca68e..fdcebf3 100644
--- a/vmdb/tags_tests.py
+++ b/vmdb/tags_tests.py
@@ -23,131 +23,127 @@ import vmdb
class TagsTests(unittest.TestCase):
-
def test_lists_no_tags_initially(self):
tags = vmdb.Tags()
self.assertEqual(tags.get_tags(), [])
def test_tells_if_tag_exists(self):
tags = vmdb.Tags()
- self.assertFalse(tags.has_tag('foo'))
- tags.append('foo')
- self.assertTrue(tags.has_tag('foo'))
- self.assertEqual(tags.get_tags(), ['foo'])
+ self.assertFalse(tags.has_tag("foo"))
+ tags.append("foo")
+ self.assertTrue(tags.has_tag("foo"))
+ self.assertEqual(tags.get_tags(), ["foo"])
def test_remembers_order(self):
tags = vmdb.Tags()
- tags.append('foo')
- tags.append('bar')
- self.assertTrue(tags.get_tags(), ['foo', 'bar'])
+ tags.append("foo")
+ tags.append("bar")
+ self.assertTrue(tags.get_tags(), ["foo", "bar"])
def test_get_dev_raises_error_for_unknown_tag(self):
tags = vmdb.Tags()
with self.assertRaises(vmdb.UnknownTag):
- tags.get_dev('does-not-exist')
+ tags.get_dev("does-not-exist")
def test_getting_builder_mount_point_raises_error_for_unknown_tag(self):
tags = vmdb.Tags()
with self.assertRaises(vmdb.UnknownTag):
- tags.get_builder_mount_point('does-not-exist')
+ tags.get_builder_mount_point("does-not-exist")
def test_raises_error_for_reused_tag(self):
tags = vmdb.Tags()
- tags.append('tag')
+ tags.append("tag")
with self.assertRaises(vmdb.TagInUse):
- tags.append('tag')
+ tags.append("tag")
def test_sets_dev(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_dev('first', '/dev/foo')
- self.assertEqual(tags.get_tags(), ['first'])
- self.assertEqual(tags.get_dev('first'), '/dev/foo')
- self.assertEqual(tags.get_builder_mount_point('first'), None)
+ tags.append("first")
+ tags.set_dev("first", "/dev/foo")
+ self.assertEqual(tags.get_tags(), ["first"])
+ self.assertEqual(tags.get_dev("first"), "/dev/foo")
+ self.assertEqual(tags.get_builder_mount_point("first"), None)
def test_adds_builder_mount_point(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_builder_mount_point('first', '/mnt/foo')
- self.assertEqual(tags.get_tags(), ['first'])
- self.assertEqual(tags.get_dev('first'), None)
- self.assertEqual(tags.get_builder_mount_point('first'), '/mnt/foo')
+ tags.append("first")
+ tags.set_builder_mount_point("first", "/mnt/foo")
+ self.assertEqual(tags.get_tags(), ["first"])
+ self.assertEqual(tags.get_dev("first"), None)
+ self.assertEqual(tags.get_builder_mount_point("first"), "/mnt/foo")
def test_builder_mount_point_is_uncached_by_default(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_builder_mount_point('first', '/mnt/foo')
- self.assertFalse(tags.is_cached('first'))
+ tags.append("first")
+ tags.set_builder_mount_point("first", "/mnt/foo")
+ self.assertFalse(tags.is_cached("first"))
def test_builder_mount_point_can_be_made_cached(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_builder_mount_point('first', '/mnt/foo', cached=True)
- self.assertTrue(tags.is_cached('first'))
+ tags.append("first")
+ tags.set_builder_mount_point("first", "/mnt/foo", cached=True)
+ self.assertTrue(tags.is_cached("first"))
def test_set_dev_raises_error_for_unknown_tag(self):
tags = vmdb.Tags()
with self.assertRaises(vmdb.UnknownTag):
- tags.set_dev('first', '/mnt/foo')
+ tags.set_dev("first", "/mnt/foo")
def test_set_builder_mount_point_raises_error_for_unknown_tag(self):
tags = vmdb.Tags()
with self.assertRaises(vmdb.UnknownTag):
- tags.set_builder_mount_point('first', '/mnt/foo')
+ tags.set_builder_mount_point("first", "/mnt/foo")
def test_set_builder_mount_point_raises_error_for_double_mount(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_builder_mount_point('first', '/mnt/foo')
+ tags.append("first")
+ tags.set_builder_mount_point("first", "/mnt/foo")
with self.assertRaises(vmdb.AlreadyMounted):
- tags.set_builder_mount_point('first', '/mnt/foo')
+ tags.set_builder_mount_point("first", "/mnt/foo")
def test_set_dev_raises_error_for_double_dev(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_dev('first', '/dev/foo')
+ tags.append("first")
+ tags.set_dev("first", "/dev/foo")
with self.assertRaises(vmdb.AlreadyHasDev):
- tags.set_dev('first', '/dev/foo')
+ tags.set_dev("first", "/dev/foo")
def test_set_fstype(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_fstype('first', 'ext4')
- self.assertEqual(tags.get_fstype('first'), 'ext4')
+ tags.append("first")
+ tags.set_fstype("first", "ext4")
+ self.assertEqual(tags.get_fstype("first"), "ext4")
def test_set_fstype_raises_error_for_double_fstype(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_fstype('first', 'ext3')
+ tags.append("first")
+ tags.set_fstype("first", "ext3")
with self.assertRaises(vmdb.AlreadyHasFsType):
- tags.set_fstype('first', 'ext4')
+ tags.set_fstype("first", "ext4")
def test_set_target_mount_point(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_target_mount_point('first', '/boot')
- self.assertEqual(tags.get_target_mount_point('first'), '/boot')
+ tags.append("first")
+ tags.set_target_mount_point("first", "/boot")
+ self.assertEqual(tags.get_target_mount_point("first"), "/boot")
def test_set_target_mount_point_raises_error_for_double_target_mount_point(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_target_mount_point('first', '/boot')
+ tags.append("first")
+ tags.set_target_mount_point("first", "/boot")
with self.assertRaises(vmdb.AlreadyHasTargetMountPoint):
- tags.set_target_mount_point('first', '/')
+ tags.set_target_mount_point("first", "/")
def test_raises_error_if_both_mount_points_not_set(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_target_mount_point('first', '/boot')
+ tags.append("first")
+ tags.set_target_mount_point("first", "/boot")
with self.assertRaises(vmdb.NeedBothMountPoints):
- tags.get_builder_from_target_mount_point('/')
+ tags.get_builder_from_target_mount_point("/")
def test_returns_builder_when_given_target_mount_point(self):
tags = vmdb.Tags()
- tags.append('first')
- tags.set_builder_mount_point('first', '/mnt/foo')
- tags.set_target_mount_point('first', '/boot')
- self.assertEqual(
- tags.get_builder_from_target_mount_point('/boot'),
- '/mnt/foo'
- )
+ tags.append("first")
+ tags.set_builder_mount_point("first", "/mnt/foo")
+ tags.set_target_mount_point("first", "/boot")
+ self.assertEqual(tags.get_builder_from_target_mount_point("/boot"), "/mnt/foo")
diff --git a/vmdb/unmount.py b/vmdb/unmount.py
index 8239f8c..2a9e5e9 100644
--- a/vmdb/unmount.py
+++ b/vmdb/unmount.py
@@ -30,7 +30,7 @@ import vmdb
def unmount(what, mounts=None, real_unmount=None):
- logging.debug('Unmounting {} and everything on top of it'.format(what))
+ logging.debug("Unmounting {} and everything on top of it".format(what))
if mounts is None: # pragma: no cover
mounts = _read_proc_mounts()
if real_unmount is None: # pragma: no cover
@@ -40,31 +40,28 @@ def unmount(what, mounts=None, real_unmount=None):
dirnames = _find_what_to_unmount(mounts, what)
for dirname in dirnames:
real_unmount(dirname)
- logging.debug('Finishd unmounting {}'.format(what))
+ logging.debug("Finishd unmounting {}".format(what))
def _read_proc_mounts(): # pragma: no cover
- with open('/proc/mounts') as f:
+ with open("/proc/mounts") as f:
return f.read()
def _real_unmount(what): # pragma: no cover
try:
- vmdb.runcmd(['umount', what])
+ vmdb.runcmd(["umount", what])
except cliapp.AppException:
- logging.info('unmount failed, but ignoring that')
+ logging.info("unmount failed, but ignoring that")
def _parse_proc_mounts(text):
- return [
- line.split()[:2]
- for line in text.splitlines()
- ]
+ return [line.split()[:2] for line in text.splitlines()]
def _find_what_to_unmount(mounts, what):
dirname = _find_mount_point(mounts, what)
- dirnameslash = dirname + '/'
+ dirnameslash = dirname + "/"
to_unmount = [
point
for dev, point in mounts
@@ -81,6 +78,5 @@ def _find_mount_point(mounts, what):
class NotMounted(Exception):
-
def __init__(self, what):
- super().__init__('Not mounted: {}'.format(what))
+ super().__init__("Not mounted: {}".format(what))
diff --git a/vmdb/unmount_tests.py b/vmdb/unmount_tests.py
index 1ea3662..58ae341 100644
--- a/vmdb/unmount_tests.py
+++ b/vmdb/unmount_tests.py
@@ -23,35 +23,30 @@ import vmdb
class UnmountTests(unittest.TestCase):
-
def setUp(self):
self.mounts = ProcMounts()
def unmount(self, what):
- vmdb.unmount(
- what,
- mounts=str(self.mounts),
- real_unmount=self.mounts.unmount)
+ vmdb.unmount(what, mounts=str(self.mounts), real_unmount=self.mounts.unmount)
def test_raises_error_if_not_mounted(self):
with self.assertRaises(vmdb.NotMounted):
- self.unmount('/foo')
+ self.unmount("/foo")
def test_unmounts_mounted_dir(self):
- self.mounts.mount('/dev/foo', '/foo')
- self.unmount('/foo')
- self.assertFalse(self.mounts.is_mounted('/foo'))
+ self.mounts.mount("/dev/foo", "/foo")
+ self.unmount("/foo")
+ self.assertFalse(self.mounts.is_mounted("/foo"))
def test_unmounts_mounted_dir_with_submounts(self):
- self.mounts.mount('/dev/foo', '/foo')
- self.mounts.mount('/dev/bar', '/foo/bar')
- self.unmount('/foo')
- self.assertFalse(self.mounts.is_mounted('/foo'))
- self.assertFalse(self.mounts.is_mounted('/foo/bar'))
+ self.mounts.mount("/dev/foo", "/foo")
+ self.mounts.mount("/dev/bar", "/foo/bar")
+ self.unmount("/foo")
+ self.assertFalse(self.mounts.is_mounted("/foo"))
+ self.assertFalse(self.mounts.is_mounted("/foo/bar"))
class ProcMounts:
-
def __init__(self):
self.mounts = []
@@ -62,17 +57,10 @@ class ProcMounts:
self.mounts.append((device, point))
def unmount(self, what):
- self.mounts = [
- mount
- for mount in self.mounts
- if what not in mount
- ]
+ self.mounts = [mount for mount in self.mounts if what not in mount]
def __str__(self):
- return ''.join(
- '{}\n'.format(self.mount_line(mount))
- for mount in self.mounts
- )
+ return "".join("{}\n".format(self.mount_line(mount)) for mount in self.mounts)
def mount_line(self, mount):
- return '{} {} fstype options 0 0'.format(mount[0], mount[1])
+ return "{} {} fstype options 0 0".format(mount[0], mount[1])