summaryrefslogtreecommitdiff
path: root/qvisqve/auth_router.py
blob: 9dfb5823c51b19849d91ddb9fbf6a6d223d75861 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# Copyright (C) 2018  Ivan Dolgov
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import urllib.parse


import bottle


import qvisqve


login_form = '''
<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <meta name="description" content="Qvisqve Login">
        <meta name="author" content="Qvarnlabs Ltd">
        <title>Qvisqve Login</title>
    </head>
    <body>
        <form action="/auth" method="POST">
            <input name="attempt_id" value="{{attempt_id}}" type="hidden" />
            User name: <input name="username" type="text" />
            <br />
            Password: <input name="password" type="password" />
            <br />
            <input type="submit" value="Login" />
        </form>
    </body>
</html>
'''


class AuthRouter(qvisqve.Router):

    def __init__(self, apps, users, authz_attempts):
        super().__init__()
        self._apps = apps
        self._users = users
        self._attempts = authz_attempts

    def get_routes(self):
        return [
            {
                'method': 'GET',
                'path': '/auth',
                'callback': self._start_authz_code_flow,
                'needs-authorization': False,
            },
            {
                'method': 'POST',
                'path': '/auth',
                'callback': self._check_user_creds,
                'needs-authorization': False,
            },
        ]

    def _start_authz_code_flow(self, content_type, body, *args, **kwargs):
        qvisqve.log.log(
            'trace', msg_text='_start_authz_code_flow', args=args,
            kwargs=kwargs, content_type=content_type, body=body)
        path = kwargs['raw_uri_path']
        if '?' not in path:
            return qvisqve.bad_request_response('Not like that')

        path, qs = path.split('?', 1)
        params = urllib.parse.parse_qs(qs)
        cleaned = self._cleanup_params(params)
        qvisqve.log.log(
            'trace', msg_text='params', path=path, qs=qs, params=params,
            cleaned=cleaned)

        client_id = cleaned.get('client_id')
        redirect_uri = cleaned.get('redirect_uri')
        app = self._apps.get(client_id)  # Check the app exist
        if app is None:
            qvisqve.log.log(
                'error', msg_text='app is unknown', client_id=client_id)
            redirect_uri = ''
        else:
            callbacks = self._apps.get_callbacks(client_id)
            if redirect_uri not in callbacks:
                qvisqve.log.log(
                    'error', msg_text='redirect_uri is unknown',
                    client_id=client_id, redirect_uri=redirect_uri,
                    callbacks=callbacks)
                redirect_uri = ''
        cleaned['redirect_uri'] = redirect_uri

        aa = self._attempts.create_attempt(cleaned)
        form = bottle.template(login_form, attempt_id=aa.get_attempt_id())
        headers = {
            'Content-Type': 'text/html; charset=utf-8',
        }
        return qvisqve.ok_response(form, headers=headers)

    def _cleanup_params(self, params):
        return {
            name: params[name][-1]
            for name in params
        }

    def _check_user_creds(self, content_type, body, *args, **kwargs):
        qvisqve.log.log(
            'trace', msg_text='_check_user_creds', args=args,
            kwargs=kwargs, content_type=content_type, body=body)

        if content_type != 'application/x-www-form-urlencoded':
            return qvisqve.bad_request_response('Wrong content type')

        params = self._get_form_params(body)
        username = self._get_param(params, 'username')
        password = self._get_param(params, 'password')
        attempt_id = self._get_param(params, 'attempt_id')
        qvisqve.log.log(
            'trace', msg_text='extracted form parameters', params=params,
            username=username, password=password, attempt_id=attempt_id)

        if None in (username, password, attempt_id):
            qvisqve.log.log(
                'error', msg_text='Necessary form field not given',
                username=username, password=password, attempt_id=attempt_id)
            return qvisqve.unauthorized_response('Access denied')

        if not self._users.is_valid_secret(username, password):
            qvisqve.log.log(
                'error',
                msg_text='User secret is invalid (username or password wrong)',
                username=username, password=password, attempt_id=attempt_id)
            return qvisqve.unauthorized_response('Access denied')

        aa = self._attempts.find_by_id(attempt_id)
        if aa is None:
            qvisqve.log.log(
                'error',
                msg_text='Attempt ID is unknown', attempt_id=attempt_id)
            return qvisqve.unauthorized_response('Access denied')

        aa.set_subject_id(username)

        gen = qvisqve.NonceGenerator()
        code = gen.create_nonce()
        aa.set_authorization_code(code)

        redirect_uri = aa.get_redirect_uri()
        if redirect_uri == '':
            qvisqve.log.log(
                'error',
                msg_text='redirect_uri is unknown',
                redirect_uri=repr(redirect_uri))
            return qvisqve.bad_request_response('Bad request')

        params = {
            'code': code,
            'state': aa.get_state(),
        }
        url = '{}?{}'.format(
            redirect_uri,
            urllib.parse.urlencode(params)
        )

        qvisqve.log.log('trace', msg_text='Returning redirect', url=url)
        return qvisqve.found_response('Redirect to callback url', url)

    def _get_param(self, params, name):
        values = params.get(name)
        if not isinstance(values, list) or len(values) < 1:
            return None
        return values[0]

    def _get_form_params(self, body):
        body = body.decode('UTF-8')
        return urllib.parse.parse_qs(body)