Skip to content
Snippets Groups Projects
app.py 15.35 KiB


# Basic system imports
import pprint
import logging
import os
import click
import urllib.parse

# Flask
from flask import abort, Flask, redirect, request, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask.cli import AppGroup


# Import modules for external APIs

# Hydra, OIDC Identity Provider
import hydra_client

# Kratos, Identity manager
import ory_kratos_client
from ory_kratos_client.api import metadata_api
from ory_kratos_client.api import v0alpha2_api as kratos_api
from ory_kratos_client.model.generic_error import GenericError
from ory_kratos_client.model.inline_response200 import InlineResponse200
from ory_kratos_client.model.inline_response2001 import InlineResponse2001
from ory_kratos_client.model.inline_response503 import InlineResponse503

from ory_kratos_client.model.admin_create_identity_body import AdminCreateIdentityBody
from ory_kratos_client.model.admin_create_self_service_recovery_link_body import AdminCreateSelfServiceRecoveryLinkBody

from ory_kratos_client.model.identity_state import IdentityState



# Initaliaze the FLASK app
app = Flask(__name__,
             static_url_path='/static')

# Load config
app.config.from_object(os.environ['APP_SETTINGS'])


# Set right logging level based on config DEBUG flag
if app.config['DEBUG']:
    app.logger.setLevel(logging.INFO)
else:
    app.logger.setLevel(logging.ERROR)



# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(app.config["HYDRA_ADMIN_URL"])

# Kratos has an admin and public end-point. We create an API for them
# both. The kratos implementation has bugs, which forces us to set
# the discard_unknown_keys to True.
tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_ADMIN_URL"],
                                        discard_unknown_keys= True)
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))

tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_PUBLIC_URL"],
                                      discard_unknown_keys = True)
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))


# Kratos API Example:
# A minimal use of the KRATOS is for example getting all identities as an admin
#
# ids = KRATOS_ADMIN.admin_list_identities()
# print (ids)
#
# Kratos API documentation for python can be found here:
# https://github.com/ory/sdk/blob/master/clients/kratos/python/README.md
#


# Create DB & migrate interface
db = SQLAlchemy(app)
migrate = Migrate(app, db)

# Import models
from models import User, App, AppRole

#
# WARNING:
#
# Below are very minimalistic calls to interfaces for development and testing
# purposed. Eventually this need to be moved to seperate files and more
# sophisticated calls with try{} catch{} claused etc.
#

##############################################################################
# CLI INTERFACE                                                              #
##############################################################################
# Define Flask CLI command groups and commands
user_cli = AppGroup('user')
app_cli = AppGroup('app')

## CLI APP COMMANDS

@app_cli.command('create')
@click.argument('slug')
@click.argument('name')
def create_app(slug, name):
    app.logger.info(f"Creating app definition: {name} ({slug})")

    obj = App()
    obj.name = name
    obj.slug = slug

    db.session.add(obj)
    db.session.commit()



@app_cli.command('list')
def list_app():
    app.logger.info("Listing configured apps")
    apps = App.query.all()

    for obj in apps:
        print("App name: %s \t Slug: %s" %(obj.name, obj.slug))


@app_cli.command('delete',)
@click.argument('slug')
def delete_app(slug):
    app.logger.info("Trying to delete app: {0}".format(slug))
    obj = App.query.filter_by(slug=slug).first()

    if not obj:
        app.logger.info("Not found")
        return


    # TODO: Deleting will (propably) fail is there are still roles attached
    #       these probobly need to be cleaned up first.
    db.session.delete(obj)
    db.session.commit()
    app.logger.info("Success")
    return


app.cli.add_command(app_cli)


## CLI USER COMMANDS

@user_cli.command('create')
@click.argument('email')
def create_user(email):
    app.logger.info("Creating user with email: ({0})".format(email))

    obj = User()
    obj.email = email


    # TODO:
    #  - Should check if database entry already exists. If so, double check if
    #    kratos user exists. If not, create that one.
    #
    #  - If no DB user, should check with kratos of user already exists. If so,
    #    throw warning, but still create DB entrie
    #
    #  - After creating kratos user, check if success, otherwise throw warning.

    body = AdminCreateIdentityBody(
        schema_id="default",
        traits={'email':email},
    ) # AdminCreateIdentityBody |  (optional)

        #state=IdentityState("active"),

    kratos_obj = KRATOS_ADMIN.admin_create_identity(admin_create_identity_body=body)


    obj.kratos_id = kratos_obj.id


    db.session.add(obj)

    db.session.commit()


@user_cli.command('list')
def list_user():
    app.logger.info("Listing users")
    users = User.query.all()

    for obj in users:
        print("Email: %s (admin: %s)" %(obj.email, obj.admin))


@user_cli.command('delete',)
@click.argument('email')
def delete_user(email):
    app.logger.info("Trying to delete user: {0}".format(email))
    obj = User.query.filter_by(email=email).first()

    if not obj:
        app.logger.info("Not found")
        return

    db.session.delete(obj)

    # TODO:
    #  - if delete succesfull, also delete kratos user, if exists
    #  - probably user roles need to be deleted before user can be deleted

    db.session.commit()
    app.logger.info("Success")
    return

@user_cli.command('setadmin')
@click.argument('email')
def setadmin_user(email):
    app.logger.info("Trying to make user into admin: {0}".format(email))
    obj = User.query.filter_by(email=email).first()

    if not obj:
        app.logger.info("Not found")
        return

    obj.admin = True
    db.session.commit()
    app.logger.info("Success")
    return


@user_cli.command('unsetadmin')
@click.argument('email')
def unsetadmin_user(email):
    app.logger.info("Trying to make user into normal user: {0}".format(email))
    obj = User.query.filter_by(email=email).first()

    if not obj:
        app.logger.info("Not found")
        return

    obj.admin = False
    db.session.commit()
    app.logger.info("Success")
    return



@user_cli.command('recover')
@click.argument('email')
def recover_user(email):
    app.logger.info("Trying to send recover email for user: {0}".format(email))
    obj = User.query.filter_by(email=email).first()


    if not obj:
        app.logger.info("Not found")
        return

    if not obj.kratos_id:
        app.logger.info("User found, but no kratos ID")
        return

    body = AdminCreateSelfServiceRecoveryLinkBody(
        expires_in="1h",
        identity_id=obj.kratos_id,
    )

    # example passing only required values which don't have defaults set
    # and optional values
    try:
        # Create a Recovery Link
        api_response = KRATOS_ADMIN.admin_create_self_service_recovery_link(
                admin_create_self_service_recovery_link_body=body)

        pprint.pprint(api_response)
    except ory_kratos_client.ApiException as error:
        app.logger.error("Exception when calling" +
            "V0alpha2Api->admin_create_self_service_recovery_link: %s\n" % error)


    return


app.cli.add_command(user_cli)




##############################################################################
# WEB ROUTES                                                                 #
##############################################################################

@app.route('/recovery', methods=['GET', 'POST'])
def recovery():
    """Start recovery flow
    If no active flow, redirect to kratos to create a flow, otherwise render the
    recovery template.
    :param flow: flow as given by Kratos
    :return: redirect or recovery page
    """

    flow = request.args.get("flow")
    if not flow:
        return redirect(app.config["KRATOS_PUBLIC_URL"] + "self-service/recovery/browser")

    return render_template(
        'recover.html',
        api_url = app.config["KRATOS_PUBLIC_URL"]
        )


@app.route('/settings', methods=['GET', 'POST'])
def settings():
    """Start settings flow
    If no active flow, redirect to kratos to create a flow, otherwise render the
    settings template.
    :param flow: flow as given by Kratos
    :return: redirect or settings page
    """

    flow = request.args.get("flow")
    if not flow:
        return redirect(app.config["KRATOS_PUBLIC_URL"] + "self-service/settings/browser")

    return render_template(
        'settings.html',
        api_url = app.config["KRATOS_PUBLIC_URL"]
        )


@app.route('/login', methods=['GET', 'POST'])
def login():
    """Start login flow
    If already logged in, shows the loggedin template. Otherwise creates a login 
    flow, if no active flow will redirect to kratos to create a flow.

    :param flow: flow as given by Kratos
    :return: redirect or login page
    """

    # Check if we are logged in:
    profile = getAuth()

    if profile:
        return render_template(
            'loggedin.html',
            api_url = app.config["KRATOS_PUBLIC_URL"],
            id = id)

    flow = request.args.get("flow")

    # If we do not have a flow, get one. 
    if not flow:
        return redirect(app.config["KRATOS_PUBLIC_URL"] + "self-service/login/browser")

    return render_template(
        'login.html',
        api_url = app.config["KRATOS_PUBLIC_URL"]
        )


@app.route('/auth', methods=['GET', 'POST'])
def auth():
    """Authorize an user for an application
    If an application authenticated against the IdP (Idenitity Provider), if
    there are no active session, the user is forwarded to the login page. 
    This is the entry point for those authorization requests. The challenge
    as provided, is verified. If an active user is logged in, the request
    is accepted and the user is returned to the application. If the user is not
    logged in yet, it redirects to the login page
    :param challenge: challenge as given by Hydra
    :return: redirect to login or application/idp
    """

    challenge = None

    # Retrieve the challenge id from the request. Depending on the method it is  
    # saved in the form (POST) or in a GET variable. If this variable is not set 
    # we can not continue.
    if request.method == 'GET':
        challenge = request.args.get("login_challenge")
    if request.method == 'POST':
        challenge = request.args.post("login_challenge")

    if not challenge:
        # TODO: Use local error page?
        app.logger.error("No challange given. Error in request")
        abort(404)


    # Check if we are logged in:
    profile = getAuth()


    # If the user is not logged in yet, we redirect to the login page
    # but before we do that, we set the "flow_state" cookie to auth.
    # so the UI knows it has to redirect after a successful login.
    # The redirect URL is back to this page (auth) with the same challenge
    # so we can pickup the flow where we left off.
    if not profile:
        url = app.config["PUBLIC_URL"] + "/auth?login_challenge=" + challenge;
        url = urllib.parse.quote_plus(url)

        app.logger.info("Redirecting to login. Setting flow_state cookies")
        app.logger.info("auth_url: " + url)

        response = redirect("login")
        response.set_cookie('flow_state', 'auth')
        response.set_cookie('auth_url', url)
        return response



    app.logger.info("User is logged in. We can authorize the user")

    try:
        login_request = HYDRA.login_request(challenge)
    except hydra_client.exceptions.NotFound:
        app.logger.error("Not Found. Login request not found. challenge={0}".format(challenge))
        # TODO: Different error page?
        abort(404)
    except hydra_client.exceptions.HTTPError:
        app.logger.error("Conflict. Login request has been used already. challenge={0}".format(challenge))
        # TODO: Different error page?
        abort(503)

    # Authorize the user
    redirect_to = login_request.accept(
                profile['email'],
                remember=True,
                # Remember session for 7d
                remember_for=60*60*24*7)

    return redirect(redirect_to)


@app.route('/consent', methods=['GET', 'POST'])
def consent():
    """Get consent
    For now, it just allows every user. Eventually this function should check
    the roles and settings of a user and provide that information to the
    application.
    :param consent_challenge: challenge as given by Hydra
    :return: redirect to login or render error
    """

    challenge = request.args.get("consent_challenge")
    if not challenge:
        # TODO: Better error handling/display?
        abort(403)
    try:
        consent_request = HYDRA.consent_request(challenge)
    except hydra_client.exceptions.NotFound:
        app.logger.error("Not Found. Consent request not found. challenge={0}".format(challenge))
        # TODO: Better error handling/display?
        abort(404)
    except hydra_client.exceptions.HTTPError:
        app.logger.error("Conflict. Consent request has been used already. challenge={0}".format(challenge))
        # TODO: Better error handling/display?
        abort(503)

    # Get information about this consent request:
    app_name = consent_request.client.client_name
    username = consent_request.subject

    # Get the related database object to get roles/access rights
    user = User.query.filter_by(email=username).first()
    if not user:
        app.logger.error("User not found in database: {0}".format(username))
        # TODO: Better error handling?
        abort(401)

    # Get claims for this user, provided the current app
    claims = user.getClaims(app_name)

    # TODO: Check access / claims?

    app.logger.info("Providing consent to %s for %s" % (app_name, username))


    app.logger.info("{0} was granted access to {1}".format(username, app_name))

    return redirect(consent_request.accept(
        grant_scope=consent_request.requested_scope,
        grant_access_token_audience=consent_request.requested_access_token_audience,
        session=claims,
    ))



@app.route('/status', methods=['GET', 'POST'])
def status():
    """Get status of current session
    Show if there is an user is logged in. If not shows: not-auth
    """

    auth = getAuth()

    if auth:
        return auth['email']
    else:
        return "not-auth"



def getAuth():
    """Checks if user is logged in
    Queries the cookies. If an authentication cookie is found, it
    checks with Kratos if the cookie is still valid. If so,
    the profile is returned. Otherwise False is returned.
    :return: Profile or False if not logged in
    """

    try:
        cookie = request.cookies.get('ory_kratos_session');
        cookie = "ory_kratos_session=" + cookie
    except TypeError:
        app.logger.info("User not logged in or cookie corrupted")
        return False

    # Given a cookie, check if it is valid and get the profile
    try:
        api_response = KRATOS_PUBLIC.to_session(
            cookie = cookie)

        # Get all traits from ID
        profile = api_response.identity.traits
        return profile

    except ory_kratos_client.ApiException as error:
        app.logger.error("Exception when calling" +
            "V0alpha2Api->to_session(): %s\n" % error)

    return False


if __name__ == '__main__':
    app.run()