#!/usr/bin/python3 import argparse import os import shutil import subprocess import tempfile import time import sys HEARTWOOD_URL = "https://seed.radicle.xyz/z3gqcJUoA1n9HaHKufZs5FCSGazv5.git" HEARTWOOD_REF = "master" CLONE_DIR = os.path.expanduser("~/src") TEST_CMD = "timeout 600s cargo test" LOG_FILE = os.path.expanduser("~/log.txt") RUN_LOG_DIR = os.path.expanduser("~") STATS_FILE = os.path.expanduser("~/stats.txt") COUNTS_FILE = os.path.expanduser("~/counts.txt") EXPLANATION_HTML = """ Results of running the Radicle heartwood tests repeatedly. Report number of successful and fail test runs per commit. Keep logs of each test run for each commit. """ CSS = """ h1 { font-size: 200%; } h2 { font-size: 125%; } h3 { font-size: 100%; } table { width: 100%; text-align: left; tr:nth-child(even) {background-color: #f2f2f2;} } .numeric { text-align: right; } """ def escape(s): s = "&".join(s.split("&")) s = "<".join(s.split("<")) s = ">".join(s.split(">")) s = '"'.join(s.split(""")) s = "'".join(s.split("'")) return s next_id = 0 def idgen(prefix): global next_id next_id += 1 return f"{prefix}{next_id}" class Element: def __init__(self, tag, pre=None): self.tag = tag self.attrs = {} self.children = [] self.pre = pre def attr(self, name, value): self.attrs[name] = self.attrs.get(name, []) + [escape(value)] def child(self, child): if isinstance(child, str): child = Text(child) self.children.append(child) def start_tag(self): s = f"<{self.tag}" if self.attrs: for a in self.attrs: value = " ".join(self.attrs[a]) s += f' {a}="{value}"' s += ">" return s def end_tag(self): return f"" def serialize(self): s = Serialized() if self.pre is not None: s.push(self.pre) s.push(self.start_tag()) for child in self.children: s.push(child.serialize()) s.push(self.end_tag()) s.newline() return str(s) class Html(Element): def __init__(self): super().__init__("html", pre="\n") class Head(Element): def __init__(self): super().__init__("head") class Title(Element): def __init__(self, text): super().__init__("title") self.child(text) class Style(Element): def __init__(self, text): super().__init__("style") self.child(text) class Body(Element): def __init__(self): super().__init__("body") class Div(Element): def __init__(self): super().__init__("div") class H1(Element): def __init__(self, text): super().__init__("h1") self.child(text) class H2(Element): def __init__(self, text, prefix): super().__init__("h2") self.toc_id = idgen(prefix) self.attr("id", self.toc_id) self.child(text) self.text = text class H3(Element): def __init__(self, text, prefix): super().__init__("h3") self.toc_id = idgen(prefix) self.attr("id", self.toc_id) self.child(text) self.text = text class P(Element): def __init__(self): super().__init__("p") class Pre(Element): def __init__(self): super().__init__("pre") class Blockquote(Element): def __init__(self): super().__init__("blockquote") class Ul(Element): def __init__(self): super().__init__("ul") class Ol(Element): def __init__(self): super().__init__("ol") class Li(Element): def __init__(self): super().__init__("li") class Table(Element): def __init__(self): super().__init__("table") class Tr(Element): def __init__(self): super().__init__("tr") class Th(Element): def __init__(self): super().__init__("th") class Td(Element): def __init__(self): super().__init__("td") class A(Element): def __init__(self, href): super().__init__("a") self.attr("href", href) class Code(Element): def __init__(self): super().__init__("code") class Text: def __init__(self, text): self.text = escape(text) def serialize(self): return self.text class Serialized: def __init__(self): self.snippets = [] def __str__(self): return "".join(self.snippets) def push(self, snippet): assert isinstance(snippet, str) self.snippets.append(snippet) def newline(self): self.snippets.append("\n") class RunLog: def __init__(self): self.data = {} def serialize(self): toc = Div() self.toc = [] html = Html() head = Head() head.child(Title("Wumpus log for one test run")) head.child(Style(CSS)) html.child(head) body = Body() html.child(body) body.child(H1("Run log")) self.metadata(body) body.child(H2("Table of contents", "toc")) body.child(toc) for cmd, exit, stdout, stderr in self.data["commands"]: self.report_command(body, cmd, exit, stdout, stderr) if "error" in self.data: self.report_error(body, self.data["error"]) self.toc_build(toc) return html.serialize() def toc_append(self, h): if isinstance(h, H2): self.toc.append((2, h.text, h.toc_id)) elif isinstance(h, H3): self.toc.append((3, h.text, h.toc_id)) else: raise Exception("ToC logic error") def toc_build(self, toc): prev = None ul = Ul() stack = [] for level, text, toc_id in self.toc: link = A(f"#{toc_id}") link.child(text) li = Li() li.child(link) if prev is None or level == prev: ul.child(li) elif level > prev: stack.append(ul) ul = Ul() ul.child(li) else: assert level < prev parent = stack.pop() parent.child(ul) ul = parent ul.child(li) prev = level assert ul is not None while stack: parent = stack.pop() parent.child(ul) ul = parent assert ul is not None toc.child(ul) def report_error(self, body, error): h2 = H2("Error", "error") body.child(h2) self.toc_append(h2) p = P() pre = Pre() pre.child(error) blockquote = Blockquote() blockquote.child(pre) p.child(blockquote) body.child(p) def report_command(self, body, cmd, exit, stdout, stderr): args = " ".join(cmd) h2 = H2(f"Run: {args}", "run") body.child(h2) self.toc_append(h2) h3 = H3("Argv", "argv") body.child(h3) self.toc_append(h3) ol = Ol() for arg in cmd: code = Code() code.child(arg) li = Li() li.child(code) ol.child(li) body.child(ol) h3 = H3("Exit code", "exit") body.child(h3) self.toc_append(h3) p = P() p.child("Exit code ") p.child(str(exit)) body.child(p) if stdout: self.output(body, "Stdout", stdout) if stderr: self.output(body, "Stderr", stderr) def output(self, body, stream, output): h3 = H3(stream, stream) body.child(h3) self.toc_append(h3) p = P() pre = Pre() pre.child(output) blockquote = Blockquote() blockquote.child(pre) p.child(blockquote) body.child(p) def metadata(self, body): h2 = H2("Metadata", "metadata") body.child(h2) self.toc_append(h2) ul = Ul() body.child(ul) code = Code() code.child(self.data["url"]) a = A(self.data["url"]) a.child(code) li = Li() li.child("Repository: ") li.child(a) ul.child(li) code = Code() code.child(self.data["ref"]) li = Li() li.child("Git ref: ") li.child(code) ul.child(li) code = Code() code.child(self.data["commit"]) li = Li() li.child("Git ref: ") li.child(code) ul.child(li) def url(self, url): self.data["url"] = url def ref(self, ref): self.data["ref"] = ref def commit(self, commit): self.data["commit"] = commit def runcmd(self, cmd, exit, stdout, stderr): if "commands" not in self.data: self.data["commands"] = [] self.data["commands"].append((cmd, exit, stdout, stderr)) def error(self, error): self.data["error"] = error def parse_args(): p = argparse.ArgumentParser() p.add_argument( "--url", default=HEARTWOOD_URL, help="URL for git repository to test" ) p.add_argument("--ref", default=HEARTWOOD_REF, help="Ref to test") p.add_argument( "--dir", default=CLONE_DIR, help="Directory where to clone the repository" ) p.add_argument("--test", default=TEST_CMD, help="Command to run tests") p.add_argument("--log", default=LOG_FILE, help="Write log file") p.add_argument( "--run-log", default=RUN_LOG_DIR, help="Write per-run log file to this directory", required=True, ) p.add_argument( "--stats", default=STATS_FILE, help="Write statistics of results to file", required=True, ) p.add_argument( "--counts", default=COUNTS_FILE, help="Count statistics per commit to file", required=True, ) p.add_argument( "--keep", action="store_true", help="Keep log file of successful run" ) return p.parse_args() def get_code(run_log, url, ref, dirname): if not os.path.exists(dirname): git_clone(run_log, url, dirname) git_checkout(run_log, dirname, ref) else: git_clean(run_log, dirname) git_checkout(run_log, dirname, ref) git_remote_update(run_log, url, dirname) git_reset(run_log, dirname, f"origin/{ref}") git_status(run_log, dirname) git_show_head(run_log, dirname) return get_commit(run_log, dirname) def git_clone(run_log, url, dirname): git(run_log, "clone", url, dirname) def git_checkout(run_log, dirname, ref): git(run_log, "checkout", ref, cwd=dirname) def git_remote_update(run_log, url, dirname): git(run_log, "remote", "update", cwd=dirname) def git_reset(run_log, dirname, ref): git(run_log, "reset", "--hard", ref, cwd=dirname) def git_clean(run_log, dirname): git(run_log, "clean", "-fdx", cwd=dirname) def git_status(run_log, dirname): git(run_log, "status", "--ignored", cwd=dirname) def git_show_head(run_log, dirname): git(run_log, "--no-pager", "show", "HEAD", cwd=dirname) def get_commit(run_log, dirname): commit = git(run_log, "rev-parse", "HEAD", cwd=dirname).strip() return commit def git(run_log, *args, cwd=None): return run(run_log, ["git"] + list(args), cwd=cwd) def run(run_log, argv, cwd=None): p = subprocess.run(argv, cwd=cwd, capture_output=True) if run_log is not None: run_log.runcmd(argv, p.returncode, p.stdout.decode(), p.stderr.decode()) if p.returncode != 0: sys.stderr.write(p.stderr) raise Exception(f"command {argv} failed") return p.stdout.decode() def build(run_log, dirname): run(run_log, ["cargo", "build", "--workspace", "--all-targets"], cwd=dirname) def run_tests(run_log, dirname, cmd): # Radicle heartwood tests sometimes leave temporary files. This can fill # the disk. Deal with this by explicitly removing them. tmp = tempfile.mkdtemp() run(run_log, ["env", f"TMPDIR={tmp}", "bash", "-c", cmd], cwd=dirname) shutil.rmtree(tmp) def record_success(filename, commit): record(filename, commit, "SUCCESS") def record_failure(filename, commit): record(filename, commit, "FAILURE") def record(filename, commit, result): with open(filename, "a") as f: f.write(f"{commit} {result}\n") def run_log_name(dirname, commit, succeeded): commit_dir = os.path.join(dirname, f"log-{commit}") if not os.path.exists(commit_dir): os.mkdir(commit_dir) timestamp = time.strftime("%Y-%m-%dT%H:%M:%S") if succeeded: suffix = "success" else: suffix = "fail" return os.path.join(commit_dir, f"log-{timestamp}.{suffix}.html") def git_commit_date(run_log, dirname, commit): try: out = git( None, "show", "--pretty=fuller", "--date=iso", commit, cwd=dirname, ) except Exception as e: sys.stderr.write(f"{e}\n") return "(unknown commit)" prefix = "CommitDate:" date = [line.strip() for line in out.splitlines() if line.startswith(prefix)][0] if date.startswith(prefix): date = date[len(prefix) :].strip() return date def count(run_log, src, counts, stats): d = {} total = 0 with open(stats) as f: for line in f: total += 1 (commit, result) = line.split() if commit not in d: d[commit] = (0, 0) (succ, fail) = d[commit] if result == "SUCCESS": succ += 1 else: fail += 1 d[commit] = (succ, fail) commits = [(git_commit_date(run_log, src, commit), commit) for commit in d] head = Head() head.child(Title("Radicle Wumpus hunter")) head.child(Style(CSS)) body = Body() expl = P() expl.child(EXPLANATION_HTML) body.child(expl) timestamp = time.strftime("%Y-%m-%d %H:%M:%S %z") p = P() p.child(f"Total of {total} test runs. Last updated {timestamp}") body.child(p) date = Th() date.child("date") commit = Th() commit.child("commit") succ = Th() succ.attr("class", "numeric") succ.child("successes") fail = Th() fail.attr("class", "numeric") fail.child("failures") headings = Tr() headings.child(date) headings.child(commit) headings.child(succ) headings.child(fail) table = Table() table.child(headings) for date, commit in reversed(sorted(commits)): succ, fail = d[commit] tr = Tr() td = Td() td.child(Text(date)) tr.child(td) code = Code() code.child(commit) link = A(f"log-{commit}/") link.child(commit) td = Td() td.child(link) tr.child(td) td = Td() td.attr("class", "numeric") td.child(str(succ)) tr.child(td) td = Td() td.attr("class", "numeric") td.child(str(fail)) tr.child(td) table.child(tr) body.child(table) html = Html() html.child(head) html.child(body) with open(counts, "w") as f: f.write(html.serialize()) def main(): args = parse_args() run_log = RunLog() run_log.url(args.url) run_log.ref(args.ref) exit = 0 try: commit = get_code(run_log, args.url, args.ref, args.dir) run_log.commit(commit) build(run_log, args.dir) run_tests(run_log, args.dir, args.test) record_success(args.stats, commit) filename = run_log_name(args.run_log, commit, True) except Exception as e: sys.stderr.write(f"{e}\n") run_log.error(str(e)) record_failure(args.stats, commit) filename = run_log_name(args.run_log, commit, False) exit = 1 if args.counts: count(run_log, args.dir, args.counts, args.stats) if args.keep: with open(filename, "w") as f: f.write(run_log.serialize()) sys.exit(exit) main()