Newer
Older
import logging
import os
import click
from flask import abort, Flask, redirect, request, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask.cli import AppGroup
# Import modules for external APS
# Hydra, OIDC Identity Provider
import hydra_client
import ory_kratos_client
from ory_kratos_client.api import metadata_api
from ory_kratos_client.api import v0alpha2_api as kratos_api
from ory_kratos_client.model.generic_error import GenericError
from ory_kratos_client.model.inline_response200 import InlineResponse200
from ory_kratos_client.model.inline_response2001 import InlineResponse2001
from ory_kratos_client.model.inline_response503 import InlineResponse503
from ory_kratos_client.model.admin_create_identity_body import AdminCreateIdentityBody
from ory_kratos_client.model.admin_create_self_service_recovery_link_body import AdminCreateSelfServiceRecoveryLinkBody
from ory_kratos_client.model.identity_state import IdentityState
# Initaliaze the FLASK app
app = Flask(__name__,
static_url_path='/static')
# Load config
app.config.from_object(os.environ['APP_SETTINGS'])
# Set right logging level based on config DEBUG flag
if (app.config['DEBUG']):
app.logger.setLevel(logging.INFO)
app.logger.setLevel(logging.ERROR)
# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(app.config["HYDRA_ADMIN_URL"])
# Kratos has an admin and public end-point. We create an API for them
# both. The kratos implementation has bugs, which forces us to set
# the discard_unkonwn_keys to True.
tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_ADMIN_URL"],
discard_unknown_keys= True)
KRATOS_ADMIN = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_PUBLIC_URL"],
discard_unknown_keys = True)
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
# Kratos API Example:
# A minimal use of the KRATOS is for example getting all identities as an admin
# ids = KRATOS_ADMIN.admin_list_identities()
# print (ids)
#
# Kratos API documentation for python can be found here:
# https://github.com/ory/sdk/blob/master/clients/kratos/python/README.md
db = SQLAlchemy(app)
migrate = Migrate(app, db)
# Import models
from models import User, App, AppRole
#
# WARNING:
#
# Below are very minimalistic calls to interfaces for development and testing
# purposed. Eventually this need to be moved to seperate files and more
# sophisticated calls with try{} catch{} claused etc.
#
##############################################################################
# CLI INTERFACE #
##############################################################################
# Define Flask CLI command groups and commands
user_cli = AppGroup('user')
app_cli = AppGroup('app')
@app_cli.command('create')
@click.argument('slug')
@click.argument('name')
def create_app(slug, name):
app.logger.info("Creating app definition: {1} ({0})".format(slug, name))
obj = App()
obj.name = name
obj.slug = slug
db.session.add(obj)
db.session.commit()
@app_cli.command('list')
def list_app():
app.logger.info("Listing configured apps")
apps = App.query.all()
for obj in apps:
print("App name: %s \t Slug: %s" %(obj.name, obj.slug))
@app_cli.command('delete',)
@click.argument('slug')
def delete_app(slug):
app.logger.info("Trying to delete app: {0}".format(slug))
obj = App.query.filter_by(slug=slug).first()
if not obj:
app.logger.info("Not found")
return
# TODO: Deleting will (propably) fail is there are still roles attached
# these probobly need to be cleaned up first.
db.session.delete(obj)
db.session.commit()
app.logger.info("Success")
return
app.cli.add_command(app_cli)
@user_cli.command('create')
@click.argument('email')
def create_user(email):
app.logger.info("Creating user with email: ({0})".format(email))
obj = User()
obj.email = email
# TODO:
# - Should check if database entry already exists. If so, double check if
# kratos user exists. If not, create that one.
#
# - If no DB user, should check with kratos of user already exists. If so,
# throw warning, but still create DB entrie
#
# - After creating kratos user, check if success, otherwise throw warning.
body = AdminCreateIdentityBody(
schema_id="default",
traits={'email':email},
) # AdminCreateIdentityBody | (optional)
#state=IdentityState("active"),
kratos_obj = KRATOS_ADMIN.admin_create_identity(admin_create_identity_body=body)
db.session.commit()
@user_cli.command('list')
def list_user():
app.logger.info("Listing users")
users = User.query.all()
for obj in users:
print("Email: %s (admin: %s)" %(obj.email, obj.admin))
@user_cli.command('delete',)
@click.argument('email')
def delete_user(email):
app.logger.info("Trying to delete user: {0}".format(email))
obj = User.query.filter_by(email=email).first()
if not obj:
app.logger.info("Not found")
return
db.session.delete(obj)
# TODO:
# - if delete succesfull, also delete kratos user, if exists
# - probably user roles need to be deleted before user can be deleted
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
db.session.commit()
app.logger.info("Success")
return
@user_cli.command('setadmin')
@click.argument('email')
def setadmin_user(email):
app.logger.info("Trying to make user into admin: {0}".format(email))
obj = User.query.filter_by(email=email).first()
if not obj:
app.logger.info("Not found")
return
obj.admin = True
db.session.commit()
app.logger.info("Success")
return
@user_cli.command('unsetadmin')
@click.argument('email')
def unsetadmin_user(email):
app.logger.info("Trying to make user into normal user: {0}".format(email))
obj = User.query.filter_by(email=email).first()
if not obj:
app.logger.info("Not found")
return
obj.admin = False
db.session.commit()
app.logger.info("Success")
return
@user_cli.command('recover')
@click.argument('email')
def recover_user(email):
app.logger.info("Trying to sent recover email for user: {0}".format(email))
obj = User.query.filter_by(email=email).first()
if not obj:
app.logger.info("Not found")
return
if not obj.kratos_id:
app.logger.info("User found, but no kratos ID")
return
body = AdminCreateSelfServiceRecoveryLinkBody(
expires_in="1h",
identity_id=obj.kratos_id,
)
# example passing only required values which don't have defaults set
# and optional values
try:
# Create a Recovery Link
api_response = KRATOS_ADMIN.admin_create_self_service_recovery_link(
admin_create_self_service_recovery_link_body=body)
pprint.pprint(api_response)
except ory_kratos_client.ApiException as e:
print("Exception when calling V0alpha2Api->admin_create_self_service_recovery_link: %s\n" % e)
app.cli.add_command(user_cli)
##############################################################################
# WEB ROUTES #
##############################################################################
@app.route('/recover', methods=['GET', 'POST'])
def recover():
return render_template('recover.html')
#login_form=login_form, logo=login_request.client.logo_uri, application_name=login_request.client.client_name)
if __name__ == '__main__':
app.run()