summaryrefslogtreecommitdiff
path: root/qvisqve/auth_router.py
diff options
context:
space:
mode:
Diffstat (limited to 'qvisqve/auth_router.py')
-rw-r--r--qvisqve/auth_router.py108
1 files changed, 94 insertions, 14 deletions
diff --git a/qvisqve/auth_router.py b/qvisqve/auth_router.py
index a91661c..097b5e7 100644
--- a/qvisqve/auth_router.py
+++ b/qvisqve/auth_router.py
@@ -17,54 +17,134 @@
import urllib.parse
+import bottle
+
+
import qvisqve
+login_form = '''
+<!DOCTYPE html>
+<html>
+ <head>
+ <meta charset="utf-8">
+ <meta name="description" content="Qvisqve Login">
+ <meta name="author" content="Qvarnlabs Ltd">
+ <title>Qvisqve Login</title>
+ </head>
+ <body>
+ <form action="/auth" method="POST">
+ <input name="attempt_id" value="{{attempt_id}}" type="hidden" />
+ User name: <input name="username" type="text" />
+ <br />
+ Password: <input name="password" type="password" />
+ <br />
+ <input type="submit" value="Login" />
+ </form>
+ </body>
+</html>
+'''
+
+
class AuthRouter(qvisqve.Router):
- def __init__(self, apps, users):
+ def __init__(self, apps, users, authz_attempts):
super().__init__()
self._apps = apps
self._users = users
+ self._attempts = authz_attempts
def get_routes(self):
return [
{
+ 'method': 'GET',
+ 'path': '/auth',
+ 'callback': self._start_authz_code_flow,
+ 'needs-authorization': False,
+ },
+ {
'method': 'POST',
'path': '/auth',
- 'callback': self._auth,
+ 'callback': self._check_user_creds,
'needs-authorization': False,
},
]
- def _auth(self, content_type, body, *args, **kwargs):
+ def _start_authz_code_flow(self, content_type, body, *args, **kwargs):
+ qvisqve.log.log(
+ 'trace', msg_text='_start_authz_code_flow', args=args,
+ kwargs=kwargs, content_type=content_type, body=body)
+ path = kwargs['raw_uri_path']
+ if '?' not in path:
+ return qvisqve.bad_request_response('Not like that')
+
+ path, qs = path.split('?', 1)
+ params = urllib.parse.parse_qs(qs)
+ cleaned = self._cleanup_params(params)
+ qvisqve.log.log(
+ 'trace', msg_text='params', path=path, qs=qs, params=params,
+ cleaned=cleaned)
+
+ aa = self._attempts.create_attempt(cleaned)
+ form = bottle.template(login_form, attempt_id=aa.get_attempt_id())
+ headers = {
+ 'Content-Type': 'text/html; charset=utf-8',
+ }
+ return qvisqve.ok_response(form, headers=headers)
+
+ def _cleanup_params(self, params):
+ return {
+ name: params[name][-1]
+ for name in params
+ }
+
+ def _check_user_creds(self, content_type, body, *args, **kwargs):
+ qvisqve.log.log(
+ 'trace', msg_text='_check_user_creds', args=args,
+ kwargs=kwargs, content_type=content_type, body=body)
+
if content_type != 'application/x-www-form-urlencoded':
return qvisqve.bad_request_response('Wrong content type')
params = self._get_form_params(body)
username = self._get_param(params, 'username')
password = self._get_param(params, 'password')
+ attempt_id = self._get_param(params, 'attempt_id')
+ qvisqve.log.log(
+ 'trace', msg_text='extracted form parameters', params=params,
+ username=username, password=password, attempt_id=attempt_id)
+ if None in (username, password, attempt_id):
+ return qvisqve.unauthorized_response('Access denied')
+
if not self._users.is_valid_secret(username, password):
return qvisqve.unauthorized_response('Access denied')
- # TODO:
- # - perform actual auth
- # - create and store auth code
- # - use callback url provided in request
+ aa = self._attempts.find_by_id(attempt_id)
+ if aa is None:
+ return qvisqve.unauthorized_response('Access denied')
- # FIXME use real app name here
- callbacks = self._apps.get_callbacks('facade')
- callback_url = callbacks[0]
+ aa.set_subject_id(username)
- params = urllib.parse.urlencode({'code': 123})
- url = '{}?{}'.format(callback_url, params)
+ gen = qvisqve.NonceGenerator()
+ code = gen.create_nonce()
+ aa.set_authorization_code(code)
- qvisqve.log.log('xxx', msg_text='Returning redirect', url=url)
+ params = {
+ 'code': code,
+ }
+ url = '{}?{}'.format(
+ aa.get_redirect_uri(),
+ urllib.parse.urlencode(params)
+ )
+ qvisqve.log.log('trace', msg_text='Returning redirect', url=url)
return qvisqve.found_response('Redirect to callback url', url)
def _get_param(self, params, name):
- return params[name][0]
+ values = params.get(name)
+ if not isinstance(values, list) or len(values) < 1:
+ return None
+ return values[0]
def _get_form_params(self, body):
body = body.decode('UTF-8')