summaryrefslogtreecommitdiff
path: root/worker_manager
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2017-12-30 18:29:37 +0200
committerLars Wirzenius <liw@liw.fi>2017-12-31 12:36:20 +0200
commitfab718970dc04fdb2578506fe3c83aa20bdfea6e (patch)
tree718e8fc4d4e10f8ca442554f20e89427ffb4d9f3 /worker_manager
parenta0f8561380f5d854ce532a5df076bc12014d5b48 (diff)
downloadick2-fab718970dc04fdb2578506fe3c83aa20bdfea6e.tar.gz
Add: built-in archive action to worker-manager
Diffstat (limited to 'worker_manager')
-rwxr-xr-xworker_manager97
1 files changed, 81 insertions, 16 deletions
diff --git a/worker_manager b/worker_manager
index 8642036..408e1b0 100755
--- a/worker_manager
+++ b/worker_manager
@@ -18,6 +18,8 @@
import base64
import json
import logging
+import os
+import tempfile
import time
import apifw
@@ -110,6 +112,7 @@ class ContainerAPI:
self._name = name
self._url = url
self._token_generator = token_generator
+ self._httpapi = HttpApi()
def register(self):
logging.info('Registering worker %s to %s', self._name, self._url)
@@ -118,14 +121,12 @@ class ContainerAPI:
body = {
'worker': self._name,
}
- httpapi = HttpApi()
- httpapi.post(url, headers, body)
+ self._httpapi.post(url, headers, body)
def get_work(self):
url = self.url('/work/{}'.format(self._name))
headers = self.get_auth_headers()
- httpapi = HttpApi()
- work = httpapi.get(url, headers)
+ work = self._httpapi.get(url, headers)
if work:
logging.info('Response: %r', work)
return work
@@ -134,12 +135,17 @@ class ContainerAPI:
logging.info('POST %s', work)
url = self.url('/work')
headers = self.get_auth_headers()
- httpapi = HttpApi()
- code = httpapi.post(url, headers, work)
+ code = self._httpapi.post(url, headers, work)
if code not in (200, 201):
raise cliapp.AppException(
'Error posting data to controller: {}'.format(code))
+ def upload_blob(self, blob_id, blob):
+ logging.info('Upload blob %s', blob_id)
+ url = self.url('/blobs/{}'.format(blob_id))
+ headers = self.get_auth_headers()
+ code = self._httpapi.put(url, headers, blob)
+
def url(self, path):
return '{}{}'.format(self._url, path)
@@ -152,16 +158,23 @@ class ContainerAPI:
class HttpApi:
+ def __init__(self):
+ self._session = requests.Session()
+
def post(self, url, headers, body):
- r = requests.post(url, json=body, headers=headers, verify=False)
+ r = self._session.post(url, json=body, headers=headers, verify=False)
if not r.ok:
logging.warning('Error: POST %s returned %s', url, r.status_code)
- else:
- logging.info('Registered worker successfully')
+ return r.status_code
+
+ def put(self, url, headers, body):
+ r = self._session.put(url, data=body, headers=headers, verify=False)
+ if not r.ok:
+ logging.warning('Error: PUT %s returned %s', url, r.status_code)
return r.status_code
def get(self, url, headers):
- r = requests.get(url, headers=headers, verify=False)
+ r = self._session.get(url, headers=headers, verify=False)
if not r.ok or not r.text:
return None
return r.json()
@@ -177,6 +190,8 @@ class TokenGenerator:
'uapi_work_id_get',
'uapi_work_post',
'uapi_workers_post',
+ 'uapi_blobs_id_get',
+ 'uapi_blobs_id_put',
])
def __init__(self):
@@ -236,7 +251,7 @@ class Worker:
def post(stream_name, data):
s = self.get_work_result(work)
- s[stream_name] = data.decode('UTF-8')
+ s[stream_name] = data
self._api.report_work(s)
step = work['step']
@@ -244,14 +259,14 @@ class Worker:
exit_code = 0
if work.get('fresh_workspace'):
logging.info('Make an empty workspace')
- cleaner = WorkspaceCleaner(self._workspace, post)
+ cleaner = WorkspaceCleaner(None, self._workspace, post)
exit_code = cleaner.do(work)
if exit_code == 0:
klass = self.worker_factory(step)
if klass is None:
exit_code = -1
else:
- worker = klass(self._workspace, post)
+ worker = klass(self._api, self._workspace, post)
exit_code = worker.do(work)
self.finish_work(work, exit_code)
@@ -277,6 +292,9 @@ class Worker:
return PythonWorker
elif 'debootstrap' in step:
return DebootstrapWorker
+ elif 'archive' in step:
+ return WorkspaceArchiver
+ logging.warning('Cannot find worker for %s', step)
return None
def finish_work(self, work, exit_code):
@@ -287,7 +305,8 @@ class Worker:
class WorkerBase:
- def __init__(self, workspace, post):
+ def __init__(self, api, workspace, post):
+ self._api = api
self._workspace = workspace
self._post = post
@@ -303,12 +322,18 @@ class WorkerBase:
logging.debug('running: %r', argv)
exit_code, _, _ = cliapp.runcmd_unchecked(
argv,
- stdout_callback=lambda data: self._post('stdout', data),
- stderr_callback=lambda data: self._post('stderr', data),
+ stdout_callback=self.report_stdout,
+ stderr_callback=self.report_stderr,
cwd=self._workspace,
)
return exit_code
+ def report_stdout(self, data):
+ return self._post('stdout', data.decode('UTF-8'))
+
+ def report_stderr(self, data):
+ return self._post('stderr', data.decode('UTF-8'))
+
def params64(self, params):
as_json = json.dumps(params)
as_bytes = as_json.encode('UTF-8')
@@ -362,5 +387,45 @@ class WorkspaceCleaner(WorkerBase):
return ['sudo', 'find', '.', '-delete']
+class WorkspaceArchiver(WorkerBase):
+
+ def do(self, work):
+ step = work['step']
+ dirname = step['archive']
+
+ if dirname == 'workspace':
+ dirname = self._workspace
+
+ blob_id = step['archive_name']
+ tarball = self.archive(dirname)
+ self._api.upload_blob(blob_id, tarball)
+
+ return 0
+
+ def archive(self, dirname):
+ logging.info('Archiving %s', dirname)
+ fd, filename = tempfile.mkstemp()
+ os.close(fd)
+ argv = ['sudo', 'tar', '-zcvf', filename, '-C', dirname, '.']
+ exit_code, _, _ = cliapp.runcmd_unchecked(
+ argv,
+ stdout_callback=self.report,
+ stderr_callback=self.report,
+ cwd=self._workspace,
+ )
+ self.report(b'Created tarball of workspace')
+ if exit_code == 0:
+ with open(filename, 'rb') as f:
+ return f.read()
+ os.remove(filename)
+ return exit_code
+
+ def report(self, data):
+ self._post('stdout', data.decode('UTF-8'))
+
+ def get_argv(self, work, params_text):
+ assert False
+
+
if __name__ == '__main__':
WorkerManager(version=ick2.__version__).run()