summaryrefslogtreecommitdiff
path: root/ick2/workapi.py
diff options
context:
space:
mode:
Diffstat (limited to 'ick2/workapi.py')
-rw-r--r--ick2/workapi.py163
1 files changed, 98 insertions, 65 deletions
diff --git a/ick2/workapi.py b/ick2/workapi.py
index c054f0a..01f9932 100644
--- a/ick2/workapi.py
+++ b/ick2/workapi.py
@@ -13,6 +13,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+import time
+
+
import ick2
@@ -38,44 +42,69 @@ class WorkAPI(ick2.APIbase):
def get_work(self, **kwargs):
worker_id = self._get_client_id(**kwargs)
+ ick2.log.log(
+ 'trace', msg_text='Worker wants work', worker_id=worker_id)
with self._trans.modify('workers', worker_id) as worker:
doing = worker.get('doing')
if doing:
+ ick2.log.log(
+ 'trace', msg_text='Worker already got work', doing=doing)
return doing
build_id = self._pick_build(worker_id)
if build_id is None:
+ ick2.log.log('trace', msg_text='No suitable build for worker')
return {}
with self._trans.modify('builds', build_id) as build:
- build['status'] = 'building'
- build['worker'] = worker_id
-
- build_obj = ick2.Build(build)
- graph = build_obj.get_graph()
- action_id = self._pick_next_action(graph)
- if action_id is None: # pragma: no cover
- return {}
-
- graph.set_action_status(action_id, 'building')
- action = graph.get_action(action_id)
-
- doing = {
- 'build_id': build_id,
- 'build_number': build['build_number'],
- 'worker': worker_id,
- 'project': build['project'],
- 'parameters': build['parameters'],
- 'action_id': action_id,
- 'step': action,
- 'log': build['log'],
- }
-
- worker.from_dict({
- 'worker': worker_id,
- 'doing': doing,
- })
+ with self._trans.modify('log', build_id) as log:
+ ick2.log.log(
+ 'trace', msg_text='Picked build for worker',
+ build_id=build_id, build=build.as_dict())
+ build_obj = ick2.Build(build)
+ sm = ick2.BuildStateMachine(build_obj)
+
+ if sm.get_state() == ick2.BUILD_TRIGGERED:
+ sm.handle_event(ick2.BuildStartsEvent())
+ self._append_text_to_build_log(
+ log,
+ 'Starting build at {}\n'.format(
+ time.strftime('%Y-%m-%d %H:%M:%S')))
+
+ resp = sm.handle_event(ick2.NeedWorkEvent())
+ if resp is None: # pragma: no cover
+ ick2.log.log(
+ 'trace', msg_text='Did not find work in build')
+ self._append_text_to_build_log(
+ log, 'Did not find work to do in build\n')
+ return {}
+ action_id, action = resp
+
+ build['worker'] = worker_id
+ doing = {
+ 'build_id': build_id,
+ 'build_number': build['build_number'],
+ 'worker': worker_id,
+ 'project': build['project'],
+ 'parameters': build['parameters'],
+ 'action_id': action_id,
+ 'step': action,
+ 'log': build['log'],
+ }
+
+ worker.from_dict({
+ 'worker': worker_id,
+ 'doing': doing,
+ })
+ self._append_text_to_build_log(
+ log,
+ 'Giving worker action:\n{}\n'.format(
+ json.dumps(doing, indent=4)))
+
+ ick2.log.log(
+ 'trace', msg_text='Returning work for worker',
+ doing=worker['doing'])
return worker['doing']
def _get_client_id(self, **kwargs):
@@ -93,10 +122,11 @@ class WorkAPI(ick2.APIbase):
return build.get('status')
def is_building(build):
- return status(build) == 'building'
+ building_states = (ick2.BUILD_BUILDING, ick2.BUILD_NOTIFYING)
+ return status(build) in building_states
def is_triggered(build):
- return status(build) == 'triggered'
+ return status(build) == ick2.BUILD_TRIGGERED
builds = self._trans.get_resources('builds')
return (self._find_build(builds, on_worker, is_building) or
@@ -108,12 +138,6 @@ class WorkAPI(ick2.APIbase):
return build['build_id']
return None
- def _pick_next_action(self, graph):
- action_ids = graph.find_actions('ready')
- if not action_ids: # pragma: no cover
- return None
- return action_ids[0]
-
def update_work(self, update, **kwargs):
try:
worker_id = update['worker']
@@ -125,32 +149,40 @@ class WorkAPI(ick2.APIbase):
with self._trans.modify('workers', worker_id) as worker:
with self._trans.modify('builds', build_id) as build:
- build_obj = ick2.Build(build)
- graph = build_obj.get_graph()
- doing = worker.get('doing', {})
- self._check_work_update(doing, update)
- self._append_to_build_log(update)
- action_id = doing['action_id']
-
- if exit_code is not None:
- if exit_code == 0:
- graph.set_action_status(action_id, 'done')
- graph.unblock()
- if not graph.has_more_to_do():
- build_obj.set_status('done')
- build['status'] = 0
- elif exit_code is not None:
- graph.set_action_status(action_id, 'failed')
- build_obj.set_status('failed')
- build['status'] = exit_code
-
- worker.from_dict({
- 'worker': worker_id,
- 'doing': {},
- })
+ with self._trans.modify('log', build_id) as log:
+ doing = worker.get('doing', {})
+ self._check_work_update(doing, update)
+ self._append_to_build_log(log, update)
+ action_id = doing['action_id']
+
+ build_obj = ick2.Build(build)
+ sm = ick2.BuildStateMachine(build_obj)
+ event = ick2.create_build_event(update)
+ sm.handle_event(event)
+ action_ended = update['exit_code'] is not None
+
+ if action_ended:
+ self._append_text_to_build_log(
+ log,
+ 'Action ended at {}, exit code {}\n'.format(
+ time.strftime('%Y-%m-%d %H:%M:%S'),
+ update['exit_code']))
+
+ if build_obj.is_finished(): # pragma: no cover
+ self._append_text_to_build_log(
+ log,
+ 'Build ended at {}, exit code {}\n'.format(
+ time.strftime('%Y-%m-%d %H:%M:%S'),
+ build['exit_code']))
+
+ if action_ended or build_obj.is_finished():
+ worker.from_dict({
+ 'worker': worker_id,
+ 'doing': {},
+ })
def _check_work_update(self, doing, update): # pragma: no cover
- must_match = ['worker', 'project', 'build_id']
+ must_match = ['worker', 'project', 'build_id', 'action_id']
for name in must_match:
if name not in update:
raise ick2.BadUpdate('{} not specified'.format(name))
@@ -159,12 +191,13 @@ class WorkAPI(ick2.APIbase):
'{} differs from current work: {} vs {}'.format(
name, doing.get(name), update[name]))
- def _append_to_build_log(self, update):
- build_id = update['build_id']
- with self._trans.modify('log', build_id) as log:
- for stream in ['stdout', 'stderr']:
- text = update.get(stream, '')
- log['log'] = log.get('log', '') + text
+ def _append_to_build_log(self, log, update):
+ for stream in ['stdout', 'stderr']:
+ text = update.get(stream, '')
+ self._append_text_to_build_log(log, text)
+
+ def _append_text_to_build_log(self, log, text):
+ log['log'] = log.get('log', '') + text
def create(self, body, **kwargs): # pragma: no cover
pass