diff --git a/login/kratos.py b/login/kratos.py new file mode 100644 index 0000000000000000000000000000000000000000..8e83eb839552e62e95b46510cd06e733700b4b91 --- /dev/null +++ b/login/kratos.py @@ -0,0 +1,295 @@ +""" +Implement the Kratos model to interact with kratos users +""" + +import urllib.parse +import urllib.request +import json +import re + +from typing import Dict +from urllib.request import Request + +# We need this import at some point to hook up roles and users +#from sqlalchemy.orm import relationship +from sqlalchemy import Integer, String, ForeignKey, Boolean + +# Some imports commented out to satisfy pylint. They will be used once more +# functions are migrated to this model +# +#from ory_kratos_client.model.admin_create_identity_body \ +# import AdminCreateIdentityBody +from ory_kratos_client.model.admin_update_identity_body \ + import AdminUpdateIdentityBody + +#from ory_kratos_client.model.submit_self_service_recovery_flow_body \ +# import SubmitSelfServiceRecoveryFlowBody +#from ory_kratos_client.model.self_service_recovery_flow \ +# import SelfServiceRecoveryFlow +from ory_kratos_client.model.identity_state \ + import IdentityState +from ory_kratos_client.model.identity import Identity +from ory_kratos_client.model.admin_create_self_service_recovery_link_body \ + import AdminCreateSelfServiceRecoveryLinkBody +from ory_kratos_client.rest import ApiException as KratosApiException + +from exceptions import BackendError +from classes import RedirectFilter + +class KratosUser(): + """ + The User object, interact with the User. It both calls to Kratos as to + the database for storing and retrieving data. + """ + + api = None + uuid = None + email = None + name = None + state = None + created_at = None + updated_at = None + + def __init__(self, api, uuid = None): + self.api = api + self.state = 'active' + if uuid: + try: + obj = api.admin_get_identity(uuid) + if obj: + self.uuid = uuid + try: + self.name = obj.traits['name'] + except KeyError: + self.name = "" + self.email = obj.traits['email'] + self.state = obj.state + self.created_at = obj.created_at + self.updated_at = obj.updated_at + except ory_kratos_client.ApiException as e: + print("Exception when calling V0alpha2Api->admin_get_identity: %s\n" % e) + + + def __repr__(self): + return f"\"{self.name}\" <{self.email}>" + + + def save(self): + if self.uuid: + body = AdminUpdateIdentityBody( + schema_id="default", + state=self.state, + traits={'email':self.email, 'name':self.name}, + ) + try: + # Update an Identity + api_response = self.api.admin_update_identity(self.uuid, + admin_update_identity_body=body) + except ory_kratos_client.ApiException as e: + print("Exception when calling V0alpha2Api->admin_update_identity: %s\n" % e) + else: + body = AdminCreateIdentityBody( + schema_id="default", + state=self.state, + traits={'email':self.email, 'name':self.name}, + ) + try: + # Update an Identity + api_response = self.api.admin_create_identity(self.uuid, + admin_create_identity_body=body) + except ory_kratos_client.ApiException as e: + print("Exception when calling V0alpha2Api->admin_update_identity: %s\n" % e) + + def delete(self): + if self.uuid: + self.api.admin_delete_identity(self.uuid) + + + @staticmethod + def find_by_email(api, email): + """Queries Kratos to find kratos ID for this given identifier + :param: api Kratos ADMIN API Object + :param: email Identifier to look for + :return: Return none or string with ID + """ + + kratos_id = None + + # Get out user ID by iterating over all available IDs + data = api.admin_list_identities() + for kratos_obj in data.value: + # Unique identifier we use + if kratos_obj.traits['email'] == email: + kratos_id = str(kratos_obj.id) + return KratosUser(api, kratos_id) + + return None + + @staticmethod + def find_all(api): + """Queries Kratos to find all kratos users and return them + as a list of KratosUser objects + :return: Return list + """ + + kratos_id = None + return_list = [] + # Get out user ID by iterating over all available IDs + data = api.admin_list_identities() + for kratos_obj in data.value: + kratos_id = str(kratos_obj.id) + return_list.append(KratosUser(api, kratos_id)) + + return return_list + + + @staticmethod + def extract_cookies(cookies): + """Extract session and CSRF cookie from a list of cookies. + + Iterate over a list of cookies and extract the session + cookies required for Kratos User Panel UI + + :param: cookies List of cookies + :return: string Cookies as string + """ + + # Find kratos session cookie & csrf + cookie_csrf = None + cookie_session = None + for cookie in cookies: + search = re.match(r'ory_kratos_session=([^;]*);.*$', cookie) + if search: + cookie_session = "ory_kratos_session=" + search.group(1) + search = re.match(r'(csrf_token[^;]*);.*$', cookie) + if search: + cookie_csrf = search.group(1) + + if not cookie_csrf or not cookie_session: + raise BackendError("Flow started, but expected cookies not found") + + # Combined the relevant cookies + cookie = cookie_csrf + "; " + cookie_session + return cookie + + + def get_recovery_link(self): + """Call the kratos API to create a recovery URL for a kratos ID + :param: api Kratos ADMIN API Object + :param: kratos_id UUID of kratos object + :return: Return none or string with recovery URL + """ + + try: + # Create body request to get recovery link with admin API + body = AdminCreateSelfServiceRecoveryLinkBody( + expires_in="15m", + identity_id=self.uuid + ) + + # Get recovery link from admin API + call = self.api.admin_create_self_service_recovery_link( + admin_create_self_service_recovery_link_body=body) + + url = call.recovery_link + except KratosApiException: + return None + return url + + + + def ui_set_password(self, api_url, recovery_url, password): + """Follow a Kratos UI sequence to set password + Kratos does not provide an interface to set a password directly. However + we still can set a password by following the UI sequence. To so so we + to follow the steps which are normally done in a browser once someone + clicks the recovery link. + :param: api_url URL to public endpoint of API + :param: recovery_url Recovery URL as generated by Kratos + :param: password Password + :raise: Exception with error message as first argument + :return: boolen True on success, False on failure (usualy password + to simple) + """ + + # Step 1: Open the recovery link and extract the cookies, as we need them + # for the next steps + try: + # We override the default Redirect handler with our custom handler to + # be able to catch the cookies. + opener = urllib.request.build_opener(RedirectFilter) + opener.open(recovery_url) + # If we do not have a 2xx status, urllib throws an error, as we "stopped" + # at our redirect, we expect a 3xx status + except urllib.error.HTTPError as req: + if req.status == 302: + cookies = req.headers.get_all('Set-Cookie') + url = req.headers.get('Location') + else: + raise BackendError('Unable to fetch recovery link') from req + else: + raise BackendError('Unable to fetch recovery link') + + # Step 2: Extract cookies and data for next step. We expect to have an + # authorized session now. We need the cookies for followup calls + # to make changes to the account (set password) + + # Get flow id + search = re.match(r'.*\?flow=(.*)', url) + if search: + flow = search.group(1) + else: + raise BackendError('No Flow ID found for recovery sequence') + + # Extract cookies with helper function + cookie = self.extract_cookies(cookies) + + # Step 3: Get the "UI", kratos expect us to call the API to get the UI + # elements which inclused the CSRF token, which is needed when + # posting the password data + try: + url = api_url + "/self-service/settings/flows?id=" + flow + + req = Request(url, headers={'Cookie':cookie}) + opener = urllib.request.build_opener() + + # Execute the request, read the data, decode the JSON, get the + # right CSRF token out of the decoded JSON + obj = json.loads(opener.open(req).read()) + token = obj['ui']['nodes'][0]['attributes']['value'] + + except Exception as error: + raise BackendError("Unable to get password reset UI") from error + + + # Step 4: Post out password + url = api_url + "self-service/settings?flow=" + flow + + # Create POST data as form data + data = { + 'method': 'password', + 'password': password, + 'csrf_token': token + } + data = urllib.parse.urlencode(data) + data = data.encode('ascii') + + # POST the new password + try: + req = Request(url, data = data, headers={'Cookie':cookie}, method="POST") + opener = urllib.request.build_opener(RedirectFilter) + opener.open(req) + # If we do not have a 2xx status, urllib throws an error, as we "stopped" + # at our redirect, we expect a 3xx status + except urllib.error.HTTPError as req: + if req.status == 302: + return True + if req.status == 303: + # Something went wrong, usuall because the password is too + # simple. Kratos does not give a proper hint about the + # underlying error + return False + raise BackendError("Unable to set password by submitting form") + + +