1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import os
import re
import subprocess
def run_ssh(ctx, host=None, cmd=None):
_runcmd(ctx, ['ssh', host, cmd])
if ctx.get('exit') != 0:
print('context:', ctx.as_dict())
assert_eq(ctx.get('exit'), 0)
def run_ssh_with_jumphost(ctx, jumphost=None, host=None, cmd=None):
_runcmd(ctx, ['ssh', '-J', jumphost, host, cmd])
if ctx.get('exit') != 0:
print('context:', ctx.as_dict())
assert_eq(ctx.get('exit'), 0)
def run_git_clone(ctx, repo=None):
_runcmd(ctx, ['git', 'clone', repo])
if ctx.get('exit') != 0:
print('context:', ctx.as_dict())
assert_eq(ctx.get('exit'), 0)
def stdout_matches(ctx, regex=None):
stdout = ctx.get('stdout', '')
assert_ne(re.search(regex, stdout), None)
def dir_exists(ctx, dirname=None):
assert_eq(os.path.isdir(dirname), True)
def _runcmd(ctx, argv):
p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate("")
ctx['argv'] = argv
ctx['stdout'] = stdout.decode('utf-8')
ctx['stderr'] = stderr.decode('utf-8')
ctx['exit'] = p.returncode
|