summaryrefslogtreecommitdiff
path: root/worker-manager
blob: 54ed5becf6fded3278d0abac27396823c311b80a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/python2
# Copyright 2017  Lars Wirzenius
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# =*= License: GPL-3+ =*=


import json
import time

import cliapp
import requests

import ick2lib


class WorkerManager(cliapp.Application):

    def add_settings(self):
        self.settings.string(
            ['controller'],
            'base URL for the controller',
            metavar='URL',
        )

        self.settings.integer(
            ['sleep'],
            'sleep for SECS seconds if there is no work currently',
            metavar='SECS',
            default=5,
        )

    def process_args(self, args):
        url = self.settings['controller']
        print 'Worker manager starts, controller is {}'.format(url)
        while True:
            work = self.get_work(url)
            if work:
                self.do_work(url, work)
            else:
                self.sleep_a_little()

    def sleep_a_little(self):
        secs = self.settings['sleep']
        print 'Sleeping for {} seconds'.format(secs)
        time.sleep(secs)

    def get_work(self, url):
        get_work_url = '{}/worker/foo'.format(url)
        print 'Getting work from {}'.format(get_work_url)
        r = requests.get(get_work_url)
        if r.status_code != 200 or not r.text:
            return None
        print 'Response: {!r}'.format(r.text)
        return r.json()

    def do_work(self, url, work):
        print 'Doing work: {!r}'.format(work)
        snippet_url = '{}/worker/bar/snippet'.format(url)
        snippet = {
            'project': work['project'],
            'stdout': '',
            'stderr': '',
            'exit-code': None,
        }

        def post(stream_name, data):
            print '{}: {!r}'.format(stream_name, data)
            s = dict(snippet)
            s[stream_name] = data
            self.post_snippet(snippet_url, s)
        
        exit_code, _, _ = cliapp.runcmd_unchecked(
            ['bash', '-c', work['shell']],
            stdout_callback=lambda data: post('stdout', data),
            stderr_callback=lambda data: post('stderr', data),
        )

        end_snippet = dict(snippet)
        end_snippet['exit-code'] = exit_code
        self.post_snippet(snippet_url, end_snippet)

    def post_snippet(self, url, snippet):
        headers = {
            'Content-Type': 'application/json',
        }
        print 'POST {} {!r}'.format(url, snippet)
        r = requests.post(url, headers=headers, data=json.dumps(snippet))
        if not r.ok:
            raise cliapp.AppException(
                'Error posting data to controller: {} {!r}'.format(
                    r.status_code, r.text))


WorkerManager(version=ick2lib.__version__).run()