# Copyright (C) 2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import qvisqve import qvisqve_secrets class AuthenticatingEntityManager(qvisqve.EntityManager): _hashed = 'hashed_secret' def set_secret(self, entity_id, cleartext_secret): entity = self.get(entity_id) hasher = qvisqve_secrets.SecretHasher() entity[self._hashed] = hasher.hash(cleartext_secret) self.create(entity_id, entity) def is_valid_secret(self, entity_id, cleartext_secret): try: entity = self.get(entity_id) except qvisqve.ResourceDoesNotExist: qvisqve.log.log( 'error', msg_text='Entity does not exist', entity_id=entity_id) return False hashed_secret = entity.get(self._hashed) if not hashed_secret: qvisqve.log.log( 'error', msg_text='Entity does not have a hashed secret', entity_id=entity_id) return False hasher = qvisqve_secrets.SecretHasher() if not hasher.is_correct(hashed_secret, cleartext_secret): qvisqve.log.log( 'error', msg_text='Client-supplied secret is WRONG', entity_id=entity_id) return False qvisqve.log.log( 'debug', msg_text='Client-supplied secret IS correct', entity_id=entity_id) return True def set_allowed_scopes(self, entity_id, scopes): entity = self.get(entity_id) entity['allowed_scopes'] = scopes self.create(entity_id, entity) def get_allowed_scopes(self, entity_id): entity = self.get(entity_id) return entity.get('allowed_scopes', []) class ClientManager(AuthenticatingEntityManager): def __init__(self, rs): super().__init__(rs, 'client') def get_subject(self, username): user = self.get(username) return user.get('sub') def set_subject(self, username, sub): user = self.get(username) user['sub'] = sub self.create(username, user) class UserManager(AuthenticatingEntityManager): def __init__(self, rs): super().__init__(rs, 'user')