#!/usr/bin/python3 # Copyright 2017-2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import logging import time import apifw import cliapp import Crypto.PublicKey.RSA import urllib3 import ick2 urllib3.disable_warnings() logging.captureWarnings(False) class WorkerManager(cliapp.Application): def __init__(self, **kwargs): super().__init__(**kwargs) self._token = None self._token_until = None def add_settings(self): self.settings.string( ['controller'], 'base URL for the controller', metavar='URL', ) self.settings.string( ['name'], 'name of this worker', metavar='URL', ) self.settings.string( ['token-key'], 'get token signing private key from FILE', metavar='FILE', ) self.settings.string( ['token-key-pub'], 'this is not used', metavar='NOPE', ) self.settings.integer( ['sleep'], 'sleep for SECS seconds if there is no work currently', metavar='SECS', default=5, ) self.settings.string( ['workspace'], 'use DIR as the workspace where commands are run', metavar='DIR', default='/var/lib/ick/workspace', ) self.settings.string( ['systree'], 'use DIR as the system tree for containers', metavar='DIR', default='/var/lib/ick/systree', ) def process_args(self, args): try: self.main_program(args) except BaseException as e: logging.error(str(e), exc_info=True) raise def main_program(self, args): self.settings.require('name') self.settings.require('controller') name = self.settings['name'] url = self.settings['controller'] workspace = self.settings['workspace'] systree = self.settings['systree'] tg = TokenGenerator() tg.set_key(self.settings['token-key']) api = ControllerAPI(name, url, tg) worker = Worker(name, api, workspace, systree) logging.info('Worker manager %s starts, controller is %s', name, url) api.register() while True: work = api.get_work() if work: worker.do_work(work) else: self.sleep_a_little() def sleep_a_little(self): secs = self.settings['sleep'] time.sleep(secs) class ControllerAPI: def __init__(self, name, url, token_generator): self._token_generator = token_generator self._cc = ick2.ControllerClient() self._cc.set_client_name(name) self._cc.set_controller_url(url) self._blobs = None def get_token(self): return self._token_generator.get_token() def register(self): self._cc.set_token(self.get_token()) self._cc.register() def get_work(self): self._cc.set_token(self.get_token()) return self._cc.get_work() def report_work(self, work): self._cc.set_token(self.get_token()) self._cc.report_work(work) def get_blob_upload_url(self, name): blobs = self.get_blob_client() return blobs.url(name) def upload_blob(self, blob_id, blob): blobs = self.get_blob_client() blobs.upload(blob_id, blob) def download_blob(self, blob_id): blobs = self.get_blob_client() return blobs.download(blob_id) def get_blob_client(self): if self._blobs is None: self._blobs = self._cc.get_blob_client() return self._blobs class TokenGenerator: max_age = 3600 # 1 hour sub = 'subject-uuid' iss = 'localhost' aud = 'localhost' scopes = ' '.join([ 'uapi_version_get', 'uapi_work_id_get', 'uapi_work_post', 'uapi_workers_post', 'uapi_blobs_id_get', 'uapi_blobs_id_put', ]) def __init__(self): self._token = None self._token_key = None self._token_until = None def is_valid(self, now): return ( self._token is not None and (self._token_until is None or now <= self._token_until) ) def set_token(self, token): self._token = token self._token_until = None assert self.is_valid(time.time()) def set_key(self, filename): key_text = self.cat(filename) self._token_key = Crypto.PublicKey.RSA.importKey(key_text) def cat(self, filename): with open(filename) as f: return f.read() def get_token(self): now = time.time() if not self.is_valid(now): self._token = self.create_token() self._token_until = now + self.max_age assert self.is_valid(now) return self._token def create_token(self): now = time.time() claims = { 'iss': self.iss, 'sub': self.sub, 'aud': self.aud, 'exp': now + self.max_age, 'scope': self.scopes, } token = apifw.create_token(claims, self._token_key) return token.decode('ascii') class Worker: def __init__(self, name, api, workspace, systree): self._name = name self._api = api self._workspace = workspace self._systree = systree def do_work(self, work): project_name = work['project'] step = work.get('step', {}) params = work.get('parameters', {}) reporter = ick2.Reporter(self._api, work) af = ick2.ActionFactory(self._systree, self._workspace, reporter) af.set_token(self._api.get_token()) af.set_blob_url_func(self._api.get_blob_upload_url) action = af.create_action(step, project_name) exit_code = action.execute(params, step) if __name__ == '__main__': WorkerManager(version=ick2.__version__).run()