# Copyright (C) 2018 Ivan Dolgov # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import urllib.parse import bottle import qvisqve class AuthRouter(qvisqve.Router): def __init__(self, apps, users, authz_attempts): super().__init__() self._apps = apps self._users = users self._attempts = authz_attempts def get_routes(self): return [ { 'method': 'GET', 'path': '/auth', 'callback': self._start_authz_code_flow, 'needs-authorization': False, }, { 'method': 'POST', 'path': '/auth', 'callback': self._check_user_creds, 'needs-authorization': False, }, ] def _start_authz_code_flow(self, content_type, body, *args, **kwargs): qvisqve.log.log( 'trace', msg_text='_start_authz_code_flow', args=args, kwargs=kwargs, content_type=content_type, body=body) path = kwargs['raw_uri_path'] if '?' not in path: return qvisqve.bad_request_response('Not like that') path, qs = path.split('?', 1) params = urllib.parse.parse_qs(qs) cleaned = self._cleanup_params(params) qvisqve.log.log( 'trace', msg_text='params', path=path, qs=qs, params=params, cleaned=cleaned) client_id = cleaned.get('client_id') redirect_uri = cleaned.get('redirect_uri') app = self._apps.get(client_id) # Check the app exist if app is None: qvisqve.log.log( 'error', msg_text='app is unknown', client_id=client_id) redirect_uri = '' else: callbacks = self._apps.get_callbacks(client_id) if redirect_uri not in callbacks: qvisqve.log.log( 'error', msg_text='redirect_uri is unknown', client_id=client_id, redirect_uri=redirect_uri, callbacks=callbacks) redirect_uri = '' cleaned['redirect_uri'] = redirect_uri aa = self._attempts.create_attempt(cleaned) form = qvisqve.render_template( 'login.tmpl', attempt_id=aa.get_attempt_id()) headers = { 'Content-Type': 'text/html; charset=utf-8', } return qvisqve.ok_response(form, headers=headers) def _cleanup_params(self, params): return { name: params[name][-1] for name in params } def _check_user_creds(self, content_type, body, *args, **kwargs): qvisqve.log.log( 'trace', msg_text='_check_user_creds', args=args, kwargs=kwargs, content_type=content_type, body=body) if content_type != 'application/x-www-form-urlencoded': return qvisqve.bad_request_response('Wrong content type') params = self._get_form_params(body) username = self._get_param(params, 'username') password = self._get_param(params, 'password') attempt_id = self._get_param(params, 'attempt_id') qvisqve.log.log( 'trace', msg_text='extracted form parameters', params=params, username=username, password=password, attempt_id=attempt_id) if None in (username, password, attempt_id): qvisqve.log.log( 'error', msg_text='Necessary form field not given', username=username, password=password, attempt_id=attempt_id) return qvisqve.unauthorized_response('Access denied') if not self._users.is_valid_secret(username, password): qvisqve.log.log( 'error', msg_text='User secret is invalid (username or password wrong)', username=username, password=password, attempt_id=attempt_id) return qvisqve.unauthorized_response('Access denied') aa = self._attempts.find_by_id(attempt_id) if aa is None: qvisqve.log.log( 'error', msg_text='Attempt ID is unknown', attempt_id=attempt_id) return qvisqve.unauthorized_response('Access denied') aa.set_subject_id(username) gen = qvisqve.NonceGenerator() code = gen.create_nonce() aa.set_authorization_code(code) redirect_uri = aa.get_redirect_uri() if redirect_uri == '': qvisqve.log.log( 'error', msg_text='redirect_uri is unknown', redirect_uri=repr(redirect_uri)) return qvisqve.bad_request_response('Bad request') params = { 'code': code, 'state': aa.get_state(), } url = '{}?{}'.format( redirect_uri, urllib.parse.urlencode(params) ) qvisqve.log.log('trace', msg_text='Returning redirect', url=url) return qvisqve.found_response('Redirect to callback url', url) def _get_param(self, params, name): values = params.get(name) if not isinstance(values, list) or len(values) < 1: return None return values[0] def _get_form_params(self, body): body = body.decode('UTF-8') return urllib.parse.parse_qs(body)