summaryrefslogtreecommitdiff
path: root/worker_manager
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2017-12-27 16:01:08 +0200
committerLars Wirzenius <liw@liw.fi>2017-12-27 18:01:37 +0200
commit4fddaccccdd090fa834c0f7b272ce627ff155888 (patch)
tree288a702645821abd5af951be26469917e8e9b05b /worker_manager
parent5fe6641f9ba6fe17a182e924911fdd54d966ab69 (diff)
downloadick2-4fddaccccdd090fa834c0f7b272ce627ff155888.tar.gz
Refactor: worker_manager for cleanliness
Diffstat (limited to 'worker_manager')
-rwxr-xr-xworker_manager413
1 files changed, 230 insertions, 183 deletions
diff --git a/worker_manager b/worker_manager
index 92c2ad4..24214bd 100755
--- a/worker_manager
+++ b/worker_manager
@@ -18,7 +18,6 @@
import base64
import json
import logging
-import sys
import time
import apifw
@@ -26,7 +25,6 @@ import cliapp
import Crypto.PublicKey.RSA
import requests
import urllib3
-
import ick2
@@ -42,11 +40,6 @@ class WorkerManager(cliapp.Application):
self._token_until = None
def add_settings(self):
- self.settings.boolean(
- ['pretend'],
- 'only pretend to execute shell steps',
- )
-
self.settings.string(
['controller'],
'base URL for the controller',
@@ -60,12 +53,6 @@ class WorkerManager(cliapp.Application):
)
self.settings.string(
- ['token'],
- 'use TOKEN for all controller HTTP requests',
- metavar='TOKEN',
- )
-
- self.settings.string(
['token-key'],
'get token signing private key from FILE',
metavar='FILE',
@@ -73,8 +60,8 @@ class WorkerManager(cliapp.Application):
self.settings.string(
['token-key-pub'],
- 'get token signing public key from FILE',
- metavar='FILE',
+ 'this is not used',
+ metavar='NOPE',
)
self.settings.integer(
@@ -93,121 +80,185 @@ class WorkerManager(cliapp.Application):
def process_args(self, args):
self.settings.require('name')
+ self.settings.require('controller')
+
name = self.settings['name']
url = self.settings['controller']
- self.show_msg(
- 'Worker manager {} starts, controller is {}'.format(name, url))
- self.register(url, name)
+ tg = TokenGenerator()
+ tg.set_key(self.settings['token-key'])
+ api = ContainerAPI(name, url, tg)
+ worker = Worker(name, api, self.settings['workspace'])
+
+ logging.info('Worker manager %s starts, controller is %s', name, url)
+
+ api.register()
while True:
- work = self.get_work(url, name)
- if work and self.settings['pretend']:
- self.report_pretend_work(url, name, work)
- elif work:
- self.do_work(url, name, work, self.settings['workspace'])
+ work = api.get_work()
+ if work:
+ worker.do_work(work)
else:
self.sleep_a_little()
- def register(self, url, name):
- logging.info('Registering worker %s to %s', name, url)
- register_url = '{}/workers'.format(url)
- headers = self.get_auth_headers()
- body = {
- 'worker': name,
- }
- r = requests.post(
- register_url, json=body, headers=headers, verify=False)
- if not r.ok:
- logging.warning('Error registering worker: %d', r.status_code)
- else:
- logging.info('Registered worker successfully')
-
def sleep_a_little(self):
secs = self.settings['sleep']
time.sleep(secs)
- def get_work(self, url, name):
- get_work_url = '{}/work/{}'.format(url, name)
+
+class ContainerAPI:
+
+ def __init__(self, name, url, token_generator):
+ self._name = name
+ self._url = url
+ self._token_generator = token_generator
+
+ def register(self):
+ logging.info('Registering worker %s to %s', self._name, self._url)
+ url = self.url('/workers')
headers = self.get_auth_headers()
- r = requests.get(get_work_url, headers=headers, verify=False)
- if r.status_code != 200 or not r.text:
- return None
- work = r.json()
+ body = {
+ 'worker': self._name,
+ }
+ httpapi = HttpApi()
+ httpapi.post(url, headers, body)
+
+ def get_work(self):
+ url = self.url('/work/{}'.format(self._name))
+ headers = self.get_auth_headers()
+ httpapi = HttpApi()
+ work = httpapi.get(url, headers)
if work:
- self.show_json('Response:', work)
+ logging.info('Response: %r', work)
return work
+ def report_work(self, work):
+ logging.info('POST %s', work)
+ url = self.url('/work')
+ headers = self.get_auth_headers()
+ httpapi = HttpApi()
+ code = httpapi.post(url, headers, work)
+ if code not in (200, 201):
+ raise cliapp.AppException(
+ 'Error posting data to controller: {}'.format(code))
+
+ def url(self, path):
+ return '{}{}'.format(self._url, path)
+
def get_auth_headers(self):
- token = self.get_token()
+ token = self._token_generator.get_token()
return {
'Authorization': 'Bearer {}'.format(token),
}
- def get_token(self):
- token = self.settings['token']
- token_key = self.settings['token-key']
- token_key_pub = self.settings['token-key-pub']
- now = time.time()
- if self._token is None or now >= self._token_until:
- if token:
- self._token = token
- self._token_until = now + 3600
- elif token_key and token_key_pub:
- with open(token_key) as f1, open(token_key_pub) as f2:
- key_text = f1.read()
- pub_text = f2.read()
- self._token = self.create_token(key_text, pub_text)
- self._token_until = now + 3600
- else:
- sys.exit('No token and no way to create')
+class HttpApi:
- assert self._token is not None
- return self._token
+ def post(self, url, headers, body):
+ r = requests.post(url, json=body, headers=headers, verify=False)
+ if not r.ok:
+ logging.warning('Error: POST %s returned %s', url, r.status_code)
+ else:
+ logging.info('Registered worker successfully')
+ return r.status_code
- def create_token(self, key_text, pub_text):
- iss = 'localhost'
- aud = 'localhost'
- scopes_list = [
- 'uapi_work_id_get',
- 'uapi_work_post',
- 'uapi_workers_post',
- ]
+ def get(self, url, headers):
+ r = requests.get(url, headers=headers, verify=False)
+ if not r.ok or not r.text:
+ return None
+ return r.json()
+
+
+class TokenGenerator:
+
+ max_age = 3600 # 1 hour
+ sub = 'subject-uuid'
+ iss = 'localhost'
+ aud = 'localhost'
+ scopes = ' '.join([
+ 'uapi_work_id_get',
+ 'uapi_work_post',
+ 'uapi_workers_post',
+ ])
+
+ def __init__(self):
+ self._token = None
+ self._token_key = None
+ self._token_until = None
+
+ def is_valid(self, now):
+ return (
+ self._token is not None and
+ (self._token_until is None or now <= self._token_until)
+ )
+
+ def set_token(self, token):
+ self._token = token
+ self._token_until = None
+ assert self.is_valid(time.time())
- key = Crypto.PublicKey.RSA.importKey(key_text)
- scopes = ' '.join(scopes_list)
+ def set_key(self, filename):
+ key_text = self.cat(filename)
+ self._token_key = Crypto.PublicKey.RSA.importKey(key_text)
+ def cat(self, filename):
+ with open(filename) as f:
+ return f.read()
+
+ def get_token(self):
+ now = time.time()
+ if not self.is_valid(now):
+ self._token = self.create_token()
+ self._token_until = now + self.max_age
+ assert self.is_valid(now)
+ return self._token
+
+ def create_token(self):
now = time.time()
claims = {
- 'iss': iss,
- 'sub': 'subject-uuid',
- 'aud': aud,
- 'exp': now + 86400,
- 'scope': scopes,
+ 'iss': self.iss,
+ 'sub': self.sub,
+ 'aud': self.aud,
+ 'exp': now + self.max_age,
+ 'scope': self.scopes,
}
- token = apifw.create_token(claims, key)
+ token = apifw.create_token(claims, self._token_key)
return token.decode('ascii')
- def report_pretend_work(self, url, name, work):
- self.show_msg('Pretending to work: {!r}'.format(work))
- snippet_url = '{}/work/{}'.format(url, name)
- snippet = {
- 'build_id': work['build_id'],
- 'worker': name,
- 'project': work['project'],
- 'stdout': '',
- 'stderr': '',
- 'exit_code': None,
- 'timestamp': self.now(),
- }
- self.post_snippet(snippet_url, snippet)
- def do_work(self, url, name, work, workspace):
- self.show_msg('Doing work')
- snippet_url = '{}/work'.format(url)
- snippet = {
+class Worker:
+
+ def __init__(self, name, api, workspace):
+ self._name = name
+ self._api = api
+ self._workspace = workspace
+
+ def do_work(self, work):
+
+ def post(stream_name, data):
+ s = self.get_work_result(work)
+ s[stream_name] = data.decode('UTF-8')
+ self._api.report_work(s)
+
+ step = work['step']
+ logging.info('Running step: %r', step)
+ exit_code = 0
+ if work.get('fresh_workspace'):
+ logging.info('Make an empty workspace')
+ cleaner = WorkspaceCleaner(self._workspace, post)
+ exit_code = cleaner.do(work)
+ if exit_code == 0:
+ klass = self.worker_factory(step)
+ if klass is None:
+ exit_code = -1
+ else:
+ worker = klass(self._workspace, post)
+ exit_code = worker.do(work)
+ self.finish_work(work, exit_code)
+
+ def get_work_result(self, work):
+ return {
'build_id': work['build_id'],
- 'worker': name,
+ 'worker': self._name,
'project': work['project'],
'pipeline': work['pipeline'],
'stdout': '',
@@ -216,95 +267,91 @@ class WorkerManager(cliapp.Application):
'timestamp': self.now(),
}
- def post(stream_name, data):
- data = data.decode('UTF-8')
- s = dict(snippet)
- s[stream_name] = data
- self.post_snippet(snippet_url, s)
-
- def run_with_interp(argv_prefix, cmd):
- exit_code, _, _ = cliapp.runcmd_unchecked(
- argv_prefix + [cmd],
- stdout_callback=lambda data: post('stdout', data),
- stderr_callback=lambda data: post('stderr', data),
- cwd=workspace,
- )
- return exit_code
-
- def run_shell(shell_cmd):
- params = work.get('parameters', {})
- params_json = json.dumps(params)
- params_text = base64.b64encode(
- params_json.encode('utf8')).decode('utf8')
- prefix = [
- 'params() { echo -n "%s" | base64 -d; }' % params_text,
- ]
- prefix_text = ''.join('{}\n'.format(line) for line in prefix)
- script = prefix_text + shell_cmd
- return run_with_interp(['bash', '-xeuc'], script)
-
- def run_python(python_cmd):
- params = work.get('parameters', {})
- params_json = json.dumps(params)
- params_text = base64.b64encode(
- params_json.encode('utf8')).decode('utf8')
- prefix = [
- 'import base64, json',
- 'params = json.loads(base64.b64decode('
- ' "{}").decode("utf8"))'.format(params_text)
- ]
- prefix_text = ''.join('{}\n'.format(line) for line in prefix)
- script = prefix_text + python_cmd
- return run_with_interp(['python3', '-c'], script)
-
- def run_debootstrap(step):
- shell_cmd = "sudo debootstrap '{}' . '{}'".format(
- step['debootstrap'], step.get('mirror', 'NOMIRROR'))
- run_shell(shell_cmd)
-
- def run_cmd(step):
- logging.info('Running step: %r', step)
- if 'shell' in step:
- run_shell(step['shell'])
- elif 'python' in step:
- run_python(step['python'])
- elif 'debootstrap' in step:
- run_debootstrap(step)
-
- exit_code = 0
- if work.get('fresh_workspace'):
- logging.info('Make an empty workspace')
- exit_code = run_shell('find . -delete')
- if exit_code == 0:
- run_cmd(work['step'])
- end_snippet = dict(snippet)
- end_snippet['exit_code'] = exit_code
- self.post_snippet(snippet_url, end_snippet)
-
def now(self):
return time.strftime('%Y-%m-%dT%H:%M:%S')
- def post_snippet(self, url, snippet):
- headers = {
- 'Content-Type': 'application/json',
- }
- headers.update(self.get_auth_headers())
- self.show_json('POST {}'.format(url), snippet)
- r = requests.post(
- url, headers=headers, data=json.dumps(snippet), verify=False)
- if not r.ok:
- raise cliapp.AppException(
- 'Error posting data to controller: {} {!r}'.format(
- r.status_code, r.text))
+ def worker_factory(self, step):
+ if 'shell' in step:
+ return ShellWorker
+ elif 'python' in step:
+ return PythonWorker
+ elif 'debootstrap' in step:
+ return DebootstrapWorker
+ return None
+
+ def finish_work(self, work, exit_code):
+ s = self.get_work_result(work)
+ s['exit_code'] = exit_code
+ self._api.report_work(s)
+
+
+class WorkerBase:
+
+ def __init__(self, workspace, post):
+ self._workspace = workspace
+ self._post = post
+
+ def do(self, work):
+ params = work.get('parameters', {})
+ params_text = self.params64(params)
+ argv = self.get_argv(work, params_text)
+ logging.debug('running: %r', argv)
+ exit_code, _, _ = cliapp.runcmd_unchecked(
+ argv,
+ stdout_callback=lambda data: self._post('stdout', data),
+ stderr_callback=lambda data: self._post('stderr', data),
+ cwd=self._workspace,
+ )
+ return exit_code
+
+ def params64(self, params):
+ as_json = json.dumps(params)
+ as_bytes = as_json.encode('UTF-8')
+ as_base64 = base64.b64encode(as_bytes)
+ return as_base64.decode('UTF-8')
+
+ def get_argv(self, work, params_text):
+ raise NotImplementedError()
+
+
+class ShellWorker(WorkerBase):
+
+ def get_argv(self, work, params_text):
+ code_snippet = work['step']['shell']
+ prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text
+ return ['bash', '-exuc', prefix + code_snippet]
+
+
+class PythonWorker(WorkerBase):
+
+ def get_argv(self, work, params_text):
+ prefix = (
+ 'import base64, json\n'
+ 'params = json.loads(base64.b64decode(\n'
+ ' "{}").decode("utf8"))\n'
+ ).format(params_text)
+ code_snippet = work['step']['python']
+ return ['python3', '-c', prefix + '\n' + code_snippet]
+
+
+class DebootstrapWorker(WorkerBase):
+
+ def get_argv(self, work, params_text):
+ step = work['step']
+ return [
+ 'sudo',
+ 'debootstrap',
+ step['debootstrap'],
+ '.',
+ step.get('mirror', 'NOMIRROR'),
+ ]
+
- def show_msg(self, msg):
- sys.stdout.write('------\n{}\n'.format(msg))
- logging.info(msg)
+class WorkspaceCleaner(WorkerBase):
- def show_json(self, prelude, obj):
- msg = '{}: {}'.format(prelude, json.dumps(obj, indent=4))
- sys.stdout.write('------\n{}\n'.format(msg))
- logging.info(msg)
+ def get_argv(self, work, params_text):
+ return ['find', '.', '-delete']
-WorkerManager(version=ick2.__version__).run()
+if __name__ == '__main__':
+ WorkerManager(version=ick2.__version__).run()