#!/usr/bin/python # Copyright 2013 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # =*= License: GPL-3+ =*= import cliapp import logging import os import re import shutil import sys import tempfile import time import ttystatus import cmdtestlib import yarnlib class YarnRunner(cliapp.Application): def add_settings(self): self.settings.boolean( ['no-act', 'dry-run', 'pretend', 'n'], 'do not actually run any tests, merely print what would be run') self.settings.boolean( ['quiet', 'q'], 'be quiet, avoid progress reporting, only show errors') self.settings.boolean( ['verbose', 'v'], 'make progress reporting be more verbose ("wall of text"), ' 'instead of a one-line status info; this is turned ' 'automatically if there is not terminal') self.settings.string_list( ['shell-library', 's'], 'include a shell library for the IMPLEMENTS sections to use') self.settings.string_list( ['run', 'r'], 'run only TEST (this option can be repeated)', metavar='TEST') self.settings.string( ['tempdir'], 'use DIR as the temporary directory for tests; ' 'it should be empty or not exist', metavar='DIR') self.settings.boolean( ['snapshot'], 'make snapshots of test working directory ' 'after each scenario step; you probably ' 'want to use this with --tempdir') def info(self, msg): if self.settings['verbose']: logging.info(msg) self.output.write('%s\n' % msg) def warning(self, msg): if self.settings['verbose']: logging.warning(msg) self.output.write('WARNING: %s\n' % msg) elif not self.settings['quiet']: self.ts.notify('WARNING: %s' % msg) def error(self, msg): if self.settings['verbose']: logging.info(msg) sys.stderr.write('%s\n' % msg) elif not self.settings['quiet']: self.ts.error(msg) def process_args(self, args): # Do we have tty? If not, turn on --verbose, unless --quiet. if not self.settings['quiet']: try: open('/dev/tty', 'w') except IOError: self.settings['verbose'] = True self.ts = ttystatus.TerminalStatus(period=0.001) if not self.settings['quiet'] and not self.settings['verbose']: self.ts.format( '%ElapsedTime() %Index(scenario,scenarios): ' '%String(scenario_name): ' 'step %Index(step,steps): %String(step_name)') scenarios, implementations = self.parse_scenarios(args) self.connect_implementations(scenarios, implementations) shell_prelude = self.load_shell_libraries() self.ts['scenarios'] = scenarios self.ts['num_scenarios'] = len(scenarios) self.info('Found %d scenarios' % len(scenarios)) self.scenarios_run = 0 self.steps_run = 0 start_time = time.time() failed_scenarios = [] for scenario in self.select_scenarios(scenarios): if not self.run_scenario(scenario, shell_prelude): failed_scenarios.append(scenario) duration = time.time() - start_time if not self.settings['quiet']: self.ts.clear() self.ts.finish() if failed_scenarios: raise cliapp.AppException( 'Test suite FAILED in %s scenarios' % len(failed_scenarios)) if not self.settings['quiet']: print ( 'Scenario test suite PASS, with %d scenarios ' '(%d total steps), ' 'in %.1f seconds' % (self.scenarios_run, self.steps_run, duration)) def parse_scenarios(self, filenames): mdparser = yarnlib.MarkdownParser() for filename in filenames: self.info('Parsing scenario file %s' % filename) blocks = mdparser.parse_file(filename) if not blocks: self.warning('No scenario code blocks in %s' % filename) block_parser = yarnlib.BlockParser() block_parser.parse_blocks(mdparser.blocks) return block_parser.scenarios, block_parser.implementations def connect_implementations(self, scenarios, implementations): for scenario in scenarios: for step in scenario.steps: self.connect_implementation(scenario, step, implementations) def connect_implementation(self, scenario, step, implementations): matching = [i for i in implementations if step.what == i.what and re.match('(%s)$' % i.regexp, step.text, re.I)] if len(matching) == 0: raise cliapp.AppException( 'Scenario "%s", step "%s %s" has no matching ' 'implementation' % (scenario.name, step.what, step.text)) if len(matching) > 1: s = '\n'.join( 'IMPLEMENTS %s %s' % (i.what, i.regexp) for i in matching) raise cliapp.AppException( 'Scenario "%s", step "%s %s" has more than one ' 'matching implementations:\n%s' % (scenario.name, step.what, step.text, s)) assert step.implementation is None step.implementation = matching[0] def load_shell_libraries(self): if not self.settings['shell-library']: self.info('No shell libraries defined') return '' libs = [] for filename in self.settings['shell-library']: self.info('Loading shell library %s' % filename) with open(filename) as f: text = f.read() libs.append('# Loaded from %s\n\n%s\n\n' % (filename, text)) return ''.join(libs) def select_scenarios(self, scenarios): def normalise(s): return ' '.join(s.lower().split()) def matches(a, b): return normalise(a) == normalise(b) if self.settings['run']: result = [] for name in self.settings['run']: for s in scenarios: if matches(s.name, name) and s not in result: result.append(s) break return result return scenarios def run_scenario(self, scenario, shell_prelude): self.info('Running scenario %s' % scenario.name) self.ts['scenario'] = scenario self.ts['scenario_name'] = scenario.name self.ts['steps'] = scenario.steps self.scenarios_run += 1 if self.settings['no-act']: self.info('Pretending everything went OK') return True if self.settings['tempdir']: tempdir = os.path.abspath(self.settings['tempdir']) if not os.path.exists(tempdir): os.mkdir(tempdir) else: tempdir = tempfile.mkdtemp() os.mkdir(self.scenario_dir(tempdir, scenario)) datadir = self.datadir(tempdir, scenario) os.mkdir(datadir) self.info('DATADIR is %s' % datadir) assuming = [s for s in scenario.steps if s.what == 'ASSUMING'] cleanup = [s for s in scenario.steps if s.what == 'FINALLY'] normal = [s for s in scenario.steps if s not in assuming + cleanup] ok = True step_number = 0 for step in assuming: exit = self.run_step(datadir, scenario, step, shell_prelude, False) step_number += 1 self.snapshot_datadir( tempdir, datadir, scenario, step_number, step) if exit != 0: break else: for step in normal: exit = self.run_step( datadir, scenario, step, shell_prelude, True) step_number += 1 self.snapshot_datadir( tempdir, datadir, scenario, step_number, step) if exit != 0: ok = False break for step in cleanup: exit = self.run_step( datadir, scenario, step, shell_prelude, True) step_number += 1 self.snapshot_datadir( tempdir, datadir, scenario, step_number, step) if exit != 0: ok = False break if not self.settings['snapshot']: shutil.rmtree(tempdir) return ok def clean_env(self): '''Return a clean environment for running tests.''' whitelisted = [ 'TERM', 'USER', 'USERNAME', 'PATH', 'HOME', 'LOGNAME', ] hardcoded = { 'SHELL': '/bin/sh', 'LC_ALL': 'C', } env = {} for key in whitelisted: if key in os.environ: env[key] = os.environ[key] for key in hardcoded: env[key] = hardcoded[key] return env def run_step(self, datadir, scenario, step, shell_prelude, report_error): self.info('Running step "%s %s"' % (step.what, step.text)) self.ts['step'] = step self.ts['step_name'] = '%s %s' % (step.what, step.text) self.steps_run += 1 m = re.match(step.implementation.regexp, step.text) assert m is not None env = self.clean_env() env['DATADIR'] = datadir env['SRCDIR'] = os.getcwd() for i, match in enumerate(m.groups('')): env['MATCH_%d' % (i+1)] = match shell_script = '%s\n\n%s\n' % ( shell_prelude, step.implementation.shell) exit, stdout, stderr = cliapp.runcmd_unchecked( ['sh', '-xeuc', shell_script], env=env) logging.debug('Exit code: %d' % exit) if stdout: logging.debug('Standard output:\n%s' % self.indent(stdout)) else: logging.debug('Standard output: empty') if stderr: logging.debug('Standard error:\n%s' % self.indent(stderr)) else: logging.debug('Standard error: empty') if exit != 0 and report_error: self.error( 'ERROR: In scenario "%s"\nstep "%s %s" failed,\n' 'with exit code %d:\n' 'Standard output from shell command:\n%s' 'Standard error from shell command:\n%s' % (scenario.name, step.what, step.text, exit, self.indent(stdout), self.indent(stderr))) return exit def scenario_dir(self, tempdir, scenario): return os.path.join(tempdir, self.nice(scenario.name)) def datadir(self, tempdir, scenario): sd = self.scenario_dir(tempdir, scenario) return os.path.join(sd, 'datadir') def snapshot_dir(self, tempdir, scenario, step, step_number): sd = self.scenario_dir(tempdir, scenario) base = '%03d-%s-%s' % (step_number, step.what, self.nice(step.text)) return os.path.join(sd, base) def snapshot_datadir(self, tempdir, datadir, scenario, step_number, step): snapshot = self.snapshot_dir(tempdir, scenario, step, step_number) cliapp.runcmd(['cp', '-a', datadir, snapshot]) def nice(self, name): # Quote a scenario or step name so it forms a nice filename. nice_chars = "abcdefghijklmnopqrstuvwxyz" nice_chars += nice_chars.upper() nice_chars += "0123456789-." nice = [] for c in name: if c in nice_chars: nice.append(c) elif not nice or nice[-1] != '_': nice.append('_') nice = ''.join(nice) return nice def indent(self, s): return ''.join(' %s\n' % line for line in s.splitlines()) YarnRunner(version=cmdtestlib.__version__).run()