#!/usr/bin/python3 # Copyright 2017 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json import logging import sys import time import apifw import cliapp import Crypto.PublicKey.RSA import requests import urllib3 import ick2 urllib3.disable_warnings() logging.captureWarnings(False) class WorkerManager(cliapp.Application): def __init__(self, **kwargs): super().__init__(**kwargs) self._token = None self._token_until = None def add_settings(self): self.settings.boolean( ['pretend'], 'only pretend to execute shell steps', ) self.settings.string( ['controller'], 'base URL for the controller', metavar='URL', ) self.settings.string( ['name'], 'name of this worker', metavar='URL', ) self.settings.string( ['token'], 'use TOKEN for all controller HTTP requests', metavar='TOKEN', ) self.settings.string( ['token-key'], 'get token signing private key from FILE', metavar='FILE', ) self.settings.string( ['token-key-pub'], 'get token signing public key from FILE', metavar='FILE', ) self.settings.integer( ['sleep'], 'sleep for SECS seconds if there is no work currently', metavar='SECS', default=5, ) def process_args(self, args): self.settings.require('name') name = self.settings['name'] url = self.settings['controller'] self.show_msg( 'Worker manager {} starts, controller is {}'.format(name, url)) while True: work = self.get_work(url, name) if work and self.settings['pretend']: self.report_pretend_work(url, name, work) elif work: self.do_work(url, name, work) else: self.show_msg('Nothing to do') self.sleep_a_little() def sleep_a_little(self): secs = self.settings['sleep'] self.show_msg('Sleeping for {} seconds'.format(secs)) time.sleep(secs) def get_work(self, url, name): get_work_url = '{}/work/{}'.format(url, name) self.show_msg('Getting work from {}'.format(get_work_url)) headers = self.get_auth_headers() r = requests.get(get_work_url, headers=headers, verify=False) if r.status_code != 200 or not r.text: return None work = r.json() self.show_json('Response:', work) return work def get_auth_headers(self): token = self.get_token() return { 'Authorization': 'Bearer {}'.format(token), } def get_token(self): token = self.settings['token'] token_key = self.settings['token-key'] token_key_pub = self.settings['token-key-pub'] now = time.time() if self._token is None or now >= self._token_until: if token: self._token = token self._token_until = now + 3600 elif token_key and token_key_pub: with open(token_key) as f1, open(token_key_pub) as f2: key_text = f1.read() pub_text = f2.read() self._token = self.create_token(key_text, pub_text) self._token_until = now + 3600 else: sys.exit('No token and no way to create') assert self._token is not None return self._token def create_token(self, key_text, pub_text): iss = 'localhost' aud = 'localhost' scopes_list = [ 'uapi_work_id_get', 'uapi_work_post', ] key = Crypto.PublicKey.RSA.importKey(key_text) scopes = ' '.join(scopes_list) now = time.time() claims = { 'iss': iss, 'sub': 'subject-uuid', 'aud': aud, 'exp': now + 86400, 'scope': scopes, } token = apifw.create_token(claims, key) return token.decode('ascii') def report_pretend_work(self, url, name, work): self.show_msg('Pretending to work: {!r}'.format(work)) snippet_url = '{}/work/{}'.format(url, name) snippet = { 'build_id': work['build_id'], 'worker': name, 'project': work['project'], 'stdout': '', 'stderr': '', 'exit_code': None, 'timestamp': self.now(), } self.post_snippet(snippet_url, snippet) def do_work(self, url, name, work): self.show_msg('Doing work') snippet_url = '{}/work'.format(url) snippet = { 'build_id': work['build_id'], 'worker': name, 'project': work['project'], 'pipeline': work['pipeline'], 'stdout': '', 'stderr': '', 'exit_code': None, 'timestamp': self.now(), } def post(stream_name, data): data = data.decode('UTF-8') s = dict(snippet) s[stream_name] = data self.post_snippet(snippet_url, s) shell_cmd = work['step']['shell'] exit_code, _, _ = cliapp.runcmd_unchecked( ['bash', '-xeuc', shell_cmd], stdout_callback=lambda data: post('stdout', data), stderr_callback=lambda data: post('stderr', data), ) end_snippet = dict(snippet) end_snippet['exit_code'] = exit_code self.post_snippet(snippet_url, end_snippet) def now(self): return time.strftime('%Y-%m-%dT%H:%M:%S') def post_snippet(self, url, snippet): headers = { 'Content-Type': 'application/json', } headers.update(self.get_auth_headers()) self.show_json('POST {}'.format(url), snippet) r = requests.post( url, headers=headers, data=json.dumps(snippet), verify=False) if not r.ok: raise cliapp.AppException( 'Error posting data to controller: {} {!r}'.format( r.status_code, r.text)) def show_msg(self, msg): sys.stdout.write('------\n{}\n'.format(msg)) logging.info(msg) def show_json(self, prelude, obj): msg = '{}: {}'.format(prelude, json.dumps(obj, indent=4)) sys.stdout.write('------\n{}\n'.format(msg)) logging.info(msg) WorkerManager(version=ick2.__version__).run()