diff options
Diffstat (limited to 'qvisqve/auth_router.py')
-rw-r--r-- | qvisqve/auth_router.py | 108 |
1 files changed, 94 insertions, 14 deletions
diff --git a/qvisqve/auth_router.py b/qvisqve/auth_router.py index a91661c..097b5e7 100644 --- a/qvisqve/auth_router.py +++ b/qvisqve/auth_router.py @@ -17,54 +17,134 @@ import urllib.parse +import bottle + + import qvisqve +login_form = ''' +<!DOCTYPE html> +<html> + <head> + <meta charset="utf-8"> + <meta name="description" content="Qvisqve Login"> + <meta name="author" content="Qvarnlabs Ltd"> + <title>Qvisqve Login</title> + </head> + <body> + <form action="/auth" method="POST"> + <input name="attempt_id" value="{{attempt_id}}" type="hidden" /> + User name: <input name="username" type="text" /> + <br /> + Password: <input name="password" type="password" /> + <br /> + <input type="submit" value="Login" /> + </form> + </body> +</html> +''' + + class AuthRouter(qvisqve.Router): - def __init__(self, apps, users): + def __init__(self, apps, users, authz_attempts): super().__init__() self._apps = apps self._users = users + self._attempts = authz_attempts def get_routes(self): return [ { + 'method': 'GET', + 'path': '/auth', + 'callback': self._start_authz_code_flow, + 'needs-authorization': False, + }, + { 'method': 'POST', 'path': '/auth', - 'callback': self._auth, + 'callback': self._check_user_creds, 'needs-authorization': False, }, ] - def _auth(self, content_type, body, *args, **kwargs): + def _start_authz_code_flow(self, content_type, body, *args, **kwargs): + qvisqve.log.log( + 'trace', msg_text='_start_authz_code_flow', args=args, + kwargs=kwargs, content_type=content_type, body=body) + path = kwargs['raw_uri_path'] + if '?' not in path: + return qvisqve.bad_request_response('Not like that') + + path, qs = path.split('?', 1) + params = urllib.parse.parse_qs(qs) + cleaned = self._cleanup_params(params) + qvisqve.log.log( + 'trace', msg_text='params', path=path, qs=qs, params=params, + cleaned=cleaned) + + aa = self._attempts.create_attempt(cleaned) + form = bottle.template(login_form, attempt_id=aa.get_attempt_id()) + headers = { + 'Content-Type': 'text/html; charset=utf-8', + } + return qvisqve.ok_response(form, headers=headers) + + def _cleanup_params(self, params): + return { + name: params[name][-1] + for name in params + } + + def _check_user_creds(self, content_type, body, *args, **kwargs): + qvisqve.log.log( + 'trace', msg_text='_check_user_creds', args=args, + kwargs=kwargs, content_type=content_type, body=body) + if content_type != 'application/x-www-form-urlencoded': return qvisqve.bad_request_response('Wrong content type') params = self._get_form_params(body) username = self._get_param(params, 'username') password = self._get_param(params, 'password') + attempt_id = self._get_param(params, 'attempt_id') + qvisqve.log.log( + 'trace', msg_text='extracted form parameters', params=params, + username=username, password=password, attempt_id=attempt_id) + if None in (username, password, attempt_id): + return qvisqve.unauthorized_response('Access denied') + if not self._users.is_valid_secret(username, password): return qvisqve.unauthorized_response('Access denied') - # TODO: - # - perform actual auth - # - create and store auth code - # - use callback url provided in request + aa = self._attempts.find_by_id(attempt_id) + if aa is None: + return qvisqve.unauthorized_response('Access denied') - # FIXME use real app name here - callbacks = self._apps.get_callbacks('facade') - callback_url = callbacks[0] + aa.set_subject_id(username) - params = urllib.parse.urlencode({'code': 123}) - url = '{}?{}'.format(callback_url, params) + gen = qvisqve.NonceGenerator() + code = gen.create_nonce() + aa.set_authorization_code(code) - qvisqve.log.log('xxx', msg_text='Returning redirect', url=url) + params = { + 'code': code, + } + url = '{}?{}'.format( + aa.get_redirect_uri(), + urllib.parse.urlencode(params) + ) + qvisqve.log.log('trace', msg_text='Returning redirect', url=url) return qvisqve.found_response('Redirect to callback url', url) def _get_param(self, params, name): - return params[name][0] + values = params.get(name) + if not isinstance(values, list) or len(values) < 1: + return None + return values[0] def _get_form_params(self, body): body = body.decode('UTF-8') |