Sergiusz Bazanski | edeb3cc | 2019-12-18 14:16:53 +0100 | [diff] [blame] | 1 | from flask import Blueprint, request, url_for, session, redirect, abort, flash |
| 2 | from flask_oauthlib.client import OAuth, OAuthException |
| 3 | from flask_login import LoginManager, login_user, logout_user, current_user, login_required, UserMixin |
| 4 | |
| 5 | from .caps import cap_required |
| 6 | |
| 7 | |
| 8 | class SpaceAuth(LoginManager): |
| 9 | def __init__(self, app=None, *args, **kwargs): |
| 10 | self.oauth = OAuth() |
| 11 | self.remote = self.oauth.remote_app( |
| 12 | 'spaceauth', |
| 13 | base_url='https://sso.hackerspace.pl/api/', |
| 14 | access_token_url='https://sso.hackerspace.pl/oauth/token', |
| 15 | authorize_url='https://sso.hackerspace.pl/oauth/authorize', |
| 16 | request_token_params={'scope': 'profile:read'}, |
| 17 | app_key='SPACEAUTH') |
| 18 | self.remote.tokengetter(self.tokengetter) |
| 19 | |
| 20 | bp = Blueprint('spaceauth', __name__) |
| 21 | bp.add_url_rule('/login', 'login', self.login_view_handler) |
| 22 | bp.add_url_rule('/logout', 'logout', self.logout_view_handler) |
| 23 | bp.add_url_rule('/callback', 'callback', self.callback_view_handler) |
| 24 | self.blueprint = bp |
| 25 | |
| 26 | super(SpaceAuth, self).__init__() |
| 27 | self.refresh_view = 'spaceauth.login' |
| 28 | self.login_view = 'spaceauth.login' |
| 29 | self.user_loader(self.user_loader_handler) |
| 30 | |
| 31 | if app: |
| 32 | self.init_app(app, *args, **kwargs) |
| 33 | |
| 34 | def init_app(self, app, url_prefix='/oauth'): |
| 35 | self.oauth.init_app(app) |
| 36 | super(SpaceAuth, self).init_app(app) |
| 37 | app.register_blueprint(self.blueprint, url_prefix=url_prefix) |
| 38 | |
| 39 | @app.errorhandler(OAuthException) |
| 40 | def errorhandler(err): |
| 41 | flash('OAuth error occured', 'error') |
| 42 | return redirect('/') |
| 43 | |
| 44 | def login_view_handler(self): |
| 45 | session['spaceauth_next'] = request.args.get('next') or request.referrer |
| 46 | return self.remote.authorize( |
| 47 | callback=url_for('spaceauth.callback', _external=True) |
| 48 | ) |
| 49 | |
| 50 | def logout_view_handler(self): |
| 51 | # TODO revoke token |
| 52 | session.pop('spaceauth_token', None) |
| 53 | session.pop('spaceauth_next', None) |
| 54 | logout_user() |
| 55 | return redirect('/') |
| 56 | |
| 57 | def callback_view_handler(self): |
| 58 | resp = self.remote.authorized_response() |
| 59 | if resp is None: |
| 60 | raise OAuthException( |
| 61 | 'Access denied', type=request.args.get('error')) |
| 62 | |
| 63 | # TODO encrypt token...? |
| 64 | session['spaceauth_token'] = resp['access_token'] |
| 65 | profile = self.remote.get('profile').data |
| 66 | |
| 67 | login_user(self.user_loader_handler(profile['username'], profile)) |
| 68 | return redirect(session.pop('spaceauth_next', None) or '/') |
| 69 | |
| 70 | def tokengetter(self): |
| 71 | return (session.get('spaceauth_token'), '') |
| 72 | |
| 73 | def user_loader_handler(self, uid, profile=None): |
| 74 | """ |
| 75 | Default user loader just to differentiate authenticated user from |
| 76 | anonymous. |
| 77 | """ |
| 78 | |
| 79 | user = UserMixin() |
| 80 | user.id = uid |
| 81 | return user |
| 82 | |
| 83 | def user_profile(self): |
| 84 | return self.remote.get('profile').data |