#!/usr/bin/python3 # Copyright 2017 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import json import os import time import cliapp import requests import ick2 class WorkerManager(cliapp.Application): def add_settings(self): self.settings.boolean( ['pretend'], 'only pretend to execute shell steps', ) self.settings.string( ['controller'], 'base URL for the controller', metavar='URL', ) self.settings.string( ['name'], 'name of this worker', metavar='URL', ) self.settings.integer( ['sleep'], 'sleep for SECS seconds if there is no work currently', metavar='SECS', default=5, ) def process_args(self, args): self.settings.require('name') name = self.settings['name'] url = self.settings['controller'] print('Worker manager {} starts, controller is {}'.format(name, url)) while True: work = self.get_work(url, name) if work and self.settings['pretend']: self.report_pretend_work(url, name, work) elif work: self.do_work(url, name, work) else: self.sleep_a_little() def sleep_a_little(self): secs = self.settings['sleep'] print('Sleeping for {} seconds'.format(secs)) time.sleep(secs) def get_work(self, url, name): get_work_url = '{}/worker/{}'.format(url, name) print('Getting work from {}'.format(get_work_url)) r = requests.get(get_work_url) if r.status_code != 200 or not r.text: return None print('Response: {!r}'.format(r.text)) return r.json() def report_pretend_work(self, url, name, work): print('Pretending to work: {!r}'.format(work)) snippet_url = '{}/worker/{}/snippet'.format(url, name) snippet = { 'project': work['project'], 'stdout': 'pretending: {}'.format(work['shell']), 'stderr': '', 'exit-code': 0, } self.post_snippet(snippet_url, snippet) def do_work(self, url, name, work): print('Doing work: {!r}'.format(work)) snippet_url = '{}/worker/{}/snippet'.format(url, name) snippet = { 'project': work['project'], 'stdout': '', 'stderr': '', 'exit-code': None, } def post(stream_name, data): print('{}: {!r}'.format(stream_name, data)) s = dict(snippet) s[stream_name] = data self.post_snippet(snippet_url, s) env = dict(os.environ) env['ICK_URL'] = work['git'] exit_code, _, _ = cliapp.runcmd_unchecked( ['bash', '-xeuc', work['shell']], stdout_callback=lambda data: post('stdout', data), stderr_callback=lambda data: post('stderr', data), env=env, ) end_snippet = dict(snippet) end_snippet['exit-code'] = exit_code self.post_snippet(snippet_url, end_snippet) def post_snippet(self, url, snippet): headers = { 'Content-Type': 'application/json', } print('POST {} {!r}'.format(url, snippet)) r = requests.post(url, headers=headers, data=json.dumps(snippet)) if not r.ok: raise cliapp.AppException( 'Error posting data to controller: {} {!r}'.format( r.status_code, r.text)) WorkerManager(version=ick2.__version__).run()