#!/usr/bin/python3 # Copyright 2017 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json import logging import sys import time import apifw import cliapp import Crypto.PublicKey.RSA import requests import urllib3 import ick2 urllib3.disable_warnings() logging.captureWarnings(False) class WorkerManager(cliapp.Application): def __init__(self, **kwargs): super().__init__(**kwargs) self._token = None self._token_until = None def add_settings(self): self.settings.boolean( ['pretend'], 'only pretend to execute shell steps', ) self.settings.string( ['controller'], 'base URL for the controller', metavar='URL', ) self.settings.string( ['name'], 'name of this worker', metavar='URL', ) self.settings.string( ['token'], 'use TOKEN for all controller HTTP requests', metavar='TOKEN', ) self.settings.string( ['token-key'], 'get token signing private key from FILE', metavar='FILE', ) self.settings.string( ['token-key-pub'], 'get token signing public key from FILE', metavar='FILE', ) self.settings.integer( ['sleep'], 'sleep for SECS seconds if there is no work currently', metavar='SECS', default=5, ) self.settings.string( ['workspace'], 'use DIR as the workspace where commands are run', metavar='DIR', default='/var/lib/ick/workspace', ) def process_args(self, args): self.settings.require('name') name = self.settings['name'] url = self.settings['controller'] self.show_msg( 'Worker manager {} starts, controller is {}'.format(name, url)) self.register(url, name) while True: work = self.get_work(url, name) if work and self.settings['pretend']: self.report_pretend_work(url, name, work) elif work: self.do_work(url, name, work, self.settings['workspace']) else: self.sleep_a_little() def register(self, url, name): logging.info('Registering worker %s to %s', name, url) register_url = '{}/workers'.format(url) headers = self.get_auth_headers() body = { 'worker': name, } r = requests.post( register_url, json=body, headers=headers, verify=False) if not r.ok: logging.warning('Error registering worker: %d', r.status_code) else: logging.info('Registered worker successfully') def sleep_a_little(self): secs = self.settings['sleep'] time.sleep(secs) def get_work(self, url, name): get_work_url = '{}/work/{}'.format(url, name) headers = self.get_auth_headers() r = requests.get(get_work_url, headers=headers, verify=False) if r.status_code != 200 or not r.text: return None work = r.json() if work: self.show_json('Response:', work) return work def get_auth_headers(self): token = self.get_token() return { 'Authorization': 'Bearer {}'.format(token), } def get_token(self): token = self.settings['token'] token_key = self.settings['token-key'] token_key_pub = self.settings['token-key-pub'] now = time.time() if self._token is None or now >= self._token_until: if token: self._token = token self._token_until = now + 3600 elif token_key and token_key_pub: with open(token_key) as f1, open(token_key_pub) as f2: key_text = f1.read() pub_text = f2.read() self._token = self.create_token(key_text, pub_text) self._token_until = now + 3600 else: sys.exit('No token and no way to create') assert self._token is not None return self._token def create_token(self, key_text, pub_text): iss = 'localhost' aud = 'localhost' scopes_list = [ 'uapi_work_id_get', 'uapi_work_post', 'uapi_workers_post', ] key = Crypto.PublicKey.RSA.importKey(key_text) scopes = ' '.join(scopes_list) now = time.time() claims = { 'iss': iss, 'sub': 'subject-uuid', 'aud': aud, 'exp': now + 86400, 'scope': scopes, } token = apifw.create_token(claims, key) return token.decode('ascii') def report_pretend_work(self, url, name, work): self.show_msg('Pretending to work: {!r}'.format(work)) snippet_url = '{}/work/{}'.format(url, name) snippet = { 'build_id': work['build_id'], 'worker': name, 'project': work['project'], 'stdout': '', 'stderr': '', 'exit_code': None, 'timestamp': self.now(), } self.post_snippet(snippet_url, snippet) def do_work(self, url, name, work, workspace): self.show_msg('Doing work') snippet_url = '{}/work'.format(url) snippet = { 'build_id': work['build_id'], 'worker': name, 'project': work['project'], 'pipeline': work['pipeline'], 'stdout': '', 'stderr': '', 'exit_code': None, 'timestamp': self.now(), } def post(stream_name, data): data = data.decode('UTF-8') s = dict(snippet) s[stream_name] = data self.post_snippet(snippet_url, s) def run_with_interp(argv_prefix, cmd): exit_code, _, _ = cliapp.runcmd_unchecked( argv_prefix + [cmd], stdout_callback=lambda data: post('stdout', data), stderr_callback=lambda data: post('stderr', data), cwd=workspace, ) return exit_code def run_shell(shell_cmd): return run_with_interp(['bash', '-xeuc'], shell_cmd) def run_python(python_cmd): return run_with_interp(['python3', '-c'], python_cmd) def run_debootstrap(step): shell_cmd = "sudo debootstrap '{}' . '{}'".format( step['debootstrap'], step.get('mirror', 'NOMIRROR')) run_shell(shell_cmd) def run_cmd(step): logging.info('Running step: %r', step) if 'shell' in step: run_shell(step['shell']) elif 'python' in step: run_python(step['python']) elif 'debootstrap' in step: run_debootstrap(step) exit_code = 0 if work.get('fresh_workspace'): logging.info('Make an empty workspace') exit_code = run_shell('find . -delete') if exit_code == 0: run_cmd(work['step']) end_snippet = dict(snippet) end_snippet['exit_code'] = exit_code self.post_snippet(snippet_url, end_snippet) def now(self): return time.strftime('%Y-%m-%dT%H:%M:%S') def post_snippet(self, url, snippet): headers = { 'Content-Type': 'application/json', } headers.update(self.get_auth_headers()) self.show_json('POST {}'.format(url), snippet) r = requests.post( url, headers=headers, data=json.dumps(snippet), verify=False) if not r.ok: raise cliapp.AppException( 'Error posting data to controller: {} {!r}'.format( r.status_code, r.text)) def show_msg(self, msg): sys.stdout.write('------\n{}\n'.format(msg)) logging.info(msg) def show_json(self, prelude, obj): msg = '{}: {}'.format(prelude, json.dumps(obj, indent=4)) sys.stdout.write('------\n{}\n'.format(msg)) logging.info(msg) WorkerManager(version=ick2.__version__).run()