Skip to content
Snippets Groups Projects

Resolve "Allow CLI to set password"

Compare and Show latest version
18 files
+ 569
684
Compare changes
  • Side-by-side
  • Inline
Files
18
+ 149
200
"""Flask application which provides the interface of a login panel. The
application interacts with different backend, like the Kratos backend for users,
Hydra for OIDC sessions and Postgres for application and role specifications.
The application provides also several command line options to interact with
the user entries in the database(s)"""
# Basic system imports
import pprint
import logging
import os
import urllib.parse
import urllib.request
import json
import re
import click
from urllib.request import Request
from classes import RedirectFilter
# Flask
from flask import abort, Flask, redirect, request, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
# False positive: pylint: disable=ungrouped-imports
from flask.cli import AppGroup
@@ -26,23 +28,15 @@ import hydra_client
# Kratos, Identity manager
import ory_kratos_client
from ory_kratos_client.api import metadata_api
from ory_kratos_client.api import v0alpha2_api as kratos_api
from ory_kratos_client.model.generic_error import GenericError
from ory_kratos_client.model.inline_response200 import InlineResponse200
from ory_kratos_client.model.inline_response2001 import InlineResponse2001
from ory_kratos_client.model.inline_response503 import InlineResponse503
from ory_kratos_client.model.admin_create_identity_body import AdminCreateIdentityBody
from ory_kratos_client.model.admin_create_self_service_recovery_link_body import AdminCreateSelfServiceRecoveryLinkBody
from ory_kratos_client.model.submit_self_service_recovery_flow_body import SubmitSelfServiceRecoveryFlowBody
from ory_kratos_client.model.self_service_recovery_flow import SelfServiceRecoveryFlow
from ory_kratos_client.model.identity_state import IdentityState
from exceptions import BackendError
from kratos import KratosUser
from exceptions import *
# Initaliaze the FLASK app
# Initialize the FLASK app
app = Flask(__name__,
static_url_path='/static')
@@ -86,17 +80,19 @@ KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
# Create DB & migrate interface
db = SQLAlchemy(app)
migrate = Migrate(app, db)
Migrate(app, db)
# Import models
# Import needs to happen after the database interface is created. AppRole is
# imported for future usage
# pylint: disable=wrong-import-position unused-import
from models import User, App, AppRole
#
# WARNING:
#
# Below are very minimalistic calls to interfaces for development and testing
# purposed. Eventually this need to be moved to seperate files and more
# sophisticated calls with try{} catch{} claused etc.
# purposed. Eventually this need to be moved to separate files and more
# sophisticated calls with try{} catch{} clauses etc.
#
##############################################################################
@@ -112,6 +108,10 @@ app_cli = AppGroup('app')
@click.argument('slug')
@click.argument('name')
def create_app(slug, name):
"""Adds an app into the database
:param slug: str short name of the app
:param name: str name of the application
"""
app.logger.info(f"Creating app definition: {name} ({slug})")
obj = App()
@@ -125,17 +125,21 @@ def create_app(slug, name):
@app_cli.command('list')
def list_app():
"""List all apps found in the database"""
app.logger.info("Listing configured apps")
apps = App.query.all()
for obj in apps:
print("App name: %s \t Slug: %s" %(obj.name, obj.slug))
print(f"App name: {obj.name} \t Slug: {obj.slug}")
@app_cli.command('delete',)
@click.argument('slug')
def delete_app(slug):
app.logger.info("Trying to delete app: {0}".format(slug))
"""Removes app from database
:param slug: str Slug of app to remove
"""
app.logger.info(f"Trying to delete app: {slug}")
obj = App.query.filter_by(slug=slug).first()
if not obj:
@@ -143,8 +147,9 @@ def delete_app(slug):
return
# TODO: Deleting will (propably) fail is there are still roles attached
# these probobly need to be cleaned up first.
# Deleting will (probably) fail is there are still roles attached. This is a
# PoC implementation only. Actually management of apps and roles will be
# done by the backend application
db.session.delete(obj)
db.session.commit()
app.logger.info("Success")
@@ -155,85 +160,106 @@ app.cli.add_command(app_cli)
## CLI USER COMMANDS
@user_cli.command('create')
@user_cli.command('show')
@click.argument('email')
def create_user(email):
app.logger.info("Creating user with email: ({0})".format(email))
obj = User()
obj.email = email
# Trying to create idenity
try:
body = AdminCreateIdentityBody(
schema_id="default",
traits={'email':email},
) # AdminCreateIdentityBody | (optional)
kratos_obj = KRATOS_ADMIN.admin_create_identity(admin_create_identity_body=body)
except ory_kratos_client.exceptions.ApiException as err:
if err.status == 409:
print("Conflict during creation of user. User already exists?")
# TODO:
# - Should check if database entry already exists. If so, double check if
# kratos user exists. If not, create that one.
#
# - If no DB user, should check with kratos of user already exists. If so,
# throw warning, but still create DB entrie
#
# - After creating kratos user, check if success, otherwise throw warning.
body = AdminCreateIdentityBody(
schema_id="default",
traits={'email':email},
) # AdminCreateIdentityBody | (optional)
def show_user(email):
"""Show user details. Output a table with the user and details about the
internal state/values of the user object
:param email: Email address of the user to show
"""
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
print(user)
print("")
print(f"UUID: {user.uuid}")
print(f"Updated: {user.updated_at}")
print(f"Created: {user.created_at}")
print(f"State: {user.state}")
@user_cli.command('update')
@click.argument('email')
@click.argument('field')
@click.argument('value')
def update_user(email, field, value):
"""Update an user object. It can modify email and name currently
:param email: Email address of user to update
:param field: Field to update, supported [name|email]
:param value: The value to set the field with
"""
app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if not user:
app.logger.error(f"User with email {email} not found.")
return
#state=IdentityState("active"),
if field == 'name':
user.name = value
elif field == 'email':
user.email = value
else:
app.logger.error(f"Field not found: {field}")
kratos_obj = KRATOS_ADMIN.admin_create_identity(admin_create_identity_body=body)
user.save()
obj.kratos_id = kratos_obj.id
@user_cli.command('delete')
@click.argument('email')
def delete_user(email):
"""Delete an user from the database
:param email: Email address of user to delete
"""
app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(KRATOS_ADMIN, email)
if not user:
app.logger.error(f"User with email {email} not found.")
return
user.delete()
db.session.add(obj)
db.session.commit()
@user_cli.command('create')
@click.argument('email')
def create_user(email):
"""Create a user in the kratos database. The argument must be an unique
email address
:param email: string Email address of user to add
"""
app.logger.info(f"Creating user with email: ({email})")
# Create a user
user = KratosUser(KRATOS_ADMIN)
user.email = email
user.save()
@user_cli.command('setpassword')
@click.argument('email')
@click.argument('password')
def setpassword_user(email, password):
"""Set a password for an account
:param: email email address of account to set a password for
:param: password password to be set
:return: boolean true on success, if not set (too weak)
:param email: email address of account to set a password for
:param password: password to be set
:return: true on success, false if not set (too weak)
:rtype: boolean
:raise: exception if unexepted error happens
"""
app.logger.info("Creating user with email: ({0})".format(email))
app.logger.info(f"Creating user with email: ({email})")
# Kratos does not provide an interface to set a password directly. However
# we still want to be able to set a password. So we have to hack our way
# a bit arround this. We do this by creating a recovery link though the
# a bit around this. We do this by creating a recovery link though the
# admin interface (which is not e-mailed) and then follow the recovery
# flow in the public facing pages of kratos
obj = User()
try:
# Get the ID of the user
kratos_id = obj.find_kratos_id(KRATOS_ADMIN, email)
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
# Get a recovery URL
url = obj.get_recovery_link(KRATOS_ADMIN, kratos_id)
url = kratos_user.get_recovery_link()
# Execute UI sequence to set password, given we have a recovery URL
result = obj.ui_set_password(app.config["KRATOS_PUBLIC_URL"], url, password)
result = kratos_user.ui_set_password(app.config["KRATOS_PUBLIC_URL"], url, password)
except BackendError as error:
app.logger.error(f"Error while setting password: {error}")
return False
@@ -249,107 +275,38 @@ def setpassword_user(email, password):
@user_cli.command('list')
def list_user():
"""Show a list of users in the database"""
app.logger.info("Listing users")
users = User.query.all()
users = KratosUser.find_all(KRATOS_ADMIN)
for obj in users:
print("Email: %s (admin: %s)" %(obj.email, obj.admin))
@user_cli.command('delete',)
@click.argument('email')
def delete_user(email):
app.logger.info("Trying to delete user: {0}".format(email))
obj = User.query.filter_by(email=email).first()
if not obj:
app.logger.info("Not found")
return
db.session.delete(obj)
# TODO:
# - if delete succesfull, also delete kratos user, if exists
# - probably user roles need to be deleted before user can be deleted
db.session.commit()
app.logger.info("Success")
return
@user_cli.command('setadmin')
@click.argument('email')
def setadmin_user(email):
app.logger.info("Trying to make user into admin: {0}".format(email))
obj = User.query.filter_by(email=email).first()
if not obj:
app.logger.info("Not found")
return
obj.admin = True
db.session.commit()
app.logger.info("Success")
return
@user_cli.command('unsetadmin')
@click.argument('email')
def unsetadmin_user(email):
app.logger.info("Trying to make user into normal user: {0}".format(email))
obj = User.query.filter_by(email=email).first()
if not obj:
app.logger.info("Not found")
return
obj.admin = False
db.session.commit()
app.logger.info("Success")
return
print(obj)
@user_cli.command('recover')
@click.argument('email')
def recover_user(email):
app.logger.info("Trying to send recover email for user: {0}".format(email))
obj = User.query.filter_by(email=email).first()
if not obj:
app.logger.info("Not found")
return
if not obj.kratos_id:
app.logger.info("User found, but no kratos ID")
return
"""Get recovery link for a user, to manual update the user/use
:param email: Email address of the user
"""
body = AdminCreateSelfServiceRecoveryLinkBody(
expires_in="1h",
identity_id=obj.kratos_id,
)
app.logger.info(f"Trying to send recover email for user: {email}")
# example passing only required values which don't have defaults set
# and optional values
try:
# Create a Recovery Link
api_response = KRATOS_ADMIN.admin_create_self_service_recovery_link(
admin_create_self_service_recovery_link_body=body)
pprint.pprint(api_response)
except ory_kratos_client.ApiException as error:
app.logger.error("Exception when calling" +
"V0alpha2Api->admin_create_self_service_recovery_link: %s\n" % error)
# Get the ID of the user
kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
# Get a recovery URL
url = kratos_user.get_recovery_link()
return
print(url)
except BackendError as error:
app.logger.error(f"Error while getting reset link: {error}")
app.cli.add_command(user_cli)
##############################################################################
# WEB ROUTES #
##############################################################################
@@ -403,7 +360,7 @@ def login():
"""
# Check if we are logged in:
profile = getAuth()
profile = get_auth()
if profile:
return render_template(
@@ -438,7 +395,7 @@ def auth():
challenge = None
# Retrieve the challenge id from the request. Depending on the method it is
# Retrieve the challenge id from the request. Depending on the method it is
# saved in the form (POST) or in a GET variable. If this variable is not set
# we can not continue.
if request.method == 'GET':
@@ -447,13 +404,12 @@ def auth():
challenge = request.args.post("login_challenge")
if not challenge:
# TODO: Use local error page?
app.logger.error("No challange given. Error in request")
abort(400)
app.logger.error("No challenge given. Error in request")
abort(400, description="Challenge required when requesting authorization")
# Check if we are logged in:
profile = getAuth()
profile = get_auth()
# If the user is not logged in yet, we redirect to the login page
@@ -462,7 +418,7 @@ def auth():
# The redirect URL is back to this page (auth) with the same challenge
# so we can pickup the flow where we left off.
if not profile:
url = app.config["PUBLIC_URL"] + "/auth?login_challenge=" + challenge;
url = app.config["PUBLIC_URL"] + "/auth?login_challenge=" + challenge
url = urllib.parse.quote_plus(url)
app.logger.info("Redirecting to login. Setting flow_state cookies")
@@ -480,15 +436,14 @@ def auth():
try:
login_request = HYDRA.login_request(challenge)
except hydra_client.exceptions.NotFound:
app.logger.error("Not Found. Login request not found. challenge={0}".format(challenge))
# TODO: Different error page?
abort(404)
app.logger.error(f"Not Found. Login request not found. challenge={challenge}")
abort(404, description="Login request not found. Please try again.")
except hydra_client.exceptions.HTTPError:
app.logger.error("Conflict. Login request has been used already. challenge={0}".format(challenge))
# TODO: Different error page?
abort(503)
app.logger.error(f"Conflict. Login request has been used already. challenge={challenge}")
abort(503, description="Login request already used. Please try again.")
# Authorize the user
# False positive: pylint: disable=no-member
redirect_to = login_request.accept(
profile['email'],
remember=True,
@@ -510,40 +465,38 @@ def consent():
challenge = request.args.get("consent_challenge")
if not challenge:
# TODO: Better error handling/display?
abort(403)
abort(403, description="Consent request required. Do not call this page directly")
try:
consent_request = HYDRA.consent_request(challenge)
except hydra_client.exceptions.NotFound:
app.logger.error("Not Found. Consent request not found. challenge={0}".format(challenge))
# TODO: Better error handling/display?
abort(404)
app.logger.error(f"Not Found. Consent request {challenge} not found")
abort(404, description="Consent request does not exist. Please try again")
except hydra_client.exceptions.HTTPError:
app.logger.error("Conflict. Consent request has been used already. challenge={0}".format(challenge))
# TODO: Better error handling/display?
abort(503)
app.logger.error(f"Conflict. Consent request {challenge} already used")
abort(503, description="Consent request already used. Please try again")
# Get information about this consent request:
# False positive: pylint: disable=no-member
app_name = consent_request.client.client_name
# False positive: pylint: disable=no-member
username = consent_request.subject
# Get the related database object to get roles/access rights
user = User.query.filter_by(email=username).first()
# Get the related user object
user = KratosUser.find_by_email(KRATOS_ADMIN, username)
if not user:
app.logger.error("User not found in database: {0}".format(username))
# TODO: Better error handling?
abort(401)
app.logger.error(f"User not found in database: {username}")
abort(401, description="User not found. Please try again.")
# Get claims for this user, provided the current app
claims = user.get_claims(app_name)
# TODO: Check access / claims?
app.logger.info("Providing consent to %s for %s" % (app_name, username))
app.logger.info("{0} was granted access to {1}".format(username, app_name))
# pylint: disable=fixme
# TODO: Need to implement checking claims here, once the backend for that is
# developed
app.logger.info(f"Providing consent to {app_name} for {username}")
app.logger.info(f"{username} was granted access to {app_name}")
# False positive: pylint: disable=no-member
return redirect(consent_request.accept(
grant_scope=consent_request.requested_scope,
grant_access_token_audience=consent_request.requested_access_token_audience,
@@ -558,16 +511,15 @@ def status():
Show if there is an user is logged in. If not shows: not-auth
"""
auth = getAuth()
auth_status = get_auth()
if auth:
return auth['email']
else:
return "not-auth"
if auth_status:
return auth_status['email']
return "not-auth"
def getAuth():
def get_auth():
"""Checks if user is logged in
Queries the cookies. If an authentication cookie is found, it
checks with Kratos if the cookie is still valid. If so,
@@ -576,7 +528,7 @@ def getAuth():
"""
try:
cookie = request.cookies.get('ory_kratos_session');
cookie = request.cookies.get('ory_kratos_session')
cookie = "ory_kratos_session=" + cookie
except TypeError:
app.logger.info("User not logged in or cookie corrupted")
@@ -592,13 +544,10 @@ def getAuth():
return profile
except ory_kratos_client.ApiException as error:
app.logger.error("Exception when calling" +
"V0alpha2Api->to_session(): %s\n" % error)
app.logger.error(f"Exception when calling V0alpha2Api->to_session(): {error}\n")
return False
if __name__ == '__main__':
app.run()
Loading