"""Flask application which provides the interface of a login panel. The application interacts with different backend, like the Kratos backend for users, Hydra for OIDC sessions and MariaDB for application and role specifications. The application provides also several command line options to interact with the user entries in the database(s)""" import ast import json import urllib.parse import urllib.request import ory_hydra_client # hydra v2 # from ory_hydra_client.api import o_auth2_api from ory_hydra_client.api import admin_api from ory_hydra_client.models import AcceptConsentRequest, AcceptLoginRequest, ConsentRequestSession import ory_hydra_client.exceptions as hydra_exceptions import ory_kratos_client from ory_kratos_client.api import frontend_api, identity_api from ory_kratos_client.model.authenticator_assurance_level import AuthenticatorAssuranceLevel from flask import abort, current_app, jsonify, redirect, render_template, request from database import db from helpers import KratosUser from config import * from web import web from areas.apps import AppRole, App, OAuthClientApp from areas.roles import RoleService from areas.roles.models import Role from areas.users.user_service import UserService # This is a circular import and should be solved differently # from app import db # APIs # Create HYDRA & KRATOS API interfaces hydra_admin_api_configuration = \ ory_hydra_client.Configuration(host=HYDRA_ADMIN_URL, discard_unknown_keys=True) hydra_client = ory_hydra_client.ApiClient(hydra_admin_api_configuration) # hydra v2 # oauth2_api = o_auth2_api.OAuth2Api(hydra_client) hydra_admin_api = admin_api.AdminApi(hydra_client) # Kratos has an admin and public end-point. We create an API for them # both. The kratos implementation has bugs, which forces us to set # the discard_unknown_keys to True. kratos_admin_api_configuration = \ ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True) kratos_admin_client = ory_kratos_client.ApiClient(kratos_admin_api_configuration) admin_identity_api = identity_api.IdentityApi(kratos_admin_client) admin_frontend_api = frontend_api.FrontendApi(kratos_admin_client) kratos_public_api_configuration = \ ory_kratos_client.Configuration(host=KRATOS_PUBLIC_URL, discard_unknown_keys=True) kratos_public_client = ory_kratos_client.ApiClient(kratos_public_api_configuration) kratos_public_frontend_api = frontend_api.FrontendApi(kratos_public_client) ADMIN_ROLE_ID = 1 NO_ACCESS_ROLE_ID = 3 ############################################################################## # WEB ROUTES # ############################################################################## @web.route("/recovery", methods=["GET", "POST"]) def recovery(): """Start recovery flow If no active flow, redirect to kratos to create a flow, otherwise render the recovery template. :param flow: flow as given by Kratos :return: redirect or recovery page """ flow = request.args.get("flow") if not flow: return redirect(KRATOS_PUBLIC_URL + "self-service/recovery/browser") return render_template("recover.html", api_url=KRATOS_PUBLIC_URL, dashboard_url=DASHBOARD_URL) @web.route("/settings", methods=["GET", "POST"]) def settings(): """Start settings flow If no active flow, redirect to kratos to create a flow, otherwise render the settings template. :param flow: flow as given by Kratos :return: redirect or settings page """ flow = request.args.get("flow") if not flow: return redirect(KRATOS_PUBLIC_URL + "self-service/settings/browser") return render_template("settings.html", api_url=KRATOS_PUBLIC_URL, dashboard_url=DASHBOARD_URL) @web.route("/error", methods=["GET"]) def error(): """Show error messages from Kratos Implements user-facing errors as described in https://www.ory.sh/docs/kratos/self-service/flows/user-facing-errors :param id: error ID as given by Kratos :return: redirect or settings page """ error_id = request.args.get("id") api_response = "" try: # Get Self-Service Errors api_response = admin_frontend_api.get_flow_error(error_id) except ory_kratos_client.ApiException as ex: current_app.logger.error( "Exception when calling get_self_service_error: %s\n", ex) return render_template("error.html", dashboard_url=DASHBOARD_URL, error_message=api_response) @web.route("/login", methods=["GET", "POST"]) def login(): """Start login flow If already logged in, shows the loggedin template. Otherwise creates a login flow, if no active flow will redirect to kratos to create a flow. :param flow: flow as given by Kratos :return: redirect or login page """ # Check if we are logged in: (identity, auth_response) = get_auth() # We ignore the potential `auth_response` in this case: that's for telling # the user they have to upgrade their session to include a second factor, # but we're already on the login page so there's no use for that here -- # they'd be redirected by Kratos back to this same login page anyway, # creating a redirect loop. Chances are that if `auth_response` is not # None, we're actually serving or processing the TOTP form here. # List to contain messages pushed to the frontend messages = list() refresh = False flow = request.args.get("flow") if flow: cookies = request.headers['cookie'] flow = kratos_public_frontend_api.get_login_flow(flow, cookie=cookies) # current_app.logger.info("flow found in login: {}".format(flow)) refresh = flow['refresh'] if refresh: message = { "id": "S_CONFIRM_CREDENTIALS", "message": "Please confirm your credentials to complete this action.", "type": "info" } # Not appending, depending on rendering in frontend now, # see issue: #136 # messages.append(message) try: for msg in flow.ui.messages.value: current_app.logger.info("Kratos message:" + msg.text) if msg.id == 4000006: message = { "id": msg.id, "message": "The provided login e-mail address or password is incorrect. Please try again.", "type": "error" } else: message = { "id": msg.id, "message": msg.text, } if msg.type == 'error': message['type'] = 'error' else: message['type'] = 'info' # Not appending, depending on rendering in frontend now, # see issue: #136 # messages.append(message) except ory_kratos_client.exceptions.ApiAttributeError as ex: # This exception is expected when there are no messages pass except Exception as ex: # An unkown exception happens, we log the error but continue as it # only affects the messages part current_app.logger.error("Unknown exception: " + str(ex)) if identity and not refresh: # We are already logged in, and don't need to refresh. if 'name' in identity['traits']: # Add a space in front of the "name" so the template can put it # between "Welcome" and the comma name = " " + identity['traits']['name'] else: name = "" return render_template("loggedin.html", api_url=KRATOS_PUBLIC_URL, dashboard_url=DASHBOARD_URL, name=name) # If we do not have a flow, get one. if not flow: return redirect(KRATOS_PUBLIC_URL + "self-service/login/browser") # If we end up here, then either: # `identity and refresh` # User is already logged in, but "refresh" is specified, meaning that # we should ask the user to authenticate again. This is necessary when # you want to change protected fields (password, TOTP) in the # self-service settings, and your session is too old. # or `not identity` # User is not logged in yet. # In either case, we present the login screen now. return render_template("login.html", api_url=KRATOS_PUBLIC_URL, dashboard_url=DASHBOARD_URL, messages=messages, demo=DEMO_INSTANCE) @web.route("/auth", methods=["GET", "POST"]) def auth(): """Authorize a user for an application If an application authenticated against the IdP (Identity Provider), if there are no active sessions, the user is forwarded to the login page. This is the entry point for those authorization requests. The challenge as provided, is verified. If an active user is logged in, the request is accepted and the user is returned to the application. If the user is not logged in yet, it redirects to the login page. :param challenge: challenge as given by Hydra :return: redirect to login or application/idp """ challenge = None # Retrieve the challenge id from the request. Depending on the method it is # saved in the form (POST) or in a GET variable. If this variable is not set # we can not continue. if request.method == "GET": challenge = request.args.get("login_challenge") if request.method == "POST": challenge = request.args.post("login_challenge") if not challenge: current_app.logger.error("No challenge given. Error in request") abort(400, description="Challenge required when requesting authorization") # Check if we are logged in: (identity, auth_response) = get_auth() if auth_response is not None: # According to `get_auth`, we have to send the user a response already, # probably a redirect to let the user provide their second factor. return auth_response # If the user is not logged in yet, we redirect to the login page # but before we do that, we set the "flow_state" cookie to auth. # so the UI knows it has to redirect after a successful login. # The redirect URL is back to this page (auth) with the same challenge # so we can pickup the flow where we left off. if not identity: url = LOGIN_PANEL_URL + "/auth?login_challenge=" + challenge url = urllib.parse.quote_plus(url) current_app.logger.info("Redirecting to login. Setting flow_state cookies") current_app.logger.info("auth_url: " + url) response = redirect(LOGIN_PANEL_URL + "/login") response.set_cookie("flow_state", "auth") response.set_cookie("auth_url", url) return response current_app.logger.info("User is logged in. We can authorize the user") try: # hydra v2 # login_request = oauth2_api.get_o_auth2_login_request(challenge) login_request = hydra_admin_api.get_login_request(challenge) except hydra_exceptions.NotFoundException: current_app.logger.error( f"Not Found. Login request not found. challenge={challenge}" ) abort(404, description="Login request not found. Please try again.") except hydra_exceptions.ApiException: current_app.logger.error( f"Conflict. Login request has been used already. challenge={challenge}" ) abort(503, description="Login request already used. Please try again.") # Authorize the user # False positive: pylint: disable=no-member try: redirect_to = hydra_admin_api.accept_login_request( challenge, accept_login_request=AcceptLoginRequest( identity.id, remember=True, # Remember session for 7d remember_for=60 * 60 * 24 * 7, ) ).redirect_to except Exception as e: current_app.logger.error("Failure during accepting login request. Redirecting to logout, hopefully to wipe cookies") current_app.logger.error(e) return redirect("logout") return redirect(redirect_to) @web.route("/consent", methods=["GET", "POST"]) def consent(): """Get consent Rather than get consent from the end-user, this checks whether the user should have access to the given app based on stackspin roles. :param consent_challenge: challenge as given by Hydra :return: redirect to login or render error """ challenge = request.args.get("consent_challenge") if not challenge: abort( 403, description="Consent request required. Do not call this page directly" ) try: # hydra v2 # consent_request = oauth2_api.get_o_auth2_consent_request(challenge) consent_request = hydra_admin_api.get_consent_request(challenge) except hydra_exceptions.NotFoundException: current_app.logger.error(f"Not Found. Consent request {challenge} not found") abort(404, description="Consent request does not exist. Please try again") except hydra_exceptions.ApiException: current_app.logger.error(f"Conflict. Consent request {challenge} already used") abort(503, description="Consent request already used. Please try again") if ENFORCE_2FA: # Check for session status, in particular 2FA. cookie = get_kratos_cookie() if not cookie: current_app.logger.info("consent: no kratos cookie set, redirecting to set up 2fa") response = redirect(KRATOS_PUBLIC_URL + "self-service/settings/browser") response.set_cookie('stackspin_context', '2fa-required') return response session = kratos_public_frontend_api.to_session(cookie=cookie) # Check session aal. aal = session['authenticator_assurance_level'] current_app.logger.info(f"aal: {aal}") if aal == AuthenticatorAssuranceLevel('aal1'): current_app.logger.info("aal is only aal1, so not accepting consent request") response = redirect(KRATOS_PUBLIC_URL + "self-service/settings/browser") response.set_cookie('stackspin_context', '2fa-required') return response # Get information about this consent request: # False positive: pylint: disable=no-member try: consent_client = consent_request.client # Some versions of Hydra module return a string object and need to be decoded if isinstance(consent_client, str): consent_client = ast.literal_eval(consent_client) client_id = consent_client.get("client_id") # False positive: pylint: disable=no-member kratos_id = consent_request.subject current_app.logger.info(f"Info: Found kratos_id {kratos_id}") current_app.logger.info(f"Info: Found client_id {client_id}") except Exception as ex: current_app.logger.error( "Error: Unable to extract information from consent request" ) current_app.logger.error(f"Error: {ex}") current_app.logger.error(f"Client: {consent_request.client}") current_app.logger.error(f"Subject: {consent_request.subject}") abort(501, description="Internal error occurred") # Get the related user object current_app.logger.info(f"Info: Getting user from admin {kratos_id}") user = KratosUser(admin_identity_api, kratos_id) if not user: current_app.logger.error(f"User not found in database: {kratos_id}") abort(401, description="User not found. Please try again.") # Get role on dashboard dashboard_app = db.session.query(App).filter( App.slug == 'dashboard').first() if dashboard_app: role_object = ( db.session.query(AppRole) .filter(AppRole.app_id == dashboard_app.id) .filter(AppRole.user_id == user.uuid) .first() ) # If the user is dashboard admin admin is for all if role_object is not None and role_object.role_id == ADMIN_ROLE_ID: current_app.logger.info(f"Info: User has admin dashboard role") current_app.logger.info(f"Providing consent to {client_id} for {kratos_id}") current_app.logger.info(f"{kratos_id} was granted admin access to {client_id}") # Get claims for this user, provided the current app claims = user.get_claims(client_id, ['admin']) current_app.logger.info(f"claims: {claims}") return redirect( hydra_admin_api.accept_consent_request( challenge, accept_consent_request=AcceptConsentRequest( grant_scope=consent_request.requested_scope, grant_access_token_audience=consent_request.requested_access_token_audience, session=ConsentRequestSession(**claims), ) ).redirect_to ) # Resolve to which app the client_id belongs. try: app_obj = db.session.query(OAuthClientApp).filter(OAuthClientApp.oauthclient_id == client_id).first().app except AttributeError: current_app.logger.error(f"Could not find app for client {client_id}") return redirect( hydra_admin_api.reject_consent_request( challenge, # In previous versions of the hydra API client library, we # could set these parameters, but that's no longer possible, # not sure why. # error="No access", # error_description="The user has no access for app", # error_hint="Contact your administrator", # status_code=401, ) ) # Default access level roles = [] role_object = ( db.session.query(AppRole) .filter(AppRole.app_id == app_obj.id) .filter(AppRole.user_id == user.uuid) .first() ) # Role ID 3 is always "No access" due to migration b514cca2d47b if role_object is None or role_object.role_id is None or role_object.role_id == NO_ACCESS_ROLE_ID: # If there is no role in app_roles or the role_id for an app is null user has no permissions current_app.logger.error(f"User has no access for: {app_obj.name}") return redirect( hydra_admin_api.reject_consent_request( challenge, # In previous versions of the hydra API client library, we # could set these parameters, but that's no longer possible, # not sure why. # error="No access", # error_description="The user has no access for app", # error_hint="Contact your administrator", # status_code=401, ) ) else: roles.append(role_object.role.name) current_app.logger.info(f"Using '{roles}' when applying consent for {kratos_id}") # Get claims for this user, provided the current app claims = user.get_claims(client_id, roles) current_app.logger.info(f"claims: {claims}") # pylint: disable=fixme # TODO: Need to implement checking claims here, once the backend for that is # developed current_app.logger.info(f"Providing consent to {client_id} for {kratos_id}") current_app.logger.info(f"{kratos_id} was granted access to {client_id}") # False positive: pylint: disable=no-member try: redirectUrl = hydra_admin_api.accept_consent_request( challenge, accept_consent_request=AcceptConsentRequest( grant_scope=consent_request.requested_scope, grant_access_token_audience=consent_request.requested_access_token_audience, session=ConsentRequestSession(**claims), ) ).redirect_to except: # If an unexpected error occurs, logout, hopefully that wipes the # relevant cookies current_app.logger.error('Fatal processing consent, redirect to logout:' + str(e)) return redirect("logout") current_app.logger.info(f"Redirect to: {redirectUrl}") return redirect(redirectUrl) @web.route("/status", methods=["GET", "POST"]) def status(): """Get status of current session Show if there is an user is logged in. If not shows: not-auth """ (auth_status, auth_response) = get_auth() if auth_status: return auth_status.id return "not-auth" def get_auth(): """Checks if user is logged in Queries the cookies. If an authentication cookie is found, it checks with Kratos if the cookie is still valid. If so, the profile is returned. Otherwise False is returned, possibly with a response to send to the user, for redirecting them to the kratos-suggested url, for providing 2FA in particular. :return: (Profile, None) or (False, None) or (False, Response) """ cookie = get_kratos_cookie() if not cookie: return False, None # Given a cookie, check if it is valid and get the profile try: api_response = kratos_public_frontend_api.to_session(cookie=cookie) # Get all traits from ID return api_response.identity, None except ory_kratos_client.ApiException as ex: # If it fails because the client needs to provide 2FA, we return a # redirect response for use by the caller of this function. if ex.body is not None: body = json.loads(ex.body) current_app.logger.info("Error in to_session: {}".format(body)) error_id = body.get('error', {}).get('id') if error_id == 'session_aal2_required': current_app.logger.info("2FA requested by Kratos. Redirecting the user.") redirect_url = body.get('redirect_browser_to') if redirect_url is None: response = None else: response = redirect(redirect_url) return False, response current_app.logger.error( f"Exception when calling to_session(): {ex}\n" ) return False, None def get_kratos_cookie(): """Retrieves the Kratos cookie from the session. Returns False if the cookie does not exist or is corrupted. """ try: cookie = request.cookies.get("ory_kratos_session") cookie = "ory_kratos_session=" + cookie except TypeError: current_app.logger.info("User not logged in or cookie corrupted") cookie = False return cookie @web.route("/logout", methods=["GET"]) def logout(): """Handles the Hydra OpenID Connect Logout flow and Kratos logout flow. Steps: 1. Hydra's /oauth2/sessions/logout endpoint is called by an application 2. Hydra calls this endpoint with a `logout_challenge` get parameter 3. We retrieve the logout request using the challenge 4. We accept the Hydra logout request. This returns a URL -- let's call it "next redirect" -- that we should redirect the browser to to finish the Hydra logout. 5. We create a Kratos logout flow, setting its `return_to` parameter to the "next redirect". This returns a Kratos logout URL. 6. We return a small HTML page to the browser, based on the `clear.html` template, which clears dashboard local storage and redirects to the Kratos logout URL. 7. The browser follows that redirect, Kratos does its thing and redirects to the "next redirect". 8. The browser follows the "next redirect", Hydra does its thing and redirects to the "post-logout URL". We set that to the dashboard root URL by default, but OIDC clients can override it to something else. For example, Nextcloud sets it to the root Nextcloud URL. Args: logout_challenge (string): Reference to a Hydra logout challenge object Returns: Redirect to the url that is provided by the LogoutRequest object. """ # Generally, if we encounter errors during these steps we redirect the user # to the beginning of the logout procedure, which is # `https://${hydra_domain}/oauth2/sessions/logout`. new_logout_url = f"{HYDRA_PUBLIC_URL}/oauth2/sessions/logout" # We should have been redirected here by hydra which also sets the # `logout_challenge` parameter. challenge = request.args.get("logout_challenge") current_app.logger.info("Logout request: challenge=%s", challenge) if not challenge: current_app.logger.info("No challenge set.") current_app.logger.info("Redirecting to hydra logout: %s", new_logout_url) return redirect(new_logout_url) try: # hydra v2 # logout_request = oauth2_api.get_o_auth2_logout_request(challenge) logout_request = hydra_admin_api.get_logout_request(challenge) except hydra_exceptions.NotFoundException: current_app.logger.error("Logout request with challenge '%s' not found", challenge) return redirect(new_logout_url) except hydra_exceptions.ApiException: current_app.logger.error( "Conflict. Logout request with challenge '%s' has been used already.", challenge) current_app.logger.info("Redirecting to hydra logout: %s", new_logout_url) return redirect(new_logout_url) current_app.logger.info("Logout request hydra, subject %s", logout_request.subject) # Accept logout request. The `redirect_to` that we get is what we have to # redirect the browser to to finish the hydra logout (clear cookies, etc.) # and after that get redirected to the configured post-logout URL. We store # the `redirect_to` URL so we can pass it to kratos below (as `return_to`), # so the browser will get redirected to `redirect_to` after the kratos # logout is finished. try: hydra_return = hydra_admin_api.accept_logout_request(challenge) next_redirect = hydra_return.redirect_to except Exception as ex: current_app.logger.info("Error accepting hydra logout request: %s", str(ex)) next_redirect = DASHBOARD_URL # Now end the kratos session. kratos_cookie = get_kratos_cookie() if not kratos_cookie: # No kratos cookie, already logged out from kratos. current_app.logger.info("Expected kratos cookie but not found. Skipping kratos logout but continuing other logout steps."); # We skip the Kratos logout, but we still need to follow # `next_redirect` -- probably the Hydra logout URL -- and clear # dashboard storage. return render_template("clear.html", url=next_redirect) try: # Create a Logout URL for Browsers current_app.logger.info(f"Creating logout flow, with return_to={next_redirect}") kratos_api_response = admin_frontend_api.create_browser_logout_flow( return_to=next_redirect, cookie=kratos_cookie) current_app.logger.info("Kratos api response to creating logout flow:") current_app.logger.info(kratos_api_response) return render_template("clear.html", url=kratos_api_response.logout_url) except ory_kratos_client.ApiException as ex: current_app.logger.error("Exception when calling" " create_browser_logout_flow: %s\n", ex) current_app.logger.info("Redirecting to hydra logout: %s", new_logout_url) return redirect(new_logout_url) if DEMO_INSTANCE: @web.route("/demo-user", methods=["POST"]) def demo_user(): data = request.get_json() defaults = { "name": "", "app_roles": [{"name": "dashboard", "role_id": Role.ADMIN_ROLE_ID}], } UserService.post_user({**defaults, **data}) return jsonify("User created successfully. You should receive an email to confirm your address and set a password.")