summaryrefslogtreecommitdiff
path: root/cliapp/runcmd_tests.py
blob: f17afa6e9075ea445ecc1c192b4d8b0d54aa0a6f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# Copyright (C) 2011, 2012  Lars Wirzenius
# Copyright (C) 2012  Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

from __future__ import unicode_literals

import os
import subprocess
import tempfile
import unittest

import cliapp


def devnull(msg):
    pass


class RuncmdTests(unittest.TestCase):

    def test_runcmd_executes_true(self):
        self.assertEqual(cliapp.runcmd(['true']), '')

    def test_runcmd_raises_error_on_failure(self):
        self.assertRaises(cliapp.AppException, cliapp.runcmd, ['false'])

    def test_runcmd_returns_stdout_of_command(self):
        self.assertEqual(cliapp.runcmd(['echo', 'hello', 'world']),
                         'hello world\n')

    def test_runcmd_returns_stderr_of_command(self):
        exit_code, out, err = cliapp.runcmd_unchecked(['ls', 'notexist'])
        self.assertNotEqual(exit_code, 0)
        self.assertEqual(out, '')
        self.assertNotEqual(err, '')

    def test_runcmd_pipes_stdin_through_command(self):
        self.assertEqual(cliapp.runcmd(['cat'], feed_stdin='hello, world'),
                         'hello, world')

    def test_runcmd_pipes_stdin_through_two_commands(self):
        self.assertEqual(
            cliapp.runcmd(['cat'], ['cat'], feed_stdin='hello, world'),
            'hello, world')

    def test_runcmd_pipes_stdin_through_command_with_lots_of_data(self):
        data = 'x' * (1024 ** 2)
        self.assertEqual(cliapp.runcmd(['cat'], feed_stdin=data), data)

    def test_runcmd_ignores_failures_on_request(self):
        self.assertEqual(cliapp.runcmd(['false'], ignore_fail=True), '')

    def test_runcmd_obeys_cwd(self):
        self.assertEqual(cliapp.runcmd(['pwd'], cwd='/'), '/\n')

    def test_runcmd_unchecked_returns_values_on_success(self):
        self.assertEqual(cliapp.runcmd_unchecked(['echo', 'foo']),
                         (0, 'foo\n', ''))

    def test_runcmd_unchecked_returns_values_on_failure(self):
        self.assertEqual(cliapp.runcmd_unchecked(['false']),
                         (1, '', ''))

    def test_runcmd_unchecked_runs_simple_pipeline(self):
        self.assertEqual(
            cliapp.runcmd_unchecked(['echo', 'foo'], ['wc', '-c']),
            (0, '4\n', ''))

    def test_runcmd_unchecked_runs_longer_pipeline(self):
        self.assertEqual(
            cliapp.runcmd_unchecked(['echo', 'foo'], ['cat'], ['wc', '-c']),
            (0, '4\n', ''))

    def test_runcmd_redirects_stdin_from_file(self):
        fd, _ = tempfile.mkstemp()
        os.write(fd, 'foobar'.encode())   # send encoded data to stdin
        os.lseek(fd, 0, os.SEEK_SET)
        self.assertEqual(cliapp.runcmd_unchecked(['cat'], stdin=fd),
                         (0, 'foobar', ''))  # runcmd will decode stdout
        os.close(fd)

    def test_runcmd_redirects_stdout_to_file(self):
        fd, filename = tempfile.mkstemp()
        exit_code, _, _ = cliapp.runcmd_unchecked(
            ['echo', 'foo'], stdout=fd)
        os.close(fd)
        with open(filename) as f:
            data = f.read()
        self.assertEqual(exit_code, 0)
        self.assertEqual(data, 'foo\n')

    def test_runcmd_redirects_stderr_to_file(self):
        fd, filename = tempfile.mkstemp()
        exit_code, _, _ = cliapp.runcmd_unchecked(
            ['ls', 'notexist'], stderr=fd)
        os.close(fd)
        with open(filename) as f:
            data = f.read()
        self.assertNotEqual(exit_code, 0)
        self.assertNotEqual(data, '')

    def test_runcmd_unchecked_handles_stdout_err_redirected_to_same_file(self):
        fd, filename = tempfile.mkstemp()
        exit_code, _, _ = cliapp.runcmd_unchecked(
            ['sleep', '2'], stdout=fd, stderr=subprocess.STDOUT)
        os.close(fd)
        with open(filename) as f:
            data = f.read()
        self.assertEqual(exit_code, 0)
        self.assertEqual(data, '')

    def test_runcmd_calls_stdout_callback_when_msg_on_stdout(self):
        msgs = []

        def logger(s):
            msgs.append(s)

            # We return a string to allow the callback to mangle
            # the data being returned.
            return 'foo'

        test_input = 'hello fox'
        _, out, _ = cliapp.runcmd_unchecked(
            ['echo', '-n', test_input], stdout_callback=logger)

        self.assertEqual(out, 'foo')
        self.assertEqual(msgs, [test_input])

    def test_runcmd_calls_stderr_callback_when_msg_on_stderr(self):
        msgs = []

        def logger(s):
            msgs.append(s)

            # We return None to signal that the data should not be
            # mangled.
            return None

        _, _, err = cliapp.runcmd_unchecked(
            ['ls', 'nosuchthing'], stderr_callback=logger)

        # The callback may be called several times, and we have no
        # control over that: output from the subprocess may arrive in
        # drips due to process scheduling and other reasons beyond our
        # control. Thus, we compare the joined string fragments,
        # instead of the list of fragments.

        self.assertNotEqual(err, '')
        self.assertEqual(''.join(msgs), err)


class ShellQuoteTests(unittest.TestCase):

    def test_returns_empty_string_for_empty_string(self):
        self.assertEqual(cliapp.shell_quote(''), '')

    def test_returns_same_string_when_safe(self):
        self.assertEqual(cliapp.shell_quote('abc123'), 'abc123')

    def test_quotes_space(self):
        self.assertEqual(cliapp.shell_quote(' '), "' '")

    def test_quotes_double_quote(self):
        self.assertEqual(cliapp.shell_quote('"'), "'\"'")

    def test_quotes_single_quote(self):
        self.assertEqual(cliapp.shell_quote("'"), '"\'"')