summaryrefslogtreecommitdiff
path: root/worker_manager
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2017-09-24 17:33:18 +0300
committerLars Wirzenius <liw@liw.fi>2017-09-24 18:19:34 +0300
commit5e56ceed83a6ebf785dde74481f63c7d71db6119 (patch)
tree908f7bfcdb92da4c434a9fdd528d2601fc33dd6e /worker_manager
parente19b65c2cf05cf7f220a197592ac6803d0170762 (diff)
downloadick2-5e56ceed83a6ebf785dde74481f63c7d71db6119.tar.gz
Change: make worker-manager run under Python 3
Doesn't work yet, but it users Python 3 in hashbang.
Diffstat (limited to 'worker_manager')
-rwxr-xr-xworker_manager136
1 files changed, 136 insertions, 0 deletions
diff --git a/worker_manager b/worker_manager
new file mode 100755
index 0000000..0ba0aab
--- /dev/null
+++ b/worker_manager
@@ -0,0 +1,136 @@
+#!/usr/bin/python3
+# Copyright 2017 Lars Wirzenius
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import json
+import os
+import time
+
+import cliapp
+import requests
+
+import ick2lib
+
+
+class WorkerManager(cliapp.Application):
+
+ def add_settings(self):
+ self.settings.boolean(
+ ['pretend'],
+ 'only pretend to execute shell steps',
+ )
+
+ self.settings.string(
+ ['controller'],
+ 'base URL for the controller',
+ metavar='URL',
+ )
+
+ self.settings.string(
+ ['name'],
+ 'name of this worker',
+ metavar='URL',
+ )
+
+ self.settings.integer(
+ ['sleep'],
+ 'sleep for SECS seconds if there is no work currently',
+ metavar='SECS',
+ default=5,
+ )
+
+ def process_args(self, args):
+ self.settings.require('name')
+ name = self.settings['name']
+ url = self.settings['controller']
+ print('Worker manager {} starts, controller is {}'.format(name, url))
+ while True:
+ work = self.get_work(url, name)
+ if work and self.settings['pretend']:
+ self.report_pretend_work(url, name, work)
+ elif work:
+ self.do_work(url, name, work)
+ else:
+ self.sleep_a_little()
+
+ def sleep_a_little(self):
+ secs = self.settings['sleep']
+ print('Sleeping for {} seconds'.format(secs))
+ time.sleep(secs)
+
+ def get_work(self, url, name):
+ get_work_url = '{}/worker/{}'.format(url, name)
+ print('Getting work from {}'.format(get_work_url))
+ r = requests.get(get_work_url)
+ if r.status_code != 200 or not r.text:
+ return None
+ print('Response: {!r}'.format(r.text))
+ return r.json()
+
+ def report_pretend_work(self, url, name, work):
+ print('Pretending to work: {!r}'.format(work))
+ snippet_url = '{}/worker/{}/snippet'.format(url, name)
+ snippet = {
+ 'project': work['project'],
+ 'stdout': 'pretending: {}'.format(work['shell']),
+ 'stderr': '',
+ 'exit-code': 0,
+ }
+ self.post_snippet(snippet_url, snippet)
+
+ def do_work(self, url, name, work):
+ print('Doing work: {!r}'.format(work))
+ snippet_url = '{}/worker/{}/snippet'.format(url, name)
+ snippet = {
+ 'project': work['project'],
+ 'stdout': '',
+ 'stderr': '',
+ 'exit-code': None,
+ }
+
+ def post(stream_name, data):
+ print('{}: {!r}'.format(stream_name, data))
+ s = dict(snippet)
+ s[stream_name] = data
+ self.post_snippet(snippet_url, s)
+
+ env = dict(os.environ)
+ env['ICK_URL'] = work['git']
+
+ exit_code, _, _ = cliapp.runcmd_unchecked(
+ ['bash', '-xeuc', work['shell']],
+ stdout_callback=lambda data: post('stdout', data),
+ stderr_callback=lambda data: post('stderr', data),
+ env=env,
+ )
+
+ end_snippet = dict(snippet)
+ end_snippet['exit-code'] = exit_code
+ self.post_snippet(snippet_url, end_snippet)
+
+ def post_snippet(self, url, snippet):
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+ print('POST {} {!r}'.format(url, snippet))
+ r = requests.post(url, headers=headers, data=json.dumps(snippet))
+ if not r.ok:
+ raise cliapp.AppException(
+ 'Error posting data to controller: {} {!r}'.format(
+ r.status_code, r.text))
+
+
+WorkerManager(version=ick2lib.__version__).run()