diff options
Diffstat (limited to 'muck/token.py')
-rw-r--r-- | muck/token.py | 60 |
1 files changed, 60 insertions, 0 deletions
diff --git a/muck/token.py b/muck/token.py new file mode 100644 index 0000000..a561632 --- /dev/null +++ b/muck/token.py @@ -0,0 +1,60 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import Crypto.PublicKey.RSA +import jwt + +import muck + + +class TokenChecker: + + def __init__(self, signing_key_pub): + pubkey = Crypto.PublicKey.RSA.importKey(signing_key_pub) + self._key = pubkey.exportKey('OpenSSH') + + def parse_header(self, value): + token = self._get_token_text(value) + options = { + 'verify_aud': False, + } + try: + return jwt.decode( + token, key=self._key, audience=None, options=options) + except jwt.DecodeError as e: + raise muck.Error(str(e)) + + def _get_token_text(self, value): + if not isinstance(value, str): + raise muck.Error('Header does not have a string value') + + if not value: + raise muck.Error('Header does not have a non-empty string value') + + words = value.split() + + if len(words) != 2: + raise muck.Error('Header does not consist of two words') + + if words[0].lower() != 'bearer': + raise muck.Error('Header does not start with "Bearer"') + + return words[1] + + +def create_token(claims, key_text): + key = Crypto.PublicKey.RSA.importKey(key_text) + token = jwt.encode(claims, key.exportKey('PEM'), algorithm='RS512') + return token.decode('ascii') |