# Copyright 2017 Lars Wirzenius
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# =*= License: GPL-3+ =*=
# Installing GRUB onto a disk image is a bit of a black art. I haven't
# found any good documentation for it. This plugin is written based on
# de-ciphering the build_openstack_image script. Here is an explanation
# of what I _THINK_ is happening.
#
# The crucial command is grub-install. It needs a ton of options to
# work correctly: see below in the code for the list, and the manpage
# for an explanation of what each of them means. We will be running
# grub-install in a chroot so that we use the version in the Debian
# version we're installing, rather than the host system, which might
# be any Debian version.
#
# To run grub-install in a chroot, we need to set up the chroot in
# various ways. Firstly, we need to tell grub-install which device
# file the image has. We can't just give it the image file itself,
# since it isn't inside the chroot, so instead we arrange to have a
# loop block device that covers the whole image file, and we bind
# mount /dev into the chroot so the device is available.
#
# grub-install seems to also require /proc and /sys so we bind mount
# /sys into the chroot as well. /proc is already mounted otherwise.
#
# We install the UEFI version of GRUB, and for that we additionally
# bind mount the EFI partition in the image. Oh yeah, you MUST have
# one.
#
# We also make sure the right GRUB package is installed in the chroot,
# before we run grub-install.
#
# Further, there's some configuration tweaking we need to do. See the
# code. Don't ask me why they're necessary.
#
# For cleanliness, we also undo any bind mounts into the chroot. Don't
# want to leave them in case they cause trouble.
#
# Note that this is currently assuming that UEFI and either the amd64
# (a.k.a. x86_64) or arm64 (a.k.a. aarch64) architectures are being
# used. These should probably not be hardcoded. Patch welcome.
# To use this plugin: write steps to create a root filesystem, and an
# VFAT filesystem to be mounted as /boot/efi. Install Debian onto the
# root filesystem. Then install grub with a step like this:
#
# - grub: uefi
# tag: root-part
# efi: efi-part
#
# Here: "tag" is the tag for the root filesystem (and corresponding
# partition), and efi is tag for the EFI partition.
#
# The grub step will take of the rest.
import json
import logging
import os
import re
import vmdb
class GrubPlugin(vmdb.Plugin):
def enable(self):
self.app.step_runners.add(GrubStepRunner())
class GrubStepRunner(vmdb.StepRunnerInterface):
def get_key_spec(self):
return {
"grub": str,
"root-fs": "",
"efi": "",
"efi-part": "",
"prep": "",
"console": "",
"tag": "",
"image-dev": "",
"quiet": False,
"kernel-params": ["biosdevname=0", "net.ifnames=0", "consoleblank=0", "rw"],
"timeout": 0,
}
def run(self, values, settings, state):
state.grub_mounts = []
flavor = values["grub"]
if flavor == "uefi":
self.install_uefi(values, settings, state)
elif flavor == "bios":
self.install_bios(values, settings, state)
elif flavor == "ieee1275":
self.install_ieee1275(values, settings, state)
else:
raise Exception("Unknown GRUB flavor {}".format(flavor))
def grub_uefi_variant(self, state):
variants = {
"amd64": ("grub-efi-amd64", "x86_64-efi"),
"i386": ("grub-efi-ia32", "i386-efi"),
"arm64": ("grub-efi-arm64", "arm64-efi"),
"armhf": ("grub-efi-arm", "arm-efi"),
}
logging.debug(f"grub plugin: state.arch={state.arch!r}")
try:
return variants[state.arch]
except KeyError:
raise Exception(
'GRUB UEFI package and target for "{}" unknown'.format(state.arch)
)
def install_uefi(self, values, settings, state):
efi = values["efi"] or None
efi_part = values["efi-part"] or None
if efi is None and efi_part is None:
raise Exception('"efi" or "efi-part" required in UEFI GRUB installation')
vmdb.progress("Installing GRUB for UEFI")
(grub_package, grub_target) = self.grub_uefi_variant(state)
self.install_grub(values, settings, state, grub_package, grub_target)
def install_bios(self, values, settings, state):
vmdb.progress("Installing GRUB for BIOS")
grub_package = "grub-pc"
grub_target = "i386-pc"
self.install_grub(values, settings, state, grub_package, grub_target)
def grub_ieee1275_variant(self, state):
variants = {
"amd64": "i386",
"ppc64": "powerpc",
"ppc64el": "powerpc",
"sparc": "sparc64"
}
logging.debug(f"grub plugin: state.arch={state.arch!r}")
return variants.get(state.arch, state.arch)
def install_ieee1275(self, values, settings, state):
vmdb.progress("Installing GRUB for IEEE1275")
grub_package = "grub-ieee1275"
grub_target = f"{self.grub_ieee1275_variant(state)}-ieee1275"
self.install_grub(values, settings, state, grub_package, grub_target)
def install_grub(self, values, settings, state, grub_package, grub_target):
console = values["console"] or None
tag = values["tag"] or values["root-fs"] or None
root_dev = state.tags.get_dev(tag)
chroot = state.tags.get_builder_mount_point(tag)
self.bind_mount_many(chroot, ["/dev", "/sys", "/proc"], state)
image_dev = values["image-dev"] or None
if image_dev is None:
image_dev = self.get_image_loop_device(root_dev, chroot)
efi = values["efi"] or None
efi_part = values["efi-part"] or None
if efi is not None:
efi_dev = state.tags.get_dev(efi)
elif efi_part is not None:
efi_dev = state.tags.get_dev(efi_part)
else:
efi_dev = None
prep = values["prep"] or None
if prep:
prep_dev = state.tags.get_dev(prep)
else:
prep_dev = None
quiet = values["quiet"]
if efi_dev:
pn = efi_dev[-1]
vmdb.runcmd(["parted", "-s", image_dev, "set", pn, "esp", "on"])
self.mount(chroot, efi_dev, "/boot/efi", state)
elif prep_dev:
pn = prep_dev[-1]
vmdb.runcmd(["parted", "-s", image_dev, "set", pn, "prep", "on"])
image_dev = prep_dev
self.install_package(chroot, grub_package)
kernel_params = values["kernel-params"]
if console == "serial":
if "ppc64" in state.arch:
kernel_params.extend(
["loglevel=3", "console=tty0", "console=hvc0,115200n8"]
)
elif "arm" in state.arch:
kernel_params.extend(
["loglevel=3", "console=tty0", "console=ttyAMA0,115200n8"]
)
else:
kernel_params.extend(
["loglevel=3", "console=tty0", "console=ttyS0,115200n8"]
)
if quiet:
kernel_params.extend(
[
"quiet",
"systemd.show_status=false",
"rd.systemd.show_status=false",
]
)
else:
kernel_params.extend(
[
"systemd.show_status=true",
]
)
self.set_grub_cmdline_config(chroot, kernel_params)
self.add_grub_crypto_disk(chroot)
self.set_grub_timeout(chroot, values["timeout"])
if console == "serial":
self.add_grub_serial_console(chroot)
vmdb.runcmd_chroot(chroot, ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"])
help_out = vmdb.runcmd_chroot(chroot, ["grub-install", "--help"])
vmdb.runcmd_chroot(
chroot,
[
"grub-install",
"--target=" + grub_target,
"--no-nvram",
"--no-extra-removable"
if b"--no-extra-removable" in help_out
else "--force-extra-removable",
"--no-floppy",
"--modules=part_msdos part_gpt",
"--grub-mkdevicemap=/boot/grub/device.map",
image_dev,
],
)
# self.unmount(state)
def teardown(self, values, settings, state):
self.unmount(state)
def unmount(self, state):
mounts = getattr(state, "grub_mounts", [])
mounts.reverse()
while mounts:
mount_point = mounts.pop()
try:
vmdb.unmount(mount_point)
except vmdb.NotMounted as e:
logging.warning(str(e))
def get_image_loop_device(self, partition_device, chroot):
# We get /dev/mappers/loopXpY and return /dev/loopX
m = re.match(r"^/dev/mapper/(?P.*)p\d+$", partition_device)
if m is not None:
loop = m.group("loop")
return "/dev/{}".format(loop)
# Check if the rootfs is a LVM volume
m = re.match(r"^/dev/(?P[^/]+)/.*$", partition_device)
if m is not None:
vgname = m.group("vgname")
logging.debug(f"extracted vgname={vgname} from {partition_device}")
env = dict(os.environ)
env["LVM_SUPPRESS_FD_WARNINGS"] = "1"
output = vmdb.runcmd_chroot(
chroot,
["lvs", "-o", "vg_name,devices", "--reportformat", "json"],
env=env,
)
# example "lvs" output:
# {
# "report": [
# {
# "lv": [
# {"vg_name":"lvm_volgroup0", "devices":"/dev/mapper/loop1p6(1024)"},
# {"vg_name":"lvm_volgroup0", "devices":"/dev/mapper/loop1p6(0)"}
# ]
# }
# ]
# }
report = json.loads(output.strip())
logging.debug(f"lvs report: {report}")
for report_item in report["report"]:
logging.debug(f"report_item: {report_item}")
for lv in report_item.get("lv", []):
logging.debug(f"lv: {lv}")
if lv.get("vg_name") == vgname:
devices = lv.get("devices", [])
if not isinstance(devices, list):
devices = [devices]
for device in devices:
logging.debug(f"device: {device}")
m = re.match(
r"^/dev/mapper/(?P.*)p\d+\(\d+\)$", device
)
if m is not None:
loop = m.group("loop")
return "/dev/{}".format(loop)
# Sometimes lvs gives us /dev/dm-X instead of a
# loop device, in which case we need to find the
# corresponding loop device ourselves
loop = self.get_loop_device_from_dm(device, chroot)
if loop is not None:
return loop
raise Exception(
"Do not understand partition device name {}".format(partition_device)
)
@staticmethod
def get_loop_device_from_dm(device, chroot):
# Returns the corresponding /dev/loopX device for a /dev/dm-Y device
m = re.match(r"^/dev/(?Pdm-\d+)\(\d+\)$", device)
if m is not None:
mapped = m.group("mapped")
logging.debug(f"mapped: {mapped}")
dmsetup_out = vmdb.runcmd_chroot(
chroot, ["dmsetup", "ls", "-o", "blkdevname"]
)
for line in dmsetup_out.decode().splitlines(keepends=False):
m = re.match(fr"^(?Ploop\d+)p\d+\s+\({mapped}\)$", line)
if m is not None:
loop = m.group("loop")
return "/dev/{}".format(loop)
def bind_mount_many(self, chroot, paths, state):
for path in paths:
self.mount(chroot, path, path, state, mount_opts=["--bind"])
def mount(self, chroot, path, mount_point, state, mount_opts=None):
chroot_path = self.chroot_path(chroot, mount_point)
if os.path.ismount(chroot_path):
logging.debug("already mounted: %s", chroot_path)
else:
if not os.path.exists(chroot_path):
os.makedirs(chroot_path)
if mount_opts is None:
mount_opts = []
vmdb.runcmd(["mount"] + mount_opts + [path, chroot_path])
state.grub_mounts.append(chroot_path)
def chroot_path(self, chroot, path):
return os.path.normpath(os.path.join(chroot, "." + path))
def install_package(self, chroot, package):
env = os.environ.copy()
env["DEBIAN_FRONTEND"] = "noninteractive"
vmdb.runcmd_chroot(chroot, ["apt-get", "update"], env=env)
vmdb.runcmd_chroot(
chroot, ["apt-get", "-y", "--no-show-progress", "install", package], env=env
)
def set_grub_cmdline_config(self, chroot, kernel_params):
param_string = " ".join(kernel_params)
self.set_grub_default(
chroot, "GRUB_CMDLINE_LINUX_DEFAULT", '"' + param_string + '"'
)
def set_grub_default(self, chroot, param, value):
filename = self.chroot_path(chroot, "/etc/default/grub")
newdefault = param + "=" + str(value) + "\n"
found_param = False
newcontents = ""
with open(filename, "r+") as f:
for line in f:
if line.startswith(param + "=") and not found_param:
newcontents += newdefault
found_param = True
elif line.startswith("#" + param + "=") and not found_param:
newcontents += line
newcontents += newdefault
found_param = True
else:
newcontents += line
if found_param:
f.seek(0)
f.write(newcontents)
f.truncate()
else:
f.write(newdefault)
def add_grub_serial_console(self, chroot):
self.set_grub_default(chroot, "GRUB_TERMINAL", "serial")
self.set_grub_default(
chroot,
"GRUB_SERIAL_COMMAND",
'"serial ' '--speed=115200 --unit=0 --word=8 --parity=no --stop=1"',
)
def add_grub_crypto_disk(self, chroot):
self.set_grub_default(chroot, "GRUB_ENABLE_CRYPTODISK", "y")
def set_grub_timeout(self, chroot, timeout):
self.set_grub_default(chroot, "GRUB_TIMEOUT", timeout)