From 87b542db319257ef0bdb418bd847e81fae6e3d2c Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Wed, 1 Aug 2018 16:11:25 +0300 Subject: Add: qvisqvetool --- qvisqvetool | 260 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 260 insertions(+) create mode 100755 qvisqvetool diff --git a/qvisqvetool b/qvisqvetool new file mode 100755 index 0000000..d8d61d7 --- /dev/null +++ b/qvisqvetool @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 +# +# Simple tool for accessing a Qvisqve instance via its API. +# +# Copyright 2016 QvarnLabs Ltd +# Copyright 2018 Lars Wirzenius +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# =*= License: GPL-3+ =*= + + +import base64 +import configparser +import json +import logging +import os +import sys +import tempfile +import time + +import cliapp +import requests +import urllib3 + +import qvisqve + + +class QvisqveTool(cliapp.Application): + + def add_settings(self): + self.settings.string( + ['secrets'], + 'get API client id and secret from FILE', + metavar='FILE', + default=os.path.expanduser('~/.config/qvarn/createtoken.conf')) + + self.settings.string( + ['api-url', 'a'], + 'use URL for accessing the API', + metavar='URL') + + self.settings.string( + ['token'], + 'use TOKEN as the token, instead of getting a new one', + metavar='TOKEN') + + self.settings.string( + ['from-file'], + 'for POST and PUT, get file to upload from FILE ' + 'instead of command line', + metavar='FILE') + + self.settings.string( + ['content-type'], + 'upload content using MIME-TYPE instead of application/json', + metavar='MIME-TYPE', + default='application/json') + + self.settings.string( + ['scope', 'scopes'], + 'use SCOPES for scopes when requesting token (space delimited list)', + metavar='SCOPES', + default=default_scopes()) + + def cmd_scopes(self, args): + scopes = self.settings['scopes'] + for scope in scopes.split(): + self.output.write('{}\n'.format(scope)) + + def cmd_get_token(self, args): + api = self.get_api() + scopes = self.settings['scopes'] + token = api.get_token(scopes) + self.output.write('{}\n'.format(token)) + + def cmd_GET(self, args): + api = self.get_api() + scopes = self.settings['scopes'] + token = api.get_token(scopes) + r = api.GET(token, args[0]) + ct = r.headers['Content-Type'] + if ct == 'application/json': + json.dump(r.json(), self.output, indent=4) + self.output.write('\n') + else: + self.output.write(r.content) + + def cmd_POST(self, args): + path, resource = self.get_path_resource(args) + api = self.get_api() + scopes = self.settings['scopes'] + token = api.get_token(scopes) + content_type = self.settings['content-type'] + r = api.POST(token, path, resource, content_type) + json.dump(r.json(), self.output, indent=4) + self.output.write('\n') + + def cmd_PUT(self, args): + path, resource = self.get_path_resource(args) + api = self.get_api() + scopes = self.settings['scopes'] + token = api.get_token(scopes) + content_type = self.settings['content-type'] + r = api.PUT(token, path, resource, content_type) + json.dump(r.json(), self.output, indent=4) + self.output.write('\n') + + def cmd_DELETE(self, args): + api = self.get_api() + scopes = self.settings['scopes'] + token = api.get_token(scopes) + r = api.DELETE(token, args[0]) + + def get_api(self): + api = QvisqveAPI() + api.set_api_url(self.settings['api-url']) + if self.settings['token']: + api.set_token(self.settings['token']) + api.lookup_credentials(self.settings['secrets']) + return api + + def get_path_resource(self, args): + logging.debug('get_path_resource: args=%r', args) + logging.debug('get_path_resource: from-file=%r', self.settings['from-file']) + if len(args) == 2: + return args + elif len(args) == 1 and self.settings['from-file']: + return args[0], self.cat(self.settings['from-file']) + else: + raise cliapp.AppException( + 'Need resource on command line, or with --from-file') + + def cat(self, filename): + with open(filename) as f: + return f.read() + + +class QvisqveAPI(object): + + def __init__(self): + self._api_url = None + self._client_id = None + self._client_secret = None + self._token = None + + def set_api_url(self, api_url): + self._api_url = api_url + + def set_token(self, token): + self._token = token + + def lookup_credentials(self, filename): + cp = configparser.ConfigParser() + cp.read([filename]) + self._client_id = cp.get(self._api_url, 'client_id') + self._client_secret = cp.get(self._api_url, 'client_secret') + + def get_token(self, scopes): + if self._token: + return self._token + else: + return self._get_new_token(scopes) + + def _get_new_token(self, scopes): + auth = (self._client_id, self._client_secret) + + data = { + u'grant_type': u'client_credentials', + u'scope': scopes, + } + + r = self.request('POST', '/token', auth=auth, data=data) + obj = r.json() + return obj[u'access_token'] + + def GET(self, token, path): + return self.request('GET', path, token=token) + + def POST(self, token, path, resource, content_type): + headers = self.prepare_usual_headers(content_type) + return self.request( + 'POST', path, token=token, headers=headers, data=resource) + + def PUT(self, token, path, resource, content_type): + headers = self.prepare_usual_headers(content_type) + return self.request( + 'PUT', path, token=token, headers=headers, data=resource) + + def DELETE(self, token, path): + return self.request('DELETE', path, token=token) + + def prepare_usual_headers(self, content_type): + headers = { + 'Content-Type': content_type, + } + return headers + + def request(self, method, path, token=None, headers=None, + auth=None, data=None): + + funcs = { + 'POST': requests.post, + 'PUT': requests.put, + 'GET': requests.get, + 'DELETE': requests.delete, + } + + url = '{}{}'.format(self._api_url, path) + + if token: + headers = dict(headers or {}) + headers['Authorization'] = 'Bearer {}'.format(token) + + response = funcs[method]( + url, + headers=headers, + auth=auth, + data=data, + verify=True) + + if not response.ok: + raise Error( + method, self._api_url, path, response.status_code, + response.text) + + return response + + +class Error(cliapp.AppException): + + def __init__(self, method, api_url, path, status, body): + self.msg = 'Error {status}: {method} {url}{path}\n\n{body}'.format( + status=status, method=method, url=api_url, path=path, body=body) + + +def default_scopes(): + types = ['clients', 'users', 'applications'] + scopes = [] + for resource_type in types: + scopes.append('uapi_{}_post'.format(resource_type)) + scopes.append('uapi_{}_get'.format(resource_type)) + scopes.append('uapi_{}_id_get'.format(resource_type)) + scopes.append('uapi_{}_id_secret_put'.format(resource_type)) + scopes.append('uapi_{}_id_delete'.format(resource_type)) + return ' '.join(scopes) + + +QvisqveTool(version=qvisqve.__version__).run() -- cgit v1.2.1