summaryrefslogtreecommitdiff
path: root/systest
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2011-06-05 17:35:28 +0100
committerLars Wirzenius <liw@liw.fi>2011-06-05 17:35:28 +0100
commit2ad9f2c32c4ade313dbefb96235939ad393f5420 (patch)
tree0ad91575fda3455c15f8c0922e926363908afa9d /systest
downloadsystest-2ad9f2c32c4ade313dbefb96235939ad393f5420.tar.gz
Rough first draft of a systest tool.
Diffstat (limited to 'systest')
-rwxr-xr-xsystest116
1 files changed, 116 insertions, 0 deletions
diff --git a/systest b/systest
new file mode 100755
index 0000000..2b2af1b
--- /dev/null
+++ b/systest
@@ -0,0 +1,116 @@
+#!/usr/bin/python
+
+import cliapp
+import re
+import subprocess
+
+
+class AssertionFailure(cliapp.AppException):
+
+ def __init__(self, msg):
+ self._str = msg
+
+ def __str__(self):
+ return self._str
+
+
+class SystemTest(cliapp.Application):
+
+ def add_settings(self):
+ self.settings.boolean(['verbose', 'v'],
+ 'print names of tests when run')
+ self.settings.string(['target'], 'target domain name or IP address')
+ self.settings.string(['user'], 'user on target')
+ self.settings.string(['user-password'], 'password for target user')
+
+ def process_args(self, args):
+ if args:
+ self.execute_tests(args)
+ else:
+ self.execute_tests(self.list_tests())
+
+ def execute_tests(self, testnames):
+ for i, testname in enumerate(testnames):
+ methodname = self.mangle(testname)
+ if hasattr(self, methodname):
+ if self.settings['verbose']:
+ self.output.write('test %d/%d: %s\n' %
+ (i+1, len(testnames), testname))
+ getattr(self, methodname)()
+ else:
+ raise cliapp.AppException('unknown test: %s' % testname)
+
+ def list_tests(self):
+ return [self.unmangle(x) for x in dir(self) if x.startswith('test')]
+
+ def mangle(self, testname):
+ return 'test_' + testname.replace('-', '_')
+
+ def unmangle(self, methodname):
+ assert methodname.startswith('test_')
+ methodname = methodname[len('test_'):]
+ return methodname.replace('_', '-')
+
+ def hostcmd(self, argv, stdin=None):
+ p = subprocess.Popen(argv, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = p.communicate(stdin)
+ if p.returncode:
+ raise cliapp.AppException('host command failed: %s\n%s' %
+ (' '.join(argv), err))
+ return out
+
+ def targetcmd(self, argv, *args, **kwargs):
+ return self.hostcmd(['ssh', '-l', self.settings['user'],
+ self.settings['target']] + argv,
+ *args, **kwargs)
+
+ def assert_(self, cond, automsg=None, msg=None):
+ if not cond:
+ output = 'Assertion failed:'
+ if automsg is None:
+ output += ' ' + repr(cond)
+ else:
+ output += ' ' + automsg
+ if msg is not None:
+ output += ' ' + msg
+ raise AssertionFailure(output)
+
+ def assertEqual(self, v1, v2, msg=None):
+ self.assert_(v1 == v2, automsg='%s != %s' % (repr(v1), repr(v2)),
+ msg=msg)
+
+ def assertMatches(self, pat, text, msg=None):
+ self.assert_(re.match(pat, text),
+ automsg='pattern %s does not match %s' % (pat, text),
+ msg=msg)
+
+ def test_only_listens_on_ssh_port_externally(self):
+ out = self.hostcmd(['nmap', self.settings['target']])
+ ports = [line.split()[0]
+ for line in out.splitlines()
+ if ' open ' in line]
+ self.assertEqual(ports, ['22/tcp'])
+
+ def test_ssh_login_works(self):
+ user = self.settings['user']
+ out = self.hostcmd(['ssh', '-l', user, self.settings['target'], 'id'])
+ self.assertMatches(r'^uid=1000\(%s\)' % user, out)
+
+ def test_manages_at_least_one_dns_lookup(self):
+ out = self.targetcmd(['host', 'www.debian.org'])
+ self.assert_('www.debian.org' in out)
+
+ def test_pings_localhost(self):
+ self.targetcmd(['ping', '-c1', 'localhost'])
+
+ def test_ping_localhost_using_ipv6(self):
+ self.targetcmd(['ping6', '-c1', 'ip6-localhost'])
+
+# def test_sudo_works(self):
+# out = self.targetcmd(['sudo', 'id'], stdin=self.password)
+# self.assertMatches(r'^uid=0\(root\)', out)
+
+
+if __name__ == '__main__':
+ SystemTest().run()