diff options
author | Lars Wirzenius <liw@liw.fi> | 2020-08-20 10:29:08 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2020-08-20 10:29:08 +0300 |
commit | ea64a6c256477c832fadb48fc84e807e9d378552 (patch) | |
tree | f5fd4cf610d97eb9a172451098bfd6eb81b6107f | |
parent | 1fe5cff98cc52f2d7ebe9fc269a6c6d2eea52125 (diff) | |
download | vmdb2-ea64a6c256477c832fadb48fc84e807e9d378552.tar.gz |
refactor: format all Python modules with black
36 files changed, 522 insertions, 728 deletions
diff --git a/vmdb/__init__.py b/vmdb/__init__.py index 5308c5a..b4a9582 100644 --- a/vmdb/__init__.py +++ b/vmdb/__init__.py @@ -28,13 +28,7 @@ from .step_list import ( StepKeyMissing, StepKeyWrongValueType, ) -from .runcmd import ( - runcmd, - runcmd_chroot, - set_verbose_progress, - progress, - error, -) +from .runcmd import runcmd, runcmd_chroot, set_verbose_progress, progress, error from .tags import ( Tags, UnknownTag, @@ -46,8 +40,5 @@ from .tags import ( NeedBothMountPoints, ) from .unmount import unmount, NotMounted -from .spec import ( - Spec, - expand_templates, -) +from .spec import Spec, expand_templates from .app import Vmdb2 diff --git a/vmdb/app.py b/vmdb/app.py index 9db656b..2376116 100644 --- a/vmdb/app.py +++ b/vmdb/app.py @@ -25,16 +25,14 @@ import vmdb class Vmdb2(cliapp.Application): - def add_settings(self): self.settings.string( - ['image'], - 'use existing image file/device FILE (use --output to create new file)', - metavar='FILE') + ["image"], + "use existing image file/device FILE (use --output to create new file)", + metavar="FILE", + ) - self.settings.boolean( - ['verbose', 'v'], - 'verbose output') + self.settings.boolean(["verbose", "v"], "verbose output") def setup(self): self.step_runners = vmdb.StepRunnerList() @@ -43,7 +41,7 @@ class Vmdb2(cliapp.Application): if len(args) != 1: sys.exit("No image specification was given on the command line.") - vmdb.set_verbose_progress(self.settings['verbose']) + vmdb.set_verbose_progress(self.settings["verbose"]) spec = self.load_spec_file(args[0]) state = vmdb.State() @@ -57,47 +55,47 @@ class Vmdb2(cliapp.Application): steps_taken, core_meltdown = self.run_steps(steps, state) if core_meltdown: - vmdb.progress('Something went wrong, cleaning up!') + vmdb.progress("Something went wrong, cleaning up!") self.run_teardowns(steps_taken, state) else: self.run_teardowns(steps_taken, state) - vmdb.progress('All went fine.') + vmdb.progress("All went fine.") if core_meltdown: - logging.error('An error occurred, exiting with non-zero exit code') + logging.error("An error occurred, exiting with non-zero exit code") sys.exit(1) def load_spec_file(self, filename): spec = vmdb.Spec() if filename == "-": - vmdb.progress('Load spec from stdin') + vmdb.progress("Load spec from stdin") spec.load_file(sys.stdin) else: - vmdb.progress('Load spec file {}'.format(filename)) + vmdb.progress("Load spec file {}".format(filename)) with open(filename) as f: spec.load_file(f) return spec def run_steps(self, steps, state): - return self.run_steps_helper( - steps, state, 'Running step: %r', 'run', False) + return self.run_steps_helper(steps, state, "Running step: %r", "run", False) def run_teardowns(self, steps, state): return self.run_steps_helper( - list(reversed(steps)), state, 'Running teardown: %r', 'teardown', True) + list(reversed(steps)), state, "Running teardown: %r", "teardown", True + ) def run_steps_helper(self, steps, state, msg, method_name, keep_going): core_meltdown = False steps_taken = [] - even_if_skipped = method_name + '_even_if_skipped' + even_if_skipped = method_name + "_even_if_skipped" for step in steps: try: logging.info(msg, step) steps_taken.append(step) runner = self.step_runners.find(step) if runner.skip(step, self.settings, state): - logging.info('Skipping as requested by unless') + logging.info("Skipping as requested by unless") method_names = [even_if_skipped] else: method_names = [method_name, even_if_skipped] @@ -110,10 +108,10 @@ class Vmdb2(cliapp.Application): values = runner.get_values(step) for method in methods: - logging.info('Calling %s', method) + logging.info("Calling %s", method) method(values, self.settings, state) except KeyError as e: - vmdb.error('Key error: %s' % str(e)) + vmdb.error("Key error: %s" % str(e)) vmdb.error(repr(e)) core_meltdown = True if not keep_going: diff --git a/vmdb/plugins/ansible_plugin.py b/vmdb/plugins/ansible_plugin.py index a223735..867a554 100644 --- a/vmdb/plugins/ansible_plugin.py +++ b/vmdb/plugins/ansible_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import tempfile @@ -26,50 +25,52 @@ import vmdb class AnsiblePlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(AnsibleStepRunner()) class AnsibleStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'ansible': str, - 'playbook': str, - } + return {"ansible": str, "playbook": str} def run(self, values, settings, state): - tag = values['ansible'] - playbook = values['playbook'] + tag = values["ansible"] + playbook = values["playbook"] mount_point = state.tags.get_builder_mount_point(tag) - rootfs_tarball = settings['rootfs-tarball'] + rootfs_tarball = settings["rootfs-tarball"] state.ansible_inventory = self.create_inventory(mount_point) vmdb.progress( - 'Created {} for Ansible inventory'.format(state.ansible_inventory)) + "Created {} for Ansible inventory".format(state.ansible_inventory) + ) vars_filename = self.create_vars(rootfs_tarball) - vmdb.progress( - 'Created {} for Ansible variables'.format(vars_filename)) + vmdb.progress("Created {} for Ansible variables".format(vars_filename)) env = dict(os.environ) - env['ANSIBLE_NOCOWS'] = '1' + env["ANSIBLE_NOCOWS"] = "1" vmdb.runcmd( - ['ansible-playbook', '-c', 'chroot', - '-i', state.ansible_inventory, - '-e', '@{}'.format(vars_filename), - playbook], - env=env) + [ + "ansible-playbook", + "-c", + "chroot", + "-i", + state.ansible_inventory, + "-e", + "@{}".format(vars_filename), + playbook, + ], + env=env, + ) def teardown(self, values, settings, state): - if hasattr(state, 'ansible_inventory'): - vmdb.progress('Removing {}'.format(state.ansible_inventory)) + if hasattr(state, "ansible_inventory"): + vmdb.progress("Removing {}".format(state.ansible_inventory)) os.remove(state.ansible_inventory) def create_inventory(self, chroot): fd, filename = tempfile.mkstemp() - os.write(fd, '[image]\n{}\n'.format(chroot).encode()) + os.write(fd, "[image]\n{}\n".format(chroot).encode()) os.close(fd) return filename diff --git a/vmdb/plugins/apt_plugin.py b/vmdb/plugins/apt_plugin.py index 4ebaf38..f2547c7 100644 --- a/vmdb/plugins/apt_plugin.py +++ b/vmdb/plugins/apt_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import cliapp @@ -25,65 +24,50 @@ import vmdb class AptPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(AptStepRunner()) class AptStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'apt': str, - 'packages': [], - 'tag': '', - 'fs-tag': '', - 'clean': True, - } + return {"apt": str, "packages": [], "tag": "", "fs-tag": "", "clean": True} def run(self, values, settings, state): - operation = values['apt'] - if operation != 'install': + operation = values["apt"] + if operation != "install": raise Exception('"apt" must always have value "install"') - packages = values['packages'] - tag = values.get('tag') or None + packages = values["packages"] + tag = values.get("tag") or None if tag is None: - tag = values['fs-tag'] + tag = values["fs-tag"] mount_point = state.tags.get_builder_mount_point(tag) if not self.got_eatmydata(state): - self.install_packages(mount_point, [], ['eatmydata']) + self.install_packages(mount_point, [], ["eatmydata"]) state.got_eatmydata = True - self.install_packages(mount_point, ['eatmydata'], packages) + self.install_packages(mount_point, ["eatmydata"], packages) - if values['clean']: + if values["clean"]: self.clean_cache(mount_point) def got_eatmydata(self, state): - return hasattr(state, 'got_eatmydata') and getattr(state, 'got_eatmydata') + return hasattr(state, "got_eatmydata") and getattr(state, "got_eatmydata") def install_packages(self, mount_point, argv_prefix, packages): env = os.environ.copy() - env['DEBIAN_FRONTEND'] = 'noninteractive' + env["DEBIAN_FRONTEND"] = "noninteractive" - vmdb.runcmd_chroot( - mount_point, - argv_prefix + - ['apt-get', 'update'], - env=env) + vmdb.runcmd_chroot(mount_point, argv_prefix + ["apt-get", "update"], env=env) vmdb.runcmd_chroot( mount_point, - argv_prefix + - ['apt-get', '-y', '--no-show-progress', 'install'] + packages, - env=env) + argv_prefix + ["apt-get", "-y", "--no-show-progress", "install"] + packages, + env=env, + ) def clean_cache(self, mount_point): env = os.environ.copy() - env['DEBIAN_FRONTEND'] = 'noninteractive' + env["DEBIAN_FRONTEND"] = "noninteractive" - vmdb.runcmd_chroot( - mount_point, - ['apt-get', 'clean'], - env=env) + vmdb.runcmd_chroot(mount_point, ["apt-get", "clean"], env=env) diff --git a/vmdb/plugins/cache_rootfs_plugin.py b/vmdb/plugins/cache_rootfs_plugin.py index eb4f75a..316de8b 100644 --- a/vmdb/plugins/cache_rootfs_plugin.py +++ b/vmdb/plugins/cache_rootfs_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import cliapp @@ -25,54 +24,53 @@ import vmdb class CacheRootFSPlugin(cliapp.Plugin): - def enable(self): self.app.settings.string( - ['rootfs-tarball'], - 'store rootfs cache tar archives in FILE', - metavar='FILE') + ["rootfs-tarball"], + "store rootfs cache tar archives in FILE", + metavar="FILE", + ) self.app.step_runners.add(MakeCacheStepRunner()) class MakeCacheStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'cache-rootfs': str, - 'options': '--one-file-system', - } + return {"cache-rootfs": str, "options": "--one-file-system"} def run(self, values, settings, state): - fs_tag = values['cache-rootfs'] + fs_tag = values["cache-rootfs"] rootdir = state.tags.get_builder_mount_point(fs_tag) - tar_path = settings['rootfs-tarball'] - opts = values['options'].split() + tar_path = settings["rootfs-tarball"] + opts = values["options"].split() if not tar_path: - raise Exception('--rootfs-tarball MUST be set') + raise Exception("--rootfs-tarball MUST be set") dirs = self._find_cacheable_mount_points(state.tags, rootdir) tags = state.tags for tag in tags.get_tags(): vmdb.progress( - 'tag {} mounted {} cached {}'.format( - tag, tags.get_builder_mount_point(tag), tags.is_cached(tag))) + "tag {} mounted {} cached {}".format( + tag, tags.get_builder_mount_point(tag), tags.is_cached(tag) + ) + ) - vmdb.progress('caching rootdir {}'.format(rootdir)) - vmdb.progress('caching relative {}'.format(dirs)) + vmdb.progress("caching rootdir {}".format(rootdir)) + vmdb.progress("caching relative {}".format(dirs)) if not os.path.exists(tar_path): - vmdb.runcmd( - ['tar'] + opts + ['-C', rootdir, '-caf', tar_path] + dirs) + vmdb.runcmd(["tar"] + opts + ["-C", rootdir, "-caf", tar_path] + dirs) def _find_cacheable_mount_points(self, tags, rootdir): - return list(sorted( - self._make_relative(rootdir, tags.get_builder_mount_point(tag)) - for tag in tags.get_tags() - if tags.is_cached(tag) - )) + return list( + sorted( + self._make_relative(rootdir, tags.get_builder_mount_point(tag)) + for tag in tags.get_tags() + if tags.is_cached(tag) + ) + ) def _make_relative(self, rootdir, dirname): - assert dirname == rootdir or dirname.startswith(rootdir + '/') + assert dirname == rootdir or dirname.startswith(rootdir + "/") if dirname == rootdir: - return '.' - return dirname[len(rootdir) + 1:] + return "." + return dirname[len(rootdir) + 1 :] diff --git a/vmdb/plugins/chroot_plugin.py b/vmdb/plugins/chroot_plugin.py index bded609..7aab1aa 100644 --- a/vmdb/plugins/chroot_plugin.py +++ b/vmdb/plugins/chroot_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import cliapp @@ -25,22 +24,17 @@ import vmdb class ChrootPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(ChrootStepRunner()) class ChrootStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'chroot': str, - 'shell': str, - } + return {"chroot": str, "shell": str} def run(self, values, settings, state): - fs_tag = values['chroot'] - shell = values['shell'] + fs_tag = values["chroot"] + shell = values["shell"] mount_point = state.tags.get_builder_mount_point(fs_tag) - vmdb.runcmd_chroot(mount_point, ['sh', '-ec', shell]) + vmdb.runcmd_chroot(mount_point, ["sh", "-ec", shell]) diff --git a/vmdb/plugins/copy_file_plugin.py b/vmdb/plugins/copy_file_plugin.py index b82497b..3c3d0e5 100644 --- a/vmdb/plugins/copy_file_plugin.py +++ b/vmdb/plugins/copy_file_plugin.py @@ -20,41 +20,35 @@ import vmdb import os import logging -class CopyFilePlugin(cliapp.Plugin): +class CopyFilePlugin(cliapp.Plugin): def enable(self): self.app.step_runners.add(CopyFileStepRunner()) class CopyFileStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'copy-file': str, - 'src': str, - 'perm': 0o644, - 'uid': 0, - 'gid': 0, - } + return {"copy-file": str, "src": str, "perm": 0o644, "uid": 0, "gid": 0} def run(self, values, settings, state): - root = state.tags.get_builder_from_target_mount_point('/') - newfile = values['copy-file'] - src = values['src'] - perm = values['perm'] - uid = values['uid'] - gid = values['gid'] + root = state.tags.get_builder_from_target_mount_point("/") + newfile = values["copy-file"] + src = values["src"] + perm = values["perm"] + uid = values["uid"] + gid = values["gid"] - filename = '/'.join([root,newfile]) + filename = "/".join([root, newfile]) logging.info( - 'Copying file %s to %s, uid %d, gid %d, perms %o' % ( - src, filename, uid, gid, perm)) + "Copying file %s to %s, uid %d, gid %d, perms %o" + % (src, filename, uid, gid, perm) + ) dirname = os.path.dirname(filename) os.makedirs(dirname, mode=0o511, exist_ok=True) - with open(src, 'rb') as inp: - with open(filename, 'wb') as output: + with open(src, "rb") as inp: + with open(filename, "wb") as output: contents = inp.read() output.write(contents) diff --git a/vmdb/plugins/create_dir_plugin.py b/vmdb/plugins/create_dir_plugin.py index 77fc72a..51f22a3 100644 --- a/vmdb/plugins/create_dir_plugin.py +++ b/vmdb/plugins/create_dir_plugin.py @@ -20,33 +20,27 @@ import vmdb import os import logging -class CreateDirPlugin(cliapp.Plugin): +class CreateDirPlugin(cliapp.Plugin): def enable(self): self.app.step_runners.add(CreateDirStepRunner()) class CreateDirStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'create-dir': str, - 'perm': 0o755, - 'uid': 0, - 'gid': 0, - } + return {"create-dir": str, "perm": 0o755, "uid": 0, "gid": 0} def run(self, values, settings, state): - root = state.tags.get_builder_from_target_mount_point('/') - newdir = values['create-dir'] - path = '/'.join([root, newdir]) - perm = values['perm'] - uid = values['uid'] - gid = values['gid'] + root = state.tags.get_builder_from_target_mount_point("/") + newdir = values["create-dir"] + path = "/".join([root, newdir]) + perm = values["perm"] + uid = values["uid"] + gid = values["gid"] logging.info( - 'Creating directory %s, uid %d, gid %d, perms %o' % ( - path, uid, gid, perm)) + "Creating directory %s, uid %d, gid %d, perms %o" % (path, uid, gid, perm) + ) os.makedirs(path, perm) os.chown(path, uid, gid) diff --git a/vmdb/plugins/create_file_plugin.py b/vmdb/plugins/create_file_plugin.py index 6d5621c..b332e64 100644 --- a/vmdb/plugins/create_file_plugin.py +++ b/vmdb/plugins/create_file_plugin.py @@ -22,37 +22,29 @@ import logging class CreateFilePlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(CreateFileStepRunner()) class CreateFileStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'create-file': str, - 'contents': str, - 'perm': 0o644, - 'uid': 0, - 'gid': 0, - } + return {"create-file": str, "contents": str, "perm": 0o644, "uid": 0, "gid": 0} def run(self, values, settings, state): - root = state.tags.get_builder_from_target_mount_point('/') - newfile = values['create-file'] - contents = values['contents'] - perm = values['perm'] - uid = values['uid'] - gid = values['gid'] + root = state.tags.get_builder_from_target_mount_point("/") + newfile = values["create-file"] + contents = values["contents"] + perm = values["perm"] + uid = values["uid"] + gid = values["gid"] - filename = '/'.join([root, newfile]) + filename = "/".join([root, newfile]) logging.info( - 'Creating file %s, uid %d, gid %d, perms %o' % ( - filename, uid, gid, perm)) + "Creating file %s, uid %d, gid %d, perms %o" % (filename, uid, gid, perm) + ) - fd = open(filename, 'w') + fd = open(filename, "w") fd.write(contents) fd.close diff --git a/vmdb/plugins/debootstrap_plugin.py b/vmdb/plugins/debootstrap_plugin.py index 2d93dea..a0db6e1 100644 --- a/vmdb/plugins/debootstrap_plugin.py +++ b/vmdb/plugins/debootstrap_plugin.py @@ -16,52 +16,53 @@ # =*= License: GPL-3+ =*= - import cliapp import vmdb class DebootstrapPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(DebootstrapStepRunner()) class DebootstrapStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): return { - 'debootstrap': str, - 'target': str, - 'mirror': str, - 'keyring': '', - 'variant': '-', + "debootstrap": str, + "target": str, + "mirror": str, + "keyring": "", + "variant": "-", } def run(self, values, settings, state): - suite = values['debootstrap'] - tag = values['target'] + suite = values["debootstrap"] + tag = values["target"] target = state.tags.get_builder_mount_point(tag) - mirror = values['mirror'] - keyring = values['keyring'] or None - variant = values['variant'] + mirror = values["mirror"] + keyring = values["keyring"] or None + variant = values["variant"] if not (suite and tag and target and mirror): - raise Exception('missing arg for debootstrap step') + raise Exception("missing arg for debootstrap step") if keyring: - vmdb.runcmd([ - 'debootstrap', - '--keyring', keyring, - '--variant', variant, - suite, target, mirror]) + vmdb.runcmd( + [ + "debootstrap", + "--keyring", + keyring, + "--variant", + variant, + suite, + target, + mirror, + ] + ) else: - vmdb.runcmd([ - 'debootstrap', - '--variant', variant, - suite, target, mirror]) + vmdb.runcmd(["debootstrap", "--variant", variant, suite, target, mirror]) def run_even_if_skipped(self, values, settings, state): - tag = values['target'] + tag = values["target"] target = state.tags.get_builder_mount_point(tag) - vmdb.runcmd_chroot(target, ['apt-get', 'update']) + vmdb.runcmd_chroot(target, ["apt-get", "update"]) diff --git a/vmdb/plugins/echo_plugin.py b/vmdb/plugins/echo_plugin.py index d7748d7..312a5d7 100644 --- a/vmdb/plugins/echo_plugin.py +++ b/vmdb/plugins/echo_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import logging import cliapp @@ -25,25 +24,20 @@ import vmdb class EchoPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(EchoStepRunner()) class EchoStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'echo': str, - 'teardown': '', - } + return {"echo": str, "teardown": ""} def run(self, values, settings, state): - text = values['echo'] - vmdb.progress('{}'.format(text)) + text = values["echo"] + vmdb.progress("{}".format(text)) def teardown(self, values, settings, state): - text = values['teardown'] + text = values["teardown"] if text: - vmdb.progress('{}'.format(text)) - logging.info('%s', text) + vmdb.progress("{}".format(text)) + logging.info("%s", text) diff --git a/vmdb/plugins/error_plugin.py b/vmdb/plugins/error_plugin.py index bed7319..f90d8c0 100644 --- a/vmdb/plugins/error_plugin.py +++ b/vmdb/plugins/error_plugin.py @@ -16,33 +16,27 @@ # =*= License: GPL-3+ =*= - import cliapp import vmdb class ErrorPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(ErrorStepRunner()) class ErrorStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'error': str, - 'teardown': str, - } + return {"error": str, "teardown": str} def run(self, values, settings, state): # We use vmdb.progress here to get output to go to stdout, # instead of stderr. We want that for tests. - vmdb.progress('{}'.format(values['error'])) - raise vmdb.StepError('an error occurred') + vmdb.progress("{}".format(values["error"])) + raise vmdb.StepError("an error occurred") def teardown(self, values, settings, state): # We use vmdb.progress here to get output to go to stdout, # instead of stderr. We want that for tests. - vmdb.progress('{}'.format(values['teardown'])) + vmdb.progress("{}".format(values["teardown"])) diff --git a/vmdb/plugins/fstab_plugin.py b/vmdb/plugins/fstab_plugin.py index 508ef07..974fc56 100644 --- a/vmdb/plugins/fstab_plugin.py +++ b/vmdb/plugins/fstab_plugin.py @@ -23,20 +23,16 @@ import vmdb class FstabPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(FstabStepRunner()) class FstabStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'fstab': str, - } + return {"fstab": str} def run(self, values, setting, state): - tag = values['fstab'] + tag = values["fstab"] chroot = state.tags.get_builder_mount_point(tag) filesystems = [] @@ -46,22 +42,23 @@ class FstabStepRunner(vmdb.StepRunnerInterface): mount_point = state.tags.get_target_mount_point(tag) if mount_point is not None: fstype = state.tags.get_fstype(tag) - output = vmdb.runcmd([ - 'blkid', '-c', '/dev/null', '-o', 'value', '-s', 'UUID', device]) + output = vmdb.runcmd( + ["blkid", "-c", "/dev/null", "-o", "value", "-s", "UUID", device] + ) if output: uuid = output.decode().strip() - filesystems.append({ - 'uuid': uuid, - 'mount_point': mount_point, - 'fstype': fstype, - }) + filesystems.append( + {"uuid": uuid, "mount_point": mount_point, "fstype": fstype} + ) else: raise Exception( - 'Unknown UUID for device {} (to be mounted on {})'.format( - device, mount_point)) + "Unknown UUID for device {} (to be mounted on {})".format( + device, mount_point + ) + ) - fstab_path = os.path.join(chroot, 'etc/fstab') + fstab_path = os.path.join(chroot, "etc/fstab") line = "UUID={uuid} {mount_point} {fstype} errors=remount-ro 0 1\n" - with open(fstab_path, 'w') as fstab: + with open(fstab_path, "w") as fstab: for entry in filesystems: fstab.write(line.format(**entry)) diff --git a/vmdb/plugins/kpartx_plugin.py b/vmdb/plugins/kpartx_plugin.py index 5d320ea..d1f4840 100644 --- a/vmdb/plugins/kpartx_plugin.py +++ b/vmdb/plugins/kpartx_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import stat @@ -26,34 +25,30 @@ import vmdb class KpartxPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(KpartxStepRunner()) class KpartxStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'kpartx': str, - } + return {"kpartx": str} def run(self, values, settings, state): - device = values['kpartx'] + device = values["kpartx"] tags = state.tags.get_tags() devs = self.kpartx(device) for tag, dev in zip(tags, devs): - vmdb.progress('remembering {} as {}'.format(dev, tag)) + vmdb.progress("remembering {} as {}".format(dev, tag)) state.tags.set_dev(tag, dev) def kpartx(self, device): - output = vmdb.runcmd(['kpartx', '-asv', device]).decode('UTF-8') + output = vmdb.runcmd(["kpartx", "-asv", device]).decode("UTF-8") for line in output.splitlines(): words = line.split() - if words[0] == 'add': + if words[0] == "add": name = words[2] - yield '/dev/mapper/{}'.format(name) + yield "/dev/mapper/{}".format(name) def teardown(self, values, settings, state): - device = values['kpartx'] - vmdb.runcmd(['kpartx', '-dsv', device]) + device = values["kpartx"] + vmdb.runcmd(["kpartx", "-dsv", device]) diff --git a/vmdb/plugins/luks_plugin.py b/vmdb/plugins/luks_plugin.py index 531d570..946893e 100644 --- a/vmdb/plugins/luks_plugin.py +++ b/vmdb/plugins/luks_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import logging import os import tempfile @@ -27,60 +26,64 @@ import vmdb class LuksPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(CryptsetupStepRunner()) class CryptsetupStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'cryptsetup': str, - 'tag': str, - 'key-file': '', - 'key-cmd': '', - } + return {"cryptsetup": str, "tag": str, "key-file": "", "key-cmd": ""} def run(self, values, settings, state): - underlying = values['cryptsetup'] - crypt_name = values['tag'] + underlying = values["cryptsetup"] + crypt_name = values["tag"] if not isinstance(underlying, str): - raise vmdb.NotString('cryptsetup', underlying) + raise vmdb.NotString("cryptsetup", underlying) if not isinstance(crypt_name, str): - raise vmdb.NotString('cryptsetup: tag', crypt_name) + raise vmdb.NotString("cryptsetup: tag", crypt_name) state.tmp_key_file = None - key_file = values['key-file'] or None - key_cmd = values['key-cmd'] or None + key_file = values["key-file"] or None + key_cmd = values["key-cmd"] or None if key_file is None and key_cmd is None: - raise Exception( - 'cryptsetup step MUST define one of key-file or key-cmd') + raise Exception("cryptsetup step MUST define one of key-file or key-cmd") if key_file is None: - output = vmdb.runcmd(['sh', '-ec', key_cmd]) - output = output.decode('UTF-8') + output = vmdb.runcmd(["sh", "-ec", key_cmd]) + output = output.decode("UTF-8") key = output.splitlines()[0] fd, key_file = tempfile.mkstemp() state.tmp_key_file = key_file os.close(fd) - open(key_file, 'w').write(key) + open(key_file, "w").write(key) dev = state.tags.get_dev(underlying) if dev is None: for t in state.tags.get_tags(): logging.debug( - 'tag %r dev %r mp %r', t, state.tags.get_dev(t), - state.tags.get_builder_mount_point(t)) + "tag %r dev %r mp %r", + t, + state.tags.get_dev(t), + state.tags.get_builder_mount_point(t), + ) assert 0 - vmdb.runcmd(['cryptsetup', '-q', 'luksFormat', dev, key_file]) + vmdb.runcmd(["cryptsetup", "-q", "luksFormat", dev, key_file]) vmdb.runcmd( - ['cryptsetup', 'open', '--type', 'luks', '--key-file', key_file, dev, - crypt_name]) - - crypt_dev = '/dev/mapper/{}'.format(crypt_name) + [ + "cryptsetup", + "open", + "--type", + "luks", + "--key-file", + key_file, + dev, + crypt_name, + ] + ) + + crypt_dev = "/dev/mapper/{}".format(crypt_name) assert os.path.exists(crypt_dev) state.tags.append(crypt_name) state.tags.set_dev(crypt_name, crypt_dev) @@ -90,7 +93,7 @@ class CryptsetupStepRunner(vmdb.StepRunnerInterface): if x is not None and os.path.exists(x): os.remove(x) - crypt_name = values['tag'] + crypt_name = values["tag"] - crypt_dev = '/dev/mapper/{}'.format(crypt_name) - vmdb.runcmd(['cryptsetup', 'close', crypt_dev]) + crypt_dev = "/dev/mapper/{}".format(crypt_name) + vmdb.runcmd(["cryptsetup", "close", crypt_dev]) diff --git a/vmdb/plugins/lvcreate_plugin.py b/vmdb/plugins/lvcreate_plugin.py index 4b135bc..1d3de33 100644 --- a/vmdb/plugins/lvcreate_plugin.py +++ b/vmdb/plugins/lvcreate_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import cliapp @@ -25,28 +24,22 @@ import vmdb class LvcreatePlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(LvcreateStepRunner()) class LvcreateStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'lvcreate': str, - 'name': str, - 'size': str, - } + return {"lvcreate": str, "name": str, "size": str} def run(self, values, settings, state): - vgname = values['lvcreate'] - lvname = values['name'] - size = values['size'] + vgname = values["lvcreate"] + lvname = values["name"] + size = values["size"] - vmdb.runcmd(['lvcreate', '--name', lvname, '--size', size, vgname]) + vmdb.runcmd(["lvcreate", "--name", lvname, "--size", size, vgname]) - lvdev = '/dev/{}/{}'.format(vgname, lvname) + lvdev = "/dev/{}/{}".format(vgname, lvname) assert os.path.exists(lvdev) state.tags.append(lvname) state.tags.set_dev(lvname, lvdev) diff --git a/vmdb/plugins/mkfs_plugin.py b/vmdb/plugins/mkfs_plugin.py index 967493f..17bf88e 100644 --- a/vmdb/plugins/mkfs_plugin.py +++ b/vmdb/plugins/mkfs_plugin.py @@ -16,48 +16,41 @@ # =*= License: GPL-3+ =*= - import cliapp import vmdb class MkfsPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(MkfsStepRunner()) class MkfsStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'mkfs': str, - 'partition': str, - 'label': '', - } + return {"mkfs": str, "partition": str, "label": ""} def run(self, values, settings, state): - fstype = values['mkfs'] - tag = values['partition'] + fstype = values["mkfs"] + tag = values["partition"] device = state.tags.get_dev(tag) if not isinstance(fstype, str): - raise vmdb.NotString('mkfs', fstype) + raise vmdb.NotString("mkfs", fstype) if not isinstance(tag, str): - raise vmdb.NotString('mkfs: tag', tag) + raise vmdb.NotString("mkfs: tag", tag) if not isinstance(device, str): - raise vmdb.NotString('mkfs: device (for tag)', device) + raise vmdb.NotString("mkfs: device (for tag)", device) - cmd = ['/sbin/mkfs', '-t', fstype] - label = values['label'] or None + cmd = ["/sbin/mkfs", "-t", fstype] + label = values["label"] or None if label: - if fstype == 'vfat': - cmd.append('-n') - elif fstype == 'f2fs': - cmd.append('-l') + if fstype == "vfat": + cmd.append("-n") + elif fstype == "f2fs": + cmd.append("-l") else: - cmd.append('-L') + cmd.append("-L") cmd.append(label) cmd.append(device) vmdb.runcmd(cmd) diff --git a/vmdb/plugins/mkimg_plugin.py b/vmdb/plugins/mkimg_plugin.py index 15467e1..75d00ed 100644 --- a/vmdb/plugins/mkimg_plugin.py +++ b/vmdb/plugins/mkimg_plugin.py @@ -16,40 +16,31 @@ # =*= License: GPL-3+ =*= - import cliapp import vmdb class MkimgPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(MkimgStepRunner()) - self.app.settings.bytesize( - ['size'], - 'size of output image', - default='1GiB') + self.app.settings.bytesize(["size"], "size of output image", default="1GiB") class MkimgStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'mkimg': str, - 'size': str, - } + return {"mkimg": str, "size": str} def run(self, values, settings, state): - filename = values['mkimg'] - size = values['size'] + filename = values["mkimg"] + size = values["size"] if not isinstance(filename, str): - raise vmdb.NotString('mkimg', filename) + raise vmdb.NotString("mkimg", filename) if not filename: - raise vmdb.IsEmptyString('mkimg', filename) + raise vmdb.IsEmptyString("mkimg", filename) if not isinstance(size, str): - raise vmdb.NotString('mkimg: size', size) + raise vmdb.NotString("mkimg: size", size) - vmdb.runcmd(['qemu-img', 'create', '-f', 'raw', filename, size]) + vmdb.runcmd(["qemu-img", "create", "-f", "raw", filename, size]) diff --git a/vmdb/plugins/mklabel_plugin.py b/vmdb/plugins/mklabel_plugin.py index 5b5fd5a..c455394 100644 --- a/vmdb/plugins/mklabel_plugin.py +++ b/vmdb/plugins/mklabel_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import stat @@ -26,20 +25,15 @@ import vmdb class MklabelPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(MklabelStepRunner()) class MklabelStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'mklabel': str, - 'device': str, - } + return {"mklabel": str, "device": str} def run(self, values, settings, state): - label_type = values['mklabel'] - device = values['device'] - vmdb.runcmd(['parted', '-s', device, 'mklabel', label_type]) + label_type = values["mklabel"] + device = values["device"] + vmdb.runcmd(["parted", "-s", device, "mklabel", label_type]) diff --git a/vmdb/plugins/mkpart_plugin.py b/vmdb/plugins/mkpart_plugin.py index 6b0a602..787bd05 100644 --- a/vmdb/plugins/mkpart_plugin.py +++ b/vmdb/plugins/mkpart_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import stat import time @@ -27,35 +26,33 @@ import vmdb class MkpartPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(MkpartStepRunner()) class MkpartStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): return { - 'mkpart': str, - 'device': str, - 'start': str, - 'end': str, - 'tag': '', - 'part-tag': '', - 'fs-type': 'ext2', + "mkpart": str, + "device": str, + "start": str, + "end": str, + "tag": "", + "part-tag": "", + "fs-type": "ext2", } def run(self, values, settings, state): - part_type = values['mkpart'] - device = values['device'] - start = values['start'] - end = values['end'] - tag = values['tag'] or values['part-tag'] or None - fs_type = values['fs-type'] + part_type = values["mkpart"] + device = values["device"] + start = values["start"] + end = values["end"] + tag = values["tag"] or values["part-tag"] or None + fs_type = values["fs-type"] device = os.path.realpath(device) orig = self.list_partitions(device) - vmdb.runcmd(['parted', '-s', device, 'mkpart', part_type, fs_type, start, end]) + vmdb.runcmd(["parted", "-s", device, "mkpart", part_type, fs_type, start, end]) new = self.list_partitions(device) diff = self.diff_partitions(orig, new) @@ -78,7 +75,7 @@ class MkpartStepRunner(vmdb.StepRunnerInterface): # tags. if self.is_block_dev(device): self.wait_for_file_to_exist(diff[0]) - vmdb.progress('remembering partition {} as {}'.format(diff[0], tag)) + vmdb.progress("remembering partition {} as {}".format(diff[0], tag)) state.tags.set_dev(tag, diff[0]) def is_block_dev(self, filename): @@ -86,24 +83,16 @@ class MkpartStepRunner(vmdb.StepRunnerInterface): return stat.S_ISBLK(st.st_mode) def list_partitions(self, device): - output = vmdb.runcmd(['parted', '-m', device, 'print']) - output = output.decode('UTF-8') - partitions = [ - line.split(':')[0] - for line in output.splitlines() - if ':' in line - ] + output = vmdb.runcmd(["parted", "-m", device, "print"]) + output = output.decode("UTF-8") + partitions = [line.split(":")[0] for line in output.splitlines() if ":" in line] return [ - word if word.startswith('/') else '{}{}'.format(device, word) + word if word.startswith("/") else "{}{}".format(device, word) for word in partitions ] def diff_partitions(self, old, new): - return [ - line - for line in new - if line not in old - ] + return [line for line in new if line not in old] def wait_for_file_to_exist(self, filename): while not os.path.exists(filename): @@ -116,14 +105,13 @@ class MkpartError(cliapp.AppException): class ExpectedNewPartition(MkpartError): - def __init__(self): - super().__init__('Expected a new partition to exist after mkpart') + super().__init__("Expected a new partition to exist after mkpart") class UnexpectedNewPartitions(MkpartError): - def __init__(self, diff): super().__init__( - 'Expected only one new partition to exist after mkpart, ' - 'but found {}'.format(' '.join(diff))) + "Expected only one new partition to exist after mkpart, " + "but found {}".format(" ".join(diff)) + ) diff --git a/vmdb/plugins/mount_plugin.py b/vmdb/plugins/mount_plugin.py index fc5ffeb..f50de11 100644 --- a/vmdb/plugins/mount_plugin.py +++ b/vmdb/plugins/mount_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import logging import os import tempfile @@ -27,19 +26,13 @@ import vmdb class MountPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(MountStepRunner()) class MountStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'mount': str, - 'dirname': '', - 'mount-on': '', - } + return {"mount": str, "dirname": "", "mount-on": ""} def run(self, values, settings, state): self.mount_rootfs(values, settings, state) @@ -48,36 +41,37 @@ class MountStepRunner(vmdb.StepRunnerInterface): self.unmount_rootfs(values, settings, state) def mount_rootfs(self, values, settings, state): - tag = values['mount'] - dirname = values['dirname'] or None - mount_on = values['mount-on'] or None + tag = values["mount"] + dirname = values["dirname"] or None + mount_on = values["mount-on"] or None device = state.tags.get_dev(tag) if dirname: if not mount_on: - raise Exception('no mount-on tag given') + raise Exception("no mount-on tag given") if not state.tags.has_tag(mount_on): - raise Exception('cannot find tag {}'.format(mount_on)) + raise Exception("cannot find tag {}".format(mount_on)) mount_point = os.path.join( - state.tags.get_builder_mount_point(mount_on), './' + dirname) + state.tags.get_builder_mount_point(mount_on), "./" + dirname + ) if not os.path.exists(mount_point): os.makedirs(mount_point) else: - dirname = '/' + dirname = "/" mount_point = tempfile.mkdtemp() - vmdb.runcmd(['mount', device, mount_point]) + vmdb.runcmd(["mount", device, mount_point]) state.tags.set_builder_mount_point(tag, mount_point, cached=True) state.tags.set_target_mount_point(tag, dirname) return mount_point def unmount_rootfs(self, values, settings, state): - tag = values['mount'] + tag = values["mount"] mount_point = state.tags.get_builder_mount_point(tag) if mount_point is None: return @@ -87,5 +81,5 @@ class MountStepRunner(vmdb.StepRunnerInterface): except vmdb.NotMounted as e: logging.warning(str(e)) - if not values['mount-on']: + if not values["mount-on"]: os.rmdir(mount_point) diff --git a/vmdb/plugins/qemudebootstrap_plugin.py b/vmdb/plugins/qemudebootstrap_plugin.py index fa97523..01184a3 100644 --- a/vmdb/plugins/qemudebootstrap_plugin.py +++ b/vmdb/plugins/qemudebootstrap_plugin.py @@ -16,62 +16,74 @@ # =*= License: GPL-3+ =*= - import cliapp import vmdb class QemuDebootstrapPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(QemuDebootstrapStepRunner()) class QemuDebootstrapStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): return { - 'qemu-debootstrap': str, - 'target': str, - 'mirror': str, - 'arch': str, - 'keyring': '', - 'variant': '-', - 'components': ['main'], + "qemu-debootstrap": str, + "target": str, + "mirror": str, + "arch": str, + "keyring": "", + "variant": "-", + "components": ["main"], } def run(self, values, settings, state): - suite = values['qemu-debootstrap'] - tag = values['target'] + suite = values["qemu-debootstrap"] + tag = values["target"] target = state.tags.get_builder_mount_point(tag) - mirror = values['mirror'] - keyring = values['keyring'] or None - variant = values['variant'] - arch = values['arch'] - components = values['components'] + mirror = values["mirror"] + keyring = values["keyring"] or None + variant = values["variant"] + arch = values["arch"] + components = values["components"] if not (suite and tag and target and mirror and arch): - raise Exception('missing arg for qemu-debootstrap step') + raise Exception("missing arg for qemu-debootstrap step") if keyring: vmdb.runcmd( - ['qemu-debootstrap', - '--keyring', keyring, - '--arch', arch, - '--variant', variant, - '--components', ','.join(components), suite, - target, - mirror]) + [ + "qemu-debootstrap", + "--keyring", + keyring, + "--arch", + arch, + "--variant", + variant, + "--components", + ",".join(components), + suite, + target, + mirror, + ] + ) else: vmdb.runcmd( - ['qemu-debootstrap', - '--arch', arch, - '--variant', variant, - '--components', ','.join(components), suite, - target, - mirror]) - vmdb.runcmd_chroot(target, ['apt-get', 'update']) + [ + "qemu-debootstrap", + "--arch", + arch, + "--variant", + variant, + "--components", + ",".join(components), + suite, + target, + mirror, + ] + ) + vmdb.runcmd_chroot(target, ["apt-get", "update"]) def run_even_if_skipped(self, values, settings, state): - tag = values['target'] + tag = values["target"] target = state.tags.get_builder_mount_point(tag) - vmdb.runcmd_chroot(target, ['apt-get', 'update']) + vmdb.runcmd_chroot(target, ["apt-get", "update"]) diff --git a/vmdb/plugins/shell_plugin.py b/vmdb/plugins/shell_plugin.py index cbc52ea..b9e516a 100644 --- a/vmdb/plugins/shell_plugin.py +++ b/vmdb/plugins/shell_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import cliapp @@ -25,23 +24,18 @@ import vmdb class ShellPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(ShellStepRunner()) class ShellStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'shell': str, - 'root-fs': str, - } + return {"shell": str, "root-fs": str} def run(self, step, settings, state): - shell = step['shell'] - fs_tag = step['root-fs'] + shell = step["shell"] + fs_tag = step["root-fs"] env = dict(os.environ) - env['ROOT'] = state.tags.get_builder_mount_point(fs_tag) - vmdb.runcmd(['sh', '-ec', shell], env=env) + env["ROOT"] = state.tags.get_builder_mount_point(fs_tag) + vmdb.runcmd(["sh", "-ec", shell], env=env) diff --git a/vmdb/plugins/unpack_rootfs_plugin.py b/vmdb/plugins/unpack_rootfs_plugin.py index 9143143..c8b7649 100644 --- a/vmdb/plugins/unpack_rootfs_plugin.py +++ b/vmdb/plugins/unpack_rootfs_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import cliapp @@ -25,30 +24,25 @@ import vmdb class UnpackRootFSPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(UnpackCacheStepRunner()) class UnpackCacheStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'unpack-rootfs': str, - } + return {"unpack-rootfs": str} def run(self, values, settings, state): - fs_tag = values['unpack-rootfs'] + fs_tag = values["unpack-rootfs"] rootdir = state.tags.get_builder_mount_point(fs_tag) - tar_path = settings['rootfs-tarball'] + tar_path = settings["rootfs-tarball"] if not tar_path: - raise Exception('--rootfs-tarball MUST be set') + raise Exception("--rootfs-tarball MUST be set") if os.path.exists(tar_path): - vmdb.runcmd( - ['tar', '-C', rootdir, '-xf', tar_path, '--numeric-owner']) + vmdb.runcmd(["tar", "-C", rootdir, "-xf", tar_path, "--numeric-owner"]) self.copy_resolv_conf(rootdir) state.rootfs_unpacked = True def copy_resolv_conf(self, rootdir): - filename = os.path.join(rootdir, 'etc', 'resolv.conf') - vmdb.runcmd(['cp', '/etc/resolv.conf', filename]) + filename = os.path.join(rootdir, "etc", "resolv.conf") + vmdb.runcmd(["cp", "/etc/resolv.conf", filename]) diff --git a/vmdb/plugins/vgcreate_plugin.py b/vmdb/plugins/vgcreate_plugin.py index d6b3c45..d979918 100644 --- a/vmdb/plugins/vgcreate_plugin.py +++ b/vmdb/plugins/vgcreate_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import os import cliapp @@ -25,36 +24,28 @@ import vmdb class VgcreatePlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(VgcreateStepRunner()) class VgcreateStepRunner(vmdb.StepRunnerInterface): - def get_key_spec(self): - return { - 'vgcreate': str, - 'physical': [], - } + return {"vgcreate": str, "physical": []} def run(self, values, settings, state): vgname = self.get_vg(values) physical = self.get_pv(values, state) for phys in physical: - vmdb.runcmd(['pvcreate', '-ff', '--yes', phys]) - vmdb.runcmd(['vgcreate', vgname] + physical) + vmdb.runcmd(["pvcreate", "-ff", "--yes", phys]) + vmdb.runcmd(["vgcreate", vgname] + physical) def teardown(self, values, settings, state): vgname = self.get_vg(values) - vmdb.runcmd(['vgchange', '-an', vgname]) + vmdb.runcmd(["vgchange", "-an", vgname]) def get_vg(self, values): - return values['vgcreate'] + return values["vgcreate"] def get_pv(self, values, state): - return [ - state.tags.get_dev(tag) - for tag in values['physical'] - ] + return [state.tags.get_dev(tag) for tag in values["physical"]] diff --git a/vmdb/plugins/virtualfs_plugin.py b/vmdb/plugins/virtualfs_plugin.py index 848d000..59996c1 100644 --- a/vmdb/plugins/virtualfs_plugin.py +++ b/vmdb/plugins/virtualfs_plugin.py @@ -16,7 +16,6 @@ # =*= License: GPL-3+ =*= - import logging import os @@ -26,7 +25,6 @@ import vmdb class VirtualFilesystemMountPlugin(cliapp.Plugin): - def enable(self): self.app.step_runners.add(VirtualFilesystemMountStepRunner()) @@ -34,22 +32,20 @@ class VirtualFilesystemMountPlugin(cliapp.Plugin): class VirtualFilesystemMountStepRunner(vmdb.StepRunnerInterface): virtuals = [ - ['none', '/proc', 'proc'], - ['none', '/dev', 'devtmpfs'], - ['none', '/dev/pts', 'devpts'], - ['none', '/dev/shm', 'tmpfs'], - ['none', '/run', 'tmpfs'], - ['none', '/run/lock', 'tmpfs'], - ['none', '/sys', 'sysfs'], + ["none", "/proc", "proc"], + ["none", "/dev", "devtmpfs"], + ["none", "/dev/pts", "devpts"], + ["none", "/dev/shm", "tmpfs"], + ["none", "/run", "tmpfs"], + ["none", "/run/lock", "tmpfs"], + ["none", "/sys", "sysfs"], ] def get_key_spec(self): - return { - 'virtual-filesystems': str, - } + return {"virtual-filesystems": str} def run(self, values, settings, state): - fstag = values['virtual-filesystems'] + fstag = values["virtual-filesystems"] mount_point = state.tags.get_builder_mount_point(fstag) self.mount_virtuals(mount_point, state) @@ -57,23 +53,23 @@ class VirtualFilesystemMountStepRunner(vmdb.StepRunnerInterface): self.unmount_virtuals(state) def mount_virtuals(self, rootfs, state): - if not hasattr(state, 'virtuals'): + if not hasattr(state, "virtuals"): state.virtuals = [] for device, mount_point, fstype in self.virtuals: - path = os.path.join(rootfs, './' + mount_point) + path = os.path.join(rootfs, "./" + mount_point) if not os.path.exists(path): os.mkdir(path) - vmdb.runcmd(['mount', '-t', fstype, device, path]) + vmdb.runcmd(["mount", "-t", fstype, device, path]) state.virtuals.append(path) - logging.debug('mounted virtuals: %r', state.virtuals) + logging.debug("mounted virtuals: %r", state.virtuals) def unmount_virtuals(self, state): - logging.debug('unmounting virtuals: %r', state.virtuals) + logging.debug("unmounting virtuals: %r", state.virtuals) for mount_point in reversed(state.virtuals): try: vmdb.unmount(mount_point) except vmdb.NotMounted as e: logging.warning(str(e)) except cliapp.AppException: - vmdb.warning('Something went wrong while unmounting. Ignoring.') + vmdb.warning("Something went wrong while unmounting. Ignoring.") diff --git a/vmdb/runcmd.py b/vmdb/runcmd.py index 7cd6406..985a717 100644 --- a/vmdb/runcmd.py +++ b/vmdb/runcmd.py @@ -34,50 +34,50 @@ def set_verbose_progress(verbose): def error(msg): logging.error(msg, exc_info=True) - sys.stderr.write('ERROR: {}\n'.format(msg)) + sys.stderr.write("ERROR: {}\n".format(msg)) sys.stderr.flush() def progress(msg): logging.info(msg) if _verbose: - sys.stdout.write('{}\n'.format(msg)) + sys.stdout.write("{}\n".format(msg)) sys.stdout.flush() def runcmd(argv, **kwargs): - progress('Exec: %r' % (argv,)) - env = kwargs.get('env', os.environ.copy()) - env['LC_ALL'] = 'C' - kwargs['env'] = env - kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE) - kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE) + progress("Exec: %r" % (argv,)) + env = kwargs.get("env", os.environ.copy()) + env["LC_ALL"] = "C" + kwargs["env"] = env + kwargs["stdout"] = kwargs.get("stdout", subprocess.PIPE) + kwargs["stderr"] = kwargs.get("stderr", subprocess.PIPE) p = subprocess.Popen(argv, **kwargs) out, err = p.communicate() - logging.debug('STDOUT: %s', out.decode('UTF8')) - logging.debug('STDERR: %s', err.decode('UTF8')) + logging.debug("STDOUT: %s", out.decode("UTF8")) + logging.debug("STDERR: %s", err.decode("UTF8")) if p.returncode != 0: - raise cliapp.AppException('Command failed: {}'.format(p.returncode)) + raise cliapp.AppException("Command failed: {}".format(p.returncode)) return out def runcmd_chroot(chroot, argv, *argvs, **kwargs): - full_argv = ['chroot', chroot] + argv + full_argv = ["chroot", chroot] + argv return runcmd(full_argv, *argvs, **kwargs) def _procdir(chroot): - proc = os.path.join(chroot, 'proc') + proc = os.path.join(chroot, "proc") if not os.path.exists(proc): os.mkdir(proc, mode=Oo755) return proc def _log_stdout(data): - logging.debug('STDOUT: %r', data) + logging.debug("STDOUT: %r", data) return data def _log_stderr(data): - logging.debug('STDERR: %r', data) + logging.debug("STDERR: %r", data) return data diff --git a/vmdb/spec.py b/vmdb/spec.py index 98f5c56..cfcba9c 100644 --- a/vmdb/spec.py +++ b/vmdb/spec.py @@ -21,7 +21,6 @@ import yaml class Spec: - def __init__(self): self._dict = None @@ -32,7 +31,7 @@ class Spec: return dict(self._dict) def get_steps(self, params): - return expand_templates(self._dict['steps'], params) + return expand_templates(self._dict["steps"], params) def expand_templates(value, params): @@ -44,9 +43,6 @@ def expand_templates(value, params): elif isinstance(value, list): return [expand_templates(x, params) for x in value] elif isinstance(value, dict): - return { - key: expand_templates(value[key], params) - for key in value - } + return {key: expand_templates(value[key], params) for key in value} else: - assert 0, 'Unknown value type: {!r}'.format(value) + assert 0, "Unknown value type: {!r}".format(value) diff --git a/vmdb/spec_tests.py b/vmdb/spec_tests.py index 4b23238..71ef965 100644 --- a/vmdb/spec_tests.py +++ b/vmdb/spec_tests.py @@ -28,16 +28,16 @@ import vmdb class SpecTests(unittest.TestCase): - spec_yaml = ''' + spec_yaml = """ steps: - step: foo arg: "{{ var1 }}" number: 0711 - step: bar - ''' + """ def setUp(self): - self.filename = write_temp_file(bytes(self.spec_yaml, 'ascii')) + self.filename = write_temp_file(bytes(self.spec_yaml, "ascii")) self.spec = vmdb.Spec() def tearDown(self): @@ -50,52 +50,33 @@ class SpecTests(unittest.TestCase): def test_expands_templates(self): self.spec.load_file(open(self.filename)) - params = { - 'var1': 'value1', - } + params = {"var1": "value1"} steps = self.spec.get_steps(params) self.assertEqual( - steps, - [ - { - 'step': 'foo', - 'arg': 'value1', - 'number': 0o711, - }, - { - 'step': 'bar', - }, - ] + steps, [{"step": "foo", "arg": "value1", "number": 0o711}, {"step": "bar"}] ) -class ExpandTemplatesTests(unittest.TestCase): +class ExpandTemplatesTests(unittest.TestCase): def test_raises_assert_if_given_incomprehensible_value(self): with self.assertRaises(AssertionError): vmdb.expand_templates(None, {}) def test_returns_same_given_string_without_template(self): - self.assertEqual(vmdb.expand_templates('foo', {}), 'foo') + self.assertEqual(vmdb.expand_templates("foo", {}), "foo") def test_expands_simple_string_template(self): - params = { - 'foo': 'bar', - } - self.assertEqual(vmdb.expand_templates('{{ foo }}', params), 'bar') + params = {"foo": "bar"} + self.assertEqual(vmdb.expand_templates("{{ foo }}", params), "bar") def test_expands_list_of_templates(self): - params = { - 'foo': 'bar', - } - self.assertEqual(vmdb.expand_templates(['{{ foo }}'], params), ['bar']) + params = {"foo": "bar"} + self.assertEqual(vmdb.expand_templates(["{{ foo }}"], params), ["bar"]) def test_expands_dict_of_templates(self): - params = { - 'foo': 'bar', - } + params = {"foo": "bar"} self.assertEqual( - vmdb.expand_templates({'key': '{{ foo }}'}, params), - {'key': 'bar'} + vmdb.expand_templates({"key": "{{ foo }}"}, params), {"key": "bar"} ) diff --git a/vmdb/state.py b/vmdb/state.py index e6b74d2..9ae046e 100644 --- a/vmdb/state.py +++ b/vmdb/state.py @@ -17,14 +17,11 @@ class State: - def __init__(self): self._attrs = {} # make sure this attribute exists self._attrs = self.as_dict() def as_dict(self): return { - key: repr(getattr(self, key)) - for key in dir(self) - if not key in self._attrs + key: repr(getattr(self, key)) for key in dir(self) if not key in self._attrs } diff --git a/vmdb/step_list.py b/vmdb/step_list.py index c8d0ec8..1cc0605 100644 --- a/vmdb/step_list.py +++ b/vmdb/step_list.py @@ -23,7 +23,6 @@ import cliapp class StepRunnerInterface: # pragma: no cover - def get_key_spec(self): raise NotImplementedError() @@ -71,7 +70,7 @@ class StepRunnerInterface: # pragma: no cover # a list of such names. If all variables have a value that # evaluates as truth, the step is skipped. - value = step_spec.get('unless', None) + value = step_spec.get("unless", None) if value is None: return False @@ -81,7 +80,6 @@ class StepRunnerInterface: # pragma: no cover class StepRunnerList: - def __init__(self): self._runners = [] @@ -109,42 +107,37 @@ class StepRunnerList: class StepError(cliapp.AppException): - def __init__(self, msg): logging.error(msg) super().__init__(msg) class StepKeyMissing(StepError): - def __init__(self, key): - super().__init__('Step is missing key {}'.format(key)) + super().__init__("Step is missing key {}".format(key)) class StepKeyWrongValueType(StepError): - def __init__(self, key, wanted, actual): super().__init__( - 'Step key {} has value {!r}, expected {!r}'.format( - key, wanted, actual)) + "Step key {} has value {!r}, expected {!r}".format(key, wanted, actual) + ) class NoMatchingRunner(StepError): - def __init__(self, keys): super().__init__( - 'No runner implements step with keys {}'.format(', '.join(keys))) + "No runner implements step with keys {}".format(", ".join(keys)) + ) class NotString(StepError): # pragma: no cover - def __init__(self, name, actual): - msg = '%s: value must be string, got %r' % (name, actual) + msg = "%s: value must be string, got %r" % (name, actual) super().__init__(msg) class IsEmptyString(StepError): # pragma: no cover - def __init__(self, name, actual): - msg = '%s: value must not be an empty string, got %r' % (name, actual) + msg = "%s: value must not be an empty string, got %r" % (name, actual) super().__init__(msg) diff --git a/vmdb/step_list_tests.py b/vmdb/step_list_tests.py index 13836a5..5c4d080 100644 --- a/vmdb/step_list_tests.py +++ b/vmdb/step_list_tests.py @@ -22,7 +22,6 @@ import vmdb class StepRunnerListTests(unittest.TestCase): - def test_is_empty_initially(self): steps = vmdb.StepRunnerList() self.assertEqual(len(steps), 0) @@ -35,39 +34,30 @@ class StepRunnerListTests(unittest.TestCase): def test_finds_correct_runner(self): steps = vmdb.StepRunnerList() - keyspec = { - 'foo': str, - 'bar': str, - } + keyspec = {"foo": str, "bar": str} runner = DummyStepRunner(keyspec=keyspec) steps.add(runner) - found = steps.find({'foo': 'foo', 'bar': 'bar'}) + found = steps.find({"foo": "foo", "bar": "bar"}) self.assertEqual(runner, found) def test_raises_error_if_runner_not_found(self): steps = vmdb.StepRunnerList() - keyspec = { - 'foo': str, - 'bar': str, - } + keyspec = {"foo": str, "bar": str} runner = DummyStepRunner(keyspec=keyspec) steps.add(runner) with self.assertRaises(vmdb.NoMatchingRunner): - steps.find({'foo': 'foo'}) + steps.find({"foo": "foo"}) def test_raises_error_if_wrong_step_key_values(self): steps = vmdb.StepRunnerList() - keyspec = { - 'foo': str, - } + keyspec = {"foo": str} runner = DummyStepRunner(keyspec=keyspec) steps.add(runner) with self.assertRaises(vmdb.StepKeyWrongValueType): - steps.find({'foo': 42}) + steps.find({"foo": 42}) class DummyStepRunner(vmdb.StepRunnerInterface): - def __init__(self, keyspec=None): self.keyspec = keyspec @@ -79,36 +69,35 @@ class DummyStepRunner(vmdb.StepRunnerInterface): class StepRunnerGetKeyValuesTests(unittest.TestCase): - def test_returns_values_from_step_for_mandatory_keys(self): - keyspec = {'foo': str} + keyspec = {"foo": str} runner = DummyStepRunner(keyspec=keyspec) - self.assertEqual(runner.get_values({'foo': 'bar'}), {'foo': 'bar'}) + self.assertEqual(runner.get_values({"foo": "bar"}), {"foo": "bar"}) def test_raises_error_for_missing_mandatory_key(self): - keyspec = {'foo': str} + keyspec = {"foo": str} runner = DummyStepRunner(keyspec=keyspec) with self.assertRaises(vmdb.StepKeyMissing): runner.get_values({}) def test_raises_error_for_wrong_type_of_value_for_mandatory_key(self): - keyspec = {'foo': str} + keyspec = {"foo": str} runner = DummyStepRunner(keyspec=keyspec) with self.assertRaises(vmdb.StepKeyWrongValueType): - runner.get_values({'foo': 42}) + runner.get_values({"foo": 42}) def test_returns_default_value_for_missing_optional_key(self): - keyspec = {'foo': 'bar'} + keyspec = {"foo": "bar"} runner = DummyStepRunner(keyspec=keyspec) - self.assertEqual(runner.get_values({}), {'foo': 'bar'}) + self.assertEqual(runner.get_values({}), {"foo": "bar"}) def test_returns_actual_value_for_optional_key(self): - keyspec = {'foo': 'bar'} + keyspec = {"foo": "bar"} runner = DummyStepRunner(keyspec=keyspec) - self.assertEqual(runner.get_values({'foo': 'yo'}), {'foo': 'yo'}) + self.assertEqual(runner.get_values({"foo": "yo"}), {"foo": "yo"}) def test_raises_error_for_wrong_type_of_value_for_optional_key(self): - keyspec = {'foo': 'bar'} + keyspec = {"foo": "bar"} runner = DummyStepRunner(keyspec=keyspec) with self.assertRaises(vmdb.StepKeyWrongValueType): - runner.get_values({'foo': 42}) + runner.get_values({"foo": 42}) diff --git a/vmdb/tags.py b/vmdb/tags.py index 7cbae09..818193f 100644 --- a/vmdb/tags.py +++ b/vmdb/tags.py @@ -17,16 +17,12 @@ class Tags: - def __init__(self): self._tags = {} self._tagnames = [] def __repr__(self): # pragma: no cover - return repr({ - 'tags': self._tags, - 'tagnames': self._tagnames, - }) + return repr({"tags": self._tags, "tagnames": self._tagnames}) def get_tags(self): return list(self._tags.keys()) @@ -36,64 +32,64 @@ class Tags: def get_dev(self, tag): item = self._get(tag) - return item['dev'] + return item["dev"] def get_builder_mount_point(self, tag): item = self._get(tag) - return item['builder_mount_point'] + return item["builder_mount_point"] def get_fstype(self, tag): item = self._get(tag) - return item['fstype'] + return item["fstype"] def get_target_mount_point(self, tag): item = self._get(tag) - return item['target_mount_point'] + return item["target_mount_point"] def is_cached(self, tag): item = self._get(tag) - return item.get('cached', False) + return item.get("cached", False) def append(self, tag): if tag in self._tags: raise TagInUse(tag) self._tagnames.append(tag) self._tags[tag] = { - 'dev': None, - 'builder_mount_point': None, - 'fstype': None, - 'target_mount_point': None, + "dev": None, + "builder_mount_point": None, + "fstype": None, + "target_mount_point": None, } def set_dev(self, tag, dev): item = self._get(tag) - if item['dev'] is not None: + if item["dev"] is not None: raise AlreadyHasDev(tag) - item['dev'] = dev + item["dev"] = dev def set_builder_mount_point(self, tag, mount_point, cached=False): item = self._get(tag) - if item['builder_mount_point'] is not None: + if item["builder_mount_point"] is not None: raise AlreadyMounted(tag) - item['builder_mount_point'] = mount_point - item['cached'] = cached + item["builder_mount_point"] = mount_point + item["cached"] = cached def set_fstype(self, tag, fstype): item = self._get(tag) - if item['fstype'] is not None: + if item["fstype"] is not None: raise AlreadyHasFsType(tag) - item['fstype'] = fstype + item["fstype"] = fstype def set_target_mount_point(self, tag, target_mount_point): item = self._get(tag) - if item['target_mount_point'] is not None: + if item["target_mount_point"] is not None: raise AlreadyHasTargetMountPoint(tag) - item['target_mount_point'] = target_mount_point + item["target_mount_point"] = target_mount_point def get_builder_from_target_mount_point(self, target_mount_point): for item in self._tags.values(): - if item['target_mount_point'] == target_mount_point: - return item['builder_mount_point'] + if item["target_mount_point"] == target_mount_point: + return item["builder_mount_point"] raise NeedBothMountPoints(target_mount_point) def _get(self, tag): @@ -104,43 +100,35 @@ class Tags: class TagInUse(Exception): - def __init__(self, tag): - super().__init__('Tag already used: {}'.format(tag)) + super().__init__("Tag already used: {}".format(tag)) class UnknownTag(Exception): - def __init__(self, tag): - super().__init__('Unknown tag: {}'.format(tag)) + super().__init__("Unknown tag: {}".format(tag)) class AlreadyHasDev(Exception): - def __init__(self, tag): - super().__init__('Already has device: {}'.format(tag)) + super().__init__("Already has device: {}".format(tag)) class AlreadyMounted(Exception): - def __init__(self, tag): - super().__init__('Already mounted tag: {}'.format(tag)) + super().__init__("Already mounted tag: {}".format(tag)) class AlreadyHasFsType(Exception): - def __init__(self, tag): - super().__init__('Already has filesytem type: {}'.format(tag)) + super().__init__("Already has filesytem type: {}".format(tag)) class AlreadyHasTargetMountPoint(Exception): - def __init__(self, tag): - super().__init__('Already has target mount point: {}'.format(tag)) + super().__init__("Already has target mount point: {}".format(tag)) class NeedBothMountPoints(Exception): - def __init__(self, target_mp): - super().__init__( - 'Need both mount points set, target: {}'.format(target_mp)) + super().__init__("Need both mount points set, target: {}".format(target_mp)) diff --git a/vmdb/tags_tests.py b/vmdb/tags_tests.py index 29ca68e..fdcebf3 100644 --- a/vmdb/tags_tests.py +++ b/vmdb/tags_tests.py @@ -23,131 +23,127 @@ import vmdb class TagsTests(unittest.TestCase): - def test_lists_no_tags_initially(self): tags = vmdb.Tags() self.assertEqual(tags.get_tags(), []) def test_tells_if_tag_exists(self): tags = vmdb.Tags() - self.assertFalse(tags.has_tag('foo')) - tags.append('foo') - self.assertTrue(tags.has_tag('foo')) - self.assertEqual(tags.get_tags(), ['foo']) + self.assertFalse(tags.has_tag("foo")) + tags.append("foo") + self.assertTrue(tags.has_tag("foo")) + self.assertEqual(tags.get_tags(), ["foo"]) def test_remembers_order(self): tags = vmdb.Tags() - tags.append('foo') - tags.append('bar') - self.assertTrue(tags.get_tags(), ['foo', 'bar']) + tags.append("foo") + tags.append("bar") + self.assertTrue(tags.get_tags(), ["foo", "bar"]) def test_get_dev_raises_error_for_unknown_tag(self): tags = vmdb.Tags() with self.assertRaises(vmdb.UnknownTag): - tags.get_dev('does-not-exist') + tags.get_dev("does-not-exist") def test_getting_builder_mount_point_raises_error_for_unknown_tag(self): tags = vmdb.Tags() with self.assertRaises(vmdb.UnknownTag): - tags.get_builder_mount_point('does-not-exist') + tags.get_builder_mount_point("does-not-exist") def test_raises_error_for_reused_tag(self): tags = vmdb.Tags() - tags.append('tag') + tags.append("tag") with self.assertRaises(vmdb.TagInUse): - tags.append('tag') + tags.append("tag") def test_sets_dev(self): tags = vmdb.Tags() - tags.append('first') - tags.set_dev('first', '/dev/foo') - self.assertEqual(tags.get_tags(), ['first']) - self.assertEqual(tags.get_dev('first'), '/dev/foo') - self.assertEqual(tags.get_builder_mount_point('first'), None) + tags.append("first") + tags.set_dev("first", "/dev/foo") + self.assertEqual(tags.get_tags(), ["first"]) + self.assertEqual(tags.get_dev("first"), "/dev/foo") + self.assertEqual(tags.get_builder_mount_point("first"), None) def test_adds_builder_mount_point(self): tags = vmdb.Tags() - tags.append('first') - tags.set_builder_mount_point('first', '/mnt/foo') - self.assertEqual(tags.get_tags(), ['first']) - self.assertEqual(tags.get_dev('first'), None) - self.assertEqual(tags.get_builder_mount_point('first'), '/mnt/foo') + tags.append("first") + tags.set_builder_mount_point("first", "/mnt/foo") + self.assertEqual(tags.get_tags(), ["first"]) + self.assertEqual(tags.get_dev("first"), None) + self.assertEqual(tags.get_builder_mount_point("first"), "/mnt/foo") def test_builder_mount_point_is_uncached_by_default(self): tags = vmdb.Tags() - tags.append('first') - tags.set_builder_mount_point('first', '/mnt/foo') - self.assertFalse(tags.is_cached('first')) + tags.append("first") + tags.set_builder_mount_point("first", "/mnt/foo") + self.assertFalse(tags.is_cached("first")) def test_builder_mount_point_can_be_made_cached(self): tags = vmdb.Tags() - tags.append('first') - tags.set_builder_mount_point('first', '/mnt/foo', cached=True) - self.assertTrue(tags.is_cached('first')) + tags.append("first") + tags.set_builder_mount_point("first", "/mnt/foo", cached=True) + self.assertTrue(tags.is_cached("first")) def test_set_dev_raises_error_for_unknown_tag(self): tags = vmdb.Tags() with self.assertRaises(vmdb.UnknownTag): - tags.set_dev('first', '/mnt/foo') + tags.set_dev("first", "/mnt/foo") def test_set_builder_mount_point_raises_error_for_unknown_tag(self): tags = vmdb.Tags() with self.assertRaises(vmdb.UnknownTag): - tags.set_builder_mount_point('first', '/mnt/foo') + tags.set_builder_mount_point("first", "/mnt/foo") def test_set_builder_mount_point_raises_error_for_double_mount(self): tags = vmdb.Tags() - tags.append('first') - tags.set_builder_mount_point('first', '/mnt/foo') + tags.append("first") + tags.set_builder_mount_point("first", "/mnt/foo") with self.assertRaises(vmdb.AlreadyMounted): - tags.set_builder_mount_point('first', '/mnt/foo') + tags.set_builder_mount_point("first", "/mnt/foo") def test_set_dev_raises_error_for_double_dev(self): tags = vmdb.Tags() - tags.append('first') - tags.set_dev('first', '/dev/foo') + tags.append("first") + tags.set_dev("first", "/dev/foo") with self.assertRaises(vmdb.AlreadyHasDev): - tags.set_dev('first', '/dev/foo') + tags.set_dev("first", "/dev/foo") def test_set_fstype(self): tags = vmdb.Tags() - tags.append('first') - tags.set_fstype('first', 'ext4') - self.assertEqual(tags.get_fstype('first'), 'ext4') + tags.append("first") + tags.set_fstype("first", "ext4") + self.assertEqual(tags.get_fstype("first"), "ext4") def test_set_fstype_raises_error_for_double_fstype(self): tags = vmdb.Tags() - tags.append('first') - tags.set_fstype('first', 'ext3') + tags.append("first") + tags.set_fstype("first", "ext3") with self.assertRaises(vmdb.AlreadyHasFsType): - tags.set_fstype('first', 'ext4') + tags.set_fstype("first", "ext4") def test_set_target_mount_point(self): tags = vmdb.Tags() - tags.append('first') - tags.set_target_mount_point('first', '/boot') - self.assertEqual(tags.get_target_mount_point('first'), '/boot') + tags.append("first") + tags.set_target_mount_point("first", "/boot") + self.assertEqual(tags.get_target_mount_point("first"), "/boot") def test_set_target_mount_point_raises_error_for_double_target_mount_point(self): tags = vmdb.Tags() - tags.append('first') - tags.set_target_mount_point('first', '/boot') + tags.append("first") + tags.set_target_mount_point("first", "/boot") with self.assertRaises(vmdb.AlreadyHasTargetMountPoint): - tags.set_target_mount_point('first', '/') + tags.set_target_mount_point("first", "/") def test_raises_error_if_both_mount_points_not_set(self): tags = vmdb.Tags() - tags.append('first') - tags.set_target_mount_point('first', '/boot') + tags.append("first") + tags.set_target_mount_point("first", "/boot") with self.assertRaises(vmdb.NeedBothMountPoints): - tags.get_builder_from_target_mount_point('/') + tags.get_builder_from_target_mount_point("/") def test_returns_builder_when_given_target_mount_point(self): tags = vmdb.Tags() - tags.append('first') - tags.set_builder_mount_point('first', '/mnt/foo') - tags.set_target_mount_point('first', '/boot') - self.assertEqual( - tags.get_builder_from_target_mount_point('/boot'), - '/mnt/foo' - ) + tags.append("first") + tags.set_builder_mount_point("first", "/mnt/foo") + tags.set_target_mount_point("first", "/boot") + self.assertEqual(tags.get_builder_from_target_mount_point("/boot"), "/mnt/foo") diff --git a/vmdb/unmount.py b/vmdb/unmount.py index 8239f8c..2a9e5e9 100644 --- a/vmdb/unmount.py +++ b/vmdb/unmount.py @@ -30,7 +30,7 @@ import vmdb def unmount(what, mounts=None, real_unmount=None): - logging.debug('Unmounting {} and everything on top of it'.format(what)) + logging.debug("Unmounting {} and everything on top of it".format(what)) if mounts is None: # pragma: no cover mounts = _read_proc_mounts() if real_unmount is None: # pragma: no cover @@ -40,31 +40,28 @@ def unmount(what, mounts=None, real_unmount=None): dirnames = _find_what_to_unmount(mounts, what) for dirname in dirnames: real_unmount(dirname) - logging.debug('Finishd unmounting {}'.format(what)) + logging.debug("Finishd unmounting {}".format(what)) def _read_proc_mounts(): # pragma: no cover - with open('/proc/mounts') as f: + with open("/proc/mounts") as f: return f.read() def _real_unmount(what): # pragma: no cover try: - vmdb.runcmd(['umount', what]) + vmdb.runcmd(["umount", what]) except cliapp.AppException: - logging.info('unmount failed, but ignoring that') + logging.info("unmount failed, but ignoring that") def _parse_proc_mounts(text): - return [ - line.split()[:2] - for line in text.splitlines() - ] + return [line.split()[:2] for line in text.splitlines()] def _find_what_to_unmount(mounts, what): dirname = _find_mount_point(mounts, what) - dirnameslash = dirname + '/' + dirnameslash = dirname + "/" to_unmount = [ point for dev, point in mounts @@ -81,6 +78,5 @@ def _find_mount_point(mounts, what): class NotMounted(Exception): - def __init__(self, what): - super().__init__('Not mounted: {}'.format(what)) + super().__init__("Not mounted: {}".format(what)) diff --git a/vmdb/unmount_tests.py b/vmdb/unmount_tests.py index 1ea3662..58ae341 100644 --- a/vmdb/unmount_tests.py +++ b/vmdb/unmount_tests.py @@ -23,35 +23,30 @@ import vmdb class UnmountTests(unittest.TestCase): - def setUp(self): self.mounts = ProcMounts() def unmount(self, what): - vmdb.unmount( - what, - mounts=str(self.mounts), - real_unmount=self.mounts.unmount) + vmdb.unmount(what, mounts=str(self.mounts), real_unmount=self.mounts.unmount) def test_raises_error_if_not_mounted(self): with self.assertRaises(vmdb.NotMounted): - self.unmount('/foo') + self.unmount("/foo") def test_unmounts_mounted_dir(self): - self.mounts.mount('/dev/foo', '/foo') - self.unmount('/foo') - self.assertFalse(self.mounts.is_mounted('/foo')) + self.mounts.mount("/dev/foo", "/foo") + self.unmount("/foo") + self.assertFalse(self.mounts.is_mounted("/foo")) def test_unmounts_mounted_dir_with_submounts(self): - self.mounts.mount('/dev/foo', '/foo') - self.mounts.mount('/dev/bar', '/foo/bar') - self.unmount('/foo') - self.assertFalse(self.mounts.is_mounted('/foo')) - self.assertFalse(self.mounts.is_mounted('/foo/bar')) + self.mounts.mount("/dev/foo", "/foo") + self.mounts.mount("/dev/bar", "/foo/bar") + self.unmount("/foo") + self.assertFalse(self.mounts.is_mounted("/foo")) + self.assertFalse(self.mounts.is_mounted("/foo/bar")) class ProcMounts: - def __init__(self): self.mounts = [] @@ -62,17 +57,10 @@ class ProcMounts: self.mounts.append((device, point)) def unmount(self, what): - self.mounts = [ - mount - for mount in self.mounts - if what not in mount - ] + self.mounts = [mount for mount in self.mounts if what not in mount] def __str__(self): - return ''.join( - '{}\n'.format(self.mount_line(mount)) - for mount in self.mounts - ) + return "".join("{}\n".format(self.mount_line(mount)) for mount in self.mounts) def mount_line(self, mount): - return '{} {} fstype options 0 0'.format(mount[0], mount[1]) + return "{} {} fstype options 0 0".format(mount[0], mount[1]) |