# Copyright (C) 2018 Ivan Dolgov
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
import urllib.parse
import bottle
import qvisqve
login_form = '''
Qvisqve Login
'''
class AuthRouter(qvisqve.Router):
def __init__(self, apps, users, authz_attempts):
super().__init__()
self._apps = apps
self._users = users
self._attempts = authz_attempts
def get_routes(self):
return [
{
'method': 'GET',
'path': '/auth',
'callback': self._start_authz_code_flow,
'needs-authorization': False,
},
{
'method': 'POST',
'path': '/auth',
'callback': self._check_user_creds,
'needs-authorization': False,
},
]
def _start_authz_code_flow(self, content_type, body, *args, **kwargs):
qvisqve.log.log(
'trace', msg_text='_start_authz_code_flow', args=args,
kwargs=kwargs, content_type=content_type, body=body)
path = kwargs['raw_uri_path']
if '?' not in path:
return qvisqve.bad_request_response('Not like that')
path, qs = path.split('?', 1)
params = urllib.parse.parse_qs(qs)
cleaned = self._cleanup_params(params)
qvisqve.log.log(
'trace', msg_text='params', path=path, qs=qs, params=params,
cleaned=cleaned)
client_id = cleaned.get('client_id')
redirect_uri = cleaned.get('redirect_uri')
app = self._apps.get(client_id) # Check the app exist
if app is None:
qvisqve.log.log(
'error', msg_text='app is unknown', client_id=client_id)
redirect_uri = ''
else:
callbacks = self._apps.get_callbacks(client_id)
if redirect_uri not in callbacks:
qvisqve.log.log(
'error', msg_text='redirect_uri is unknown',
client_id=client_id, redirect_uri=redirect_uri,
callbacks=callbacks)
redirect_uri = ''
cleaned['redirect_uri'] = redirect_uri
aa = self._attempts.create_attempt(cleaned)
form = bottle.template(login_form, attempt_id=aa.get_attempt_id())
headers = {
'Content-Type': 'text/html; charset=utf-8',
}
return qvisqve.ok_response(form, headers=headers)
def _cleanup_params(self, params):
return {
name: params[name][-1]
for name in params
}
def _check_user_creds(self, content_type, body, *args, **kwargs):
qvisqve.log.log(
'trace', msg_text='_check_user_creds', args=args,
kwargs=kwargs, content_type=content_type, body=body)
if content_type != 'application/x-www-form-urlencoded':
return qvisqve.bad_request_response('Wrong content type')
params = self._get_form_params(body)
username = self._get_param(params, 'username')
password = self._get_param(params, 'password')
attempt_id = self._get_param(params, 'attempt_id')
qvisqve.log.log(
'trace', msg_text='extracted form parameters', params=params,
username=username, password=password, attempt_id=attempt_id)
if None in (username, password, attempt_id):
qvisqve.log.log(
'error', msg_text='Necessary form field not given',
username=username, password=password, attempt_id=attempt_id)
return qvisqve.unauthorized_response('Access denied')
if not self._users.is_valid_secret(username, password):
qvisqve.log.log(
'error',
msg_text='User secret is invalid (username or password wrong)',
username=username, password=password, attempt_id=attempt_id)
return qvisqve.unauthorized_response('Access denied')
aa = self._attempts.find_by_id(attempt_id)
if aa is None:
qvisqve.log.log(
'error',
msg_text='Attempt ID is unknown', attempt_id=attempt_id)
return qvisqve.unauthorized_response('Access denied')
aa.set_subject_id(username)
gen = qvisqve.NonceGenerator()
code = gen.create_nonce()
aa.set_authorization_code(code)
redirect_uri = aa.get_redirect_uri()
if redirect_uri == '':
qvisqve.log.log(
'error',
msg_text='redirect_uri is unknown',
redirect_uri=repr(redirect_uri))
return qvisqve.bad_request_response('Bad request')
params = {
'code': code,
}
url = '{}?{}'.format(
redirect_uri,
urllib.parse.urlencode(params)
)
qvisqve.log.log('trace', msg_text='Returning redirect', url=url)
return qvisqve.found_response('Redirect to callback url', url)
def _get_param(self, params, name):
values = params.get(name)
if not isinstance(values, list) or len(values) < 1:
return None
return values[0]
def _get_form_params(self, body):
body = body.decode('UTF-8')
return urllib.parse.parse_qs(body)