From c024e2c7b950036f6c2d7a8d94f380ded50bcd48 Mon Sep 17 00:00:00 2001 From: Mart van Santen <mart@greenhost.nl> Date: Mon, 6 Dec 2021 07:59:10 +0100 Subject: [PATCH] Code cleanups, comment additions and make pylint happy --- login/app.py | 164 ++++++++++++++++++-------------------------- login/classes.py | 10 ++- login/config.py | 22 ++++-- login/exceptions.py | 4 +- login/kratos.py | 84 +++++++++++++++++------ login/models.py | 47 ++----------- 6 files changed, 163 insertions(+), 168 deletions(-) diff --git a/login/app.py b/login/app.py index dbf99c3..ac2136e 100644 --- a/login/app.py +++ b/login/app.py @@ -1,7 +1,12 @@ +"""Flask application which provides the inferface of a login panel. The +application interacts with different backend, like the Kratos backend for users, +Hydra for OIDC sessions and Postgres for application and role specifications. +The application provides also serveral command line options to interact with +the user entries in the database(s)""" + # Basic system imports -import pprint import logging import os import urllib.parse @@ -11,7 +16,7 @@ import click # Flask from flask import abort, Flask, redirect, request, render_template from flask_sqlalchemy import SQLAlchemy -from flask_migrate import Migrate +# False positive: pylint: disable=ungrouped-imports from flask.cli import AppGroup @@ -22,14 +27,8 @@ import hydra_client # Kratos, Identity manager import ory_kratos_client -from ory_kratos_client.api import metadata_api from ory_kratos_client.api import v0alpha2_api as kratos_api -from ory_kratos_client.model.generic_error import GenericError - -from ory_kratos_client.model.admin_create_identity_body import AdminCreateIdentityBody -from ory_kratos_client.model.admin_create_self_service_recovery_link_body import AdminCreateSelfServiceRecoveryLinkBody -from ory_kratos_client.model.identity_state import IdentityState from exceptions import BackendError @@ -80,12 +79,13 @@ KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp)) # Create DB & migrate interface db = SQLAlchemy(app) -migrate = Migrate(app, db) +Migrate(app, db) -# Import models +# Import needs to happen after the database interface is created. AppRole is +# imported for future usage +# pylint: disable=wrong-import-position unused-import from models import User, App, AppRole - # # WARNING: # @@ -107,6 +107,10 @@ app_cli = AppGroup('app') @click.argument('slug') @click.argument('name') def create_app(slug, name): + """Adds an app into the database + :param slug: str short name of the app + :param name: str name of the application + """ app.logger.info(f"Creating app definition: {name} ({slug})") obj = App() @@ -120,17 +124,21 @@ def create_app(slug, name): @app_cli.command('list') def list_app(): + """List all apps found in the database""" app.logger.info("Listing configured apps") apps = App.query.all() for obj in apps: - print("App name: %s \t Slug: %s" %(obj.name, obj.slug)) + print(f"App name: {obj.name} \t Slug: {obj.slug}") @app_cli.command('delete',) @click.argument('slug') def delete_app(slug): - app.logger.info("Trying to delete app: {0}".format(slug)) + """Removes app from database + :param slug: str Slug of app to remove + """ + app.logger.info(f"Trying to delete app: {slug}") obj = App.query.filter_by(slug=slug).first() if not obj: @@ -138,8 +146,9 @@ def delete_app(slug): return - # TODO: Deleting will (propably) fail is there are still roles attached - # these probobly need to be cleaned up first. + # Deleting will (propably) fail is there are still roles attached. This is a + # PoC implementation only. Actually management of apps and roles will be + # done by the backend application db.session.delete(obj) db.session.commit() app.logger.info("Success") @@ -209,15 +218,16 @@ def delete_user(email): @user_cli.command('create') @click.argument('email') def create_user(email): - app.logger.info("Creating user with email: ({0})".format(email)) + """Create a user in the kratos database. The argument must be an unique + email address + :param email: string Email address of user to add + """ + app.logger.info(f"Creating user with email: ({email})") # Create a user user = KratosUser(KRATOS_ADMIN) user.email = email user.save() - user = KratosUser(email) - - @user_cli.command('setpassword') @click.argument('email') @@ -231,7 +241,7 @@ def setpassword_user(email, password): :raise: exception if unexepted error happens """ - app.logger.info("Creating user with email: ({0})".format(email)) + app.logger.info(f"Creating user with email: ({email})") # Kratos does not provide an interface to set a password directly. However # we still want to be able to set a password. So we have to hack our way @@ -279,7 +289,7 @@ def recover_user(email): :param email: Email address of the user """ - app.logger.info("Trying to send recover email for user: {0}".format(email)) + app.logger.info(f"Trying to send recover email for user: {email}") try: # Get the ID of the user @@ -293,45 +303,9 @@ def recover_user(email): app.logger.error(f"Error while getting reset link: {error}") -#@user_cli.command('setadmin') -#@click.argument('email') -#def setadmin_user(email): -# app.logger.info("Trying to make user into admin: {0}".format(email)) -# obj = User.query.filter_by(email=email).first() -# -# if not obj: -# app.logger.info("Not found") -# return -# -# obj.admin = True -# db.session.commit() -# app.logger.info("Success") -# return - -#@user_cli.command('unsetadmin') -#@click.argument('email') -#def unsetadmin_user(email): -# app.logger.info("Trying to make user into normal user: {0}".format(email)) -# obj = User.query.filter_by(email=email).first() -# -# if not obj: -# app.logger.info("Not found") -# return -# -# obj.admin = False -# db.session.commit() -# app.logger.info("Success") -# return - - - - - app.cli.add_command(user_cli) - - ############################################################################## # WEB ROUTES # ############################################################################## @@ -385,7 +359,7 @@ def login(): """ # Check if we are logged in: - profile = getAuth() + profile = get_auth() if profile: return render_template( @@ -420,7 +394,7 @@ def auth(): challenge = None - # Retrieve the challenge id from the request. Depending on the method it is + # Retrieve the challenge id from the request. Depending on the method it is # saved in the form (POST) or in a GET variable. If this variable is not set # we can not continue. if request.method == 'GET': @@ -429,13 +403,12 @@ def auth(): challenge = request.args.post("login_challenge") if not challenge: - # TODO: Use local error page? app.logger.error("No challange given. Error in request") - abort(400) + abort(400, description="Challenge required when requesting authorization") # Check if we are logged in: - profile = getAuth() + profile = get_auth() # If the user is not logged in yet, we redirect to the login page @@ -444,7 +417,7 @@ def auth(): # The redirect URL is back to this page (auth) with the same challenge # so we can pickup the flow where we left off. if not profile: - url = app.config["PUBLIC_URL"] + "/auth?login_challenge=" + challenge; + url = app.config["PUBLIC_URL"] + "/auth?login_challenge=" + challenge url = urllib.parse.quote_plus(url) app.logger.info("Redirecting to login. Setting flow_state cookies") @@ -462,15 +435,14 @@ def auth(): try: login_request = HYDRA.login_request(challenge) except hydra_client.exceptions.NotFound: - app.logger.error("Not Found. Login request not found. challenge={0}".format(challenge)) - # TODO: Different error page? - abort(404) + app.logger.error(f"Not Found. Login request not found. challenge={challenge}") + abort(404, description="Login request not found. Please try again.") except hydra_client.exceptions.HTTPError: - app.logger.error("Conflict. Login request has been used already. challenge={0}".format(challenge)) - # TODO: Different error page? - abort(503) + app.logger.error(f"Conflict. Login request has been used already. challenge={challenge}") + abort(503, description="Login request already used. Please try again.") # Authorize the user + # False positive: pylint: disable=no-member redirect_to = login_request.accept( profile['email'], remember=True, @@ -492,40 +464,38 @@ def consent(): challenge = request.args.get("consent_challenge") if not challenge: - # TODO: Better error handling/display? - abort(403) + abort(403, description="Consent request required. Do not call this page directly") try: consent_request = HYDRA.consent_request(challenge) except hydra_client.exceptions.NotFound: - app.logger.error("Not Found. Consent request not found. challenge={0}".format(challenge)) - # TODO: Better error handling/display? - abort(404) + app.logger.error(f"Not Found. Consent request {challenge} not found") + abort(404, description="Consent request does not exist. Please try again") except hydra_client.exceptions.HTTPError: - app.logger.error("Conflict. Consent request has been used already. challenge={0}".format(challenge)) - # TODO: Better error handling/display? - abort(503) + app.logger.error(f"Conflict. Consent request {challenge} already used") + abort(503, description="Consent request already used. Please try again") # Get information about this consent request: + # False positive: pylint: disable=no-member app_name = consent_request.client.client_name + # False positive: pylint: disable=no-member username = consent_request.subject - # Get the related database object to get roles/access rights - user = User.query.filter_by(email=username).first() + # Get the related user object + user = KratosUser.find_by_email(KRATOS_ADMIN, email) if not user: - app.logger.error("User not found in database: {0}".format(username)) - # TODO: Better error handling? - abort(401) + app.logger.error(f"User not found in database: {username}") + abort(401, description="User not found. Please try again.") # Get claims for this user, provided the current app claims = user.get_claims(app_name) - # - # TODO: Check access / claims? - app.logger.info("Providing consent to %s for %s" % (app_name, username)) - - - app.logger.info("{0} was granted access to {1}".format(username, app_name)) + # pylint: disable=fixme + # TODO: Need to implement checking claims here, once the backend for that is + # developed + app.logger.info(f"Providing consent to {app_name} for {username}") + app.logger.info(f"{username} was granted access to {app_name}") + # False positive: pylint: disable=no-member return redirect(consent_request.accept( grant_scope=consent_request.requested_scope, grant_access_token_audience=consent_request.requested_access_token_audience, @@ -540,16 +510,15 @@ def status(): Show if there is an user is logged in. If not shows: not-auth """ - auth = getAuth() + auth_status = get_auth() - if auth: - return auth['email'] - else: - return "not-auth" + if auth_status: + return auth_status['email'] + return "not-auth" -def getAuth(): +def get_auth(): """Checks if user is logged in Queries the cookies. If an authentication cookie is found, it checks with Kratos if the cookie is still valid. If so, @@ -558,7 +527,7 @@ def getAuth(): """ try: - cookie = request.cookies.get('ory_kratos_session'); + cookie = request.cookies.get('ory_kratos_session') cookie = "ory_kratos_session=" + cookie except TypeError: app.logger.info("User not logged in or cookie corrupted") @@ -574,13 +543,10 @@ def getAuth(): return profile except ory_kratos_client.ApiException as error: - app.logger.error("Exception when calling" + - "V0alpha2Api->to_session(): %s\n" % error) + app.logger.error(f"Exception when calling V0alpha2Api->to_session(): {error}\n") return False if __name__ == '__main__': app.run() - - diff --git a/login/classes.py b/login/classes.py index 7d80154..599545d 100644 --- a/login/classes.py +++ b/login/classes.py @@ -1,9 +1,17 @@ +"""Generic classes used by different parts of the application""" + import urllib.request # Instead of processing the redirect, we return, so the application # can handle the redirect itself. This is needed to extract cookies # etc. class RedirectFilter(urllib.request.HTTPRedirectHandler): - def redirect_request(self, req, fp, code, msg, hdrs, newurl): + """Overrides the standard redirect handler so it does not automatically + redirect. This allows for inspecting the return values before redirecting or + override the redirect action""" + + # pylint: disable=too-many-arguments + # This amount of arguments is expected by the HTTPRedirectHandler + def redirect_request(self, req, fp, code, msg, headers, newurl): return None diff --git a/login/config.py b/login/config.py index 57c900d..e13aecb 100644 --- a/login/config.py +++ b/login/config.py @@ -1,10 +1,17 @@ +"""Config class imported by Flask to provide different enviroments with +different debug level. It also sets the config object with the variables learned +from the enviroment""" + import os basedir = os.path.abspath(os.path.dirname(__file__)) -class Config(object): +# Just provides config, not methods +# pylint: disable=too-few-public-methods +class Config(): + """Flask Config object following the Flask specification""" DEBUG = False TESTING = False CSRF_ENABLED = True @@ -14,28 +21,33 @@ class Config(object): SQLALCHEMY_TRACK_MODIFICATIONS = False - PUBLIC_URL = os.environ['PUBLIC_URL']; + PUBLIC_URL = os.environ['PUBLIC_URL'] - HYDRA_ADMIN_URL = os.environ['HYDRA_ADMIN_URL']; + HYDRA_ADMIN_URL = os.environ['HYDRA_ADMIN_URL'] - KRATOS_ADMIN_URL = os.environ['KRATOS_ADMIN_URL']; - KRATOS_PUBLIC_URL = os.environ['KRATOS_PUBLIC_URL'] + "/"; + KRATOS_ADMIN_URL = os.environ['KRATOS_ADMIN_URL'] + KRATOS_PUBLIC_URL = os.environ['KRATOS_PUBLIC_URL'] + "/" class ProductionConfig(Config): + """Production config, which is the general config with DEBUGGING off""" DEBUG = False class StagingConfig(Config): + """Staging config which has debugging features on""" DEVELOPMENT = True DEBUG = True class DevelopmentConfig(Config): + """Development config which has debugging features on""" DEVELOPMENT = True DEBUG = True class TestingConfig(Config): + """Testing config which has debugging features off, but provides testing + flag""" TESTING = True diff --git a/login/exceptions.py b/login/exceptions.py index 4b0747a..bae5711 100644 --- a/login/exceptions.py +++ b/login/exceptions.py @@ -1,6 +1,8 @@ +"""Custom exception handler to raise consistent exceptions, as different backend +raise different exceptions""" + class BackendError(Exception): """The backend error is raised when interacting with the backend fails or gives an unexpected result. The error contains a oneliner description of the problem""" - pass diff --git a/login/kratos.py b/login/kratos.py index e612326..456b062 100644 --- a/login/kratos.py +++ b/login/kratos.py @@ -10,16 +10,12 @@ import re from typing import Dict from urllib.request import Request - # Some imports commented out to satisfy pylint. They will be used once more # functions are migrated to this model from ory_kratos_client.model.admin_create_identity_body \ import AdminCreateIdentityBody from ory_kratos_client.model.admin_update_identity_body \ import AdminUpdateIdentityBody -from ory_kratos_client.model.identity_state \ - import IdentityState -from ory_kratos_client.model.identity import Identity from ory_kratos_client.model.admin_create_self_service_recovery_link_body \ import AdminCreateSelfServiceRecoveryLinkBody from ory_kratos_client.rest import ApiException as KratosApiException @@ -58,7 +54,7 @@ class KratosUser(): self.created_at = obj.created_at self.updated_at = obj.updated_at except KratosApiException as error: - print("Exception when calling V0alpha2Api->admin_get_identity: %s\n" % error) + print(f"Exception when calling V0alpha2Api->admin_get_identity: {error}") def __repr__(self): @@ -66,31 +62,37 @@ class KratosUser(): def save(self): + """Saves this object into the kratos backend database. If the object + is new, it will create, otherwise update an entry. + :raise: BackendError is an error with Kratos happened. + """ + + + + # Traits are the "profile" values we will set, kratos will complain on + # empty values, so we check if "name" is set and only add it if so. + traits = {'email':self.email} - # Kratos API expect for state an IdentifyState object, while - # during updating, it expects a string. + if self.name: + traits['name'] = self.name + # If we have a UUID, we are updating if self.uuid: body = AdminUpdateIdentityBody( schema_id="default", state=self.state, - traits={'email':self.email, 'name':self.name}, + traits=traits, ) try: - # Update an Identity api_response = self.api.admin_update_identity(self.uuid, admin_update_identity_body=body) - except ory_kratos_client.ApiException as e: - print("Exception when calling V0alpha2Api->admin_update_identity: %s\n" % e) + except KratosApiException as error: + raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error else: - traits = {'email':self.email} - - if self.name: - traits['name'] = self.name body = AdminCreateIdentityBody( schema_id="default", - traits=traits + traits=traits, ) try: # Create an Identity @@ -98,12 +100,18 @@ class KratosUser(): admin_create_identity_body=body) if api_response.id: self.uuid = api_response.id - except ory_kratos_client.ApiException as e: - print("Exception when calling V0alpha2Api->admin_update_identity: %s\n" % e) + except KratosApiException as error: + raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error def delete(self): + """Deletes the object from kratos + :raise: BackendError if Krator API call fails + """ if self.uuid: - self.api.admin_delete_identity(self.uuid) + try: + self.api.admin_delete_identity(self.uuid) + except KratosApiException as error: + raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error @staticmethod @@ -151,8 +159,9 @@ class KratosUser(): Iterate over a list of cookies and extract the session cookies required for Kratos User Panel UI - :param: cookies List of cookies + :param cookies: str[], list of cookies :return: string Cookies as string + :rtype: str[] """ # Find kratos session cookie & csrf @@ -291,3 +300,38 @@ class KratosUser(): # underlying error return False raise BackendError("Unable to set password by submitting form") + + # Pylint complains about app not used. That is correct, but we will use that + # in the future. Ignore this error + # pylint: disable=unused-argument + def get_claims(self, app, mapping = None) -> Dict[str, Dict[str, str]]: + """Create openID Connect token + Use the userdata stored in the user object to create an OpenID Connect token. + The token returned by this function can be passed to Hydra, + which will store it and serve it to OpenID Connect Clients to retrieve user information. + If you need to relabel a field pass an array of tuples to rename_fields. + Example: getClaims('nextcloud', [("name", "username"),("roles", "groups")]) + + Attributes: + appname - Name or ID of app to connect to + mapping - Mapping of the fields + + Returns: + OpenID Connect token of type dict + """ + + token = { + "name": self.email, + "preferred_username": self.email, + "email": self.email, + "roles": '', + } + + + # Relabel field names + if mapping: + for old_field_name, new_field_name in mapping: + token[new_field_name] = token[old_field_name] + del token[old_field_name] + + return dict(id_token=token) diff --git a/login/models.py b/login/models.py index bacffe3..c1b59f9 100644 --- a/login/models.py +++ b/login/models.py @@ -2,20 +2,18 @@ Implement different models used by Stackspin panel """ -import urllib.parse -import urllib.request -import json -import re - -from typing import Dict -from urllib.request import Request # We need this import at some point to hook up roles and users #from sqlalchemy.orm import relationship from sqlalchemy import Integer, String, ForeignKey, Boolean +# pylint: disable=cyclic-import +# This is based on the documentation of Flask Alchemy from app import db +# Pylint complains about too-few-public-methods. Methods will be added once +# this is implemented. +# pylint: disable=too-few-public-methods class User(db.Model): """ The User object, interact with the User. It both calls to Kratos as to @@ -30,41 +28,6 @@ class User(db.Model): def __repr__(self): return f"{self.id} <{self.email}>" - # Pylint complains about app not used. That is correct, but we will use that - # in the future. Ignore this error - # pylint: disable=unused-argument - def get_claims(self, app, mapping = None) -> Dict[str, Dict[str, str]]: - """Create openID Connect token - Use the userdata stored in the user object to create an OpenID Connect token. - The token returned by this function can be passed to Hydra, - which will store it and serve it to OpenID Connect Clients to retrieve user information. - If you need to relabel a field pass an array of tuples to rename_fields. - Example: getClaims('nextcloud', [("name", "username"),("roles", "groups")]) - - Attributes: - appname - Name or ID of app to connect to - mapping - Mapping of the fields - - Returns: - OpenID Connect token of type dict - """ - - token = { - "name": self.email, - "preferred_username": self.email, - "email": self.email, - "roles": '', - } - - - # Relabel field names - if mapping: - for old_field_name, new_field_name in mapping: - token[new_field_name] = token[old_field_name] - del token[old_field_name] - - return dict(id_token=token) - # Pylint complains about too-few-public-methods. Methods will be added once # this is implemented. -- GitLab