# Copyright (C) 2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import json import logging import time import urllib import requests class HttpError(Exception): pass class HttpAPI: # Make requests to an HTTP API. json_type = 'application/json' def __init__(self): self._session = requests.Session() self._token = None self._verify = None def set_session(self, session): self._session = session def set_verify_tls(self, verify): # pragma: no cover self._verify = verify def set_token(self, token): self._token = token def get_dict(self, url, headers=None): r = self._request(self._session.get, url, headers=headers) ct = r.headers.get('Content-Type') if ct != self.json_type: raise HttpError('Not JSON response') try: return r.json() except json.decoder.JSONDecodeError: raise HttpError('JSON parsing error') def get_blob(self, url, headers=None): r = self._request(self._session.get, url, headers=headers) return r.content def delete(self, url, headers=None): # pragma: no cover r = self._request(self._session.delete, url, headers=headers) return r.content def post(self, url, headers=None, body=None): self._send_request(self._session.post, url, headers=headers, body=body) def post_auth(self, url, headers=None, body=None, auth=None): assert auth is not None if headers is None: headers = {} headers['Authorization'] = self._basic_auth(auth) return self._send_request( self._session.post, url, headers=headers, body=body, auth=auth) def _basic_auth(self, auth): username, password = auth cleartext = '{}:{}'.format(username, password).encode('UTF-8') encoded = base64.b64encode(cleartext) return 'Basic {}'.format(encoded.decode('UTF-8')) def put(self, url, headers=None, body=None): self._send_request(self._session.put, url, headers=headers, body=body) def _send_request(self, func, url, headers=None, body=None, auth=None): if headers is None: headers = {} headers = dict(headers) if not headers.get('Content-Type'): h, body = self._get_content_type_header(body) headers.update(h) return self._request(func, url, headers=headers, data=body, auth=auth) def _get_content_type_header(self, body): if isinstance(body, dict): header = { 'Content-Type': 'application/json', } body = json.dumps(body) return header, body return {}, body def _get_authorization_headers(self): return { 'Authorization': 'Bearer {}'.format(self._token), } def _request(self, func, url, headers=None, **kwargs): if headers is None: headers = {} auth = kwargs.get('auth') if auth is None: headers.update(self._get_authorization_headers()) if 'auth' in kwargs: del kwargs['auth'] r = func(url, headers=headers, verify=self._verify, **kwargs) if not r.ok: raise HttpError('{}: {}'.format(r.status_code, r.text)) return r class ControllerClient: def __init__(self): self._name = None self._api = HttpAPI() self._url = None self._auth_url = None def set_client_name(self, name): self._name = name def set_verify_tls(self, verify): # pragma: no cover self._api.set_verify_tls(verify) def set_http_api(self, api): self._api = api def get_http_api(self): # pragma: no cover return self._api def set_controller_url(self, url): self._url = url def get_controller_url(self): # pragma: no cover return self._url def set_token(self, token): self._api.set_token(token) def url(self, path): return '{}{}'.format(self._url, path) def get_version(self): url = self.url('/version') return self._api.get_dict(url) def get_apt_server(self): version = self.get_version() logging.debug('get_apt_server: version: %s', version) server = version.get('apt_server') logging.info('APT server: %r', server) return server def get_artifact_store_url(self): version = self.get_version() url = version.get('artifact_store') logging.info('Artifact store URL: %r', url) return url def get_auth_url(self): version = self.get_version() url = version.get('auth_url') logging.info('Authentication URL: %r', url) return url def get_notify_url(self): # pragma: no cover version = self.get_version() url = version.get('notify_url') logging.info('Notification URL: %r', url) return url def get_auth_client(self): url = self.get_auth_url() ac = AuthClient() ac.set_auth_url(url) ac.set_http_api(self._api) return ac def get_blob_client(self): url = self.get_artifact_store_url() blobs = BlobClient() blobs.set_url(url) blobs.set_http_api(self._api) return blobs def register(self): assert self._url is not None url = self.url('/workers') logging.info('Registering worker %s to %s', self._name, url) body = { 'worker': self._name, } try: self._api.post(url, body=body) except HttpError: pass def get_work(self): url = self.url('/work') work = self._api.get_dict(url) logging.info('Requested work, got: %r', work) return work def report_work(self, work): logging.info('Reporting work: %r', work) url = self.url('/work') headers = { 'Content-Type': self._api.json_type, } body = json.dumps(work) self._api.post(url, headers=headers, body=body) def show(self, path): # pragma: no cover url = self.url(path) return self._api.get_dict(url) def show_blob(self, path): # pragma: no cover url = self.url(path) return self._api.get_blob(url) def delete(self, path): # pragma: no cover url = self.url(path) return self._api.delete(url) def create(self, path, obj): # pragma: no cover url = self.url(path) return self._api.post(url, body=obj) def update(self, path, obj): # pragma: no cover url = self.url(path) return self._api.put(url, body=obj) def trigger(self, project_name): # pragma: no cover path = '/projects/{}/+trigger'.format(project_name) url = self.url(path) return self._api.get_dict(url) def get_log(self, build_id): # pragma: no cover path = '/logs/{}'.format(build_id) url = self.url(path) return self._api.get_blob(url) def notify(self, notify): # pragma: no cover url = self.get_notify_url() self._api.post(url, body=notify) class AuthClient: def __init__(self): self._auth_url = None self._http_api = HttpAPI() self._client_id = None self._client_secret = None def set_auth_url(self, url): self._auth_url = url def set_http_api(self, api): self._http_api = api def set_client_creds(self, client_id, client_secret): self._client_id = client_id self._client_secret = client_secret def get_token(self, scope): auth = (self._client_id, self._client_secret) params = { 'grant_type': 'client_credentials', 'scope': scope, } body = urllib.parse.urlencode(params) headers = { 'Content-Type': 'application/x-www-form-urlencoded', } r = self._http_api.post_auth( self._auth_url, headers=headers, body=body, auth=auth) obj = r.json() return obj['access_token'] class Reporter: # pragma: no cover def __init__(self, api, work): self._api = api self._work = dict(work) def report(self, exit_code, stdout, stderr): result = dict(self._work) result['exit_code'] = exit_code result['stdout'] = self.make_string(stdout) result['stderr'] = self.make_string(stderr) self._api.report_work(result) def make_string(self, thing): if isinstance(thing, bytes): return thing.decode('UTF-8') if isinstance(thing, str): return thing if thing is None: return thing return repr(thing) def get_work_result(self, work): return { 'build_id': work['build_id'], 'worker': work['worker'], 'project': work['project'], 'pipeline': work['pipeline'], 'stdout': '', 'stderr': '', 'exit_code': None, 'timestamp': self.now(), } def now(self): return time.strftime('%Y-%m-%dT%H:%M:%S') class BlobClient: def __init__(self): self._url = None self._api = None def set_url(self, url): self._url = url def set_http_api(self, api): self._api = api def url(self, blob_name): assert self._url is not None return '{}/blobs/{}'.format(self._url, blob_name) def download(self, blob_name): logging.info('Download blob %s', blob_name) url = self.url(blob_name) return self._api.get_blob(url) def upload(self, blob_name, blob): logging.info('Upload blob %s', blob_name) url = self.url(blob_name) return self._api.put(url, body=blob)