From c2826a862eff20a1eefdfda8d694aaacb43302e1 Mon Sep 17 00:00:00 2001
From: Mart van Santen <>
Date: Fri, 3 Dec 2021 08:53:56 +0100
Subject: [PATCH] Initial kratos object

 login/ | 295 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 295 insertions(+)
 create mode 100644 login/

diff --git a/login/ b/login/
new file mode 100644
index 0000000..8e83eb8
--- /dev/null
+++ b/login/
@@ -0,0 +1,295 @@
+Implement the Kratos model to interact with kratos users
+import urllib.parse
+import urllib.request
+import json
+import re
+from typing import Dict
+from urllib.request import Request
+# We need this import at some point to hook up roles and users
+#from sqlalchemy.orm import relationship
+from sqlalchemy import Integer, String, ForeignKey, Boolean
+# Some imports commented out to satisfy pylint. They will be used once more
+# functions are migrated to this model
+#from ory_kratos_client.model.admin_create_identity_body \
+#    import AdminCreateIdentityBody
+from ory_kratos_client.model.admin_update_identity_body \
+    import AdminUpdateIdentityBody
+#from ory_kratos_client.model.submit_self_service_recovery_flow_body \
+#    import SubmitSelfServiceRecoveryFlowBody
+#from ory_kratos_client.model.self_service_recovery_flow \
+#    import SelfServiceRecoveryFlow
+from ory_kratos_client.model.identity_state \
+    import IdentityState
+from ory_kratos_client.model.identity import Identity
+from ory_kratos_client.model.admin_create_self_service_recovery_link_body \
+    import AdminCreateSelfServiceRecoveryLinkBody
+from import ApiException as KratosApiException
+from exceptions import BackendError
+from classes import RedirectFilter
+class KratosUser():
+    """
+    The User object, interact with the User. It both calls to Kratos as to
+    the database for storing and retrieving data.
+    """
+    api = None
+    uuid = None
+    email = None
+    name = None
+    state = None
+    created_at = None
+    updated_at = None
+    def __init__(self, api, uuid = None):
+        self.api = api
+        self.state = 'active'
+        if uuid:
+            try:
+                obj = api.admin_get_identity(uuid)
+                if obj:
+                    self.uuid = uuid
+                    try:
+               = obj.traits['name']
+                    except KeyError:
+               = ""
+           = obj.traits['email']
+                    self.state = obj.state
+                    self.created_at = obj.created_at
+                    self.updated_at = obj.updated_at
+            except ory_kratos_client.ApiException as e:
+                print("Exception when calling V0alpha2Api->admin_get_identity: %s\n" % e)
+    def __repr__(self):
+        return f"\"{}\" <{}>"
+    def save(self):
+        if self.uuid:
+            body = AdminUpdateIdentityBody(
+                schema_id="default",
+                state=self.state,
+                traits={'email', 'name'},
+            )
+            try:
+                # Update an Identity
+                api_response = self.api.admin_update_identity(self.uuid,
+                        admin_update_identity_body=body)
+            except ory_kratos_client.ApiException as e:
+                print("Exception when calling V0alpha2Api->admin_update_identity: %s\n" % e)
+        else:
+            body = AdminCreateIdentityBody(
+                schema_id="default",
+                state=self.state,
+                traits={'email', 'name'},
+            )
+            try:
+                # Update an Identity
+                api_response = self.api.admin_create_identity(self.uuid,
+                        admin_create_identity_body=body)
+            except ory_kratos_client.ApiException as e:
+                print("Exception when calling V0alpha2Api->admin_update_identity: %s\n" % e)
+    def delete(self):
+        if self.uuid:
+            self.api.admin_delete_identity(self.uuid)
+    @staticmethod
+    def find_by_email(api, email):
+        """Queries Kratos to find kratos ID for this given identifier
+        :param: api Kratos ADMIN API Object
+        :param: email Identifier to look for
+        :return: Return none or string with ID
+        """
+        kratos_id = None
+        # Get out user ID by iterating over all available IDs
+        data = api.admin_list_identities()
+        for kratos_obj in data.value:
+            # Unique identifier we use
+            if kratos_obj.traits['email'] == email:
+                kratos_id = str(
+                return KratosUser(api, kratos_id)
+        return None
+    @staticmethod
+    def find_all(api):
+        """Queries Kratos to find all kratos users and return them
+        as a list of KratosUser objects
+        :return: Return list
+        """
+        kratos_id = None
+        return_list = []
+        # Get out user ID by iterating over all available IDs
+        data = api.admin_list_identities()
+        for kratos_obj in data.value:
+            kratos_id = str(
+            return_list.append(KratosUser(api, kratos_id))
+        return return_list
+    @staticmethod
+    def extract_cookies(cookies):
+        """Extract session and CSRF cookie from a list of cookies.
+        Iterate over a list of cookies and extract the session
+        cookies required for Kratos User Panel UI
+        :param: cookies List of cookies
+        :return: string Cookies as string
+        """
+        # Find kratos session cookie & csrf
+        cookie_csrf = None
+        cookie_session = None
+        for cookie in cookies:
+            search = re.match(r'ory_kratos_session=([^;]*);.*$', cookie)
+            if search:
+                cookie_session = "ory_kratos_session=" +
+            search = re.match(r'(csrf_token[^;]*);.*$', cookie)
+            if search:
+                cookie_csrf =
+        if not cookie_csrf or not cookie_session:
+            raise BackendError("Flow started, but expected cookies not found")
+        # Combined the relevant cookies
+        cookie = cookie_csrf + "; " + cookie_session
+        return cookie
+    def get_recovery_link(self):
+        """Call the kratos API to create a recovery URL for a kratos ID
+        :param: api Kratos ADMIN API Object
+        :param: kratos_id UUID of kratos object
+        :return: Return none or string with recovery URL
+        """
+        try:
+            # Create body request to get recovery link with admin API
+            body = AdminCreateSelfServiceRecoveryLinkBody(
+                expires_in="15m",
+                identity_id=self.uuid
+            )
+            # Get recovery link from admin API
+            call = self.api.admin_create_self_service_recovery_link(
+                admin_create_self_service_recovery_link_body=body)
+            url = call.recovery_link
+        except KratosApiException:
+            return None
+        return url
+    def ui_set_password(self, api_url, recovery_url, password):
+        """Follow a Kratos UI sequence to set password
+        Kratos does not provide an interface to set a password directly. However
+        we still can set a password by following the UI sequence. To so so we
+        to follow the steps which are normally done in a browser once someone
+        clicks the recovery link.
+        :param: api_url      URL to public endpoint of API
+        :param: recovery_url Recovery URL as generated by Kratos
+        :param: password     Password
+        :raise:              Exception with error message as first argument
+        :return: boolen      True on success, False on failure (usualy password
+                             to simple)
+        """
+        # Step 1: Open the recovery link and extract the cookies, as we need them
+        #         for the next steps
+        try:
+            # We override the default Redirect handler with our custom handler to
+            # be able to catch the cookies.
+            opener = urllib.request.build_opener(RedirectFilter)
+        # If we do not have a 2xx status, urllib throws an error, as we "stopped"
+        # at our redirect, we expect a 3xx status
+        except urllib.error.HTTPError as req:
+            if req.status == 302:
+                cookies = req.headers.get_all('Set-Cookie')
+                url =  req.headers.get('Location')
+            else:
+                raise BackendError('Unable to fetch recovery link') from req
+        else:
+            raise BackendError('Unable to fetch recovery link')
+        # Step 2: Extract cookies and data for next step. We expect to have an
+        #         authorized session now. We need the cookies for followup calls
+        #         to make changes to the account (set password)
+        # Get flow id
+        search = re.match(r'.*\?flow=(.*)', url)
+        if search:
+            flow =
+        else:
+            raise BackendError('No Flow ID found for recovery sequence')
+        # Extract cookies with helper function
+        cookie = self.extract_cookies(cookies)
+        # Step 3: Get the "UI", kratos expect us to call the API to get the UI
+        #         elements which inclused the CSRF token, which is needed when
+        #         posting the password data
+        try:
+            url = api_url + "/self-service/settings/flows?id=" + flow
+            req = Request(url, headers={'Cookie':cookie})
+            opener = urllib.request.build_opener()
+            # Execute the request, read the data, decode the JSON, get the
+            # right CSRF token out of the decoded JSON
+            obj = json.loads(
+            token = obj['ui']['nodes'][0]['attributes']['value']
+        except Exception as error:
+            raise BackendError("Unable to get password reset UI") from error
+        # Step 4: Post out password
+        url = api_url + "self-service/settings?flow=" + flow
+        # Create POST data as form data
+        data = {
+            'method': 'password',
+            'password': password,
+            'csrf_token': token
+        }
+        data = urllib.parse.urlencode(data)
+        data = data.encode('ascii')
+        # POST the new password
+        try:
+            req = Request(url, data = data, headers={'Cookie':cookie}, method="POST")
+            opener = urllib.request.build_opener(RedirectFilter)
+            # If we do not have a 2xx status, urllib throws an error, as we "stopped"
+            # at our redirect, we expect a 3xx status
+        except urllib.error.HTTPError as req:
+            if req.status == 302:
+                return True
+            if req.status == 303:
+                # Something went wrong, usuall because the password is too
+                # simple. Kratos does not give a proper hint about the
+                # underlying error
+                return False
+        raise BackendError("Unable to set password by submitting form")