summaryrefslogtreecommitdiff
path: root/systest
blob: 5055b62e44ad0c47394132ac38a5bb58bd48cdf4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/python

import cliapp
import re
import subprocess


class AssertionFailure(cliapp.AppException):

    def __init__(self, msg):
        self._str = msg
        
    def __str__(self):
        return self._str


class SystemTest(cliapp.Application):

    def add_settings(self):
        self.settings.boolean(['verbose', 'v'], 
                              'print names of tests when run')
        self.settings.string(['target'], 'target domain name or IP address')
        self.settings.string(['user'], 'user on target')
        self.settings.string(['user-password'], 'password for target user')
        self.settings.boolean(['list-tests'], 'list known tests')

    def process_args(self, args):
        if self.settings['list-tests']:
            for testname in self.list_tests():
                self.output.write('%s\n' % testname)
        elif args:
            self.execute_tests(args)
        else:
            self.execute_tests(self.list_tests())

    def execute_tests(self, testnames):
        for i, testname in enumerate(testnames):
            methodname = self.mangle(testname)
            if hasattr(self, methodname):
                if self.settings['verbose']:
                    self.output.write('test %d/%d: %s\n' % 
                                      (i+1, len(testnames), testname))
                getattr(self, methodname)()
            else:
                raise cliapp.AppException('unknown test: %s' % testname)

    def list_tests(self):
        return [self.unmangle(x) for x in dir(self) if x.startswith('test')]

    def mangle(self, testname):
        return 'test_' + testname.replace('-', '_')
    
    def unmangle(self, methodname):
        assert methodname.startswith('test_')
        methodname = methodname[len('test_'):]
        return methodname.replace('_', '-')

    def hostcmd(self, argv, stdin=None):
        p = subprocess.Popen(argv, 
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE, 
                             stderr=subprocess.PIPE)
        out, err = p.communicate(stdin)
        if p.returncode:
            raise cliapp.AppException('host command failed: %s\n%s' %
                                       (' '.join(argv), err))
        return out

    def targetcmd(self, argv, *args, **kwargs):
        return self.hostcmd(['ssh', '-l', self.settings['user'],
                              self.settings['target']] + argv,
                             *args, **kwargs)

    def assert_(self, cond, automsg=None, msg=None):
        if not cond:
            output = 'Assertion failed:'
            if automsg is None:
                output += ' ' + repr(cond)
            else:
                output += ' ' + automsg
            if msg is not None:
                output += ' ' + msg
            raise AssertionFailure(output)

    def assertEqual(self, v1, v2, msg=None):
        self.assert_(v1 == v2, automsg='%s != %s' % (repr(v1), repr(v2)),
                     msg=msg)

    def assertMatches(self, pat, text, msg=None):
        self.assert_(re.match(pat, text),
                     automsg='pattern %s does not match %s' % (pat, text),
                     msg=msg)

    def test_only_ssh_port(self):
        out = self.hostcmd(['nmap', self.settings['target']])
        ports = [line.split()[0]
                 for line in out.splitlines()
                 if ' open ' in line]
        self.assertEqual(ports, ['22/tcp'])

    def test_ssh_login(self):
        user = self.settings['user']
        out = self.hostcmd(['ssh', '-l', user, self.settings['target'], 'id'])
        self.assertMatches(r'^uid=1000\(%s\)' % user, out)

    def test_simple_dns_lookup(self):
        out = self.targetcmd(['host', 'www.debian.org'])
        self.assert_('www.debian.org' in out)
        
    def test_ping_localhost(self):
        self.targetcmd(['ping', '-c1', 'localhost'])
        
    def test_ping6_localhost(self):
        self.targetcmd(['ping6', '-c1', 'ip6-localhost'])
        
    def test_cat(self):
        out = self.targetcmd(['cat'], stdin='foo')
        self.assertEqual(out, 'foo')

#    def test_sudo_works(self):
#        out = self.targetcmd(['sudo', 'id'], stdin=self.password)
#        self.assertMatches(r'^uid=0\(root\)', out)


if __name__ == '__main__':
    SystemTest().run()