1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
#!/usr/bin/python2
# Copyright 2017 Lars Wirzenius
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# =*= License: GPL-3+ =*=
import json
import time
import cliapp
import requests
import ick2lib
class WorkerManager(cliapp.Application):
def add_settings(self):
self.settings.string(
['controller'],
'base URL for the controller',
metavar='URL',
)
self.settings.integer(
['sleep'],
'sleep for SECS seconds if there is no work currently',
metavar='SECS',
default=5,
)
def process_args(self, args):
url = self.settings['controller']
print 'Worker manager starts, controller is {}'.format(url)
while True:
work = self.get_work(url)
if work:
self.do_work(url, work)
else:
self.sleep_a_little()
def sleep_a_little(self):
secs = self.settings['sleep']
print 'Sleeping for {} seconds'.format(secs)
time.sleep(secs)
def get_work(self, url):
get_work_url = '{}/worker/foo'.format(url)
print 'Getting work from {}'.format(get_work_url)
r = requests.get(get_work_url)
if r.status_code != 200 or not r.text:
return None
print 'Response: {!r}'.format(r.text)
return r.json()
def do_work(self, url, work):
print 'Doing work: {!r}'.format(work)
snippet_url = '{}/worker/bar/snippet'.format(url)
snippet = {
'project': work['project'],
'stdout': '',
'stderr': '',
'exit-code': None,
}
def post(stream_name, data):
print '{}: {!r}'.format(stream_name, data)
s = dict(snippet)
s[stream_name] = data
self.post_snippet(snippet_url, s)
exit_code, _, _ = cliapp.runcmd_unchecked(
['bash', '-c', work['shell']],
stdout_callback=lambda data: post('stdout', data),
stderr_callback=lambda data: post('stderr', data),
)
end_snippet = dict(snippet)
end_snippet['exit-code'] = exit_code
self.post_snippet(snippet_url, end_snippet)
def post_snippet(self, url, snippet):
headers = {
'Content-Type': 'application/json',
}
print 'POST {} {!r}'.format(url, snippet)
r = requests.post(url, headers=headers, data=json.dumps(snippet))
if not r.ok:
raise cliapp.AppException(
'Error posting data to controller: {} {!r}'.format(
r.status_code, r.text))
WorkerManager(version=ick2lib.__version__).run()
|