summaryrefslogtreecommitdiff
path: root/worker_manager
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2018-03-30 10:02:47 +0300
committerLars Wirzenius <liw@liw.fi>2018-03-30 11:52:19 +0300
commit6582e63e1d4efec128bcf8c0d7103643abc8650d (patch)
treec3bc045d10e3990c155e4fb9b46c6f4c7192b999 /worker_manager
parent9784f0ec903ed18af0e0a8f056ee6ef23c87d41f (diff)
downloadick2-6582e63e1d4efec128bcf8c0d7103643abc8650d.tar.gz
Change: worker_manager to use new classes in ick2
Diffstat (limited to 'worker_manager')
-rwxr-xr-xworker_manager465
1 files changed, 44 insertions, 421 deletions
diff --git a/worker_manager b/worker_manager
index ec8c19f..ad01fd7 100755
--- a/worker_manager
+++ b/worker_manager
@@ -15,12 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import base64
-import json
import logging
-import os
-import subprocess
-import tempfile
import time
import apifw
@@ -88,16 +83,25 @@ class WorkerManager(cliapp.Application):
)
def process_args(self, args):
+ try:
+ self.main_program(args)
+ except BaseException as e:
+ logging.error(str(e), exc_info=True)
+ raise
+
+ def main_program(self, args):
self.settings.require('name')
self.settings.require('controller')
name = self.settings['name']
url = self.settings['controller']
+ workspace = self.settings['workspace']
+ systree = self.settings['systree']
+
tg = TokenGenerator()
tg.set_key(self.settings['token-key'])
api = ControllerAPI(name, url, tg)
- worker = Worker(
- name, api, self.settings['workspace'], self.settings['systree'])
+ worker = Worker(name, api, workspace, systree)
logging.info('Worker manager %s starts, controller is %s', name, url)
@@ -117,94 +121,43 @@ class WorkerManager(cliapp.Application):
class ControllerAPI:
def __init__(self, name, url, token_generator):
- self._name = name
- self._url = url
- self._blob_url = None
self._token_generator = token_generator
- self._httpapi = HttpApi()
self._cc = ick2.ControllerClient()
self._cc.set_client_name(name)
self._cc.set_controller_url(url)
+ self._blobs = None
+
+ def get_token(self):
+ return self._token_generator.get_token()
def register(self):
- self._cc.set_token(self._token_generator.get_token())
+ self._cc.set_token(self.get_token())
self._cc.register()
def get_work(self):
- self._cc.set_token(self._token_generator.get_token())
+ self._cc.set_token(self.get_token())
return self._cc.get_work()
def report_work(self, work):
- self._cc.set_token(self._token_generator.get_token())
+ self._cc.set_token(self.get_token())
self._cc.report_work(work)
- def url(self, path):
- return '{}{}'.format(self._url, path)
-
- def get_auth_headers(self):
- token = self._token_generator.get_token()
- return {
- 'Authorization': 'Bearer {}'.format(token),
- }
+ def get_blob_upload_url(self, name):
+ blobs = self.get_blob_client()
+ return blobs.url(name)
def upload_blob(self, blob_id, blob):
- logging.info('Upload blob %s', blob_id)
- url = self.bloburl(blob_id)
- headers = self.get_auth_headers()
- code = self._httpapi.put(url, headers, blob)
+ blobs = self.get_blob_client()
+ blobs.upload(blob_id, blob)
def download_blob(self, blob_id):
- logging.info('Download blob %s', blob_id)
- url = self.bloburl(blob_id)
- headers = self.get_auth_headers()
- return self._httpapi.get_blob(url, headers)
-
- def bloburl(self, blob_id):
- if self._blob_url is None:
- self._blob_url = self.get_artifact_store_url()
- if self._blob_url is not None:
- return '{}/blobs/{}'.format(self._blob_url, blob_id)
- logging.error('Do not artifact store URL')
- return None
-
- def get_artifact_store_url(self):
- url = self.url('/version')
- headers = self.get_auth_headers()
- version = self._httpapi.get(url, headers)
- logging.info('Version: %r', version)
- if version:
- return version.get('artifact_store')
-
-
-class HttpApi:
-
- def __init__(self):
- self._httpapi = ick2.HttpAPI()
-
- def post(self, url, headers, body):
- ret = self._request(self._httpapi.post, url, headers, body=body)
- if ret is None:
- return 400
- return 201
+ blobs = self.get_blob_client()
+ return blobs.download(blob_id)
- def put(self, url, headers, body):
- ret = self._request(self._httpapi.put, url, headers, body=body)
- if ret is None:
- return 400
- return 200
-
- def get(self, url, headers):
- return self._request(self._httpapi.get_dict, url, headers)
-
- def get_blob(self, url, headers):
- return self._request(self._httpapi.get_blob, url, headers)
-
- def _request(self, func, url, headers, **kwargs):
- try:
- return func(url, headers=headers, **kwargs)
- except ick2.HttpError as e:
- logging.warning('Error: %s returned %s', url, e)
- return None
+ def get_blob_client(self):
+ if self._blobs is None:
+ self._blobs = self._cc.get_blob_client()
+ return self._blobs
class TokenGenerator:
@@ -277,353 +230,23 @@ class Worker:
self._systree = systree
def do_work(self, work):
-
- def post(stream_name, data):
- s = self.get_work_result(work)
- s[stream_name] = data
- self._api.report_work(s)
-
- step = work['step']
- logging.info('Running step: %r', step)
- exit_code = 0
- if exit_code == 0:
- klass = self.worker_factory(step)
- if klass is None:
- exit_code = -1
- else:
- worker = klass(self._api, self._workspace, self._systree, post)
- exit_code = worker.do(work)
- self.finish_work(work, exit_code)
-
- def get_work_result(self, work):
- return {
- 'build_id': work['build_id'],
- 'worker': self._name,
- 'project': work['project'],
- 'pipeline': work['pipeline'],
- 'stdout': '',
- 'stderr': '',
- 'exit_code': None,
- 'timestamp': self.now(),
- }
-
- def now(self):
- return time.strftime('%Y-%m-%dT%H:%M:%S')
-
- def worker_factory(self, step):
- table = [
- ('shell', None, ShellWorker),
- ('python', None, PythonWorker),
- ('debootstrap', None, DebootstrapWorker),
- ('archive', None, WorkspaceArchiver),
- ('action', 'populate_systree', SystreePopulator),
- ('action', 'create_workspace', WorkspaceCleaner),
- ]
-
- for key, value, klass in table:
- if key in step and (value is None or step[key] == value):
- return klass
-
- logging.warning('Cannot find worker for %s', step)
- return None
-
- def finish_work(self, work, exit_code):
- s = self.get_work_result(work)
- s['exit_code'] = exit_code
- self._api.report_work(s)
-
-
-class Runner:
-
- def __init__(self, post_output):
- self._post = post_output
- self._buffers = {'stdout': '', 'stderr': ''}
- self._maxbuf = 1024 * 256
- self._timeout = 1.0
-
- def runcmd(self, argv, **kwargs):
- logging.debug('Runner.runcmd: argv=%r %r', argv, kwargs)
- exit_code, _, _ = cliapp.runcmd_unchecked(
- argv,
- stdout_callback=self.capture_stdout,
- stderr=subprocess.STDOUT,
- output_timeout=self._timeout,
- timeout_callback=self.flush,
- **kwargs
- )
- self.flush()
- logging.debug('Runner.runcmd: finished, exit_code=%r', exit_code)
- return exit_code
-
- def capture_stdout(self, data):
- return self.capture('stdout', data)
-
- def capture_stderr(self, data):
- return self.capture('stderr', data)
-
- def capture(self, stream_name, data):
- logging.debug('CAPTURE %s: %r', stream_name, data)
- self._buffers[stream_name] += data.decode('UTF-8')
- if len(self._buffers[stream_name]) >= self._maxbuf:
- self.flush()
- return b''
-
- def flush(self):
- logging.debug('flushing: self._buffers=%r', self._buffers)
- streams = list(self._buffers.keys())
- logging.debug('streams: %r', streams)
- for stream in streams:
- buf = self._buffers[stream]
- logging.debug('FLUSH: %s: %r', stream, buf)
- self._buffers[stream] = ''
- if buf:
- logging.debug('Posting %d bytes of %s', len(buf), stream)
- self._post(stream, buf)
-
-
-class Mounter:
-
- def __init__(self, mounts, runner):
- self._mounts = mounts
- self._runner = runner
-
- def __enter__(self):
- self.mount()
- return self
-
- def __exit__(self, *args):
- self.unmount()
-
- def mount(self):
- for dirname, mp in self._mounts:
- if not os.path.exists(mp):
- os.mkdir(mp)
- self._runner.runcmd(['sudo', 'mount', '--bind', dirname, mp])
-
- def unmount(self):
- for dirname, mp in reversed(self._mounts):
- self._runner.runcmd(['sudo', 'umount', mp])
-
-
-class WorkerBase:
-
- def __init__(self, api, workspace, systree, post):
- self._api = api
- self._workspace = workspace
- self._systree = systree
- self._post = post
-
- def do(self, work):
- params = work.get('parameters', {})
- params_text = self.params64(params)
- argv = self.get_argv(work, params_text)
- mounts = []
- env = dict(os.environ)
- env.update({
- 'LC_ALL': 'C',
- 'DEBIAN_FRONTEND': 'noninteractive',
- })
- if self.where(work) == 'chroot':
- logging.debug('CHROOT REQUESTED')
- argv = ['sudo', 'chroot', self._workspace] + argv
- mounts = [
- ('/proc', os.path.join(self._workspace, 'proc')),
- ('/sys', os.path.join(self._workspace, 'sys')),
- ]
- elif self.where(work) == 'container':
- logging.debug('CONTAINER REQUESTED')
- bind = '{}:/workspace'.format(self._workspace)
- mp = '{}/workspace'.format(self._workspace)
- if not os.path.exists(mp):
- os.mkdir(mp)
- argv = [
- 'sudo', 'systemd-nspawn', '-D', self._systree, '--bind', bind,
- '--chdir', '/workspace',
- ] + argv
- else:
- logging.debug('HOST REQUESTED')
- runner = Runner(self._post)
- with Mounter(mounts, runner):
- return runner.runcmd(argv, cwd=self._workspace, env=env)
-
- def params64(self, params):
- as_json = json.dumps(params)
- as_bytes = as_json.encode('UTF-8')
- as_base64 = base64.b64encode(as_bytes)
- return as_base64.decode('UTF-8')
-
- def where(self, work):
+ logging.debug('Doing work: %r', work)
+ project_name = work['project']
step = work.get('step', {})
- return step.get('where')
-
- def get_argv(self, work, params_text):
- raise NotImplementedError()
-
-
-class ShellWorker(WorkerBase):
-
- def get_argv(self, work, params_text):
- step = work['step']
- code_snippet = step['shell']
- where = step.get('where', 'host')
- prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text
- return ['bash', '-exuc', prefix + code_snippet]
-
-
-class PythonWorker(WorkerBase):
-
- def get_argv(self, work, params_text):
- prefix = (
- 'import base64, json\n'
- 'params = json.loads(base64.b64decode(\n'
- ' "{}").decode("utf8"))\n'
- ).format(params_text)
- code_snippet = work['step']['python']
- return ['python3', '-c', prefix + '\n' + code_snippet]
-
-
-class DebootstrapWorker(WorkerBase):
-
- def get_argv(self, work, params_text):
- step = work['step']
- params = work.get('parameters', {})
- if step['debootstrap'] == 'auto':
- suite = params['debian_codename']
- else:
- suite = step['debootstrap']
-
- return [
- 'sudo',
- 'debootstrap',
- suite,
- '.',
- step.get('mirror', 'http://deb.debian.org/debian'),
- ]
-
-
-class WorkspaceCleaner(WorkerBase):
-
- def get_argv(self, work, params_text):
- return ['sudo', 'find', '.', '-delete']
-
-
-class WorkspaceArchiver(WorkerBase):
-
- def do(self, work):
+ logging.debug('Doing work: step=%r', step)
params = work.get('parameters', {})
- blob_id = params.get('systree_name', 'noname')
- step = work['step']
- dirname = step['archive']
-
- if dirname == 'workspace':
- dirname = self._workspace
-
- runner = Runner(self._post)
-
- temp = self.archive(runner, dirname)
- if isinstance(temp, int):
- return temp
-
- result = self.upload(runner, temp, blob_id)
- os.remove(temp)
-
- return result
-
- def archive(self, runner, dirname):
- logging.info('Archiving %s', dirname)
- self.report(b'Archiving workspace\n')
-
- fd, filename = tempfile.mkstemp()
- os.close(fd)
-
- archive = ['sudo', 'tar', '-zcf', filename, '-C', dirname, '.']
- exit_code = runner.runcmd(archive)
- if exit_code != 0:
- os.remove(filename)
- return exit_code
-
- return filename
-
- def upload(self, runner, filename, blob_id):
- logging.info('Uploading %s', blob_id)
- self.report(b'Uploading workspace\n')
-
- url = self._api.bloburl(blob_id)
- headers = self._api.get_auth_headers()
- upload = ['curl', '-sk', '-T', filename]
- for name, value in headers.items():
- header = '{}: {}'.format(name, value)
- upload.extend(['-H', header])
- upload.append(url)
-
- exit_code = runner.runcmd(upload)
- if exit_code == 0:
- self.report(b'Uploaded tarball of workspace\n')
-
- return exit_code
-
- def execute_argv(self, *argvs, **kwargs):
- exit_code, _, _ = cliapp.runcmd_unchecked(
- *argvs,
- **kwargs,
- )
- return exit_code
-
- def report(self, data):
- self._post('stdout', data.decode('UTF-8'))
- return b''
-
- def get_argv(self, work, params_text):
- assert False
-
-
-class SystreePopulator(WorkerBase):
-
- systree_dir = '/var/lib/ick/systree'
-
- def do(self, work):
- step = work['step']
- systree_name = step.get('systree_name')
- if not systree_name:
- self.report(
- b'No systree_name field in action, no systree population\n')
- return 1
-
- self.make_directory_empty(self.systree_dir)
-
- self.report(b'Downloading and unpacking systree blob\n')
- return self.download_and_unpack_systree(systree_name, self.systree_dir)
-
- def make_directory_empty(self, dirname):
- return self.execute_argv(['sudo', 'find', '-delete'], cwd=dirname)
-
- def download_and_unpack_systree(self, systree_name, dirname):
- url = self._api.bloburl(systree_name)
- headers = self._api.get_auth_headers()
- download = ['curl', '-k']
- for name, value in headers.items():
- header = '{}: {}'.format(name, value)
- download.extend(['-H', header])
- download.append(url)
-
- unpack = ['sudo', 'tar', '-zxf', '-', '-C', dirname]
-
- return self.execute_argv(download, unpack)
-
- def execute_argv(self, *argvs, **kwargs):
- exit_code, _, _ = cliapp.runcmd_unchecked(
- *argvs,
- stdout_callback=self.report,
- stderr_callback=self.report,
- **kwargs,
- )
- return exit_code
-
- def report(self, data):
- self._post('stdout', data.decode('UTF-8'))
-
- def get_argv(self, work, params_text):
- assert False
+ logging.debug('Doing work: params=%r', params)
+ reporter = ick2.Reporter(self._api, work)
+ logging.debug('Doing work: reporter=%r', reporter)
+ af = ick2.ActionFactory(self._systree, self._workspace, reporter)
+ logging.debug('Doing work: af=%r', af)
+ af.set_token(self._api.get_token())
+ af.set_blob_url_func(self._api.get_blob_upload_url)
+ action = af.create_action(step, project_name)
+ logging.debug('Doing work: action=%r', action)
+ exit_code = action.execute(params, step)
+ logging.debug('Action finished: exit_code=%r', exit_code)
+ logging.debug('Finished work: %r', work)
if __name__ == '__main__':