#!/usr/bin/env python3 # Copyright 2013 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # =*= License: GPL-3+ =*= import cliapp import collections import locale import logging import os import re import shutil import sys import tempfile import time import ttystatus import yarnlib class YarnRunner(cliapp.Application): def add_settings(self): self.settings.boolean( ['no-act', 'dry-run', 'pretend', 'n'], 'do not actually run any tests, merely print what would be run') self.settings.boolean( ['quiet', 'q'], 'be quiet, avoid progress reporting, only show errors') self.settings.boolean( ['verbose', 'v'], 'make progress reporting be more verbose ("wall of text"), ' 'instead of a one-line status info; this is turned ' 'automatically if there is not terminal') self.settings.string_list( ['shell-library', 's'], 'include a shell library for the IMPLEMENTS sections to use') self.settings.string_list( ['run', 'r'], 'run only SCENARIO (this option can be repeated)', metavar='SCENARIO') self.settings.string( ['tempdir'], 'use DIR as the temporary directory for tests; ' 'it should be empty or not exist', metavar='DIR') self.settings.string_list( ['env'], 'add NAME=VALUE to the environment when tests are run', metavar='NAME=VALUE') self.settings.boolean( ['snapshot'], 'make snapshots of test working directory ' 'after each scenario step; you probably ' 'want to use this with --tempdir') self.settings.boolean( ['timings'], 'report wall clock time for each scenario and step') self.settings.boolean( ['allow-missing-steps'], 'allow scenarios to reference steps that do not exist, ' 'by warning about them, but otherwise ignoring the scenarios') self.settings.boolean( ['require-assumptions'], 'require ASSUMING to always pass') self.settings.string( ['shell'], 'run IMPLEMENTS using SHELL', metavar='SHELL', default='/bin/sh') self.settings.string_list( ['shell-arg'], 'use ARG when running shell', metavar='ARG', default=['-xeu']) self.settings.boolean( ['cd-datadir'], 'change to DATADIR when running commands', default=False) self.settings.boolean( ['stop-on-first-fail'], "stop if any scenario step fails, don't run more scenarios") def stdout(self, msg): self._write(sys.stdout, msg) def stderr(self, msg): self._write(sys.stderr, msg) def _write(self, output, msg): output.write(msg) output.flush() def info(self, indent, msg): if self.settings['verbose']: logging.info(msg) indent_size = 4 self.stdout('%*s%s\n' % (indent * indent_size, '', msg)) def warning(self, msg): if self.settings['verbose']: logging.warning(msg) self.stdout('WARNING: %s\n' % msg) elif not self.settings['quiet']: self.ts.notify('WARNING: %s' % msg) def error(self, msg): if self.settings['verbose']: logging.info(msg) self.stderr('%s\n' % msg) elif not self.settings['quiet']: self.ts.error(msg) def process_args(self, args): # Do we have tty? If not, turn on --verbose, unless --quiet. if not self.settings['quiet']: try: open('/dev/tty', 'w') except IOError: self.settings['verbose'] = True self.ts = ttystatus.TerminalStatus(period=0.001) if not self.settings['quiet'] and not self.settings['verbose']: self.ts.format( '%ElapsedTime() %Index(current_step,all_steps): ' '%String(scenario_name): ' '%String(step_name)') scenarios, implementations = self.parse_scenarios(args) self.check_there_are_scenarios(scenarios) self.check_for_duplicate_scenario_names(scenarios) self.check_for_thens(scenarios) scenarios = self.connect_implementations(scenarios, implementations) shell_prelude = self.load_shell_libraries() self.info(0, 'Found %d scenarios' % len(scenarios)) all_steps = [] for scenario in scenarios: all_steps.extend(scenario.steps) self.ts['all_steps'] = all_steps self.scenarios_run = 0 self.skipped_for_assuming = 0 self.steps_run = 0 self.timings = [] start_time = time.time() failed_scenarios = [] for scenario in self.select_scenarios(scenarios): if not self.run_scenario(scenario, shell_prelude): failed_scenarios.append(scenario) if self.settings['stop-on-first-fail']: break duration = time.time() - start_time if not self.settings['quiet']: self.ts.clear() self.ts.finish() if failed_scenarios: self.stderr('Failed scenarios:\n') for scenario in failed_scenarios: self.stderr(' - %s\n' % scenario.name) raise cliapp.AppException( 'Test suite FAILED in %s scenarios' % len(failed_scenarios)) if not self.settings['quiet']: self.stdout( 'Scenario test suite PASS, with %d scenarios ' '(%d total steps), ' 'in %.1f seconds\n' % (self.scenarios_run, self.steps_run, duration)) if self.skipped_for_assuming: self.stdout( 'Scenarios SKIPPED due to ASSUMING step failing: %d\n' % self.skipped_for_assuming) if self.settings['timings']: self.report_timings() def parse_scenarios(self, filenames): mdparser = yarnlib.MarkdownParser() for filename in filenames: self.info(0, 'Parsing scenario file %s' % filename) blocks = mdparser.parse_file(filename) block_parser = yarnlib.BlockParser() block_parser.parse_blocks(mdparser.blocks) return block_parser.scenarios, block_parser.implementations def check_there_are_scenarios(self, scenarios): if not scenarios: raise cliapp.AppException( 'There are no scenarios; must have at least one.') def check_for_duplicate_scenario_names(self, scenarios): counts = collections.Counter() for s in scenarios: counts[s.name] += 1 duplicates = [name for name in counts if counts[name] > 1] if duplicates: duplist = ''.join(' %s\n' % name for name in duplicates) raise cliapp.AppException( 'There are scenarios with duplicate names:\n%s' % duplist) def check_for_thens(self, scenarios): no_thens = [] for scenario in scenarios: for step in scenario.steps: if step.what == 'THEN': break else: no_thens.append(scenario) if no_thens: raise cliapp.AppException( 'Some scenarios have no THENs:\n%s' % ''.join(' "%s"\n' % s.name for s in no_thens)) def connect_implementations(self, scenarios, implementations): new_list = [] for scenario in scenarios: missing_step = False for step in scenario.steps: self.connect_implementation( scenario, step, implementations) if step.implementation is None: missing_step = True if not missing_step: new_list.append(scenario) return new_list def connect_implementation(self, scenario, step, implementations): matching = [i for i in implementations if step.what == i.what and self.implements_matches_step(i, step)] if len(matching) == 0: if self.settings['allow-missing-steps']: self.warning( 'Scenario %s has missing step %s %s' % (scenario.name, step.what, step.text)) return raise cliapp.AppException( 'Scenario "%s", step "%s %s" has no matching ' 'implementation' % (scenario.name, step.what, step.text)) if len(matching) > 1: s = '\n'.join( 'IMPLEMENTS %s %s' % (i.what, i.regexp) for i in matching) raise cliapp.AppException( 'Scenario "%s", step "%s %s" has more than one ' 'matching implementations:\n%s' % (scenario.name, step.what, step.text, s)) assert step.implementation is None step.implementation = matching[0] def load_shell_libraries(self): if not self.settings['shell-library']: self.info(0, 'No shell libraries defined') return '' libs = [] for filename in self.settings['shell-library']: self.info(0, 'Loading shell library %s' % filename) with open(filename) as f: text = f.read() libs.append('# Loaded from %s\n\n%s\n\n' % (filename, text)) return ''.join(libs) def select_scenarios(self, scenarios): def normalise(s): return ' '.join(s.lower().split()) def matches(a, b): return normalise(a) == normalise(b) if self.settings['run']: result = [] for name in self.settings['run']: for s in scenarios: if matches(s.name, name) and s not in result: result.append(s) break else: raise cliapp.AppException( 'Requested scenario %s does not exist' % name) return result return scenarios def run_scenario(self, scenario, shell_prelude): self.start_scenario_timing(scenario.name) started = time.time() self.info(0, 'Running scenario %s' % scenario.name) self.ts['scenario_name'] = scenario.name self.ts.flush() self.scenarios_run += 1 if self.settings['no-act']: self.info(0, 'Pretending everything went OK') for step in scenario.steps: self.ts['current_step'] = step self.ts.flush() self.remember_scenario_timing(time.time() - started) return True if self.settings['tempdir']: tempdir = os.path.abspath(self.settings['tempdir']) if not os.path.exists(tempdir): os.mkdir(tempdir) else: tempdir = tempfile.mkdtemp() os.mkdir(self.scenario_dir(tempdir, scenario)) datadir = self.datadir(tempdir, scenario) os.mkdir(datadir) self.info(1, 'DATADIR is %s' % datadir) homedir = self.homedir(datadir) os.mkdir(homedir) self.info(1, 'HOME for tests is %s' % homedir) assuming = [s for s in scenario.steps if s.what == 'ASSUMING'] cleanup = [s for s in scenario.steps if s.what == 'FINALLY'] normal = [s for s in scenario.steps if s not in assuming + cleanup] ok = True step_number = 0 for step in assuming: exit = self.run_step(datadir, scenario, step, shell_prelude, False) step_number += 1 self.snapshot_datadir( tempdir, datadir, scenario, step_number, step) if self.settings['require-assumptions'] and exit != 0: ok = False break elif exit != 0: self.ts.notify( 'Skipping "%s" because "%s %s" failed' % (scenario.name, step.what, step.text)) self.skipped_for_assuming += 1 break else: for step in normal: exit = self.run_step( datadir, scenario, step, shell_prelude, True) step_number += 1 self.snapshot_datadir( tempdir, datadir, scenario, step_number, step) if exit != 0: ok = False break for step in cleanup: exit = self.run_step( datadir, scenario, step, shell_prelude, True) step_number += 1 self.snapshot_datadir( tempdir, datadir, scenario, step_number, step) if exit != 0: ok = False break if not self.settings['snapshot']: shutil.rmtree(tempdir, ignore_errors=True) self.remember_scenario_timing(time.time() - started) return ok def homedir(self, datadir): return os.path.join(datadir, 'HOME') def clean_env(self): '''Return a clean environment for running tests.''' whitelisted = [ 'PATH', ] hardcoded = { 'TERM': 'dumb', 'SHELL': '/bin/sh', 'LC_ALL': 'C', 'USER': 'tomjon', 'USERNAME': 'tomjon', 'LOGNAME': 'tomjon', } env = {} for key in whitelisted: if key in os.environ: env[key] = os.environ[key] for key in hardcoded: env[key] = hardcoded[key] for option_arg in self.settings['env']: if '=' not in option_arg: raise cliapp.AppException( '--env argument must contain "=" ' 'to separate environment variable name and value') key, value = option_arg.split('=', 1) env[key] = value return env def run_step(self, datadir, scenario, step, shell_prelude, report_error): started = time.time() self.info(1, 'Running step "%s %s"' % (step.what, step.text)) self.ts['current_step'] = step self.ts['step_name'] = '%s %s' % (step.what, step.text) self.ts.flush() self.steps_run += 1 m = self.implements_matches_step(step.implementation, step) assert m is not None env = self.clean_env() env['DATADIR'] = datadir env['SRCDIR'] = os.getcwd() env['HOME'] = self.homedir(datadir) for i, match in enumerate(m.groups('')): env['MATCH_%d' % (i+1)] = match self.add_srcdir_to_pythonpath(env, env['SRCDIR']) if self.settings['cd-datadir']: cwd = datadir else: cwd = '.' fd, shell_script = tempfile.mkstemp() os.close(fd) with open(shell_script, 'w') as f: f.write(shell_prelude) f.write(step.implementation.shell) f.flush() shell = self.settings['shell'] shell_args = [x for x in self.settings['shell-arg'] if x] logging.debug('shell: %r', shell) logging.debug('shell_args: %r', shell_args) logging.debug('shell_script: %r', shell_script) exit, stdout, stderr = cliapp.runcmd_unchecked( [shell] + shell_args + [shell_script], env=env, cwd=cwd) os.remove(shell_script) logging.debug('Exit code: %d' % exit) if stdout: logging.debug('Standard output:\n%s' % self.indent(stdout.decode())) else: logging.debug('Standard output: empty') if stderr: logging.debug('Standard error:\n%s' % self.indent(stderr.decode())) else: logging.debug('Standard error: empty') if exit != 0 and report_error: self.error('ERROR: In scenario "%s"' % scenario.name) self.error('step "%s %s" failed,' % (step.what, step.text)) self.error('with exit code %d:' % exit) self.error('Standard output from shell command:\n%s' % self.indent(stdout.decode())) self.error('Standard error from shell command:\n%s' % self.indent(stderr.decode())) self.remember_step_timing( '%s %s' % (step.what, step.text), time.time() - started) return exit def add_srcdir_to_pythonpath(self, env, srcdir): # Special handling of PYTHONPATH. Add $SRCDIR to it so that # Python IMPLEMENTS can use it. But only if --shell=python is # used. This is a bit of magic that hopefully won't surprise # users too much. if 'python' in self.settings['shell']: pythonpath = env.get('PYTHONPATH', None) if pythonpath: pythonpath += ':' + srcdir else: pythonpath = srcdir env['PYTHONPATH'] = pythonpath def scenario_dir(self, tempdir, scenario): return os.path.join(tempdir, self.nice(scenario.name)) def datadir(self, tempdir, scenario): sd = self.scenario_dir(tempdir, scenario) return os.path.join(sd, 'datadir') def snapshot_dir(self, tempdir, scenario, step, step_number): sd = self.scenario_dir(tempdir, scenario) base = '%03d-%s-%s' % (step_number, step.what, self.nice(step.text)) return os.path.join(sd, base) def snapshot_datadir(self, tempdir, datadir, scenario, step_number, step): if self.settings['snapshot']: snapshot = self.snapshot_dir(tempdir, scenario, step, step_number) exit, out, err = cliapp.runcmd_unchecked( ['cp', '-ax', datadir, snapshot]) if exit != 0: logging.warning( 'Snapshot copy failed:\n%s\n%s' % (out.decode(), err.decode ())) def nice(self, name): # Quote a scenario or step name so it forms a nice filename. nice_chars = "abcdefghijklmnopqrstuvwxyz" nice_chars += nice_chars.upper() nice_chars += "0123456789-." nice = [] for c in name: if c in nice_chars: nice.append(c) elif not nice or nice[-1] != '_': nice.append('_') nice = ''.join(nice) return nice def implements_matches_step(self, implements, step): '''Return re.Match if implements matches the step. Otherwise, return None. ''' m = re.match(implements.regexp, step.text, re.I) if m and m.end() != len(step.text): return None return m def indent(self, s): return ''.join(' %s\n' % line for line in s.splitlines()) def start_scenario_timing(self, scenario_name): self.timings.append((scenario_name, None, [])) def remember_scenario_timing(self, duration): scenario_name, _, step_tuples = self.timings[-1] self.timings[-1] = (scenario_name, duration, step_tuples) def remember_step_timing(self, step_name, step_duration): scenario_name, scenario_duration, step_tuples = self.timings[-1] step_tuples = step_tuples + [(step_name, step_duration)] self.timings[-1] = (scenario_name, scenario_duration, step_tuples) def report_timings(self): for scenario_name, scenario_duration, step_tuples in self.timings: self.stdout('%5.1f %s\n' % (scenario_duration, scenario_name)) for step_name, step_duration in step_tuples: self.stdout(' %5.1f %s\n' % (step_duration, step_name)) YarnRunner(version=yarnlib.__version__).run()