Skip to content
Snippets Groups Projects
user_service.py 9.94 KiB
Newer Older
import ory_kratos_client
Arie Peterson's avatar
Arie Peterson committed
from ory_kratos_client.model.update_recovery_flow_body \
    import UpdateRecoveryFlowBody
from ory_kratos_client.api import frontend_api, identity_api
from config import KRATOS_ADMIN_URL

from areas.apps import App, AppRole, AppsService
Davor's avatar
Davor committed
from areas.roles import Role, RoleService
Davor's avatar
Davor committed
from flask import current_app

from helpers.error_handler import KratosError

kratos_admin_api_configuration = \
    ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True)
Arie Peterson's avatar
Arie Peterson committed
kratos_client = ory_kratos_client.ApiClient(kratos_admin_api_configuration)
kratos_frontend_api = frontend_api.FrontendApi(kratos_client)
Arie Peterson's avatar
Arie Peterson committed
kratos_identity_api = identity_api.IdentityApi(kratos_client)
    @classmethod
    def get_users(cls):
        dashboardRoles = cls.__getDashboardRoles()
        while page >= 0:
            if page == 0:
                res = KratosApi.get("/admin/identities?per_page=1000").json()
            else:
                res = KratosApi.get("/admin/identities?per_page=1000&page={}".format(page)).json()
            for r in res:
                # Inject information from the `stackspin` database that's associated to this user.
                # In particular, the dashboard role (admin or regular user).
                stackspinData = {}
                dashboardRole = dashboardRoles.get(r["id"])
                if dashboardRole is not None:
                    stackspinData["stackspin_admin"] = dashboardRole == Role.ADMIN_ROLE_ID
                    r["stackspin_data"] = stackspinData
                userList.append(r)
            if len(res) == 0:
                page = -1
            else:
                page = page + 1
    @classmethod
    def get_user(cls, id):
        res = KratosApi.get("/admin/identities/{}".format(id)).json()
        return cls.__insertAppRoleToUser(id, res)
    def create_recovery_link(id):
        kratos_data = {
            "identity_id": id
        }
        res = KratosApi.post("/admin/recovery/link", kratos_data).json()
        return res

    @classmethod
    def post_user(cls, data):
        kratos_data = {
            "schema_id": "default",
            "traits": {
                "name": data["name"],
                "email": data["email"],
            },
        res = KratosApi.post("/admin/identities", kratos_data).json()
Luka's avatar
Luka committed
        if data["app_roles"]:
            app_roles = data["app_roles"]
            for ar in app_roles:
                app = App.query.filter_by(slug=ar["name"]).first()
                app_role = AppRole(
                    user_id=res["id"],
Davor's avatar
Davor committed
                    role_id=ar["role_id"] if "role_id" in ar else Role.NO_ACCESS_ROLE_ID,
                    app_id=app.id,
                )

                db.session.add(app_role)
            db.session.commit()
        else:
            all_apps = AppsService.get_all_apps()
            for app in all_apps:
                app_role = AppRole(
                    user_id=res["id"],
Davor's avatar
Davor committed
                    role_id=Role.NO_ACCESS_ROLE_ID,
Luka's avatar
Luka committed
                    app_id=app.id,
                )
Luka's avatar
Luka committed
                db.session.add(app_role)
            db.session.commit()
Arie Peterson's avatar
Arie Peterson committed
        # We start a recovery flow immediately after creating the
        # user, so the user can set their initial password.
        cls.__start_recovery_flow(data["email"])
        return UserService.get_user(res["id"])

    @staticmethod
    def reset_2fa(id):
        KratosApi.delete("/admin/identities/{}/credentials/totp".format(id))

    def __start_recovery_flow(email):
        """
        Start a Kratos recovery flow for the user's email address.

        This sends out an email to the user that explains to them how they can
        set their password. Make sure the user exists inside Kratos before you
        use this function.

        :param email: Email to send recovery link to
        :type email: str
        """
        api_response = kratos_frontend_api.create_native_recovery_flow()
        flow = api_response['id']
        # Submit the recovery flow to send an email to the new user.
Arie Peterson's avatar
Arie Peterson committed
        update_recovery_flow_body = \
            UpdateRecoveryFlowBody(method="link", email=email)
        api_response = kratos_frontend_api.update_recovery_flow(flow,
Arie Peterson's avatar
Arie Peterson committed
                update_recovery_flow_body=update_recovery_flow_body)
    @classmethod
    def put_user(cls, id, user_editing_id, data):
        kratos_data = {
            "schema_id": "default",
            "traits": {"email": data["email"], "name": data["name"]},
        }
        KratosApi.put("/admin/identities/{}".format(id), kratos_data)
Davor's avatar
Davor committed
        is_admin = RoleService.is_user_admin(user_editing_id)
Davor's avatar
Davor committed

Davor's avatar
Davor committed
        if is_admin and data["app_roles"]:
Luka's avatar
Luka committed
            app_roles = data["app_roles"]
            for ar in app_roles:
                app = App.query.filter_by(slug=ar["name"]).first()
Davor's avatar
Davor committed
                app_role = AppRole.query.filter_by(
                    user_id=id, app_id=app.id).first()
Luka's avatar
Luka committed

                if app_role:
                    app_role.role_id = ar["role_id"] if "role_id" in ar else None
                    db.session.commit()
                else:
                    appRole = AppRole(
                        user_id=id,
                        role_id=ar["role_id"] if "role_id" in ar else None,
                        app_id=app.id,
                    )
                    db.session.add(appRole)
                    db.session.commit()
        return cls.get_user(id)
    @classmethod
    def put_multiple_users(cls, user_editing_id, data):
        for user_data in data["users"]:
            kratos_data = {
                # "schema_id": "default",
                "traits": {"email": user_data["email"]},
            }
            KratosApi.put("/admin/identities/{}".format(user_data["id"]), kratos_data)

            is_admin = RoleService.is_user_admin(user_editing_id)

            if is_admin and user_data["app_roles"]:
                app_roles = user_data["app_roles"]
                for ar in app_roles:
                    app = App.query.filter_by(slug=ar["name"]).first()
                    app_role = AppRole.query.filter_by(
                        user_id=user_data["id"], app_id=app.id).first()

                    if app_role:
                        app_role.role_id = ar["role_id"] if "role_id" in ar else None
                        db.session.commit()
                    else:
                        appRole = AppRole(
                            user_id=user_Data["id"],
                            role_id=ar["role_id"] if "role_id" in ar else None,
                            app_id=app.id,
                        )
                        db.session.add(appRole)
                        db.session.commit()

            return cls.get_user(user_data["id"])
Luka's avatar
Luka committed
    @staticmethod
    def delete_user(id):
        app_role = AppRole.query.filter_by(user_id=id).all()
Davor's avatar
Davor committed
        for ar in app_role:
            db.session.delete(ar)
        db.session.commit()
Luka's avatar
Luka committed

Davor's avatar
Davor committed
    @staticmethod
    def post_multiple_users(data):
        # check if data is array
Davor's avatar
Davor committed
        # for every item in array call Kratos
        created_users = []
        existing_users = []
        creation_failed_users = []
Davor's avatar
Davor committed
        for user_data in data['users']:
            user_email = user_data["email"]
            if not user_email:
Davor's avatar
Davor committed
                return
Davor's avatar
Davor committed
            try:
                UserService.post_user(user_data)
                current_app.logger.info(f"Batch create user: {user_email}")
                created_users.append(user_email)
            except KratosError as err:
                status_code = err.args[1]
                if status_code == 409:
                    existing_users.append(user_email)
                elif status_code == 400:
                    creation_failed_users.append(user_email)
                current_app.logger.error(
                    f"Exception calling Kratos: {err} on creating user: {user_email} {status_code}")
Davor's avatar
Davor committed
            except Exception as error:
                current_app.logger.error(
                    f"Exception: {error} on creating user: {user_email}")
                creation_failed_users.append(user_email)

        success_response = {}
        existing_response = {}
        failed_response = {}
        if created_users:
            success_response = {"users": created_users,
                                "message": f"{len(created_users)} users created"}
        if existing_users:
            existing_response = {
                "users": existing_users, "message": f"{len(existing_users)} users already exist: {', '.join(existing_users)}"}
        if creation_failed_users:
            failed_response = {"users": creation_failed_users,
                               "message": f"{len(creation_failed_users)} users failed to create: {', '.join(creation_failed_users)}"}

        return {"success": success_response, "existing": existing_response, "failed": failed_response}
    @staticmethod
    def __insertAppRoleToUser(userId, userRes):
Luka's avatar
Luka committed
        apps = App.query.all()
        app_roles = []
        for app in apps:
            # Only show role when installed
            app_status = app.get_status()
            if app_status.installed:

                tmp_app_role = AppRole.query.filter_by(
                    user_id=userId, app_id=app.id
                ).first()
                app_roles.append(
                        {
                            "name": app.slug,
                            "role_id": tmp_app_role.role_id if tmp_app_role else None,
                        }
                    )
Luka's avatar
Luka committed
        userRes["traits"]["app_roles"] = app_roles

    @staticmethod
    def __getDashboardRoles():
        dashboardRoles = {}
        for appRole, app in (
            db.session.query(AppRole, App)
            .filter(AppRole.app_id == App.id)
            .filter(App.slug == "dashboard")
            .all()
        ):
            dashboardRoles[appRole.user_id] = appRole.role_id
        return dashboardRoles