From 6cde93c90e716942bca7d5af400d3075bf4c4ff1 Mon Sep 17 00:00:00 2001 From: Mart van Santen <mart@greenhost.nl> Date: Fri, 3 Dec 2021 10:30:11 +0100 Subject: [PATCH] Code cleanups --- login/app.py | 194 +++++++++++++++++++----------------------------- login/kratos.py | 18 +---- login/models.py | 17 ----- 3 files changed, 78 insertions(+), 151 deletions(-) diff --git a/login/app.py b/login/app.py index ebb56e6..dbf99c3 100644 --- a/login/app.py +++ b/login/app.py @@ -153,6 +153,10 @@ app.cli.add_command(app_cli) @user_cli.command('show') @click.argument('email') def show_user(email): + """Show user details. Output a table with the user and details about the + internal state/values of the user object + :param email: Email address of the user to show + """ user = KratosUser.find_by_email(KRATOS_ADMIN, email) print(user) print("") @@ -161,27 +165,43 @@ def show_user(email): print(f"Created: {user.created_at}") print(f"State: {user.state}") -@user_cli.command('updatename') +@user_cli.command('update') @click.argument('email') -@click.argument('name') -def update_name_user(email, name): +@click.argument('field') +@click.argument('value') +def update_user(email, field, value): + """Update an user object. It can modify email and name currently + :param email: Email address of user to update + :param field: Field to update, supported [name|email] + :param value: The value to set the field with + """ + app.logger.info(f"Looking for user with email: {email}") user = KratosUser.find_by_email(KRATOS_ADMIN, email) - user.name = name - user.save() + if not user: + app.logger.error(f"User with email {email} not found.") + return + + if field == 'name': + user.name = value + elif field == 'email': + user.email = value + else: + app.logger.error(f"Field not found: {field}") -@user_cli.command('updateemail') -@click.argument('email') -@click.argument('newemail') -def update_email_user(email, newemail): - user = KratosUser.find_by_email(KRATOS_ADMIN, email) - user.email = newemail user.save() @user_cli.command('delete') @click.argument('email') -def show_user(email): +def delete_user(email): + """Delete an user from the database + :param email: Email address of user to delete + """ + app.logger.info(f"Looking for user with email: {email}") user = KratosUser.find_by_email(KRATOS_ADMIN, email) + if not user: + app.logger.error(f"User with email {email} not found.") + return user.delete() @@ -191,43 +211,12 @@ def show_user(email): def create_user(email): app.logger.info("Creating user with email: ({0})".format(email)) - obj = User() - obj.email = email - - # Load kratos object + # Create a user user = KratosUser(KRATOS_ADMIN) user.email = email user.save() user = KratosUser(email) -# # Trying to create idenity -# try: -# body = AdminCreateIdentityBody( -# schema_id="default", -# traits={'email':email}, -# ) # AdminCreateIdentityBody | (optional) -# kratos_obj = KRATOS_ADMIN.admin_create_identity(admin_create_identity_body=body) -# except ory_kratos_client.exceptions.ApiException as err: -# if err.status == 409: -# print("Conflict during creation of user. User already exists?") - - - - # TODO: - # - Should check if database entry already exists. If so, double check if - # kratos user exists. If not, create that one. - # - # - If no DB user, should check with kratos of user already exists. If so, - # throw warning, but still create DB entrie - # - # - After creating kratos user, check if success, otherwise throw warning. - - - obj.kratos_id = user.uuid - - db.session.add(obj) - - db.session.commit() @user_cli.command('setpassword') @@ -275,6 +264,7 @@ def setpassword_user(email, password): @user_cli.command('list') def list_user(): + """Show a list of users in the database""" app.logger.info("Listing users") users = KratosUser.find_all(KRATOS_ADMIN) @@ -282,93 +272,59 @@ def list_user(): print(obj) -@user_cli.command('delete2',) -@click.argument('email') -def delete2_user(email): - app.logger.info("Trying to delete user: {0}".format(email)) - obj = User.query.filter_by(email=email).first() - - if not obj: - app.logger.info("Not found") - return - - db.session.delete(obj) - - # TODO: - # - if delete succesfull, also delete kratos user, if exists - # - probably user roles need to be deleted before user can be deleted - - db.session.commit() - app.logger.info("Success") - return - -@user_cli.command('setadmin') -@click.argument('email') -def setadmin_user(email): - app.logger.info("Trying to make user into admin: {0}".format(email)) - obj = User.query.filter_by(email=email).first() - - if not obj: - app.logger.info("Not found") - return - - obj.admin = True - db.session.commit() - app.logger.info("Success") - return - - -@user_cli.command('unsetadmin') -@click.argument('email') -def unsetadmin_user(email): - app.logger.info("Trying to make user into normal user: {0}".format(email)) - obj = User.query.filter_by(email=email).first() - - if not obj: - app.logger.info("Not found") - return - - obj.admin = False - db.session.commit() - app.logger.info("Success") - return - - - @user_cli.command('recover') @click.argument('email') def recover_user(email): + """Get recovery link for a user, to manual update the user/use + :param email: Email address of the user + """ + app.logger.info("Trying to send recover email for user: {0}".format(email)) - obj = User.query.filter_by(email=email).first() + try: + # Get the ID of the user + kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email) - if not obj: - app.logger.info("Not found") - return + # Get a recovery URL + url = kratos_user.get_recovery_link() - if not obj.kratos_id: - app.logger.info("User found, but no kratos ID") - return + print(url) + except BackendError as error: + app.logger.error(f"Error while getting reset link: {error}") - body = AdminCreateSelfServiceRecoveryLinkBody( - expires_in="1h", - identity_id=obj.kratos_id, - ) - # example passing only required values which don't have defaults set - # and optional values - try: - # Create a Recovery Link - api_response = KRATOS_ADMIN.admin_create_self_service_recovery_link( - admin_create_self_service_recovery_link_body=body) +#@user_cli.command('setadmin') +#@click.argument('email') +#def setadmin_user(email): +# app.logger.info("Trying to make user into admin: {0}".format(email)) +# obj = User.query.filter_by(email=email).first() +# +# if not obj: +# app.logger.info("Not found") +# return +# +# obj.admin = True +# db.session.commit() +# app.logger.info("Success") +# return + +#@user_cli.command('unsetadmin') +#@click.argument('email') +#def unsetadmin_user(email): +# app.logger.info("Trying to make user into normal user: {0}".format(email)) +# obj = User.query.filter_by(email=email).first() +# +# if not obj: +# app.logger.info("Not found") +# return +# +# obj.admin = False +# db.session.commit() +# app.logger.info("Success") +# return - pprint.pprint(api_response) - except ory_kratos_client.ApiException as error: - app.logger.error("Exception when calling" + - "V0alpha2Api->admin_create_self_service_recovery_link: %s\n" % error) - return app.cli.add_command(user_cli) diff --git a/login/kratos.py b/login/kratos.py index 5774385..e612326 100644 --- a/login/kratos.py +++ b/login/kratos.py @@ -10,22 +10,13 @@ import re from typing import Dict from urllib.request import Request -# We need this import at some point to hook up roles and users -#from sqlalchemy.orm import relationship -from sqlalchemy import Integer, String, ForeignKey, Boolean # Some imports commented out to satisfy pylint. They will be used once more # functions are migrated to this model -# from ory_kratos_client.model.admin_create_identity_body \ import AdminCreateIdentityBody from ory_kratos_client.model.admin_update_identity_body \ import AdminUpdateIdentityBody - -#from ory_kratos_client.model.submit_self_service_recovery_flow_body \ -# import SubmitSelfServiceRecoveryFlowBody -#from ory_kratos_client.model.self_service_recovery_flow \ -# import SelfServiceRecoveryFlow from ory_kratos_client.model.identity_state \ import IdentityState from ory_kratos_client.model.identity import Identity @@ -66,8 +57,8 @@ class KratosUser(): self.state = obj.state self.created_at = obj.created_at self.updated_at = obj.updated_at - except ory_kratos_client.ApiException as e: - print("Exception when calling V0alpha2Api->admin_get_identity: %s\n" % e) + except KratosApiException as error: + print("Exception when calling V0alpha2Api->admin_get_identity: %s\n" % error) def __repr__(self): @@ -77,7 +68,7 @@ class KratosUser(): def save(self): # Kratos API expect for state an IdentifyState object, while - # during updating, it expects a string. + # during updating, it expects a string. if self.uuid: body = AdminUpdateIdentityBody( @@ -300,6 +291,3 @@ class KratosUser(): # underlying error return False raise BackendError("Unable to set password by submitting form") - - - diff --git a/login/models.py b/login/models.py index 933c270..bacffe3 100644 --- a/login/models.py +++ b/login/models.py @@ -14,23 +14,6 @@ from urllib.request import Request #from sqlalchemy.orm import relationship from sqlalchemy import Integer, String, ForeignKey, Boolean -# Some imports commented out to satisfy pylint. They will be used once more -# functions are migrated to this model -# -#from ory_kratos_client.model.admin_create_identity_body \ -# import AdminCreateIdentityBody -#from ory_kratos_client.model.submit_self_service_recovery_flow_body \ -# import SubmitSelfServiceRecoveryFlowBody -#from ory_kratos_client.model.self_service_recovery_flow \ -# import SelfServiceRecoveryFlow -#from ory_kratos_client.model.identity_state \ -# import IdentityState -from ory_kratos_client.model.admin_create_self_service_recovery_link_body \ - import AdminCreateSelfServiceRecoveryLinkBody -from ory_kratos_client.rest import ApiException as KratosApiException - -from exceptions import BackendError -from classes import RedirectFilter from app import db class User(db.Model): -- GitLab