Skip to content
Snippets Groups Projects
app.py 7.9 KiB
Newer Older
# Basic system imports
import logging
import os
import click
import pprint
from flask import abort, Flask, redirect, request, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask.cli import AppGroup

# Import modules for external APS

# Hydra, OIDC Identity Provider
import hydra_client
# Kratos, Identity manager
import ory_kratos_client 
from ory_kratos_client.api import metadata_api
from ory_kratos_client.api import v0alpha2_api as kratos_api
from ory_kratos_client.model.generic_error import GenericError
from ory_kratos_client.model.inline_response200 import InlineResponse200
from ory_kratos_client.model.inline_response2001 import InlineResponse2001
from ory_kratos_client.model.inline_response503 import InlineResponse503

from ory_kratos_client.model.admin_create_identity_body import AdminCreateIdentityBody
from ory_kratos_client.model.admin_create_self_service_recovery_link_body import AdminCreateSelfServiceRecoveryLinkBody

from ory_kratos_client.model.identity_state import IdentityState

# Initaliaze the FLASK app
app = Flask(__name__,
             static_url_path='/static')
# Load config
app.config.from_object(os.environ['APP_SETTINGS'])

# Set right logging level based on config DEBUG flag
if (app.config['DEBUG']):
    app.logger.setLevel(logging.INFO)
Mart van Santen's avatar
Mart van Santen committed
else:
    app.logger.setLevel(logging.ERROR)


# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(app.config["HYDRA_ADMIN_URL"])

# Kratos has an admin and public end-point. We create an API for them
# both. The kratos implementation has bugs, which forces us to set
# the discard_unkonwn_keys to True.
tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_ADMIN_URL"],
                                        discard_unknown_keys= True)
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))

tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_PUBLIC_URL"],
                                      discard_unknown_keys = True)
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))


# Kratos API Example:
# A minimal use of the KRATOS is for example getting all identities as an admin
# ids = KRATOS_ADMIN.admin_list_identities()
# print (ids)
#
# Kratos API documentation for python can be found here:
# https://github.com/ory/sdk/blob/master/clients/kratos/python/README.md
Mart van Santen's avatar
Mart van Santen committed
# Create DB & migrate interface
db = SQLAlchemy(app)
migrate = Migrate(app, db)

# Import models
from models import User, App, AppRole

#
# WARNING:
#
# Below are very minimalistic calls to interfaces for development and testing 
# purposed. Eventually this need to be moved to seperate files and more
# sophisticated calls with try{} catch{} claused etc.
#
Mart van Santen's avatar
Mart van Santen committed
##############################################################################
# CLI INTERFACE                                                              #
##############################################################################
# Define Flask CLI command groups and commands
user_cli = AppGroup('user')
app_cli = AppGroup('app')

Mart van Santen's avatar
Mart van Santen committed
## CLI APP COMMANDS

@app_cli.command('create')
@click.argument('slug')
@click.argument('name')
def create_app(slug, name):
    app.logger.info("Creating app definition: {1} ({0})".format(slug, name))

    obj = App()
    obj.name = name
    obj.slug = slug

    db.session.add(obj)
    db.session.commit()
@app_cli.command('list')
def list_app():
    app.logger.info("Listing configured apps")
    apps = App.query.all()
    for obj in apps:
        print("App name: %s \t Slug: %s" %(obj.name, obj.slug))
@app_cli.command('delete',)
@click.argument('slug')
def delete_app(slug):
    app.logger.info("Trying to delete app: {0}".format(slug))
    obj = App.query.filter_by(slug=slug).first()

    if not obj:
        app.logger.info("Not found")
        return


    # TODO: Deleting will (propably) fail is there are still roles attached
    #       these probobly need to be cleaned up first.
    db.session.delete(obj)
    db.session.commit()
    app.logger.info("Success")
    return


app.cli.add_command(app_cli)


Mart van Santen's avatar
Mart van Santen committed
## CLI USER COMMANDS

@user_cli.command('create')
@click.argument('email')
def create_user(email):
Mart van Santen's avatar
Mart van Santen committed
    app.logger.info("Creating user with email: ({0})".format(email))

    obj = User()
    obj.email = email

    # TODO:
    #  - Should check if database entry already exists. If so, double check if
    #    kratos user exists. If not, create that one.
    #
    #  - If no DB user, should check with kratos of user already exists. If so, 
    #    throw warning, but still create DB entrie
    #
    #  - After creating kratos user, check if success, otherwise throw warning.

    body = AdminCreateIdentityBody(
        schema_id="default",
        traits={'email':email},
    ) # AdminCreateIdentityBody |  (optional)

        #state=IdentityState("active"),

    kratos_obj = KRATOS_ADMIN.admin_create_identity(admin_create_identity_body=body)
    obj.kratos_id = kratos_obj.id
    db.session.add(obj)
Mart van Santen's avatar
Mart van Santen committed
    db.session.commit()


@user_cli.command('list')
def list_user():
    app.logger.info("Listing users")
    users = User.query.all()

    for obj in users:
        print("Email: %s (admin: %s)" %(obj.email, obj.admin))


@user_cli.command('delete',)
@click.argument('email')
def delete_user(email):
    app.logger.info("Trying to delete user: {0}".format(email))
    obj = User.query.filter_by(email=email).first()

    if not obj:
        app.logger.info("Not found")
        return

    db.session.delete(obj)
    # TODO: 
    #  - if delete succesfull, also delete kratos user, if exists
    #  - probably user roles need to be deleted before user can be deleted
Mart van Santen's avatar
Mart van Santen committed
    db.session.commit()
    app.logger.info("Success")
    return

@user_cli.command('setadmin')
@click.argument('email')
def setadmin_user(email):
    app.logger.info("Trying to make user into admin: {0}".format(email))
    obj = User.query.filter_by(email=email).first()

    if not obj:
        app.logger.info("Not found")
        return

    obj.admin = True
    db.session.commit()
    app.logger.info("Success")
    return


@user_cli.command('unsetadmin')
@click.argument('email')
def unsetadmin_user(email):
    app.logger.info("Trying to make user into normal user: {0}".format(email))
    obj = User.query.filter_by(email=email).first()

    if not obj:
        app.logger.info("Not found")
        return

    obj.admin = False
    db.session.commit()
    app.logger.info("Success")
    return



@user_cli.command('recover')
@click.argument('email')
def recover_user(email):
    app.logger.info("Trying to sent recover email for user: {0}".format(email))
    obj = User.query.filter_by(email=email).first()

Mart van Santen's avatar
Mart van Santen committed
    if not obj:
        app.logger.info("Not found")
        return

    if not obj.kratos_id:
        app.logger.info("User found, but no kratos ID")
        return

    body = AdminCreateSelfServiceRecoveryLinkBody(
        expires_in="1h",
        identity_id=obj.kratos_id,
    ) 

    # example passing only required values which don't have defaults set
    # and optional values
    try:
        # Create a Recovery Link
        api_response = KRATOS_ADMIN.admin_create_self_service_recovery_link(
                admin_create_self_service_recovery_link_body=body)

        pprint.pprint(api_response)
    except ory_kratos_client.ApiException as e:
        print("Exception when calling V0alpha2Api->admin_create_self_service_recovery_link: %s\n" % e)    


app.cli.add_command(user_cli)

Mart van Santen's avatar
Mart van Santen committed



##############################################################################
# WEB ROUTES                                                                 #
##############################################################################

# -> recovery

# TODO: Create webroutes
@app.route('/recover', methods=['GET', 'POST'])
def recover():

    return render_template('recover.html')


    #login_form=login_form, logo=login_request.client.logo_uri, application_name=login_request.client.client_name)