diff --git a/login/app.py b/login/app.py index 72621bc13a544f65c903412af576f6eaef8358a1..8dd6440bfe75ed94f146924dd2d007d7caaffdc6 100644 --- a/login/app.py +++ b/login/app.py @@ -1,6 +1,6 @@ - +# Basic system imports import logging import os import click @@ -12,13 +12,13 @@ from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask.cli import AppGroup -# Hydra admin -import hydra_client -# Kratos ? -#from kratos import User, KratosError -#from forms import LoginForm +# Import modules for external APS + +# Hydra, OIDC Identity Provider +import hydra_client +# Kratos, Identity manager import ory_kratos_client from ory_kratos_client.api import metadata_api from ory_kratos_client.api import v0alpha2_api as kratos_api @@ -32,18 +32,23 @@ from ory_kratos_client.model.admin_create_self_service_recovery_link_body import from ory_kratos_client.model.identity_state import IdentityState -#from ory_kratos_client.api.public_api import PublicApi as KratosPublicApi -##from ory_kratos_client.api.admin_api import AdminApi as KratosAdminApi # Initaliaze the FLASK app -app = Flask(__name__) +app = Flask(__name__, + static_url_path='/static') # Load config app.config.from_object(os.environ['APP_SETTINGS']) -# Move to config? -app.logger.setLevel(logging.INFO) + +# Set right logging level based on config DEBUG flag +if (app.config['DEBUG']): + app.logger.setLevel(logging.INFO) +else + app.logger.setLevel(logging.ERROR) + + # Create HYDRA & KRATOS API interfaces HYDRA = hydra_client.HydraAdmin(app.config["HYDRA_ADMIN_URL"]) @@ -60,11 +65,15 @@ tmp = ory_kratos_client.Configuration(host=app.config["KRATOS_PUBLIC_URL"], KRATOS_PUBLIC = kratos_api.V0alpha2Api(ory_kratos_client.ApiClient(tmp)) +# Kratos API Example: +# A minimal use of the KRATOS is for example getting all identities as an admin # -#ids = KRATOS_ADMIN.admin_list_identities() -#print (ids) +# ids = KRATOS_ADMIN.admin_list_identities() +# print (ids) +# +# Kratos API documentation for python can be found here: +# https://github.com/ory/sdk/blob/master/clients/kratos/python/README.md # - # Create DB & migrate interface @@ -74,6 +83,13 @@ migrate = Migrate(app, db) # Import models from models import User, App, AppRole +# +# WARNING: +# +# Below are very minimalistic calls to interfaces for development and testing +# purposed. Eventually this need to be moved to seperate files and more +# sophisticated calls with try{} catch{} claused etc. +# ############################################################################## # CLI INTERFACE # @@ -98,9 +114,10 @@ def create_app(slug, name): db.session.commit() + @app_cli.command('list') def list_app(): - app.logger.info("Listing apps") + app.logger.info("Listing configured apps") apps = App.query.all() for obj in apps: @@ -117,6 +134,9 @@ def delete_app(slug): app.logger.info("Not found") return + + # TODO: Deleting will (propably) fail is there are still roles attached + # these probobly need to be cleaned up first. db.session.delete(obj) db.session.commit() app.logger.info("Success") @@ -137,6 +157,15 @@ def create_user(email): obj.email = email + # TODO: + # - Should check if database entry already exists. If so, double check if + # kratos user exists. If not, create that one. + # + # - If no DB user, should check with kratos of user already exists. If so, + # throw warning, but still create DB entrie + # + # - After creating kratos user, check if success, otherwise throw warning. + body = AdminCreateIdentityBody( schema_id="default", traits={'email':email}, @@ -146,7 +175,10 @@ def create_user(email): kratos_obj = KRATOS_ADMIN.admin_create_identity(admin_create_identity_body=body) + obj.kratos_id = kratos_obj.id + + db.session.add(obj) db.session.commit() @@ -173,9 +205,9 @@ def delete_user(email): db.session.delete(obj) -# if obj.kratos_id: -# -# + # TODO: + # - if delete succesfull, also delete kratos user, if exists + # - probably user roles need to be deleted before user can be deleted db.session.commit() app.logger.info("Success") @@ -258,182 +290,8 @@ app.cli.add_command(user_cli) # WEB ROUTES # ############################################################################## -@app.route('/login', methods=['GET', 'POST']) -def login(): - """Provides login form and handles login attempt - - Args: - login_form: contains login data submitted by a user (POST) - challenge: id that identifies the request from oauth client. passed by hydra - - Returns: - Error page if no challenge id is present - or Login Form if user hasn't authenticated - or redirect to callback url provided by hydra if login was successful - """ - login_form = LoginForm() - redirect_to = None - - - # TODO: Empty/short passwors is a form Error - challenge = None - - # Retrieve the challenge id from the request. Depending on the method it is - # saved in the form (POST) or in a GET variable. If this variable is not set - # we can not continue. - if request.method == 'GET': - challenge = request.args.get("login_challenge") - if request.method == 'POST': - challenge = request.args.post("login_challenge") - - if not challenge: - app.logger.error("No challange given. Error in request") - abort(404) - - - # Now that we have the challenge id, we can request the challenge object from the hydra - # admin API - try: - login_request = HYDRA.login_request(challenge) - except hydra_client.exceptions.NotFound: - app.logger.error("Not Found. Login request not found. challenge={0}".format(challenge)) - abort(404) - except hydra_client.exceptions.HTTPError: - app.logger.error("Conflict. Login request has been used already. challenge={0}".format(challenge)) - abort(503) - - - - # Skip, if true, let's us know that Hydra has already successfully authenticated - # the user. we don't need to check anything and we can accept the request right away. - if login_request.skip: - app.logger.info("{0} is already logged in. Skip authentication".format(login_request.subject)) - return redirect(login_request.accept(login_request.subject)) - - #skip = request.args.get("skip") - #logout = request.args.get("logout") - #if skip: - # app.logger.info("{0} is already logged in. Skip authentication".format(login_request.subject)) - # return redirect(login_request.accept(login_request.subject)) - #elif logout: - # login_form.challenge.data = challenge - # HYDRA.invalidate_login_sessions(login_request.subject); - # return redirect(login_request.reject( - # "Login cancelled", - # error_description="Login was cancelled and user session was terminated")) - #else: - # return render_template('skip.html', challenge=challenge, logo=login_request.client.logo_uri, application_name=login_request.client.client_name, username=login_request.subject) - else: - app.logger.info("Not yet logged in. Requesting login credentials") - - - - - if "ory_kratos_session" not in request.cookies: - app.logger.info("No kratos cookie, get kratos cookie") - kratos = ory_kratos_client.ApiClient(app.config["KRATOS_PUBLIC_URL"]) - - - login_flow = LoginFlow(app.config["KRATOS_PUBLIC_URL"]) - return_to = f"{app.config['PUBLIC_URL']}/login?login_challenge={challenge}" - - # We need to decide here whether we want to accept or decline the login request. - # if a login form was submitted, we need to confirm that the userdata, the agent - # send us via POST is valid - if login_form.validate_on_submit(): - - # Load users - try: - user = User(login_form.username.data) - except KratosError as error: - app.logger.error( - "User does not exists {0}".format(error)) - return redirect(login_request.reject( - "Login denied", - error_description="Login request was denied due to an internal server error")) - - # Authenticate / TOTP? - if user.authenticate(login_form.password.data): - redirect_to = login_request.accept( - user.username, - remember=login_form.remember.data, - # Remember session for 12h - remember_for=60*60*12) - app.logger.info("{0} logged in successfully".format(user.username)) - else: - # Should be internal - redirect_to = login_request.reject( - "Login denied.", - error_description="Invalid username or password") - app.logger.info("{0} failed to login".format(user.username)) - return redirect(redirect_to) - - - # If Skip is not true and the user has not submitted any data via a form, we need - # to display a login form for the user to type in their username and password. - # as a reference we save the challenge id in a hidden field of the form. - else: - login_form.challenge.data = challenge - return render_template('login.html', login_form=login_form, logo=login_request.client.logo_uri, application_name=login_request.client.client_name) - - - -@app.route('/consent', methods=['GET', 'POST']) -def consent(): - """Checks user app permission - - Checks user app permission by loading a consent object via the Hydra admin API and - validating that the user triggering the request has sufficient permissions by querying - the GraphQL API. If the user is allowed to use the app the request is accepted and openID - claims are sent to Hydra. - - Args: - consent_challenge: Reference to a consent challenge object in form of an alphanumeric - String. Can be used to retrieve the consent challenge object via the Hydra Admin API (GET) - - Returns: - Redirect to the url that is provided by the consent challenge object. - """ - challenge = request.args.get("consent_challenge") - if not challenge: - abort(403) - try: - consent_request = HYDRA.consent_request(challenge) - except hydra_client.exceptions.NotFound: - app.logger.error("Not Found. Consent request not found. challenge={0}".format(challenge)) - abort(404) - except hydra_client.exceptions.HTTPError: - app.logger.error("Conflict. Consent request has been used already. challenge={0}".format(challenge)) - abort(503) - app_name = consent_request.client.client_name - username = consent_request.subject - - app.logger.info("Providing consent to %s for %s", (app_name, username)) +# TODO: Create webroutes - try: - user = User(username) - except KarosError as error: - app.logger.error( - "Retrieving user object from GraphQL server failed {0}".format(error)) - return redirect(consent_request.reject( - "Permission denied", - error_description="Login request was denied due to an internal server error")) - - - #access_granted = user.has_app_permission(app_name) - - if True: - app.logger.info("{0} was granted access to {1}".format(username, app_name)) - session = user.get_oauth_session() - return redirect(consent_request.accept( - grant_scope=consent_request.requested_scope, - grant_access_token_audience=consent_request.requested_access_token_audience, - session=session, - )) - app.logger.info("{0} was denied access to {1}".format(username, app_name)) - return redirect(consent_request.reject( - "Permission denied", - error_description="Login request was denied due to missing application permission")) if __name__ == '__main__':