#!/usr/bin/python3 import argparse import os import subprocess import tempfile import yaml def log(msg): if True: print(msg) class Config: defaults = { "authorized_keys_file": None, "host_cert_file": None, "host_key_file": None, "user_ca_pub_file": None, } exandable = [ "authorized_keys_file", "host_cert_file", "host_key_file", "user_ca_pub_file", ] def __init__(self): self.config = dict(self.defaults) def __getitem__(self, key): return self.config[key] def read(self, filename): log(f"reading configuration from {filename}") with open(filename) as f: obj = yaml.safe_load(f) self.config.update(obj) for key in self.exandable: if self.config[key] is not None: self.config[key] = os.path.expanduser(self.config[key]) log(f"config: {self.config}") def mount(drive, mp): log(f"mounting {drive} on {mp}") subprocess.run(["mount", drive, mp]) def unmount(path): log(f"un-mounting {path}") subprocess.run(["umount", path]) def cat(path): log(f"reading {path}") with open(path) as f: return f.read() def write(path, content, owner, group, mode): log(f"writing {path}") with open(path, "w") as f: f.write(content) os.chown(path, owner, group) os.chmod(path, mode) def dir_exists(mp, path): full = os.path.join(mp, path) if not os.path.exists(full): raise Exception(f"{full} does not exist") def host_id(config, mp): key_path = config["host_key_file"] cert_path = config["host_cert_file"] if key_path is None or cert_path is None: return key = cat(key_path) cert = cat(cert_path) config_d = "/etc/ssh/sshd_config.d" host_key = "/etc/ssh/ssh_host_key" host_cert = "/etc/ssh/ssh_host_key-cert.pub" # Note that the order of HostKey and HostCertificate lines matter. write( f"{mp}{config_d}/id.conf", f""" HostKeyAlgorithms ssh-ed25519 HostKey {host_key} HostCertificate {host_cert} """, 0, 0, 0o644, ) write(f"{mp}{host_key}", key, 0, 0, 0o600) write(f"{mp}{host_cert}", cert, 0, 0, 0o644) def authorized_keys(config, mp): authz_keys_path = config["authorized_keys_file"] if authz_keys_path is None: return authz_keys = cat(authz_keys_path) write(f"{mp}/root/.ssh/authorized_keys", authz_keys, 0, 0, 0x600) def user_ca(config, mp): ca_path = config["user_ca_pub_file"] if ca_path is None: return ca_key = cat(ca_path) include = f"{mp}/etc/ssh/sshd_config.d/userca.conf" write(include, "TrustedUserCAKeys /etc/ssh/user_ca_keys\n", 0, 0, 0o644) cakeys = f"{mp}/etc/ssh/user_ca_keys" write(cakeys, ca_key, 0, 0, 0o644) def main(): log("configure-image starting") p = argparse.ArgumentParser() p.add_argument("config", metavar="FILE", help="configuration file") p.add_argument( "drive", metavar="DEVICE", help="device file of the drive with v-i installer image", ) args = p.parse_args() config = Config() config.read(args.config) mp = tempfile.mkdtemp() drive1 = f"{args.drive}1" drive2 = f"{args.drive}2" mount(drive2, mp) try: dir_exists(mp, "etc/ssh") host_id(config, mp) authorized_keys(config, mp) user_ca(config, mp) finally: unmount(mp) os.rmdir(mp) log("all good") main()