Skip to content
Snippets Groups Projects
Verified Commit 92f39e07 authored by Mark's avatar Mark
Browse files

Migrate consent provider to GRAPHQL

parent 18deafc4
No related branches found
No related tags found
1 merge request!4Change consent provider backend to graphql
Pipeline #296 passed with stages
in 5 minutes and 20 seconds
from flask import abort, Flask, redirect, render_template, request from flask import abort, Flask, redirect, request
from flask.views import View from flask.views import View
from os import urandom, environ from os import urandom, environ
from hydra_client import HydraAdmin from hydra_client import HydraAdmin
from flask_wtf import FlaskForm from db import User
from wtforms import SubmitField, HiddenField
from flask_wtf.csrf import CSRFProtect
from wtforms.validators import DataRequired
HYDRA_ADMIN_URL = environ['HYDRA_ADMIN_URL'] HYDRA_ADMIN_URL = environ['HYDRA_ADMIN_URL']
class ConsentForm(FlaskForm):
accept = SubmitField("accept")
challenge = HiddenField("challenge")
class ConsentView(View):
methods = "GET", "POST"
def render_form(self, form, **context):
return render_template("consent.html", form=form, **context)
def dispatch_request(self):
hydra = HydraAdmin(HYDRA_ADMIN_URL)
form = ConsentForm()
challenge = request.args.get("consent_challenge") or form.challenge.data
if not challenge:
abort(400)
consent_request = hydra.consent_request(challenge)
session = {
"access_token": {},
"id_token": {
"sub": "248289761010",
"name": "Example User",
"given_name": "Example",
"family_name": "User",
"preferred_username": "example",
"email": "example@oas.example.com",
"picture": "",
},
}
if request.method == "GET":
return self.get(form, consent_request, session)
elif request.method == "POST":
return self.post(form, consent_request, session)
abort(405)
def get(self, form, consent_request, session):
if consent_request.skip:
redirect_to = consent_request.accept(
grant_scope=consent_request.requested_scope,
grant_access_token_audience=consent_request.requested_access_token_audience,
session=session,
)
return redirect(redirect_to)
else:
form.challenge.data = consent_request.challenge
return self.render_form(
form, user=consent_request.subject, client=consent_request.client
)
def post(self, form, consent_request, session):
if form.validate():
if form.accept.data:
redirect_to = consent_request.accept(
grant_scope=consent_request.requested_scope,
grant_access_token_audience=consent_request.requested_access_token_audience,
session=session,
remember=False
)
else:
redirect_to = consent_request.reject(error="user_decline")
return redirect(redirect_to)
else:
# TODO: show error message
pass
return self.render_form(form)
app = Flask(__name__) app = Flask(__name__)
app.secret_key = urandom(16)
csrf = CSRFProtect(app)
app.add_url_rule("/consent", view_func=ConsentView.as_view("consent")) @app.route('/', methods=['GET'])
def home():
hydra = HydraAdmin(HYDRA_ADMIN_URL)
challenge = request.args.get("consent_challenge") or form.challenge.data
if not challenge:
abort(400)
consent_request = hydra.consent_request(challenge)
username = consent_request.client.client_name
app_name = consent_request.client.owner
user = User(username)
access_granted = user.has_app_permission(app_name)
if access_granted:
session = user.get_oauth_session()
return redirect(consent_request.accept(
grant_scope=consent_request.requested_scope,
grant_access_token_audience=consent_request.requested_access_token_audience,
session=session,
))
abort(400)
if __name__ == '__main__':
app.run()
from os import environ
from hydra_client import HydraAdmin
from graphqlclient import GraphQLClient
from json import loads
GRAPHQL_URL = environ['GRAPHQL_URL']
graphql_client = GraphQLClient(GRAPHQL_URL)
class User():
def __init__(self, username):
self.username = username
self._load_remote_user_info()
def _load_remote_user_info(self):
querystring = '''{{
getUser(username: "{0}"){{
email,
applications{{
edges{{
node{{
name
}}
}}
}}
}}}}'''.format(self.username).strip()
result = loads(graphql_client.execute(querystring))
if "data" in result:
data = result["data"]["getUser"]
self.applications = list(map(lambda x: x["node"]["name"],
data["applications"]["edges"]))
self.email = data["email"]
def has_app_permission(self, appname):
return appname in self.applicaions
def get_oauth_session(self):
return {
"access_token": {},
"id_token": {
"name": self.username,
"preferred_username": self.username,
"email" : self.email,
"picture": ""}
}
Flask==1.1.1 Flask
flask-wtf==0.14.2 hydra-client
hydra-client==0.4.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment