#!/usr/bin/env python3 # # Simple tool for accessing a Qvisqve instance via its API. # # Copyright 2016 QvarnLabs Ltd # Copyright 2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # =*= License: GPL-3+ =*= import base64 import configparser import json import logging import os import sys import tempfile import time import cliapp import requests import urllib3 import qvisqve json_content_type = 'application/json' paths = { 'user': '/users', 'client': '/clients', 'application': '/applications', } class QvisqveTool(cliapp.Application): def add_settings(self): self.settings.string( ['secrets'], 'get API client id and secret from FILE', metavar='FILE', default=os.path.expanduser('~/.config/qvarn/createtoken.conf')) self.settings.string( ['api-url', 'a'], 'use URL for accessing the API', metavar='URL') self.settings.string( ['token'], 'use TOKEN as the token, instead of getting a new one', metavar='TOKEN') self.settings.string( ['from-file'], 'for POST and PUT, get file to upload from FILE ' 'instead of command line', metavar='FILE') self.settings.string( ['content-type'], 'upload content using MIME-TYPE instead of application/json', metavar='MIME-TYPE', default='application/json') self.settings.string( ['scope', 'scopes'], 'use SCOPES for scopes when requesting token (space delimited list)', metavar='SCOPES', default=default_scopes()) def cmd_scopes(self, args): scopes = self.settings['scopes'] for scope in scopes.split(): self.output.write('{}\n'.format(scope)) def cmd_get_token(self, args): api, token = self.get_api() self.output.write('{}\n'.format(token)) def cmd_get(self, args): api, token = self.get_api() kind = args[0] names = args[1:] path = paths[kind] for name in names: path = '{}/{}'.format(path, name) r = api.GET(token, path) self.output.write('{} {}:\n'.format(kind, name)) json.dump(r.json(), self.output, indent=4) self.output.write('\n') def cmd_list(self, args): for kind in args: path = paths[kind] api, token = self.get_api() r = api.GET(token, path) names = r.json()['resources'] for name in sorted(names): self.output.write('{} {}\n'.format(kind, name)) def cmd_create(self, args): kind, name, password = args entity = { 'id': name, } path = paths[kind] api, token = self.get_api() api.POST(token, path, json.dumps(entity), json_content_type) secret = { 'secret': password, } secret_path = '{}/{}/secret'.format(path, name) api.PUT(token, secret_path, json.dumps(secret), json_content_type) entity_path = '{}/{}'.format(path, name) self.output.write('Created {} {}\n'.format(kind, name)) def cmd_delete(self, args): kind = args[0] for name in args[1:]: path = '{}/{}'.format(paths[kind], name) self.cmd_DELETE([path]) def cmd_set_secret(self, args): kind, name, secret = args path = paths[kind] api, token = self.get_api() secret = { 'secret': secret, } path = '{}/{}/secret'.format(path, name) api.PUT(token, path, json.dumps(secret), json_content_type) self.output.write('Set secret for {}\n'.format(name)) def cmd_add_callback(self, args): name, callback = args api, token = self.get_api() path = '/applications/{}'.format(name) r = api.GET(token, path) app = r.json() app['callbacks'] = app.get('callbacks', []) + [callback] api.PUT(token, path, json.dumps(app), json_content_type) def cmd_allow_scope(self, args): kind = args[0] name = args[1] scopes = args[2:] api, token = self.get_api() path = '{}/{}'.format(paths[kind], name) r = api.GET(token, path) entity = r.json() entity['allowed_scopes'] = entity.get('allowed_scopes', []) + scopes api.PUT(token, path, json.dumps(entity), json_content_type) def cmd_GET(self, args): api, token = self.get_api() r = api.GET(token, args[0]) ct = r.headers['Content-Type'] if ct == json_content_type: json.dump(r.json(), self.output, indent=4) self.output.write('\n') else: self.output.write(r.content) def cmd_POST(self, args): path, resource = self.get_path_resource(args) api, token = self.get_api() content_type = self.settings['content-type'] r = api.POST(token, path, resource, content_type) json.dump(r.json(), self.output, indent=4) self.output.write('\n') def cmd_PUT(self, args): path, resource = self.get_path_resource(args) api, token = self.get_api() content_type = self.settings['content-type'] r = api.PUT(token, path, resource, content_type) json.dump(r.json(), self.output, indent=4) self.output.write('\n') def cmd_DELETE(self, args): api, token = self.get_api() r = api.DELETE(token, args[0]) def get_api(self): api = QvisqveAPI() api.set_api_url(self.settings['api-url']) if self.settings['token']: api.set_token(self.settings['token']) api.lookup_credentials(self.settings['secrets']) scopes = self.settings['scopes'] token = api.get_token(scopes) return api, token def get_path_resource(self, args): logging.debug('get_path_resource: args=%r', args) logging.debug('get_path_resource: from-file=%r', self.settings['from-file']) if len(args) == 2: return args elif len(args) == 1 and self.settings['from-file']: return args[0], self.cat(self.settings['from-file']) else: raise cliapp.AppException( 'Need resource on command line, or with --from-file') def cat(self, filename): with open(filename) as f: return f.read() class QvisqveAPI(object): def __init__(self): self._api_url = None self._client_id = None self._client_secret = None self._token = None def set_api_url(self, api_url): self._api_url = api_url def set_token(self, token): self._token = token def lookup_credentials(self, filename): cp = configparser.ConfigParser() cp.read([filename]) self._client_id = cp.get(self._api_url, 'client_id') self._client_secret = cp.get(self._api_url, 'client_secret') def get_token(self, scopes): if self._token: return self._token else: return self._get_new_token(scopes) def _get_new_token(self, scopes): auth = (self._client_id, self._client_secret) data = { u'grant_type': u'client_credentials', u'scope': scopes, } r = self.request('POST', '/token', auth=auth, data=data) obj = r.json() return obj[u'access_token'] def GET(self, token, path): return self.request('GET', path, token=token) def POST(self, token, path, resource, content_type): headers = self.prepare_usual_headers(content_type) return self.request( 'POST', path, token=token, headers=headers, data=resource) def PUT(self, token, path, resource, content_type): headers = self.prepare_usual_headers(content_type) return self.request( 'PUT', path, token=token, headers=headers, data=resource) def DELETE(self, token, path): return self.request('DELETE', path, token=token) def prepare_usual_headers(self, content_type): headers = { 'Content-Type': content_type, } return headers def request(self, method, path, token=None, headers=None, auth=None, data=None): funcs = { 'POST': requests.post, 'PUT': requests.put, 'GET': requests.get, 'DELETE': requests.delete, } url = '{}{}'.format(self._api_url, path) if token: headers = dict(headers or {}) headers['Authorization'] = 'Bearer {}'.format(token) response = funcs[method]( url, headers=headers, auth=auth, data=data, verify=True) if not response.ok: raise Error( method, self._api_url, path, response.status_code, response.text) return response class Error(cliapp.AppException): def __init__(self, method, api_url, path, status, body): self.msg = 'Error {status}: {method} {url}{path}\n\n{body}'.format( status=status, method=method, url=api_url, path=path, body=body) def default_scopes(): types = ['clients', 'users', 'applications', 'subs'] scopes = [] for resource_type in types: scopes.append('uapi_{}_post'.format(resource_type)) scopes.append('uapi_{}_get'.format(resource_type)) scopes.append('uapi_{}_id_get'.format(resource_type)) scopes.append('uapi_{}_id_put'.format(resource_type)) scopes.append('uapi_{}_id_secret_put'.format(resource_type)) scopes.append('uapi_{}_id_delete'.format(resource_type)) return ' '.join(scopes) QvisqveTool(version=qvisqve.__version__).run()