Skip to content
Snippets Groups Projects
app.py 4.58 KiB
Newer Older
Mark's avatar
Mark committed
from flask import abort, Flask, redirect, request, render_template
Mark's avatar
Mark committed
from os import urandom, environ
Mark's avatar
Mark committed
from hydra_client import HydraAdmin
Mark's avatar
Mark committed
import hydra_client
Mark's avatar
Mark committed
from db import User, BackendConnectionError
Mark's avatar
Mark committed
from forms import LoginForm
import logging
Mark's avatar
Mark committed

HYDRA_ADMIN_URL = environ['HYDRA_ADMIN_URL']
Mark's avatar
Mark committed
HYDRA = HydraAdmin(HYDRA_ADMIN_URL)
Mark's avatar
Mark committed

app = Flask(__name__)
Mark's avatar
Mark committed
app.config['SECRET_KEY'] = urandom(16)
Mark's avatar
Mark committed
app.debug = True if "FLASK_ENV" in environ and environ["FLASK_ENV"] == "development" else False
Mark's avatar
Mark committed
app.logger.setLevel(logging.INFO)
Mark's avatar
Mark committed

@app.route('/login', methods=['GET', 'POST'])
    """Provides login form and handles login attempt
Mark's avatar
Mark committed

    Args:
        login_form: contains login data submitted by a user (POST)
        challenge: id that identifies the request from oauth client. passed by hydra
Mark's avatar
Mark committed

    Returns:
Mark's avatar
Mark committed
        Error page if no challenge id is present
        or Login Form if user hasn't authenticated
        or redirect to callback url provided by hydra if login was successful
Mark's avatar
Mark committed
    """
    login_form = LoginForm()
Mark's avatar
Mark committed
    redirect_to = None

    # Retrieve the challenge id from the request. Depending on the method it is saved in the
    # form (POST) or in a GET variable.
    if request.method == 'GET':
        challenge = request.args.get("login_challenge")
        if not challenge:
            return abort(400)
Mark's avatar
Mark committed
    elif login_form.validate_on_submit():
        challenge = login_form.challenge.data
Mark's avatar
Mark committed
    # Now that we have the challenge id, we can request the challenge object from the hydra
    # admin API
Mark's avatar
Mark committed
    try:
Mark's avatar
Mark committed
        login_request = HYDRA.login_request(challenge)
Mark's avatar
Mark committed
    except hydra_client.exceptions.NotFound:
        app.logger.error("Not Found. Login request not found. challenge={0}".format(challenge))
        abort(404)
    except hydra_client.exceptions.HTTPError:
        app.logger.error("Conflict. Login request has been used already. challenge={0}".format(challenge))
        abort(503)
Mark's avatar
Mark committed
    # We need to decide here whether we want to accept or decline the login request.
    # if a login form was submitted, we need to confirm that the userdata, the agent
    # send us via POST is valid
Mark's avatar
Mark committed
    if login_form.validate_on_submit():
Mark's avatar
Mark committed
        try:
            user = User(login_form.username.data)
        except BackendConnectionError as error:
            app.logger.error(
                "Retrieving user object from GraphQL server failed {0}".format(error))
            return redirect(login_request.reject(
                "Login denied",
                error_description="Login request was denied due to an internal server error"))
Mark's avatar
Mark committed
        if user.authenticate(login_form.password.data):
Mark's avatar
Mark committed
            redirect_to = login_request.accept(
                user.username,
                remember=login_form.remember.data,
                # Remember session for 12h
Mark's avatar
Mark committed
                remember_for=60*60*12)
Mark's avatar
Mark committed
            app.logger.info("{0} logged in successfully".format(user.username))
        else:
Mark's avatar
Mark committed
            redirect_to = login_request.reject(
                "Login denied",
                error_description="Invalid username or password")
            app.logger.info("{0} failed to login".format(user.username))
Mark's avatar
Mark committed
        return redirect(redirect_to)
Mark's avatar
Mark committed

    # Skip, if true, let's us know that Hydra has already successfully authenticated
    # the user. we don't need to check anything and we can accept the request right away.
    elif login_request.skip:
        skip = request.args.get("skip")
        logout = request.args.get("logout")
        if skip:
            app.logger.info("{0} is already logged in. Skip authentication".format(login_request.subject))
            return redirect(login_request.accept(login_request.subject))
        elif logout:
            login_form.challenge.data = challenge
            HYDRA.invalidate_login_sessions(login_request.subject);
            return redirect(login_request.reject(
                "Login cancelled",
Mark's avatar
Mark committed
                error_description="Login was cancelled and user session was terminated"))
        else:
            return render_template('skip.html', challenge=challenge, logo=login_request.client.logo_uri, application_name=login_request.client.client_name, username=login_request.subject)


Mark's avatar
Mark committed

    # If Skip is not true and the user has not submitted any data via a form, we need
    # to display a login form for the user to type in their username and password.
    # as a reference we save the challenge id in a hidden field of the form.
Mark's avatar
Mark committed
    else:
        login_form.challenge.data = challenge
Mark's avatar
Mark committed
        return render_template('login.html', login_form=login_form, logo=login_request.client.logo_uri, application_name=login_request.client.client_name)
Mark's avatar
Mark committed

if __name__ == '__main__':
    app.run()