Skip to content
Snippets Groups Projects
Commit e3d416f4 authored by Mart van Santen's avatar Mart van Santen
Browse files

Removed webroutes and added comments

parent 65dc0bbf
No related branches found
No related tags found
1 merge request!50Implemente basic flask + database APIs
# Basic system imports
import logging
import os
import click
......@@ -12,13 +12,13 @@ from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask.cli import AppGroup
# Hydra admin
import hydra_client
# Kratos ?
#from kratos import User, KratosError
#from forms import LoginForm
# Import modules for external APS
# Hydra, OIDC Identity Provider
import hydra_client
# Kratos, Identity manager
import ory_kratos_client
from ory_kratos_client.api import metadata_api
from ory_kratos_client.api import v0alpha2_api as kratos_api
......@@ -32,18 +32,23 @@ from ory_kratos_client.model.admin_create_self_service_recovery_link_body import
from ory_kratos_client.model.identity_state import IdentityState
#from ory_kratos_client.api.public_api import PublicApi as KratosPublicApi
##from ory_kratos_client.api.admin_api import AdminApi as KratosAdminApi
# Initaliaze the FLASK app
app = Flask(__name__)
app = Flask(__name__,
static_url_path='/static')
# Load config
app.config.from_object(os.environ['APP_SETTINGS'])
# Move to config?
app.logger.setLevel(logging.INFO)
# Set right logging level based on config DEBUG flag
if (app.config['DEBUG']):
app.logger.setLevel(logging.INFO)
else
app.logger.setLevel(logging.ERROR)
# Create HYDRA & KRATOS API interfaces
HYDRA = hydra_client.HydraAdmin(app.config["HYDRA_ADMIN_URL"])
......@@ -60,11 +65,15 @@ tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_PUBLIC_URL"],
KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp))
# Kratos API Example:
# A minimal use of the KRATOS is for example getting all identities as an admin
#
#ids = KRATOS_ADMIN.admin_list_identities()
#print (ids)
# ids = KRATOS_ADMIN.admin_list_identities()
# print (ids)
#
# Kratos API documentation for python can be found here:
# https://github.com/ory/sdk/blob/master/clients/kratos/python/README.md
#
# Create DB & migrate interface
......@@ -74,6 +83,13 @@ migrate = Migrate(app, db)
# Import models
from models import User, App, AppRole
#
# WARNING:
#
# Below are very minimalistic calls to interfaces for development and testing
# purposed. Eventually this need to be moved to seperate files and more
# sophisticated calls with try{} catch{} claused etc.
#
##############################################################################
# CLI INTERFACE #
......@@ -98,9 +114,10 @@ def create_app(slug, name):
db.session.commit()
@app_cli.command('list')
def list_app():
app.logger.info("Listing apps")
app.logger.info("Listing configured apps")
apps = App.query.all()
for obj in apps:
......@@ -117,6 +134,9 @@ def delete_app(slug):
app.logger.info("Not found")
return
# TODO: Deleting will (propably) fail is there are still roles attached
# these probobly need to be cleaned up first.
db.session.delete(obj)
db.session.commit()
app.logger.info("Success")
......@@ -137,6 +157,15 @@ def create_user(email):
obj.email = email
# TODO:
# - Should check if database entry already exists. If so, double check if
# kratos user exists. If not, create that one.
#
# - If no DB user, should check with kratos of user already exists. If so,
# throw warning, but still create DB entrie
#
# - After creating kratos user, check if success, otherwise throw warning.
body = AdminCreateIdentityBody(
schema_id="default",
traits={'email':email},
......@@ -146,7 +175,10 @@ def create_user(email):
kratos_obj = KRATOS_ADMIN.admin_create_identity(admin_create_identity_body=body)
obj.kratos_id = kratos_obj.id
db.session.add(obj)
db.session.commit()
......@@ -173,9 +205,9 @@ def delete_user(email):
db.session.delete(obj)
# if obj.kratos_id:
#
#
# TODO:
# - if delete succesfull, also delete kratos user, if exists
# - probably user roles need to be deleted before user can be deleted
db.session.commit()
app.logger.info("Success")
......@@ -258,182 +290,8 @@ app.cli.add_command(user_cli)
# WEB ROUTES #
##############################################################################
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Provides login form and handles login attempt
Args:
login_form: contains login data submitted by a user (POST)
challenge: id that identifies the request from oauth client. passed by hydra
Returns:
Error page if no challenge id is present
or Login Form if user hasn't authenticated
or redirect to callback url provided by hydra if login was successful
"""
login_form = LoginForm()
redirect_to = None
# TODO: Empty/short passwors is a form Error
challenge = None
# Retrieve the challenge id from the request. Depending on the method it is
# saved in the form (POST) or in a GET variable. If this variable is not set
# we can not continue.
if request.method == 'GET':
challenge = request.args.get("login_challenge")
if request.method == 'POST':
challenge = request.args.post("login_challenge")
if not challenge:
app.logger.error("No challange given. Error in request")
abort(404)
# Now that we have the challenge id, we can request the challenge object from the hydra
# admin API
try:
login_request = HYDRA.login_request(challenge)
except hydra_client.exceptions.NotFound:
app.logger.error("Not Found. Login request not found. challenge={0}".format(challenge))
abort(404)
except hydra_client.exceptions.HTTPError:
app.logger.error("Conflict. Login request has been used already. challenge={0}".format(challenge))
abort(503)
# Skip, if true, let's us know that Hydra has already successfully authenticated
# the user. we don't need to check anything and we can accept the request right away.
if login_request.skip:
app.logger.info("{0} is already logged in. Skip authentication".format(login_request.subject))
return redirect(login_request.accept(login_request.subject))
#skip = request.args.get("skip")
#logout = request.args.get("logout")
#if skip:
# app.logger.info("{0} is already logged in. Skip authentication".format(login_request.subject))
# return redirect(login_request.accept(login_request.subject))
#elif logout:
# login_form.challenge.data = challenge
# HYDRA.invalidate_login_sessions(login_request.subject);
# return redirect(login_request.reject(
# "Login cancelled",
# error_description="Login was cancelled and user session was terminated"))
#else:
# return render_template('skip.html', challenge=challenge, logo=login_request.client.logo_uri, application_name=login_request.client.client_name, username=login_request.subject)
else:
app.logger.info("Not yet logged in. Requesting login credentials")
if "ory_kratos_session" not in request.cookies:
app.logger.info("No kratos cookie, get kratos cookie")
kratos = ory_kratos_client.ApiClient(app.config["KRATOS_PUBLIC_URL"])
login_flow = LoginFlow(app.config["KRATOS_PUBLIC_URL"])
return_to = f"{app.config['PUBLIC_URL']}/login?login_challenge={challenge}"
# We need to decide here whether we want to accept or decline the login request.
# if a login form was submitted, we need to confirm that the userdata, the agent
# send us via POST is valid
if login_form.validate_on_submit():
# Load users
try:
user = User(login_form.username.data)
except KratosError as error:
app.logger.error(
"User does not exists {0}".format(error))
return redirect(login_request.reject(
"Login denied",
error_description="Login request was denied due to an internal server error"))
# Authenticate / TOTP?
if user.authenticate(login_form.password.data):
redirect_to = login_request.accept(
user.username,
remember=login_form.remember.data,
# Remember session for 12h
remember_for=60*60*12)
app.logger.info("{0} logged in successfully".format(user.username))
else:
# Should be internal
redirect_to = login_request.reject(
"Login denied.",
error_description="Invalid username or password")
app.logger.info("{0} failed to login".format(user.username))
return redirect(redirect_to)
# If Skip is not true and the user has not submitted any data via a form, we need
# to display a login form for the user to type in their username and password.
# as a reference we save the challenge id in a hidden field of the form.
else:
login_form.challenge.data = challenge
return render_template('login.html', login_form=login_form, logo=login_request.client.logo_uri, application_name=login_request.client.client_name)
@app.route('/consent', methods=['GET', 'POST'])
def consent():
"""Checks user app permission
Checks user app permission by loading a consent object via the Hydra admin API and
validating that the user triggering the request has sufficient permissions by querying
the GraphQL API. If the user is allowed to use the app the request is accepted and openID
claims are sent to Hydra.
Args:
consent_challenge: Reference to a consent challenge object in form of an alphanumeric
String. Can be used to retrieve the consent challenge object via the Hydra Admin API (GET)
Returns:
Redirect to the url that is provided by the consent challenge object.
"""
challenge = request.args.get("consent_challenge")
if not challenge:
abort(403)
try:
consent_request = HYDRA.consent_request(challenge)
except hydra_client.exceptions.NotFound:
app.logger.error("Not Found. Consent request not found. challenge={0}".format(challenge))
abort(404)
except hydra_client.exceptions.HTTPError:
app.logger.error("Conflict. Consent request has been used already. challenge={0}".format(challenge))
abort(503)
app_name = consent_request.client.client_name
username = consent_request.subject
app.logger.info("Providing consent to %s for %s", (app_name, username))
# TODO: Create webroutes
try:
user = User(username)
except KarosError as error:
app.logger.error(
"Retrieving user object from GraphQL server failed {0}".format(error))
return redirect(consent_request.reject(
"Permission denied",
error_description="Login request was denied due to an internal server error"))
#access_granted = user.has_app_permission(app_name)
if True:
app.logger.info("{0} was granted access to {1}".format(username, app_name))
session = user.get_oauth_session()
return redirect(consent_request.accept(
grant_scope=consent_request.requested_scope,
grant_access_token_audience=consent_request.requested_access_token_audience,
session=session,
))
app.logger.info("{0} was denied access to {1}".format(username, app_name))
return redirect(consent_request.reject(
"Permission denied",
error_description="Login request was denied due to missing application permission"))
if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment