#!/usr/bin/python3 # Copyright 2017-2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import logging import time import cliapp import urllib3 import ick2 urllib3.disable_warnings() logging.captureWarnings(False) class WorkerManager(cliapp.Application): def __init__(self, **kwargs): super().__init__(**kwargs) self._token = None def add_settings(self): self.settings.string( ['client-id'], 'use ID as the client id when authenticatin to IDP', metavar='ID', ) self.settings.string( ['client-secret-cmd'], 'run CMD to gget the client secret when authentication to IDP', metavar='CMD', ) self.settings.string( ['controller'], 'base URL for the controller', metavar='URL', ) self.settings.integer( ['sleep'], 'sleep for SECS seconds if there is no work currently', metavar='SECS', default=5, ) self.settings.string( ['workspace'], 'use DIR as the workspace where commands are run', metavar='DIR', default='/var/lib/ick/workspace', ) self.settings.string( ['systree'], 'use DIR as the system tree for containers', metavar='DIR', default='/var/lib/ick/systree', ) self.settings.boolean( ['verify-tls'], 'verify API provider TLS certificate ' '(default is verify, use --no-verify-tls)', default=True, ) def process_args(self, args): try: self.main_program(args) except BaseException as e: logging.error(str(e), exc_info=True) raise def main_program(self, args): self.settings.require('client-id') self.settings.require('client-secret-cmd') self.settings.require('controller') name = self.settings['client-id'] url = self.settings['controller'] workspace = self.settings['workspace'] systree = self.settings['systree'] secret = self.get_client_secret() api = ControllerAPI(name, url) api.set_verify_tls(self.settings['verify-tls']) api.set_client_creds(name, secret) worker = Worker(name, api, workspace, systree) logging.info('Worker manager %s starts, controller is %s', name, url) api.register() while True: work = api.get_work() if work: worker.do_work(work) else: self.sleep_a_little() def sleep_a_little(self): secs = self.settings['sleep'] time.sleep(secs) def get_client_secret(self): cmd = self.settings['client-secret-cmd'] output = cliapp.runcmd(['sh', '-c', cmd]) lines = output.splitlines() return lines[0].strip() class ControllerAPI: _scopes = ' '.join([ 'uapi_version_get', 'uapi_notify_post', 'uapi_work_get', 'uapi_work_post', 'uapi_workers_post', 'uapi_blobs_id_get', 'uapi_blobs_id_put', 'uapi_builds_id_get', 'uapi_logs_id_get', ]) def __init__(self, name, url): self._cc = ick2.ControllerClient() self._cc.set_client_name(name) self._cc.set_controller_url(url) self._ac = None self._blobs = None self._client_id = None self._client_secret = None def get_controller_client(self): return self._cc def set_verify_tls(self, verify): self._cc.set_verify_tls(verify) def set_client_creds(self, client_id, client_secret): self._client_id = client_id self._client_secret = client_secret def get_token(self): if self._ac is None: self._ac = self._cc.get_auth_client() self._ac.set_client_creds(self._client_id, self._client_secret) return self._ac.get_token(self._scopes) def register(self): self._cc.set_token(self.get_token()) self._cc.register() def get_work(self): self._cc.set_token(self.get_token()) return self._cc.get_work() def report_work(self, work): self._cc.set_token(self.get_token()) self._cc.report_work(work) def get_blob_upload_url(self, name): blobs = self.get_blob_client() return blobs.url(name) def upload_blob(self, blob_id, blob): blobs = self.get_blob_client() blobs.upload(blob_id, blob) def download_blob(self, blob_id): blobs = self.get_blob_client() return blobs.download(blob_id) def get_blob_client(self): if self._blobs is None: self._blobs = self._cc.get_blob_client() return self._blobs class Worker: def __init__(self, name, api, workspace, systree): self._name = name self._api = api self._workspace = workspace self._systree = systree def do_work(self, work): project_name = work['project'] build_number = work['build_number'] build_id = work['build_id'] step = work.get('step', {}) params = work.get('parameters', {}) reporter = ick2.Reporter(self._api, work) af = ick2.ActionFactory( build_id, self._systree, self._workspace, reporter) af.set_controller_client(self._api.get_controller_client()) af.set_token(self._api.get_token()) af.set_blob_url_func(self._api.get_blob_upload_url) af.add_env_var('LC_ALL', 'C') af.add_env_var('DEBIAN_FRONTEND', 'noninteractive') af.add_env_var('BUILD_NUMBER', str(build_number)) action = af.create_action(step, project_name) exit_code = action.execute(params, step) if __name__ == '__main__': WorkerManager(version=ick2.__version__).run()