Newer
Older
import ory_kratos_client
from ory_kratos_client.model.update_recovery_flow_body \
import UpdateRecoveryFlowBody
from ory_kratos_client.api import frontend_api, identity_api
from config import KRATOS_ADMIN_URL
from database import db
from areas.apps import App, AppRole, AppsService
from helpers import KratosApi
from helpers.error_handler import KratosError
kratos_admin_api_configuration = \
ory_kratos_client.Configuration(host=KRATOS_ADMIN_URL, discard_unknown_keys=True)
kratos_client = ory_kratos_client.ApiClient(kratos_admin_api_configuration)
kratos_frontend_api = frontend_api.FrontendApi(kratos_client)
kratos_identity_api = identity_api.IdentityApi(kratos_client)
class UserService:
@classmethod
def get_users(cls):
userList = []
dashboardRoles = cls.__getDashboardRoles()
while page >= 0:
if page == 0:
res = KratosApi.get("/admin/identities?per_page=1000").json()
else:
res = KratosApi.get("/admin/identities?per_page=1000&page={}".format(page)).json()
# Inject information from the `stackspin` database that's associated to this user.
# In particular, the dashboard role (admin or regular user).
stackspinData = {}
dashboardRole = dashboardRoles.get(r["id"])
if dashboardRole is not None:
stackspinData["stackspin_admin"] = dashboardRole == Role.ADMIN_ROLE_ID
r["stackspin_data"] = stackspinData
userList.append(r)
if len(res) == 0:
page = -1
else:
page = page + 1
return userList
@classmethod
def get_user(cls, id):
res = KratosApi.get("/admin/identities/{}".format(id)).json()
return cls.__insertAppRoleToUser(id, res)
def create_recovery_link(id):
kratos_data = {
"identity_id": id
}
res = KratosApi.post("/admin/recovery/link", kratos_data).json()
return res
@classmethod
def post_user(cls, data):
kratos_data = {
"schema_id": "default",
"traits": {
"name": data["name"],
"email": data["email"],
},
res = KratosApi.post("/admin/identities", kratos_data).json()
if data["app_roles"]:
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole(
user_id=res["id"],
role_id=ar["role_id"] if "role_id" in ar else Role.NO_ACCESS_ROLE_ID,
app_id=app.id,
)
db.session.add(app_role)
db.session.commit()
else:
all_apps = AppsService.get_all_apps()
for app in all_apps:
app_role = AppRole(
user_id=res["id"],
# We start a recovery flow immediately after creating the
# user, so the user can set their initial password.
cls.__start_recovery_flow(data["email"])
return UserService.get_user(res["id"])
@staticmethod
def reset_2fa(id):
KratosApi.delete("/admin/identities/{}/credentials/totp".format(id))
@staticmethod
def __start_recovery_flow(email):
"""
Start a Kratos recovery flow for the user's email address.
This sends out an email to the user that explains to them how they can
set their password. Make sure the user exists inside Kratos before you
use this function.
:param email: Email to send recovery link to
:type email: str
"""
api_response = kratos_frontend_api.create_native_recovery_flow()
flow = api_response['id']
# Submit the recovery flow to send an email to the new user.
update_recovery_flow_body = \
UpdateRecoveryFlowBody(method="link", email=email)
api_response = kratos_frontend_api.update_recovery_flow(flow,
@classmethod
def put_user(cls, id, user_editing_id, data):
kratos_data = {
"schema_id": "default",
"traits": {"email": data["email"], "name": data["name"]},
}
KratosApi.put("/admin/identities/{}".format(id), kratos_data)
app_roles = data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole.query.filter_by(
user_id=id, app_id=app.id).first()
if app_role:
app_role.role_id = ar["role_id"] if "role_id" in ar else None
db.session.commit()
else:
appRole = AppRole(
user_id=id,
role_id=ar["role_id"] if "role_id" in ar else None,
app_id=app.id,
)
db.session.add(appRole)
db.session.commit()
return cls.get_user(id)
@classmethod
def put_multiple_users(cls, user_editing_id, data):
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
for user_data in data["users"]:
kratos_data = {
# "schema_id": "default",
"traits": {"email": user_data["email"]},
}
KratosApi.put("/admin/identities/{}".format(user_data["id"]), kratos_data)
is_admin = RoleService.is_user_admin(user_editing_id)
if is_admin and user_data["app_roles"]:
app_roles = user_data["app_roles"]
for ar in app_roles:
app = App.query.filter_by(slug=ar["name"]).first()
app_role = AppRole.query.filter_by(
user_id=user_data["id"], app_id=app.id).first()
if app_role:
app_role.role_id = ar["role_id"] if "role_id" in ar else None
db.session.commit()
else:
appRole = AppRole(
user_id=user_Data["id"],
role_id=ar["role_id"] if "role_id" in ar else None,
app_id=app.id,
)
db.session.add(appRole)
db.session.commit()
return cls.get_user(user_data["id"])
@staticmethod
def delete_user(id):
app_role = AppRole.query.filter_by(user_id=id).all()
for ar in app_role:
db.session.delete(ar)
db.session.commit()
@staticmethod
def post_multiple_users(data):
# check if data is array
existing_users = []
creation_failed_users = []
user_email = user_data["email"]
if not user_email:
UserService.post_user(user_data)
current_app.logger.info(f"Batch create user: {user_email}")
created_users.append(user_email)
except KratosError as err:
status_code = err.args[1]
if status_code == 409:
existing_users.append(user_email)
elif status_code == 400:
creation_failed_users.append(user_email)
current_app.logger.error(
f"Exception calling Kratos: {err} on creating user: {user_email} {status_code}")
current_app.logger.error(
f"Exception: {error} on creating user: {user_email}")
creation_failed_users.append(user_email)
success_response = {}
existing_response = {}
failed_response = {}
if created_users:
success_response = {"users": created_users,
"message": f"{len(created_users)} users created"}
if existing_users:
existing_response = {
"users": existing_users, "message": f"{len(existing_users)} users already exist: {', '.join(existing_users)}"}
if creation_failed_users:
failed_response = {"users": creation_failed_users,
"message": f"{len(creation_failed_users)} users failed to create: {', '.join(creation_failed_users)}"}
return {"success": success_response, "existing": existing_response, "failed": failed_response}
@staticmethod
def __insertAppRoleToUser(userId, userRes):
apps = App.query.all()
app_roles = []
for app in apps:
# Only show role when installed
app_status = app.get_status()
if app_status.installed:
tmp_app_role = AppRole.query.filter_by(
user_id=userId, app_id=app.id
).first()
app_roles.append(
{
"name": app.slug,
"role_id": tmp_app_role.role_id if tmp_app_role else None,
}
)
return userRes
@staticmethod
def __getDashboardRoles():
dashboardRoles = {}
for appRole, app in (
db.session.query(AppRole, App)
.filter(AppRole.app_id == App.id)
.filter(App.slug == "dashboard")
.all()
):
dashboardRoles[appRole.user_id] = appRole.role_id
return dashboardRoles