Skip to content
Snippets Groups Projects
app.py 18.4 KiB
Newer Older
Mart van Santen's avatar
Mart van Santen committed
"""Flask application which provides the interface of a login panel. The
application interacts with different backend, like the Kratos backend for users,
Hydra for OIDC sessions and Postgres for application and role specifications.
Mart van Santen's avatar
Mart van Santen committed
The application provides also several command line options to interact with
the user entries in the database(s)"""

# Basic system imports
import urllib.parse
import urllib.request
import click
from flask import abort, Flask, redirect, request, render_template
from flask_sqlalchemy import SQLAlchemy
Mart van Santen's avatar
Mart van Santen committed
from flask_migrate import Migrate
# False positive: pylint: disable=ungrouped-imports
from flask.cli import AppGroup

Mart van Santen's avatar
wq  
Mart van Santen committed
# Import modules for external APIs

# Hydra, OIDC Identity Provider
import hydra_client
# Kratos, Identity manager
Mart van Santen's avatar
wq  
Mart van Santen committed
import ory_kratos_client
from ory_kratos_client.api import v0alpha2_api as kratos_api
Mart van Santen's avatar
Mart van Santen committed
from exceptions import BackendError
from kratos import KratosUser
Mart van Santen's avatar
Mart van Santen committed


Mart van Santen's avatar
Mart van Santen committed
# Initialize the FLASK app
app = Flask(__name__,
             static_url_path='/static')
# Load config
app.config.from_object(os.environ['APP_SETTINGS'])

# Set right logging level based on config DEBUG flag
Mart van Santen's avatar
wq  
Mart van Santen committed
if app.config['DEBUG']:
    app.logger.setLevel(logging.INFO)
Mart van Santen's avatar
Mart van Santen committed
else:
    app.logger.setLevel(logging.ERROR)


# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(app.config["HYDRA_ADMIN_URL"])

# Kratos has an admin and public end-point. We create an API for them
# both. The kratos implementation has bugs, which forces us to set
# the discard_unknown_keys to True.
tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_ADMIN_URL"],
                                        discard_unknown_keys= True)
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))

tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_PUBLIC_URL"],
                                      discard_unknown_keys = True)
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))


# Kratos API Example:
# A minimal use of the KRATOS is for example getting all identities as an admin
# ids = KRATOS_ADMIN.admin_list_identities()
# print (ids)
#
# Kratos API documentation for python can be found here:
# https://github.com/ory/sdk/blob/master/clients/kratos/python/README.md
Mart van Santen's avatar
Mart van Santen committed
# Create DB & migrate interface
# Import needs to happen after the database interface is created. AppRole is
# imported for future usage
# pylint: disable=wrong-import-position unused-import
Mart van Santen's avatar
Mart van Santen committed
from models import App, AppRole
Mart van Santen's avatar
wq  
Mart van Santen committed
# Below are very minimalistic calls to interfaces for development and testing
Mart van Santen's avatar
Mart van Santen committed
# purposed. Eventually this need to be moved to separate files and more
# sophisticated calls with try{} catch{} clauses etc.
Mart van Santen's avatar
Mart van Santen committed
##############################################################################
# CLI INTERFACE                                                              #
##############################################################################
# Define Flask CLI command groups and commands
user_cli = AppGroup('user')
app_cli = AppGroup('app')

Mart van Santen's avatar
Mart van Santen committed
## CLI APP COMMANDS

@app_cli.command('create')
@click.argument('slug')
@click.argument('name')
def create_app(slug, name):
    """Adds an app into the database
    :param slug: str short name of the app
    :param name: str name of the application
    """
Mart van Santen's avatar
wq  
Mart van Santen committed
    app.logger.info(f"Creating app definition: {name} ({slug})")

    obj = App()
    obj.name = name
    obj.slug = slug

    db.session.add(obj)
    db.session.commit()
@app_cli.command('list')
def list_app():
    """List all apps found in the database"""
    app.logger.info("Listing configured apps")
    apps = App.query.all()
        print(f"App name: {obj.name} \t Slug: {obj.slug}")
@app_cli.command('delete',)
@click.argument('slug')
def delete_app(slug):
    """Removes app from database
    :param slug: str Slug of app to remove
    """
    app.logger.info(f"Trying to delete app: {slug}")
    obj = App.query.filter_by(slug=slug).first()

    if not obj:
        app.logger.info("Not found")
        return

Mart van Santen's avatar
Mart van Santen committed
    # Deleting will (probably) fail is there are still roles attached. This is a
    # PoC implementation only. Actually management of apps and roles will be
    # done by the backend application
    db.session.delete(obj)
    db.session.commit()
    app.logger.info("Success")
    return


app.cli.add_command(app_cli)


Mart van Santen's avatar
Mart van Santen committed
## CLI USER COMMANDS
Mart van Santen's avatar
Mart van Santen committed
@user_cli.command('setrole')
@click.argument('email')
@click.argument('app_slug')
@click.argument('role')
def setrole(email, app_slug, role):
    app.logger.info(f"Assiging role {role} to {email} for app {app_slug}");

    # Find user
    user = KratosUser.find_by_email(KRATOS_ADMIN, email)

    if role != 'admin' and role != 'user':
        print("At this point only the roles 'admin' and 'user' are accepted");
        exit(1)



    if not user:
        print("User not found. Abort");
        exit(1)

    app_obj = db.session.query(App).filter(App.slug==app_slug).first()
    if not app_obj:
        print("App not found. Abort.");
        exit(1)


    role_obj = db.session.query(AppRole).\
                    filter(AppRole.app_id==app_obj.id).\
                    filter(AppRole.user_id==user.uuid).first()


    if role_obj:
        db.session.delete(role_obj)

    obj = AppRole()
    obj.user_id = user.uuid
    obj.app_id = app_obj.id
    obj.role = role


    db.session.add(obj)
    db.session.commit()



@user_cli.command('show')
@click.argument('email')
def show_user(email):
Mart van Santen's avatar
Mart van Santen committed
    """Show user details. Output a table with the user and details about the
    internal state/values of the user object
    :param email: Email address of the user to show
    """
    user = KratosUser.find_by_email(KRATOS_ADMIN, email)
    print(user)
    print("")
    print(f"UUID:     {user.uuid}")
    print(f"Username: {user.username}")
    print(f"Updated:  {user.updated_at}")
    print(f"Created:  {user.created_at}")
    print(f"State:    {user.state}")
Mart van Santen's avatar
Mart van Santen committed
@user_cli.command('update')
@click.argument('email')
Mart van Santen's avatar
Mart van Santen committed
@click.argument('field')
@click.argument('value')
def update_user(email, field, value):
    """Update an user object. It can modify email and name currently
    :param email: Email address of user to update
    :param field: Field to update, supported [name|email]
    :param value: The value to set the field with
    """
    app.logger.info(f"Looking for user with email: {email}")
    user = KratosUser.find_by_email(KRATOS_ADMIN, email)
Mart van Santen's avatar
Mart van Santen committed
    if not user:
        app.logger.error(f"User with email {email} not found.")
        return

    if field == 'name':
        user.name = value
    elif field == 'email':
        user.email = value
    else:
        app.logger.error(f"Field not found: {field}")

    user.save()


@user_cli.command('delete')
@click.argument('email')
Mart van Santen's avatar
Mart van Santen committed
def delete_user(email):
    """Delete an user from the database
    :param email: Email address of user to delete
    """
    app.logger.info(f"Looking for user with email: {email}")
    user = KratosUser.find_by_email(KRATOS_ADMIN, email)
Mart van Santen's avatar
Mart van Santen committed
    if not user:
        app.logger.error(f"User with email {email} not found.")
        return
@user_cli.command('create')
@click.argument('email')
def create_user(email):
    """Create a user in the kratos database. The argument must be an unique
    email address
    :param email: string Email address of user to add
    """
    app.logger.info(f"Creating user with email: ({email})")
Mart van Santen's avatar
Mart van Santen committed
    # Create a user
    user = KratosUser.find_by_email(KRATOS_ADMIN, email)
    if user:
Mart van Santen's avatar
Mart van Santen committed
        app.logger.info("User already exists. Not recreating")
    user = KratosUser(KRATOS_ADMIN)
    user.email = email
    user.save()
@user_cli.command('setpassword')
@click.argument('email')
@click.argument('password')
def setpassword_user(email, password):
    """Set a password for an account
    :param email:    email address of account to set a password for
    :param password:  password to be set
    :return:         true on success, false if not set (too weak)
    :rtype: boolean
    :raise:          exception if unexepted error happens
    """

    app.logger.info(f"Setting password for: ({email})")

    # Kratos does not provide an interface to set a password directly. However
    # we still want to be able to set a password. So we have to hack our way
Mart van Santen's avatar
Mart van Santen committed
    # a bit around this. We do this by creating a recovery link though the
    # admin interface (which is not e-mailed) and then follow the recovery
    # flow in the public facing pages of kratos

    try:
        # Get the ID of the user
        kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)

        # Get a recovery URL
        url = kratos_user.get_recovery_link()

        # Execute UI sequence to set password, given we have a recovery URL
        result = kratos_user.ui_set_password(app.config["KRATOS_PUBLIC_URL"], url, password)

    except BackendError as error:
        app.logger.error(f"Error while setting password: {error}")
        return False

    if result:
        app.logger.info("Success setting password")
    else:
        app.logger.error("Failed to set password. Password too weak?")

    return result



Mart van Santen's avatar
Mart van Santen committed
@user_cli.command('list')
def list_user():
Mart van Santen's avatar
Mart van Santen committed
    """Show a list of users in the database"""
Mart van Santen's avatar
Mart van Santen committed
    app.logger.info("Listing users")
    users = KratosUser.find_all(KRATOS_ADMIN)
Mart van Santen's avatar
Mart van Santen committed

    for obj in users:
Mart van Santen's avatar
Mart van Santen committed


@user_cli.command('recover')
@click.argument('email')
def recover_user(email):
Mart van Santen's avatar
Mart van Santen committed
    """Get recovery link for a user, to manual update the user/use
    :param email: Email address of the user
    """

    app.logger.info(f"Trying to send recover email for user: {email}")
Mart van Santen's avatar
Mart van Santen committed
    try:
        # Get the ID of the user
        kratos_user = KratosUser.find_by_email(KRATOS_ADMIN, email)
Mart van Santen's avatar
Mart van Santen committed
        # Get a recovery URL
        url = kratos_user.get_recovery_link()
Mart van Santen's avatar
Mart van Santen committed
        print(url)
    except BackendError as error:
        app.logger.error(f"Error while getting reset link: {error}")
app.cli.add_command(user_cli)

Mart van Santen's avatar
Mart van Santen committed

##############################################################################
# WEB ROUTES                                                                 #
##############################################################################

@app.route('/recovery', methods=['GET', 'POST'])
def recovery():
    """Start recovery flow
    If no active flow, redirect to kratos to create a flow, otherwise render the
    recovery template.
    :param flow: flow as given by Kratos
    :return: redirect or recovery page
    """
    flow = request.args.get("flow")
    if not flow:
        return redirect(app.config["KRATOS_PUBLIC_URL"] + "self-service/recovery/browser")

    return render_template(
        'recover.html',
        api_url = app.config["KRATOS_PUBLIC_URL"]
        )
@app.route('/settings', methods=['GET', 'POST'])
def settings():
    """Start settings flow
    If no active flow, redirect to kratos to create a flow, otherwise render the
    settings template.
    :param flow: flow as given by Kratos
    :return: redirect or settings page
    """

    flow = request.args.get("flow")
    if not flow:
        return redirect(app.config["KRATOS_PUBLIC_URL"] + "self-service/settings/browser")

    return render_template(
        'settings.html',
        api_url = app.config["KRATOS_PUBLIC_URL"]
        )


@app.route('/login', methods=['GET', 'POST'])
def login():
    """Start login flow
    If already logged in, shows the loggedin template. Otherwise creates a login
    flow, if no active flow will redirect to kratos to create a flow.

    :param flow: flow as given by Kratos
    :return: redirect or login page
    """

    # Check if we are logged in:
        return render_template(
            'loggedin.html',
            api_url = app.config["KRATOS_PUBLIC_URL"],
            id = id)

    flow = request.args.get("flow")
    # If we do not have a flow, get one.
    if not flow:
        return redirect(app.config["KRATOS_PUBLIC_URL"] + "self-service/login/browser")

    return render_template(
        'login.html',
        api_url = app.config["KRATOS_PUBLIC_URL"]
        )


@app.route('/auth', methods=['GET', 'POST'])
def auth():
Mart van Santen's avatar
Mart van Santen committed
    """Authorize an user for an application
    If an application authenticated against the IdP (Idenitity Provider), if
    there are no active session, the user is forwarded to the login page.
Mart van Santen's avatar
Mart van Santen committed
    This is the entry point for those authorization requests. The challenge
    as provided, is verified. If an active user is logged in, the request
    is accepted and the user is returned to the application. If the user is not
    logged in yet, it redirects to the login page
Mart van Santen's avatar
Mart van Santen committed
    :param challenge: challenge as given by Hydra
    :return: redirect to login or application/idp
    """
    # Retrieve the challenge id from the request. Depending on the method it is
    # saved in the form (POST) or in a GET variable. If this variable is not set
    # we can not continue.
    if request.method == 'GET':
        challenge = request.args.get("login_challenge")
    if request.method == 'POST':
        challenge = request.args.post("login_challenge")

    if not challenge:
Mart van Santen's avatar
Mart van Santen committed
        app.logger.error("No challenge given. Error in request")
        abort(400, description="Challenge required when requesting authorization")

    # Check if we are logged in:
    # If the user is not logged in yet, we redirect to the login page
    # but before we do that, we set the "flow_state" cookie to auth.
    # so the UI knows it has to redirect after a successful login.
    # The redirect URL is back to this page (auth) with the same challenge
    # so we can pickup the flow where we left off.
        url = app.config["PUBLIC_URL"] + "/auth?login_challenge=" + challenge
        url = urllib.parse.quote_plus(url)

        app.logger.info("Redirecting to login. Setting flow_state cookies")
        app.logger.info("auth_url: " + url)

Mart van Santen's avatar
Mart van Santen committed
        response = redirect(app.config["PUBLIC_URL"] + "/login")
        response.set_cookie('flow_state', 'auth')
        response.set_cookie('auth_url', url)
        return response

Mart van Santen's avatar
Mart van Santen committed
    app.logger.info("User is logged in. We can authorize the user")

    try:
        login_request = HYDRA.login_request(challenge)
    except hydra_client.exceptions.NotFound:
        app.logger.error(f"Not Found. Login request not found. challenge={challenge}")
        abort(404, description="Login request not found. Please try again.")
    except hydra_client.exceptions.HTTPError:
        app.logger.error(f"Conflict. Login request has been used already. challenge={challenge}")
        abort(503, description="Login request already used. Please try again.")
Mart van Santen's avatar
Mart van Santen committed
    # Authorize the user
    # False positive: pylint: disable=no-member
    redirect_to = login_request.accept(
                remember=True,
                # Remember session for 7d
                remember_for=60*60*24*7)

Mart van Santen's avatar
Mart van Santen committed
    return redirect(redirect_to)


@app.route('/consent', methods=['GET', 'POST'])
def consent():
    """Get consent
    For now, it just allows every user. Eventually this function should check
    the roles and settings of a user and provide that information to the
    application.
    :param consent_challenge: challenge as given by Hydra
    :return: redirect to login or render error
Mart van Santen's avatar
Mart van Santen committed
    """

    challenge = request.args.get("consent_challenge")
    if not challenge:
        abort(403, description="Consent request required. Do not call this page directly")
    try:
        consent_request = HYDRA.consent_request(challenge)
    except hydra_client.exceptions.NotFound:
        app.logger.error(f"Not Found. Consent request {challenge} not found")
        abort(404, description="Consent request does not exist. Please try again")
    except hydra_client.exceptions.HTTPError:
        app.logger.error(f"Conflict. Consent request {challenge} already used")
        abort(503, description="Consent request already used. Please try again")
    # Get information about this consent request:
    # False positive: pylint: disable=no-member
    app_name = consent_request.client.client_name
    # False positive: pylint: disable=no-member
    # Get the related user object
    user = KratosUser(KRATOS_ADMIN, kratos_id)
    if not user:
        app.logger.error(f"User not found in database: {kratos_id}")
        abort(401, description="User not found. Please try again.")
Mart van Santen's avatar
Mart van Santen committed
    # Get role on this app
    app_obj = db.session.query(App).filter(App.slug==app_name).first()

    # Default access level
Mart van Santen's avatar
Mart van Santen committed
    role = ''
Mart van Santen's avatar
Mart van Santen committed
    if app_obj:
        role_obj = db.session.query(AppRole).\
                    filter(AppRole.app_id==app_obj.id).\
                    filter(AppRole.user_id==user.uuid).first()
        if role_obj:
            role = role_obj.role

    # Get claims for this user, provided the current app
Mart van Santen's avatar
Mart van Santen committed
    claims = user.get_claims(app_name, None, role)
    # pylint: disable=fixme
    # TODO: Need to implement checking claims here, once the backend for that is
    #       developed
    app.logger.info(f"Providing consent to {app_name} for {kratos_id}")
    app.logger.info(f"{kratos_id} was granted access to {app_name}")
    # False positive: pylint: disable=no-member
    return redirect(consent_request.accept(
        grant_scope=consent_request.requested_scope,
        grant_access_token_audience=consent_request.requested_access_token_audience,
        session=claims,
    ))



@app.route('/status', methods=['GET', 'POST'])
def status():
Mart van Santen's avatar
Mart van Santen committed
    """Get status of current session
    Show if there is an user is logged in. If not shows: not-auth
    """

Mart van Santen's avatar
Mart van Santen committed
    """Checks if user is logged in
    Queries the cookies. If an authentication cookie is found, it
    checks with Kratos if the cookie is still valid. If so,
    the profile is returned. Otherwise False is returned.
    :return: Profile or False if not logged in
    """
        cookie = request.cookies.get('ory_kratos_session')
        cookie = "ory_kratos_session=" + cookie
    except TypeError:
        app.logger.info("User not logged in or cookie corrupted")
        return False

Mart van Santen's avatar
Mart van Santen committed
    # Given a cookie, check if it is valid and get the profile
    try:
        api_response = KRATOS_PUBLIC.to_session(
            cookie=cookie)
        # Get all traits from ID

    except ory_kratos_client.ApiException as error:
        app.logger.error(f"Exception when calling V0alpha2Api->to_session(): {error}\n")

if __name__ == '__main__':
    app.run()