#!/usr/bin/python3 import argparse import glob import os import subprocess import sys import tempfile import yaml verbose = False def log(msg): if verbose: sys.stderr.write(msg) sys.stderr.write("\n") sys.stderr.flush() class Config: defaults = { "authorized_keys_file": None, "host_cert_file": None, "host_key_file": None, "host_cert_cmd": None, "host_key_cmd": None, "user_ca_pub_file": None, "user_ca_pub_cmd": None, "cmd_as_user": None, "wifi_name": None, "wifi_password": None, } exandable = [ "authorized_keys_file", "host_cert_file", "host_key_file", "user_ca_pub_file", ] def __init__(self): self.config = dict(self.defaults) def __getitem__(self, key): return self.config[key] def read(self, filename): log(f"reading configuration from {filename}") with open(filename) as f: obj = yaml.safe_load(f) if obj is not None: self.config.update(obj) for key in self.exandable: if self.config[key] is not None: self.config[key] = os.path.expanduser(self.config[key]) log(f"config:") for key in self.config: log(f" {key}={self.config[key]!r}") def user_ca_pub(self): return self._get_from_file_or_cmd("user_ca_pub", "user CA public key", None) def host_key(self, hostname): return self._get_from_file_or_cmd("host_key", "host private key", hostname) def host_cert(self, hostname): return self._get_from_file_or_cmd("host_cert", "host certificate", hostname) def wifi(self): return self.config.get("wifi_name"), self.config.get("wifi_password") def _get_from_file_or_cmd(self, prefix, msg, hostname): filename = self.config.get(f"{prefix}_file") if filename is not None: log(f"reading {msg} from {filename}") return cat(filename) cmd = self.config.get(f"{prefix}_cmd") if cmd is None: return None if hostname is not None: cmd = hostname.join(cmd.split("$HOST")) if cmd is not None: user = self.config.get("cmd_as_user") if user is not None: log(f"reading {msg} from command (as {user}): {cmd}") return run(cmd, user=user) else: log(f"reading {msg} from command: {cmd}") return run(cmd) log(f"can't read {msg}") return None def mount(drive, mp): log(f"mounting {drive} on {mp}") subprocess.run(["mount", drive, mp]) def unmount(path): log(f"un-mounting {path}") subprocess.run(["umount", path]) def run(cmd, user=None): if user is not None: argv = ["sudo", "-u", user, "--", "/bin/bash", "-c", cmd] log(f"argv: {argv}") p = subprocess.run(argv, capture_output=True) else: p = subprocess.run(cmd, shell=True, capture_output=True) if p.returncode != 0: sys.stderr.write(p.stderr.decode()) sys.exit(1) return p.stdout.decode() def cat(path): log(f"reading {path}") with open(path) as f: return f.read() def write(path, content, owner, group, mode): log(f"writing {path}") with open(path, "w") as f: f.write(content) os.chown(path, owner, group) os.chmod(path, mode) def dir_exists(mp, path): full = os.path.join(mp, path) if not os.path.exists(full): raise Exception(f"{full} does not exist") def host_id(config, mp, installer_hostname): key = config.host_key(installer_hostname) cert = config.host_cert(installer_hostname) if key is None or cert is None: return config_d = "/etc/ssh/sshd_config.d" host_key = "/etc/ssh/ssh_host_key" host_cert = "/etc/ssh/ssh_host_key-cert.pub" # Remove all existing host keys. for filename in glob.glob(f"{mp}/etc/ssh/ssh_host_*_key*"): os.remove(filename) # Note that the order of HostKey and HostCertificate lines matter. write( f"{mp}{config_d}/id.conf", f""" HostKeyAlgorithms ssh-ed25519,ssh-ed25519-cert-v01@openssh.com HostKey {host_key} HostCertificate {host_cert} """, 0, 0, 0o644, ) write(f"{mp}{host_key}", key, 0, 0, 0o600) write(f"{mp}{host_cert}", cert, 0, 0, 0o644) def authorized_keys(config, mp): authz_keys_path = config["authorized_keys_file"] if authz_keys_path is None: return authz_keys = cat(authz_keys_path) write(f"{mp}/root/.ssh/authorized_keys", authz_keys, 0, 0, 0x600) def user_ca(config, mp): ca_key = config.user_ca_pub() if ca_key is None: return include = f"{mp}/etc/ssh/sshd_config.d/user_ca.conf" write(include, "TrustedUserCAKeys /etc/ssh/user_ca_pubs\n", 0, 0, 0o644) cakeys = f"{mp}/etc/ssh/user_ca_pubs" write(cakeys, ca_key, 0, 0, 0o644) def wifi(config, mp): (name, password) = config.wifi() if name and password: data = f"[Security]\nPassphrase={password}\n" filename = f"{mp}/var/lib/iwd/{name}.psk" dirname = os.path.dirname(filename) os.makedirs(dirname, exist_ok=True) write(filename, data, 0, 0, 0o600) def main(): log("configure-image starting") p = argparse.ArgumentParser() p.add_argument("--verbose", action="store_true", help="be verbose") p.add_argument("--hostname", default="v-i", help="host name of installer image") p.add_argument("config", metavar="FILE", help="configuration file") p.add_argument( "drive", metavar="DEVICE", help="device file of the drive with v-i installer image", ) args = p.parse_args() global verbose verbose = args.verbose config = Config() config.read(args.config) mp = tempfile.mkdtemp() drive1 = f"{args.drive}1" drive2 = f"{args.drive}2" mount(drive2, mp) try: dir_exists(mp, "etc/ssh") host_id(config, mp, args.hostname) authorized_keys(config, mp) user_ca(config, mp) wifi(config, mp) finally: unmount(mp) os.rmdir(mp) log("all good") main()