# Copyright (C) 2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import urllib.parse import bottle import qvisqve import qvisqve_secrets class TokenRouter(qvisqve.Router): def __init__(self, token_generator, clients, users, authz_attempts): qvisqve.log.log('debug', msg_text='TokenRouter init starts') super().__init__() args = (users, clients, token_generator, authz_attempts) self._grants = { 'client_credentials': ClientCredentialsGrant(*args), 'authorization_code': AuthorizationCodeGrant(*args), } qvisqve.log.log('debug', msg_text='TokenRouter created') def get_routes(self): return [ { 'method': 'POST', 'path': '/token', 'callback': self._create_token, 'needs-authorization': False, }, ] def _create_token(self, content_type, body, **kwargs): qvisqve.log.log('xxx', body=body, kwargs=kwargs) if content_type != 'application/x-www-form-urlencoded': return qvisqve.bad_request_response('Wrong content type') params = self._get_form_params(body) grant_type = self._get_grant_type(params) grant = self._get_grant(grant_type) if grant is None: return qvisqve.bad_request_response('Wrong grant type') return grant.get_token(bottle.request, params) def _get_form_params(self, body): body = body.decode('UTF-8') return urllib.parse.parse_qs(body) def _get_grant_type(self, params): grant_type = params.get('grant_type') if len(grant_type) == 1: return grant_type[0] return None def _get_grant(self, grant_type): return self._grants.get(grant_type) class Grant: def __init__(self, users, clients, generator, authz_attempts): self._users = users self._clients = clients self._generator = generator self._attempts = authz_attempts class ClientCredentialsGrant(Grant): def get_token(self, request, params): qvisqve.log.log( 'debug', msg_text='ClientCredentialGrant.get_token called', request=request, params=params) client_id, client_secret = request.auth if not self._clients.is_valid_secret(client_id, client_secret): qvisqve.log.log( 'error', msg_text='Client token request is unauthorized') return qvisqve.unauthorized_response('Unauthorized') scope = self._get_scope(params) if scope is None: return qvisqve.bad_request_response('Bad scope') allowed = self._clients.get_allowed_scopes(client_id) scope = ' '.join( s for s in scope.split() if s in allowed ) sub = self._clients.get_subject(client_id) token = self._generator.new_token(client_id, scope, subject_id=sub) return qvisqve.ok_response({ 'access_token': token, 'token_type': 'Bearer', 'scope': scope, }) def _get_scope(self, params): scope = params.get('scope', []) if len(scope) > 1: return None if scope: return scope[0] return '' class AuthorizationCodeGrant(Grant): def get_token(self, request, params): client_id, client_secret = request.auth if not self._clients.is_valid_secret(client_id, client_secret): qvisqve.log.log('error', msg_text='Invalid client creds given') return qvisqve.unauthorized_response('Access denied') code = self._get_code(params) if code is None: qvisqve.log.log('error', msg_text='No code given') return qvisqve.unauthorized_response('Access denied') aa = self._attempts.find_by_code(code) if aa is None: qvisqve.log.log('error', msg_text='Unknown code given', code=code) return qvisqve.bad_request_response('Bad request') subject_id = aa.get_subject_id() scope = aa.get_scope() allowed = self._users.get_allowed_scopes(subject_id) scope = ' '.join( s for s in scope.split() if s in allowed ) self._attempts.delete_by_id(aa.get_attempt_id()) empty_token = self._generator.new_token( '', scope, subject_id=subject_id) return qvisqve.ok_response({ 'access_token': empty_token, 'token_type': 'Bearer', 'expires_in': 3600, # FIXME expiration time 'scope': '', }) def _get_code(self, params): code = params.get('code') if len(code) == 1: return code[0] return None class Clients: def __init__(self, clients): self._clients = clients self._hasher = qvisqve_secrets.SecretHasher() def is_correct_secret(self, client_id, cleartext): client = self._get_client(client_id) secret = client.get('client_secret') return secret and self._hasher.is_correct(secret, cleartext) def get_allowed_scopes(self, client_id): client = self._get_client(client_id) return client.get('allowed_scopes', []) def _get_client(self, client_id): return self._clients.get(client_id, {})