from flask import abort, Flask, redirect, request, render_template from os import urandom, environ from hydra_client import HydraAdmin import hydra_client from db import User, BackendConnectionError from forms import LoginForm import logging HYDRA_ADMIN_URL = environ['HYDRA_ADMIN_URL'] HYDRA = HydraAdmin(HYDRA_ADMIN_URL) app = Flask(__name__) app.config['SECRET_KEY'] = urandom(16) app.debug = True if "FLASK_ENV" in environ and environ["FLASK_ENV"] == "development" else False app.logger.setLevel(logging.INFO) @app.route('/login', methods=['GET', 'POST']) def login(): """Provides login form and handles login attempt Args: login_form: contains login data submitted by a user (POST) challenge: id that identifies the request from oauth client. passed by hydra Returns: Error page if no challenge id is present or Login Form if user hasn't authenticated or redirect to callback url provided by hydra if login was successful """ login_form = LoginForm() redirect_to = None # Retrieve the challenge id from the request. Depending on the method it is saved in the # form (POST) or in a GET variable. if request.method == 'GET': challenge = request.args.get("login_challenge") if not challenge: return abort(400) elif login_form.validate_on_submit(): challenge = login_form.challenge.data # Now that we have the challenge id, we can request the challenge object from the hydra # admin API try: login_request = HYDRA.login_request(challenge) except hydra_client.exceptions.NotFound: app.logger.error("Not Found. Login request not found. challenge={0}".format(challenge)) abort(404) except hydra_client.exceptions.HTTPError: app.logger.error("Conflict. Login request has been used already. challenge={0}".format(challenge)) abort(503) # We need to decide here whether we want to accept or decline the login request. # if a login form was submitted, we need to confirm that the userdata, the agent # send us via POST is valid if login_form.validate_on_submit(): try: user = User(login_form.username.data) except BackendConnectionError as error: app.logger.error( "Retrieving user object from GraphQL server failed {0}".format(error)) return redirect(login_request.reject( "Login denied", error_description="Login request was denied due to an internal server error")) if user.authenticate(login_form.password.data): redirect_to = login_request.accept( user.username, remember=login_form.remember.data, # Remember session for 12h remember_for=60*60*12) app.logger.info("{0} logged in successfully".format(user.username)) else: redirect_to = login_request.reject( "Login denied", error_description="Invalid username or password") app.logger.info("{0} failed to login".format(user.username)) return redirect(redirect_to) # Skip, if true, let's us know that Hydra has already successfully authenticated # the user. we don't need to check anything and we can accept the request right away. elif login_request.skip: skip = request.args.get("skip") logout = request.args.get("logout") if skip: app.logger.info("{0} is already logged in. Skip authentication".format(login_request.subject)) return redirect(login_request.accept(login_request.subject)) elif logout: login_form.challenge.data = challenge HYDRA.invalidate_login_sessions(login_request.subject); return redirect(login_request.reject( "Login cancelled", error_description="Login was cancelled and user session was terminated")) else: return render_template('skip.html', challenge=challenge, logo=login_request.client.logo_uri, application_name=login_request.client.client_name, username=login_request.subject) # If Skip is not true and the user has not submitted any data via a form, we need # to display a login form for the user to type in their username and password. # as a reference we save the challenge id in a hidden field of the form. else: login_form.challenge.data = challenge return render_template('login.html', login_form=login_form, logo=login_request.client.logo_uri, application_name=login_request.client.client_name) if __name__ == '__main__': app.run()