summaryrefslogtreecommitdiff
path: root/worker_manager
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2017-11-06 20:57:35 +0100
committerLars Wirzenius <liw@liw.fi>2017-11-06 20:57:35 +0100
commita676a477760d87f5d51bce2a9e93dbb4c64d4536 (patch)
tree913d0f9f5e7f6fe5dbac8e5c2fde33b38b676e0f /worker_manager
parentc9e9a5c9aa8cbd2c1510ee06f7eb2af3628c60b5 (diff)
downloadick2-a676a477760d87f5d51bce2a9e93dbb4c64d4536.tar.gz
Add: adapt worker-manager to work with current controller
Diffstat (limited to 'worker_manager')
-rwxr-xr-xworker_manager86
1 files changed, 63 insertions, 23 deletions
diff --git a/worker_manager b/worker_manager
index bddc7a6..bca03d8 100755
--- a/worker_manager
+++ b/worker_manager
@@ -16,15 +16,22 @@
import json
+import logging
import os
+import sys
import time
import cliapp
import requests
+import urllib3
import ick2
+urllib3.disable_warnings()
+logging.captureWarnings(False)
+
+
class WorkerManager(cliapp.Application):
def add_settings(self):
@@ -45,6 +52,12 @@ class WorkerManager(cliapp.Application):
metavar='URL',
)
+ self.settings.string(
+ ['token'],
+ 'use TOKEN for all controller HTTP requests',
+ metavar='TOKEN',
+ )
+
self.settings.integer(
['sleep'],
'sleep for SECS seconds if there is no work currently',
@@ -56,7 +69,7 @@ class WorkerManager(cliapp.Application):
self.settings.require('name')
name = self.settings['name']
url = self.settings['controller']
- print('Worker manager {} starts, controller is {}'.format(name, url))
+ self.show_msg('Worker manager {} starts, controller is {}'.format(name, url))
while True:
work = self.get_work(url, name)
if work and self.settings['pretend']:
@@ -64,73 +77,100 @@ class WorkerManager(cliapp.Application):
elif work:
self.do_work(url, name, work)
else:
+ self.show_msg('Nothing to do')
self.sleep_a_little()
def sleep_a_little(self):
secs = self.settings['sleep']
- print('Sleeping for {} seconds'.format(secs))
+ self.show_msg('Sleeping for {} seconds'.format(secs))
time.sleep(secs)
def get_work(self, url, name):
- get_work_url = '{}/worker/{}'.format(url, name)
- print('Getting work from {}'.format(get_work_url))
- r = requests.get(get_work_url)
+ get_work_url = '{}/work/{}'.format(url, name)
+ self.show_msg('Getting work from {}'.format(get_work_url))
+ headers = self.get_auth_headers()
+ r = requests.get(get_work_url, headers=headers, verify=False)
if r.status_code != 200 or not r.text:
return None
- print('Response: {!r}'.format(r.text))
- return r.json()
+ work = r.json()
+ self.show_json('Response:', work)
+ return work
+
+ def get_auth_headers(self):
+ token = self.settings['token']
+ return {
+ 'Authorization': 'Bearer {}'.format(token),
+ }
def report_pretend_work(self, url, name, work):
- print('Pretending to work: {!r}'.format(work))
- snippet_url = '{}/worker/{}/snippet'.format(url, name)
+ self.show_msg('Pretending to work: {!r}'.format(work))
+ snippet_url = '{}/work/{}'.format(url, name)
snippet = {
+ 'build_id': work['build_id'],
+ 'worker': name,
'project': work['project'],
- 'stdout': 'pretending: {}'.format(work['shell']),
+ 'stdout': '',
'stderr': '',
- 'exit-code': 0,
+ 'exit_code': None,
+ 'timestamp': self.now(),
}
self.post_snippet(snippet_url, snippet)
def do_work(self, url, name, work):
- print('Doing work: {!r}'.format(work))
- snippet_url = '{}/worker/{}/snippet'.format(url, name)
+ self.show_msg('Doing work')
+ snippet_url = '{}/work'.format(url)
snippet = {
+ 'build_id': work['build_id'],
+ 'worker': name,
'project': work['project'],
+ 'pipeline': work['pipeline'],
'stdout': '',
'stderr': '',
- 'exit-code': None,
+ 'exit_code': None,
+ 'timestamp': self.now(),
}
def post(stream_name, data):
- print('{}: {!r}'.format(stream_name, data))
+ data = data.decode('UTF-8')
s = dict(snippet)
s[stream_name] = data
self.post_snippet(snippet_url, s)
- env = dict(os.environ)
- env['ICK_URL'] = work['git']
-
+ shell_cmd = work['step']['shell']
exit_code, _, _ = cliapp.runcmd_unchecked(
- ['bash', '-xeuc', work['shell']],
+ ['bash', '-xeuc', shell_cmd],
stdout_callback=lambda data: post('stdout', data),
stderr_callback=lambda data: post('stderr', data),
- env=env,
)
end_snippet = dict(snippet)
- end_snippet['exit-code'] = exit_code
+ end_snippet['exit_code'] = exit_code
self.post_snippet(snippet_url, end_snippet)
+ def now(self):
+ return time.strftime('%Y-%m-%dT%H:%M:%S')
+
def post_snippet(self, url, snippet):
headers = {
'Content-Type': 'application/json',
}
- print('POST {} {!r}'.format(url, snippet))
- r = requests.post(url, headers=headers, data=json.dumps(snippet))
+ headers.update(self.get_auth_headers())
+ self.show_json('POST {}'.format(url), snippet)
+ r = requests.post(
+ url, headers=headers, data=json.dumps(snippet), verify=False)
if not r.ok:
raise cliapp.AppException(
'Error posting data to controller: {} {!r}'.format(
r.status_code, r.text))
+ def show_msg(self, msg):
+ sys.stdout.write('------\n{}\n'.format(msg))
+ logging.info(msg)
+
+ def show_json(self, prelude, obj):
+ msg = '{}: {}'.format(prelude, json.dumps(obj, indent=4))
+ sys.stdout.write('------\n{}\n'.format(msg))
+ logging.info(msg)
+
WorkerManager(version=ick2.__version__).run()