Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • xeruf/dashboard
  • stackspin/dashboard
2 results
Show changes
Showing
with 1588 additions and 0 deletions
# from config import KRATOS_ADMIN_URL
# from database import db
# from kubernetes.client import CustomObjectsApi
# import re
import requests
from requests.exceptions import ConnectionError
from flask import current_app
# import helpers.kubernetes
class ResourcesService:
@classmethod
def get_resources(cls):
# custom = CustomObjectsApi()
# raw_nodes = custom.list_cluster_custom_object('metrics.k8s.io', 'v1beta1', 'nodes')
# raw_pods = custom.list_cluster_custom_object('metrics.k8s.io', 'v1beta1', 'pods')
# nodes = []
# for node in raw_nodes["items"]:
# nodes.append({
# "name": node["metadata"]["name"],
# "cpu_raw": node["usage"]["cpu"],
# "cpu": cls.parse_cpu(node["usage"]["cpu"]),
# "memory_raw": node["usage"]["memory"],
# "memory_used": cls.parse_memory(node["usage"]["memory"]),
# })
try:
cores = cls.get_prometheus('machine_cpu_cores')
current_app.logger.info(f"Number of cores: {cores}")
result = {
"success": True,
# Number of cores in use. So average usage times number of cores.
"cpu": cores * cls.get_prometheus('1 - (avg by(instance) (rate(node_cpu_seconds_total{mode="idle"}[8m])))', 'float'),
"cpu_total": cores,
# "memory_raw": node["usage"]["memory"],
# "memory_used": cls.parse_memory(node["usage"]["memory"]),
"memory_total": cls.get_prometheus('machine_memory_bytes'),
"memory_available": cls.get_prometheus('node_memory_MemAvailable_bytes'),
"disk_free": cls.get_prometheus('node_filesystem_free_bytes{mountpoint="/"}'),
"disk_total": cls.get_prometheus('node_filesystem_size_bytes{mountpoint="/"}'),
}
except ConnectionError:
return {
"success": False,
"message": "Could not contact prometheus; perhaps monitoring is not enabled.",
}
return result
# @staticmethod
# def parse_cpu(s):
# result = re.match(r"^(\d+)([mun]?)$", s)
# if result is None:
# raise Exception("cpu data does not match known patterns")
# number = result.group(1)
# suffix = result.group(2)
# multipliers = {"": 1, "m": 1e-3, "u": 1e-6, "n": 1e-9}
# return (int(number) * multipliers[suffix])
# @staticmethod
# def parse_memory(s):
# result = re.match(r"^(\d+)(|Ki|Mi|Gi)$", s)
# if result is None:
# raise Exception("memory data does not match known patterns")
# number = result.group(1)
# suffix = result.group(2)
# multipliers = {"": 1, "Ki": 1024, "Mi": 1024*1024, "Gi": 1024*1024*1024}
# return (int(number) * multipliers[suffix])
@staticmethod
def get_prometheus(query, cast='int'):
try:
params = {
"query": query,
}
result = requests.get("http://kube-prometheus-stack-prometheus:9090/api/v1/query", params=params)
current_app.logger.info(query)
current_app.logger.info(result.json())
value = result.json()["data"]["result"][0]["value"][1]
except AttributeError:
return None
if cast == 'float':
converted = float(value)
else:
converted = int(value)
return converted
# "pods": {
# "apiVersion": "metrics.k8s.io/v1beta1",
# "items": [
# {
# "containers": [
# {
# "name": "traffic-manager",
# "usage": {
# "cpu": "2839360n",
# "memory": "24696Ki"
# }
# }
# ],
# "metadata": {
# "creationTimestamp": "2023-11-30T15:10:10Z",
# "labels": {
# "app": "traffic-manager",
# "pod-template-hash": "5cd7cc7fd6",
# "telepresence": "manager"
# },
# "name": "traffic-manager-5cd7cc7fd6-mp7td",
# "namespace": "ambassador"
# },
# "timestamp": "2023-11-30T15:10:00Z",
# "window": "12.942s"
# },
from .roles import *
from .models import *
from sqlalchemy import Integer, String
from database import db
class Role(db.Model):
ADMIN_ROLE_ID = 1
USER_ROLE_ID = 2
NO_ACCESS_ROLE_ID = 3
id = db.Column(Integer, primary_key=True)
name = db.Column(String(length=64))
def __str__(self):
return f"Role {self.name}"
from areas.apps.models import App, AppRole
from .models import Role
class RoleService:
@staticmethod
def get_roles():
roles = Role.query.all()
return [{"id": r.id, "name": r.name} for r in roles]
@staticmethod
def get_role_by_id(role_id):
return Role.query.filter_by(id=role_id).first()
@staticmethod
def is_user_admin(userId):
dashboard_app_id = App.query.filter_by(slug='dashboard').first().id
dashboard_role_id = AppRole.query.filter_by(user_id=userId, app_id=dashboard_app_id).first().role_id
return dashboard_role_id == 1
from flask import jsonify, request
from flask_jwt_extended import jwt_required
from flask_cors import cross_origin
from areas import api_v1
from .role_service import RoleService
@api_v1.route("/roles", methods=["GET"])
@jwt_required()
@cross_origin()
def get_roles():
roles = RoleService.get_roles()
return jsonify(roles)
from .models import *
from .tags import *
from sqlalchemy import Integer, String
from database import db
class Tag(db.Model):
id = db.Column(Integer, primary_key=True)
name = db.Column(String(length=256))
colour = db.Column(String(length=64))
def __str__(self):
return f"Tag {self.slug}"
class TagUser(db.Model):
__tablename__ = "tag_user"
user_id = db.Column(String(length=64), primary_key=True)
tag_id = db.Column(Integer, primary_key=True)
def __str__(self):
return f"TagUser, with tag_id {self.tag_id}, user_id {self.user_id}"
from database import db
from .models import Tag
class TagService:
@staticmethod
def get_tags():
tags = Tag.query.all()
return [{"id": t.id, "name": t.name, "colour": t.colour} for t in tags]
@staticmethod
def create_tag(data):
tag = Tag(name=data["name"], colour=data.get("colour"))
db.session.add(tag)
db.session.commit()
return {"id": tag.id, "name": tag.name, "colour": tag.colour}
@staticmethod
def update_tag(id, data):
tag = Tag.query.get(id)
for key, value in data.items():
if hasattr(tag, key):
setattr(tag, key, value)
db.session.commit()
@staticmethod
def delete_tag(id):
tag = Tag.query.get(id)
db.session.delete(tag)
db.session.commit()
return {"message": "Tag deleted successfully."}
from flask import jsonify, request
from flask_cors import cross_origin
from flask_expects_json import expects_json
from flask_jwt_extended import jwt_required
from areas import api_v1
from helpers.auth_guard import admin_required
from .tag_service import TagService
from .validation import schema
@api_v1.route("/tags", methods=["GET"])
@jwt_required()
@cross_origin()
@admin_required()
def get_tags():
result = TagService.get_tags()
return jsonify(result)
@api_v1.route("/tags", methods=["POST"])
@jwt_required()
@cross_origin()
@expects_json(schema)
@admin_required()
def post_tag():
data = request.get_json()
tag = TagService.create_tag(data)
return jsonify(tag)
@api_v1.route("/tags/<int:id>", methods=["PUT"])
@jwt_required()
@cross_origin()
@expects_json(schema)
@admin_required()
def put_tag(id):
data = request.get_json()
result = TagService.update_tag(id, data)
return jsonify(message="Tag updated successfully.")
@api_v1.route("/tags/<int:id>", methods=["DELETE"])
@jwt_required()
@cross_origin()
@admin_required()
def delete_tag(id):
TagService.delete_tag(id)
return jsonify(message="Tag deleted successfully.")
import re
schema = {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name describing the tag.",
"minLength": 1,
},
"colour": {
"type": "string",
"description": "Colour that may be used when displaying the tag.",
},
},
"required": ["name"],
}
from .users import *
from .user_service import *
from areas.apps.models import App, AppRole
from areas.roles.models import Role
from areas.tags.models import TagUser
from database import db
from helpers import KratosApi
class User():
@staticmethod
def get_all():
page = 0
userList = []
# Get all associated user data (Stackspin roles, tags).
stackspinData = UserStackspinData()
while page >= 0:
if page == 0:
res = KratosApi.get("/admin/identities?per_page=1000").json()
else:
res = KratosApi.get("/admin/identities?per_page=1000&page={}".format(page)).json()
for r in res:
# Inject information from the `stackspin` database that's associated to this user.
r["stackspin_data"] = stackspinData.getData(r["id"])
userList.append(r)
if len(res) == 0:
page = -1
else:
page = page + 1
return userList
class UserStackspinData():
# TODO: we currently ignore the userID parameter, so we always get all
# associated information even if we only need it for a single user.
# That should be changed.
def __init__(self, userID=None):
self.dashboardRoles = self.__getDashboardRoles()
self.userTags = self.__getUserTags()
def getData(self, userID):
stackspinData = {}
dashboardRole = self.dashboardRoles.get(userID)
if dashboardRole is not None:
stackspinData["stackspin_admin"] = dashboardRole == Role.ADMIN_ROLE_ID
# Also, user tags.
stackspinData["tags"] = self.userTags.get(userID, [])
return stackspinData
@staticmethod
def setTags(userID, tags):
# Delete all existing tags, because the new set of tags is interpreted
# to overwrite the previous set.
db.session.query(TagUser).filter(TagUser.user_id == userID).delete()
# Now create an entry for every tag in the new list.
for tagID in tags:
tagUser = TagUser(user_id=userID, tag_id=tagID)
db.session.add(tagUser)
@staticmethod
def __getDashboardRoles():
dashboardRoles = {}
for appRole, app in (
db.session.query(AppRole, App)
.filter(AppRole.app_id == App.id)
.filter(App.slug == "dashboard")
.all()
):
dashboardRoles[appRole.user_id] = appRole.role_id
return dashboardRoles
@staticmethod
def __getUserTags():
userTags = {}
for tagUser in db.session.query(TagUser).all():
if tagUser.user_id in userTags:
userTags[tagUser.user_id].append(tagUser.tag_id)
else:
userTags[tagUser.user_id] = [tagUser.tag_id]
return userTags
import ory_kratos_client
from ory_kratos_client.models.json_patch \
import JsonPatch
from ory_kratos_client.models.update_recovery_flow_body \
import UpdateRecoveryFlowBody
from ory_kratos_client.models.update_recovery_flow_with_link_method \
import UpdateRecoveryFlowWithLinkMethod
from ory_kratos_client.api import frontend_api, identity_api
from datetime import datetime
import time
from flask import current_app
from .models import User, UserStackspinData
from config import KRATOS_ADMIN_URL
from database import db
from areas.apps.models import App, AppRole, ProvisionStatus
from areas.apps.apps_service import AppsService
from areas.roles import Role
from helpers import KratosApi
from helpers.error_handler import KratosError
from helpers.provision import Provision
from helpers.threads import request_provision
kratos_admin_api_configuration = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL)
kratos_client = ory_kratos_client.ApiClient(kratos_admin_api_configuration)
kratos_frontend_api = frontend_api.FrontendApi(kratos_client)
kratos_identity_api = identity_api.IdentityApi(kratos_client)
class UserService:
@classmethod
def get_users(cls):
return User.get_all()
@classmethod
def get_user(cls, id):
res = KratosApi.get("/admin/identities/{}".format(id)).json()
res["stackspin_data"] = UserStackspinData(id).getData(id)
# TODO: this adds app roles to the kratos `traits` field, but I think
# that belongs under `stackspin_data`.
return cls.__insertAppRoleToUser(id, res)
@staticmethod
def create_recovery_link(id):
kratos_data = {
"identity_id": id
}
res = KratosApi.post("/admin/recovery/link", kratos_data).json()
return res
@classmethod
def post_user(cls, data):
kratos_data = {
"schema_id": "default",
"traits": {
"name": data["name"],
"email": data["email"],
},
}
res = KratosApi.post("/admin/identities", kratos_data).json()
if data["app_roles"]:
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole(
user_id=res["id"],
role_id=ar["role_id"] if "role_id" in ar else Role.NO_ACCESS_ROLE_ID,
app_id=app.id,
)
db.session.add(app_role)
else:
all_apps = AppsService.get_all_apps()
for app in all_apps:
app_role = AppRole(
user_id=res["id"],
role_id=Role.NO_ACCESS_ROLE_ID,
app_id=app.id,
)
db.session.add(app_role)
if data.get("tags") is not None:
UserStackspinData.setTags(res["id"], data["tags"])
# Commit all changes to the stackspin database.
db.session.commit()
request_provision()
# We start a recovery flow immediately after creating the
# user, so the user can set their initial password.
cls.__start_recovery_flow(data["email"])
return UserService.get_user(res["id"])
@staticmethod
def reset_totp(id):
KratosApi.delete("/admin/identities/{}/credentials/totp".format(id))
@staticmethod
def reset_webauthn(id):
KratosApi.delete("/admin/identities/{}/credentials/webauthn".format(id))
@staticmethod
def __start_recovery_flow(email):
"""
Start a Kratos recovery flow for the user's email address.
This sends out an email to the user that explains to them how they can
set their password. Make sure the user exists inside Kratos before you
use this function.
:param email: Email to send recovery link to
:type email: str
"""
api_response = kratos_frontend_api.create_native_recovery_flow()
flow = api_response.id
# Submit the recovery flow to send an email to the new user.
update_recovery_flow_body = \
UpdateRecoveryFlowBody(UpdateRecoveryFlowWithLinkMethod(method="link", email=email))
api_response = kratos_frontend_api.update_recovery_flow(flow,
update_recovery_flow_body=update_recovery_flow_body)
@classmethod
def put_user(cls, id, data):
# Get the old version of the identity. We need that for comparison to
# see if some attributes are changed by our update.
old_user = KratosApi.get("/admin/identities/{}".format(id)).json()
old_name = old_user["traits"].get("name", "")
new_name = data.get("name", "")
# Create list of patches with our changes.
patches = []
patches.append(JsonPatch(op="replace", path="/traits/email", value=data['email']))
patches.append(JsonPatch(op="replace", path="/traits/name", value=new_name))
# Determine whether we're really changing the name, and if so record
# that fact in the database in the form of a ScimAttribute. We'll use
# that information later during provisioning via SCIM.
if old_name != new_name:
current_app.logger.info(f"Name changed for: {data['email']}")
current_app.logger.info(f" old name: {old_name}")
current_app.logger.info(f" new name: {new_name}")
Provision.store_attribute(attribute='name', user_id=id)
# We used a PUT before, but that deletes any attributes that we don't
# specify, which is not so convenient. So we PATCH just the attributes
# we're changing instead.
kratos_identity_api.patch_identity(id, json_patch=patches)
if data["app_roles"]:
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
cls.set_user_role(id, app.id, ar["role_id"] if "role_id" in ar else None)
if data.get("tags") is not None:
UserStackspinData.setTags(id, data["tags"])
db.session.commit()
request_provision()
return cls.get_user(id)
@classmethod
def set_user_role(cls, user_id, app_id, role_id):
app_role = AppRole.query.filter_by(user_id=user_id, app_id=app_id).first()
if app_role:
# There is already a role set for this user and app, so we
# edit it.
app_role.role_id = role_id
# Mark the app role so the SCIM routine will pick it up at
# its next run.
app_role.provision_status = ProvisionStatus.SyncNeeded
else:
# There is no role set yet for this user and app, so we
# create a new one.
appRole = AppRole(
user_id=user_id,
role_id=role_id,
app_id=app_id,
)
db.session.add(appRole)
request_provision()
@staticmethod
def delete_user(id):
app_role = AppRole.query.filter_by(user_id=id).all()
for ar in app_role:
ar.provision_status = ProvisionStatus.ToDelete
db.session.commit()
request_provision()
@classmethod
def post_multiple_users(cls, data):
# check if data is array
# for every item in array call Kratos
created_users = []
existing_users = []
creation_failed_users = []
for user_data in data['users']:
user_email = user_data["email"]
if not user_email:
return
try:
cls.post_user(user_data)
current_app.logger.info(f"Batch create user: {user_email}")
created_users.append(user_email)
except KratosError as err:
status_code = err.args[1]
if status_code == 409:
existing_users.append(user_email)
elif status_code == 400:
creation_failed_users.append(user_email)
current_app.logger.error(
f"Exception calling Kratos: {err} on creating user: {user_email} {status_code}")
except Exception as error:
current_app.logger.error(
f"Exception: {error} on creating user: {user_email}")
creation_failed_users.append(user_email)
request_provision()
success_response = {}
existing_response = {}
failed_response = {}
if created_users:
success_response = {"users": created_users,
"message": f"{len(created_users)} users created"}
if existing_users:
existing_response = {
"users": existing_users, "message": f"{len(existing_users)} users already exist: {', '.join(existing_users)}"}
if creation_failed_users:
failed_response = {"users": creation_failed_users,
"message": f"{len(creation_failed_users)} users failed to create: {', '.join(creation_failed_users)}"}
return {"success": success_response, "existing": existing_response, "failed": failed_response}
@staticmethod
def recovery_complete(userID):
# Current unix time.
now = int(datetime.today().timestamp())
current_app.logger.info(f"Set last_recovery for {userID} to {now}")
patch = JsonPatch(op="replace", path="/metadata_admin/last_recovery", value=now)
kratos_identity_api.patch_identity(userID, json_patch=[patch])
@staticmethod
def login_complete(userID):
# Current unix time.
now = int(datetime.today().timestamp())
current_app.logger.info(f"Set last_login for {userID} to {now}")
patch = JsonPatch(op="replace", path="/metadata_admin/last_login", value=now)
kratos_identity_api.patch_identity(userID, json_patch=[patch])
@staticmethod
def after_user_registration(userID, registration_method):
dashboard_app = App.query.filter_by(slug="dashboard").first()
existing_app_role = AppRole.query.filter_by(app_id=dashboard_app.id, user_id=userID).first()
if existing_app_role is not None:
current_app.logger.info("Dashboard role already set, done.")
return
current_app.logger.info(f"Setting user dashboard role for {userID}.")
app_role = AppRole(
user_id=userID,
role_id=Role.USER_ROLE_ID,
app_id=dashboard_app.id,
)
db.session.add(app_role)
db.session.commit()
current_app.logger.info(f"Set registration_method for {userID} to {registration_method}")
patch = JsonPatch(op="replace", path="/metadata_admin/registration_method", value=registration_method)
kratos_identity_api.patch_identity(userID, json_patch=[patch])
@staticmethod
def __insertAppRoleToUser(userId, userRes):
apps = App.query.all()
app_roles = []
for app in apps:
# Only show role when installed
app_status = app.get_status()
if app_status.installed:
tmp_app_role = AppRole.query.filter_by(
user_id=userId, app_id=app.id
).first()
app_roles.append(
{
"name": app.slug,
"role_id": tmp_app_role.role_id if tmp_app_role else None,
}
)
userRes["traits"]["app_roles"] = app_roles
return userRes
from flask import current_app, jsonify, request
from flask_cors import cross_origin
from flask_expects_json import expects_json
from flask_jwt_extended import get_jwt, jwt_required
from areas import api_v1
from helpers import KratosApi
from helpers.auth_guard import admin_required, kratos_webhook
from .validation import schema, schema_multiple, schema_multi_edit, schema_recovery_complete
from .user_service import UserService
@api_v1.route("/users", methods=["GET"])
@jwt_required()
@cross_origin()
@admin_required()
def get_users():
res = UserService.get_users()
return jsonify(res)
@api_v1.route("/users/<string:id>", methods=["GET"])
@jwt_required()
@cross_origin()
@admin_required()
def get_user(id):
res = UserService.get_user(id)
return jsonify(res)
@api_v1.route("/users/<string:id>/recovery", methods=["POST"])
@jwt_required()
@cross_origin()
@admin_required()
def get_user_recovery(id):
res = UserService.create_recovery_link(id)
return jsonify(res)
@api_v1.route("/users/<string:id>/reset_totp", methods=["POST"])
@jwt_required()
@cross_origin()
@admin_required()
def reset_totp(id):
res = UserService.reset_totp(id)
return jsonify(res)
@api_v1.route("/users/<string:id>/reset_webauthn", methods=["POST"])
@jwt_required()
@cross_origin()
@admin_required()
def reset_webauthn(id):
res = UserService.reset_webauthn(id)
return jsonify(res)
# This is supposed to be called by Kratos as a webhook after a user has
# successfully recovered their account.
@api_v1.route("/users/recovery_complete", methods=["POST"])
@expects_json(schema_recovery_complete)
@kratos_webhook()
def recovery_complete():
data = request.get_json()
try:
UserService.recovery_complete(data["user_id"])
except Exception as e:
current_app.logger.warn(f"Exception in /users/recovery_complete: {e}")
raise
return jsonify(message="Updated last recovery time.")
# This is supposed to be called by Kratos as a webhook after a user has
# successfully logged in to their account.
@api_v1.route("/users/login_complete", methods=["POST"])
@expects_json(schema_recovery_complete)
@kratos_webhook()
def login_complete():
data = request.get_json()
try:
UserService.login_complete(data["user_id"])
except Exception as e:
current_app.logger.warn(f"Exception in /users/login_complete: {e}")
raise
return jsonify(message="Updated last login time.")
# This is supposed to be called by Kratos as a webhook after a user has
# been created. We can't add this to the dashboard code for creating users,
# because users can be created directly by Kratos as well in case of "social
# login" (upstream OIDC).
@api_v1.route("/users/after_registration", methods=["POST"])
@expects_json(schema_recovery_complete)
@kratos_webhook()
def after_registration():
data = request.get_json()
current_app.logger.warn(f"\"after registration\" webhook called with user id {data['user_id']}")
current_app.logger.warn(f"full webhook request: {data}")
UserService.after_user_registration(data["user_id"], data["registration_method"])
return jsonify(message="Performed post-registration tasks.")
@api_v1.route("/users", methods=["POST"])
@jwt_required()
@cross_origin()
@expects_json(schema)
@admin_required()
def post_user():
data = request.get_json()
res = UserService.post_user(data)
return jsonify(res)
@api_v1.route("/users/<string:id>", methods=["PUT"])
@jwt_required()
@cross_origin()
@expects_json(schema)
@admin_required()
def put_user(id):
data = request.get_json()
res = UserService.put_user(id, data)
return jsonify(res)
@api_v1.route("/users/<string:id>", methods=["DELETE"])
@jwt_required()
@cross_origin()
@admin_required()
def delete_user(id):
res = KratosApi.delete("/identities/{}".format(id))
if res.status_code == 204:
UserService.delete_user(id)
return jsonify(), res.status_code
return jsonify(res.json()), res.status_code
@api_v1.route("/users-batch", methods=["POST"])
@jwt_required()
@cross_origin()
@expects_json(schema_multiple)
@admin_required()
def post_multiple_users():
"""Expects an array of user JSON schema in request body."""
data = request.get_json()
res = UserService.post_multiple_users(data)
return jsonify(res)
@api_v1.route("/me", methods=["GET"])
@jwt_required()
@cross_origin()
def get_personal_info():
user_id = __get_user_id_from_jwt()
res = UserService.get_user(user_id)
return jsonify(res)
@api_v1.route("/me", methods=["PUT"])
@jwt_required()
@cross_origin()
@expects_json(schema)
def update_personal_info():
data = request.get_json()
user_id = __get_user_id_from_jwt()
res = UserService.put_user(user_id, data)
return jsonify(res)
def __get_user_id_from_jwt():
claims = get_jwt()
return claims["user_id"]
import re
schema = {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "Email of the user",
"pattern": r"(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|\"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])",
"minLength": 1,
},
"app_roles": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the app",
"minLength": 1,
},
"role_id": {
"type": ["integer", "null"],
"description": "Role of the user",
"minimum": 1,
},
},
"required": ["name", "role_id"],
},
},
"tags": {
"type": "array",
"items": {
"type": "integer",
},
},
},
"required": ["email", "app_roles"],
}
schema_multiple = {
"users": {
"type": "array",
"items": {
"$ref": schema
}
}
}
# Multiple app role edit of existing users
schema_multi_edit = {
"users": {
"type": "array",
"items" : {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "Email of the user",
"minLength": 1,
},
"id": {
"type": "string",
"description": "UUID of the user",
"minLength": 1,
},
"app_roles": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the app",
"minLength": 1,
},
"role_id": {
"type": ["integer", "null"],
"description": "Role of the user",
"minimum": 1,
},
},
# "required": ["name", "role_id"],
},
},
"tags": {
"type": "array",
"items": {
"type": "integer",
},
},
},
# "required": ["email", "app_roles"],
}
}
}
schema_recovery_complete = {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "Kratos ID of the user",
"minLength": 1,
},
},
"required": ["user_id"],
}
from flask import Blueprint
cli = Blueprint("cli", __name__)
from .cli import *
"""Flask application which provides the interface of a login panel. The
application interacts with different backend, like the Kratos backend for users,
Hydra for OIDC sessions and MariaDB for application and role specifications.
The application provides also several command line options to interact with
the user entries in the database(s)"""
import sys
import click
import datetime
import ory_kratos_client
from flask import current_app
from flask.cli import AppGroup
from ory_kratos_client.api import identity_api
from sqlalchemy import func
from config import HYDRA_ADMIN_URL, KRATOS_ADMIN_URL, KRATOS_PUBLIC_URL
from helpers import KratosUser
from cliapp import cli
from areas.apps.apps_service import AppsService
from areas.apps.models import AppRole, App
from areas.roles import Role
from areas.users import UserService
from database import db
# APIs
# Kratos has an admin and public end-point. We create an API for the admin one.
kratos_admin_api_configuration = ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL)
kratos_admin_client = ory_kratos_client.ApiClient(kratos_admin_api_configuration)
kratos_identity_api = identity_api.IdentityApi(kratos_admin_client)
##############################################################################
# CLI INTERFACE #
##############################################################################
# Define Flask CLI command groups and commands
user_cli = AppGroup("user")
app_cli = AppGroup("app")
## CLI APP COMMANDS
@app_cli.command("create")
@click.argument("slug")
@click.argument("name")
@click.argument("external-url", required=False)
def create_app(slug, name, external_url = None):
"""Adds an app into the database
:param slug: str short name of the app
:param name: str name of the application
:param extenal-url: if set, it marks this as an external app and
configures the url
"""
current_app.logger.info(f"Creating app definition: {name} ({slug})")
obj = App(name=name, slug=slug)
app_obj = App.query.filter_by(slug=slug).first()
if app_obj:
current_app.logger.info(f"App definition: {name} ({slug}) already exists in database")
sys.exit(1)
if (external_url):
obj.external = True
obj.url = external_url
db.session.add(obj)
db.session.commit()
current_app.logger.info(f"App definition: {name} ({slug}) created")
@app_cli.command("list")
def list_app():
"""List all apps found in the database"""
current_app.logger.info("Listing configured apps")
apps = App.query.all()
for obj in apps:
print(f"App name: {obj.name}\tSlug: {obj.slug},\tURL: {obj.get_url()}\tStatus: {obj.get_status()}")
@user_cli.command("cleanup")
@click.option("--dry-run", is_flag=True, default=False)
def cleanup_users(dry_run):
"""
Remove users that have never been active and are at least six weeks old.
"""
current_app.logger.info("Listing inactive users")
if dry_run:
print("Dry run, so not deleting anything.")
users = KratosUser.find_all(kratos_identity_api)
number_users = 0
number_inactive_users = 0
for user in users:
number_users = number_users + 1
try:
last_recovery = user.metadata_admin.get('last_recovery')
except (KeyError, AttributeError):
last_recovery = None
if last_recovery is not None:
continue
print(user)
print(f" Created at: {user.created_at}")
# For this long period we ignore any timezone difference.
age = datetime.datetime.now(datetime.timezone.utc) - user.created_at
if age > datetime.timedelta(weeks=6):
print("That's more than 6 weeks ago.")
number_inactive_users = number_inactive_users + 1
if not dry_run:
print("Deleting.")
user.delete()
UserService.delete(user.uuid)
if dry_run:
print(f"Would delete {number_inactive_users} users out of {number_users} total.")
else:
print(f"Deleted {number_inactive_users} users out of {number_users} total.")
@app_cli.command(
"delete",
)
@click.argument("slug")
def delete_app(slug):
"""Removes app from database
:param slug: str Slug of app to remove
"""
current_app.logger.info(f"Trying to delete app: {slug}")
app_obj = App.query.filter_by(slug=slug).first()
if not app_obj:
current_app.logger.info("Not found")
sys.exit(1)
app_status = app_obj.get_status()
if app_status.installed and not app_obj.external:
current_app.logger.info("Can not delete installed application, run"
" 'uninstall' first")
sys.exit(1)
app_obj.delete()
current_app.logger.info("Success.")
@app_cli.command(
"uninstall",
)
@click.argument("slug")
def uninstall_app(slug):
"""Uninstalls the app from the cluster
:param slug: str Slug of app to remove
"""
current_app.logger.info(f"Trying to uninstall app: {slug}")
app_obj = App.query.filter_by(slug=slug).first()
if not app_obj:
current_app.logger.info("Not found")
sys.exit(1)
app_obj.uninstall()
current_app.logger.info("Success.")
@app_cli.command("status")
@click.argument("slug")
def status_app(slug):
"""Gets the current app status from the Kubernetes cluster
:param slug: str Slug of app to remove
"""
current_app.logger.info(f"Getting status for app: {slug}")
app = App.query.filter_by(slug=slug).first()
if not app:
current_app.logger.error(f"App {slug} does not exist")
sys.exit(1)
current_app.logger.info(app.get_status())
@app_cli.command("install")
@click.argument("slug")
def install_app(slug):
"""Installs app into Kubernetes cluster
:param slug: str Slug of app to install
"""
current_app.logger.info(f"Installing app: {slug}")
app = App.query.filter_by(slug=slug).first()
if not app:
current_app.logger.error(f"App {slug} does not exist")
sys.exit(1)
if app.external:
current_app.logger.info(
f"App {slug} is an external app and cannot be provisioned automatically")
sys.exit(1)
current_status = app.get_status()
if not current_status.installed:
AppsService.install_app(app)
current_app.logger.info(
f"App {slug} installing...")
else:
current_app.logger.error(f"App {slug} is already installed")
@app_cli.command("roles")
@click.argument("slug")
def roles_app(slug):
"""Gets a list of roles for this app
:param slug: str Slug of app queried
"""
current_app.logger.info(f"Getting roles for app: {slug}")
app = App.query.filter_by(slug=slug).first()
if not app:
current_app.logger.error(f"App {slug} does not exist")
sys.exit(1)
current_app.logger.info("Roles: ")
for role in app.roles:
current_app.logger.info(role)
cli.cli.add_command(app_cli)
## CLI USER COMMANDS
@user_cli.command("setrole")
@click.argument("email")
@click.argument("app_slug")
@click.argument("role")
def setrole(email, app_slug, role):
"""Set role for a user
:param email: Email address of user to assign role
:param app_slug: Slug name of the app, for example 'nextcloud'
:param role: Role to assign. Currently only 'admin', 'user', 'none'/'no
access'.
"""
current_app.logger.info(f"Assigning role {role} to {email} for app {app_slug}")
# Find user
user = KratosUser.find_by_email(kratos_identity_api, email)
if not user:
print("User not found. Abort")
sys.exit(1)
app = db.session.query(App).filter(App.slug == app_slug).first()
if not app:
print("App not found. Abort.")
sys.exit(1)
if role == "none":
role = "no access"
role = Role.query.filter(func.lower(Role.name) == func.lower(role)).first()
if not role:
print("Role not found. Abort.")
sys.exit(1)
UserService.set_user_role(user.uuid, app.id, role.id)
db.session.commit()
@user_cli.command("show")
@click.argument("email")
def show_user(email):
"""Show user details. Output a table with the user and details about the
internal state/values of the user object
:param email: Email address of the user to show
"""
user = KratosUser.find_by_email(kratos_identity_api, email)
if user is None:
print(f"User with email address '{email}' was not found")
return
print(user)
print("")
print(f"UUID: {user.uuid}")
print(f"Username: {user.username}")
print(f"Updated: {user.updated_at}")
print(f"Created: {user.created_at}")
print(f"State: {user.state}")
print(f"Roles:")
results = db.session.query(AppRole)\
.filter_by(user_id=user.uuid)\
.join(App).join(Role)\
.add_entity(App).add_entity(Role)
for entry in results:
app = entry[-2]
role = entry[-1]
print(f" {role.name: >9} on {app.name}")
@user_cli.command("update")
@click.argument("email")
@click.argument("field")
@click.argument("value")
def update_user(email, field, value):
"""Update an user object. It can modify email and name currently
:param email: Email address of user to update
:param field: Field to update, supported [name|email]
:param value: The value to set the field with
"""
current_app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(kratos_identity_api, email)
if not user:
current_app.logger.error(f"User with email {email} not found.")
sys.exit(1)
if field == "name":
user.name = value
elif field == "email":
user.email = value
else:
current_app.logger.error(f"Field not found: {field}")
# TODO: this currently deletes the last_recovery and last_login because
# `save` uses a simple PUT and is not aware of those fields. We should
# switch to PATCH instead, or refactor so `save` uses the same code as
# `put_user`.
user.save()
@user_cli.command("delete")
@click.argument("email")
def delete_user(email):
"""Delete an user from the database
:param email: Email address of user to delete
"""
current_app.logger.info(f"Looking for user with email: {email}")
user = KratosUser.find_by_email(kratos_identity_api, email)
if not user:
current_app.logger.error(f"User with email {email} not found.")
sys.exit(1)
UserService.delete_user(user.uuid)
@user_cli.command("create")
@click.argument("email")
def create_user(email):
"""Create a user in the kratos database. The argument must be an unique
email address
:param email: string Email address of user to add
"""
current_app.logger.info(f"Creating user with email: ({email})")
# Create a user
user = KratosUser.find_by_email(kratos_identity_api, email)
if user:
current_app.logger.info("User already exists. Not recreating")
return
user = KratosUser(kratos_identity_api)
user.email = email
user.save()
dashboard_app = db.session.query(App).filter(App.slug == 'dashboard').first()
if not dashboard_app:
print("Dashboard app not found. Aborting.")
sys.exit(1)
user_role = Role.query.filter(func.lower(Role.name) == 'user').first()
if not user_role:
print("User role not found. Aborting.")
sys.exit(1)
UserService.set_user_role(user.uuid, dashboard_app.id, user_role.id)
db.session.commit()
@user_cli.command("setpassword")
@click.argument("email")
@click.argument("password")
def setpassword_user(email, password):
"""Set a password for an account
:param email: email address of account to set a password for
:param password: password to be set
:raise: exception if unexepted error happens
"""
current_app.logger.info(f"Setting password for: ({email})")
# Kratos does not provide an interface to set a password directly. However
# we still want to be able to set a password. So we have to hack our way
# a bit around this. We do this by creating a recovery link though the
# admin interface (which is not e-mailed) and then follow the recovery
# flow in the public facing pages of kratos
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(kratos_identity_api, email)
if kratos_user is None:
current_app.logger.error(f"User with email '{email}' not found")
sys.exit(1)
# Get a recovery URL
url = kratos_user.get_recovery_link()
# Execute UI sequence to set password, given we have a recovery URL
result = kratos_user.ui_set_password(KRATOS_PUBLIC_URL, url, password)
except Exception as error: # pylint: disable=broad-except
current_app.logger.error(f"Error while setting password: {error}")
sys.exit(1)
if result:
current_app.logger.info("Success setting password")
else:
current_app.logger.error("Failed to set password. Password too weak?")
@user_cli.command("list")
def list_user():
"""Show a list of users in the database"""
current_app.logger.info("Listing users")
users = KratosUser.find_all(kratos_identity_api)
for obj in users:
print(obj)
@user_cli.command("recover")
@click.argument("email")
def recover_user(email):
"""Get recovery link for a user, to manual update the user/use
:param email: Email address of the user
"""
current_app.logger.info(f"Trying to send recover email for user: {email}")
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(kratos_identity_api, email)
# Get a recovery URL
url = kratos_user.get_recovery_link()
print(url)
except Exception as error: # pylint: disable=broad-except
current_app.logger.error(f"Error while getting reset link: {error}")
@user_cli.command("reset_totp")
@click.argument("email")
def reset_totp(email):
"""Remove configured totp second factor for a user.
:param email: Email address of the user
"""
current_app.logger.info(f"Removing totp second factor for user: {email}")
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(kratos_identity_api, email)
# Get a recovery URL
UserService.reset_totp(kratos_user.uuid)
except Exception as error: # pylint: disable=broad-except
current_app.logger.error(f"Error while removing totp second factor: {error}")
@user_cli.command("reset_webauthn")
@click.argument("email")
def reset_webauthn(email):
"""Remove configured second factor for a user.
:param email: Email address of the user
"""
current_app.logger.info(f"Removing second factor for user: {email}")
try:
# Get the ID of the user
kratos_user = KratosUser.find_by_email(kratos_identity_api, email)
# Get a recovery URL
UserService.reset_webauthn(kratos_user.uuid)
except Exception as error: # pylint: disable=broad-except
current_app.logger.error(f"Error while removing webauthn second factor: {error}")
cli.cli.add_command(user_cli)
import base64
import jwt
import logging
from string import Template
import yaml
from database import db
from areas.apps.models import App, OAuthClientApp
import helpers.kubernetes as k8s
# Read in two configmaps from the cluster, which specify which apps should be
# present in the database. Returns the list of app slugs.
def populate_apps():
logging.debug("cluster_config: populating apps")
database_apps = {}
for app in App.query.all():
slug = app.slug
database_apps[slug] = app
logging.debug(f"database app: {slug}")
core_apps = _populate_apps_from(database_apps, "stackspin-apps")
custom_apps = _populate_apps_from(database_apps, "stackspin-apps-custom")
return (core_apps + custom_apps)
# Read a list of apps from a configmap. Check if they are already present in
# the database, and if not, add missing ones there. Properties `name`,
# `external` and `url` can be specified in yaml format in the configmap value
# contents. Returns the list of app slugs found.
def _populate_apps_from(database_apps, configmap_name):
slugs = []
cm_apps = k8s.get_kubernetes_config_map_data(configmap_name, "flux-system")
if cm_apps is None:
logging.debug(f"Could not find configmap '{configmap_name}' in namespace 'flux-system'; ignoring.")
else:
for app_slug, app_data in cm_apps.items():
logging.debug(f"configmap app: {app_slug}")
slugs.append(app_slug)
if app_slug in database_apps:
logging.debug(f" already present in database")
else:
logging.debug(f" not present in database, adding!")
data = yaml.safe_load(app_data)
name = data["name"]
logging.debug(f" name: {name}")
external = data.get("external", False)
logging.debug(f" type external: {type(external)}")
logging.debug(f" external: {external}")
url = data.get("url", None)
logging.debug(f" url: {url}")
new_app = App(slug=app_slug, name=name, external=external, url=url)
db.session.add(new_app)
db.session.commit()
return slugs
# Read in two configmaps from the cluster, which specify which oauthclients
# should be present in the database.
def populate_oauthclients():
logging.debug("cluster_config: populating oauthclients")
database_oauthclients = {}
for client in OAuthClientApp.query.all():
id = client.oauthclient_id
database_oauthclients[id] = client
logging.debug(f"database oauthclient: {id}")
_populate_oauthclients_from(database_oauthclients, "stackspin-oauthclients")
_populate_oauthclients_from(database_oauthclients, "stackspin-oauthclients-custom")
# Read a list of oauthclients from a configmap. Check if they are already
# present in the database, and if not, add missing ones there. The value of the
# mapping is taken to be the slug of the app the oauthclient belongs to.
def _populate_oauthclients_from(database_oauthclients, configmap_name):
cm_oauthclients = k8s.get_kubernetes_config_map_data(configmap_name, "flux-system")
if cm_oauthclients is None:
logging.debug(f"Could not find configmap '{configmap_name}' in namespace 'flux-system'; ignoring.")
else:
for client_id, client_app in cm_oauthclients.items():
logging.debug(f"configmap oauthclient: {client_id}")
if client_id in database_oauthclients:
logging.debug(f" already present in database")
else:
logging.debug(f" not present in database, adding!")
# Take the value of the configmap mapping (`client_app`) and
# interpret it as the slug of the app that this oauthclient
# belongs to.
app = App.query.filter_by(slug=client_app).first()
if not app:
logging.error(f" could not find app with slug {client_app}")
continue
new_client = OAuthClientApp(oauthclient_id=client_id, app_id=app.id)
logging.debug(f" new oauth client: {new_client}")
db.session.add(new_client)
db.session.commit()
# Read optional per-app SCIM configuration (URL and token) from secrets.
# Store the results in the database. Needs the list of app slugs to be passed.
def populate_scim_config(apps):
for app in apps:
secret_name = f"stackspin-scim-{app}"
scim_config = k8s.get_kubernetes_secret_data(secret_name, "flux-system")
if scim_config is None:
logging.debug(f"Could not find secret '{secret_name}' in namespace 'flux-system'; ignoring.")
continue
logging.debug(f"Processing secret stackspin-scim-{app}")
app = App.query.filter_by(slug=app).first()
if not app:
logging.error(f" could not find app with slug {app}")
continue
scim_url = scim_config.get("scim_url")
if scim_url is None:
logging.error(f" 'scim_url' is not set")
continue
scim_token = scim_config.get("scim_token")
if scim_token is None:
logging.error(f" 'scim_token' is not set")
continue
scim_url = base64.b64decode(scim_url).decode()
# We substitute the string `$BASE` or `${BASE}` in the `scim_url` by
# the app's base url.
scim_url = Template(scim_url).substitute(BASE=app.get_url())
app.scim_url = scim_url
scim_token = base64.b64decode(scim_token).decode()
scim_jwt = scim_config.get("scim_jwt")
if scim_jwt is not None:
scim_jwt = base64.b64decode(scim_jwt).decode()
if scim_jwt == "nextcloud":
# Nextcloud wants a JWT token containing the username of an existing user.
scim_token = jwt.encode({"sub":"admin"}, scim_token, algorithm="HS256")
else:
logging.error(f" 'jwt' has unknown value {scim_jwt}")
continue
app.scim_token = scim_token
scim_group_support = scim_config.get("scim_group_support")
if scim_group_support is None:
app.scim_group_support = False
else:
scim_group_support = base64.b64decode(scim_group_support).decode()
app.scim_group_support = scim_group_support.lower() in ['1', 'true', 'yes']
logging.info(f"Configuring SCIM for {app} with url {app.scim_url}.")
db.session.commit()
import os
LOG_LEVEL = os.environ.get("LOG_LEVEL")
SECRET_KEY = os.environ.get("SECRET_KEY")
HYDRA_CLIENT_ID = os.environ.get("HYDRA_CLIENT_ID")
HYDRA_CLIENT_SECRET = os.environ.get("HYDRA_CLIENT_SECRET")
HYDRA_AUTHORIZATION_BASE_URL = os.environ.get("HYDRA_AUTHORIZATION_BASE_URL")
TOKEN_URL = os.environ.get("TOKEN_URL")
DASHBOARD_URL = os.environ.get("DASHBOARD_URL")
LOGIN_PANEL_URL = os.environ.get("LOGIN_PANEL_URL")
HYDRA_PUBLIC_URL = os.environ.get("HYDRA_PUBLIC_URL")
HYDRA_ADMIN_URL = os.environ.get("HYDRA_ADMIN_URL")
KRATOS_ADMIN_URL = os.environ.get("KRATOS_ADMIN_URL")
KRATOS_PUBLIC_URL = str(os.environ.get("KRATOS_PUBLIC_URL")) + "/"
SQLALCHEMY_DATABASE_URI = os.environ.get("DATABASE_URL")
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Set this to "true" to load the config from a Kubernetes serviceaccount
# running in a Kubernetes pod. Set it to "false" to load the config from the
# `KUBECONFIG` environment variable.
LOAD_INCLUSTER_CONFIG = os.environ.get("LOAD_INCLUSTER_CONFIG", "").lower() == "true"
RUN_BY_GUNICORN = "gunicorn" in os.environ.get("SERVER_SOFTWARE", "")
if os.environ.get("TELEPRESENCE_ROOT"):
TELEPRESENCE = True
TELEPRESENCE_MODE = os.environ.get("TELEPRESENCE_MODE")
print(f"TELEPRESENCE_MODE: {TELEPRESENCE_MODE}")
if TELEPRESENCE_MODE == "docker":
KUBECONFIG = os.environ["TELEPRESENCE_MOUNTS"]
else:
KUBECONFIG = os.environ.get("TELEPRESENCE_ROOT") + os.environ["TELEPRESENCE_MOUNTS"]
print(f"KUBECONFIG from telepresence: {KUBECONFIG}")
print(os.stat(KUBECONFIG))
else:
TELEPRESENCE = False
KUBECONFIG = None
DEMO_INSTANCE = os.environ.get("DASHBOARD_DEMO_INSTANCE", "False").lower() in ('true', '1')
ENFORCE_2FA = os.environ.get("DASHBOARD_ENFORCE_2FA", "False").lower() in ('true', '1')