# Copyright 2017 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # =*= License: GPL-3+ =*= import email import imaplib import os import subprocess import sys import urlparse import cliapp import requests import yaml variables_filename = os.environ.get('VARIABLES', 'vars.yaml') class YarnHelper(object): def __init__(self): self._env = dict(os.environ) self._next_match = 1 self._variables = None # None means not loaded, otherwise dict def set_environment(self, env): self._env = dict(env) def get_next_match(self): name = 'MATCH_{}'.format(self._next_match) if name not in self._env: raise Error('no next match') self._next_match += 1 return self._env[name] def get_variable(self, name, default=None): if self._variables is None: self._variables = self._load_variables() return self._variables.get(name, default) def _load_variables(self): if os.path.exists(variables_filename): with open(variables_filename, 'r') as f: return yaml.safe_load(f) return {} def set_variable(self, name, value): if self._variables is None: self._variables = {} self._variables[name] = value self._save_variables(self._variables) def _save_variables(self, variables): with open(variables_filename, 'w') as f: yaml.safe_dump(variables, f) def append_to_list(self, list_name, value): if self._variables is None: self._variables = self._load_variables() items = self._variables.get(list_name, []) items.append(value) self.set_variable(list_name, items) def construct_aliased_http_request( self, address, method, url, data=None, headers=None): if headers is None: headers = {} parts = list(urlparse.urlparse(url)) headers['Host'] = parts[1] parts[1] = address aliased_url = urlparse.urlunparse(parts) r = requests.Request(method, aliased_url, data=data, headers=headers) return r.prepare() def http_get(self, address, url): # pragma: no cover r = self.construct_aliased_http_request(address, 'GET', url) s = requests.Session() resp = s.send(r) return resp.status_code, resp.content def assertEqual(self, a, b): if a != b: raise Error('assertion {!r} == {!r} failed'.format(a, b)) def assertNotEqual(self, a, b): if a == b: raise Error('assertion {!r} != {!r} failed'.format(a, b)) def assertGreaterThan(self, a, b): if a <= b: raise Error('assertion {!r} > {!r} failed'.format(a, b)) def assertIn(self, a, b): if a not in b: raise Error('assertion {!r} in {!r} failed'.format(a, b)) def get_password_with_pass(self, pass_home, pass_name): # pragma: no cover p = subprocess.Popen( ['env', 'HOME={}'.format(pass_home), 'pass', 'show', pass_name], stdout=subprocess.PIPE) stdout, stderr = p.communicate() password = stdout.rstrip() return password def iterate_mails_in_imap_mailbox( self, address, user, password, callback, exp): # pragma: no cover m = imaplib.IMAP4_SSL(address) m.login(user, password) m.select('INBOX', False) typ, data = m.search(None, 'ALL') for num in data[0].split(): typ, data = m.fetch(num, '(RFC822)') typ, text = data[0] msg = email.message_from_string(text) callback(m, num, msg) if exp: m.expunge() m.close() m.logout() def ssh_keygen(self, user): # pragma: no cover filename = self.ssh_key_file_for_user(user) cliapp.runcmd(['ssh-keygen', '-f', filename, '-N', '', '-C', user]) with open(filename + '.pub') as f: return f.read() def ssh_key_file_for_user(self, user): # pragma: no cover return os.path.abspath('{}.key'.format(user)) def repo_ssh_url(self, repo): # pragma: no cover return 'ssh://git@{}/{}'.format(os.environ['GITANO_SERVER'], repo) def local_checkout_dirname(self, user, repo): # pragma: no cover return '{}_{}'.format(user, repo) def git_as(self, user, args, **kwargs): # pragma: no cover server = os.environ['GITANO_SERVER'] env = dict(os.environ) env['GIT_SSH_COMMAND'] = self.env_ssh_command(user) sys.stdout.write('env:\n') for key in env: sys.stdout.write(' {}={}\n'.format(key, env[key])) sys.stdout.write('git args: {}\n'.format(args)) return cliapp.runcmd_unchecked( ['git'] + args, stderr=subprocess.STDOUT, env=env, **kwargs) def git_as_checked(self, user, args, **kwargs): # pragma: no cover exit, out, err = self.git_as(user, args, **kwargs) sys.stdout.write('STDOUT from git:\n{}'.format(out)) sys.stderr.write('STDERR from git:\n{}'.format(err)) self.assertEqual(exit, 0) def env_ssh_command(self, user): # pragma: no cover argv = [ 'ssh', '-o', 'PasswordAuthentication=no', '-o', 'IdentitiesOnly=yes', '-i', self.get_user_ssh_key(user), ] return ' '.join(argv) def get_admin_ssh_key(self): # pragma: no cover return os.environ['ADMIN_SSH_KEY'] def get_user_ssh_key(self, user): # pragma: no cover if user is None: return self.get_admin_ssh_key() else: return self.ssh_key_file_for_user(user) def gitano(self, user, args, stdin=None): # pragma: no cover server = os.environ['GITANO_SERVER'] kwargs = { 'stderr': subprocess.STDOUT, 'ssh_options': [ '-oPasswordAuthentication=no', '-oIdentitiesOnly=yes', '-i', self.get_user_ssh_key(user), ], } if stdin is not None: kwargs['feed_stdin'] = stdin return cliapp.ssh_runcmd( 'git@{}'.format(server), args.split(), **kwargs) def gitano_confirm_with_token(self, prefix, which): # pragma: no cover try: output = self.gitano(None, '{} {}'.format(prefix, which)) except cliapp.AppException: pass else: last_line = output.splitlines()[-1] token = last_line.split()[-1] self.gitano(None, '{} {} {}'.format(prefix, which, token)) class Error(Exception): pass