diff --git a/backend/app.py b/backend/app.py index 9bc3812072ee48c795b29d1392e36c33f6ec8e34..e07a39d99e92b62375dae26cef1a1ad13dad8883 100644 --- a/backend/app.py +++ b/backend/app.py @@ -5,6 +5,8 @@ from flask_jwt_extended import JWTManager import flask_migrate from jsonschema.exceptions import ValidationError from NamedAtomicLock import NamedAtomicLock +import threading +import traceback from werkzeug.exceptions import BadRequest # These imports are required @@ -21,6 +23,7 @@ from areas import tags from cliapp import cliapp import helpers.kubernetes import helpers.provision +import helpers.threads from web import login from database import db @@ -117,26 +120,39 @@ def init_routines(): cluster_config.populate_oauthclients() # Load per-app scim config if present. cluster_config.populate_scim_config(app_slugs) - reload() + # We could call `reload` here manually, but actually the watch also at its + # start creates events for existing secrets so we don't need to. with app.app_context(): - app.logger.info("Setting watch for dashboard config.") + # Set watch for dashboard SCIM config secrets. Any time those change, + # we reload so we can do SCIM for newly installed apps. try: helpers.kubernetes.watch_dashboard_config(app, reload) except Exception as e: - app.logger.error(f"Error watching: {e}") + app.logger.error(f"Error watching dashboard config: {e}") - # if provisioner.enabled: - # We define this wrapper because the SCIM provisioning code needs to access the - # database, which needs a flask app context. - def provision(): - with app.app_context(): - provisioner.reconcile() # Set up a generic task scheduler (cron-like). scheduler = BackgroundScheduler() scheduler.start() # Add a job to run the provisioning reconciliation routine regularly. - # TODO: decrease the frequency once development settles. - scheduler.add_job(provision, 'interval', minutes=1) + # We'll also run it when we make changes that should be propagated + # immediately. + scheduler.add_job(helpers.threads.request_provision, 'interval', id='provision', minutes=5) + # We'll run this in a separate thread so it can be done in the background. + # We have this single "provisioning worker" so there will be only one + # provisioning operation at a time. We use an Event to signal a + # provisioning request. + def wait_for_provision_event(): + while True: + helpers.threads.provision_event.wait() + helpers.threads.provision_event.clear() + app.logger.info("Starting provisioning.") + with app.app_context(): + try: + provisioner.reconcile() + except Exception as e: + app.logger.warn(f"Exception in user provisioning:") + app.logger.warn(traceback.format_exc()) + threading.Thread(target=wait_for_provision_event).start() # `init_routines` should only run once per dashboard instance. To enforce this # we have different behaviour for production and development mode: diff --git a/backend/areas/users/user_service.py b/backend/areas/users/user_service.py index af7e60fd37cbc8bfb4c0067533109e73a2e94e98..d0ad44fca940200bca4b5892320cb4089fe5c428 100644 --- a/backend/areas/users/user_service.py +++ b/backend/areas/users/user_service.py @@ -20,6 +20,7 @@ from areas.roles import Role, RoleService from areas.tags import TagUser from helpers import KratosApi from helpers.error_handler import KratosError +from helpers.threads import request_provision kratos_admin_api_configuration = \ @@ -106,6 +107,9 @@ class UserService: # Commit all changes to the stackspin database. db.session.commit() + # Provision user in all apps. + request_provision() + # We start a recovery flow immediately after creating the # user, so the user can set their initial password. cls.__start_recovery_flow(data["email"]) @@ -155,6 +159,7 @@ class UserService: UserStackspinData.setTags(id, data["tags"]) db.session.commit() + helpers.threads.request_provision() return cls.get_user(id) @@ -177,6 +182,8 @@ class UserService: app_id=app_id, ) db.session.add(appRole) + current_app.logger.info(f"Requesting user {user_id} to be provisioned.") + request_provision() @classmethod def put_multiple_users(cls, user_editing_id, data): diff --git a/backend/helpers/kubernetes.py b/backend/helpers/kubernetes.py index 4e8ad96405a2e67c96ce302cefc15014af11fe1d..4cea526239d102a978f62c2d444dc81d442d02ea 100644 --- a/backend/helpers/kubernetes.py +++ b/backend/helpers/kubernetes.py @@ -436,23 +436,24 @@ def debounce(timeout: float): return decorator def watch_dashboard_config(app, reload): - current_app.logger.info("Creating watch.") w = watch.Watch() - current_app.logger.info("Creating api instance.") - api_client_instance = api_client.ApiClient() - api_instance = client.CoreV1Api(api_client_instance) + api_instance = client.CoreV1Api(api_client.ApiClient()) def p(): with app.app_context(): + # Number of seconds to wait before reloading in case more secrets show up. + # In particular this prevents us from reloading once for every + # secret that exists at startup in succession. + debounce_timeout = 1 + @debounce(debounce_timeout) + def debounced_reload(): + reload() for event in w.stream( api_instance.list_namespaced_secret, 'flux-system', label_selector="stackspin.net/scim-config=1", watch=True ): - current_app.logger.info(f"{event['type']}: {event['object'].metadata.name}") - debounce(1)(reload)() - current_app.logger.info("Creating thread.") + current_app.logger.info(f"{event['type']} SCIM config secret: {event['object'].metadata.name}") + debounced_reload() thread = threading.Thread(target=p) - current_app.logger.info("Starting thread.") thread.start() - current_app.logger.info("watch_dashboard-config finished") diff --git a/backend/helpers/provision.py b/backend/helpers/provision.py index 200c11cceebfde38878df0b0d91d874c8979deec..161b563a7f322590a6266ebef0a6b5998796f4e8 100644 --- a/backend/helpers/provision.py +++ b/backend/helpers/provision.py @@ -181,10 +181,11 @@ class Provision: db.session.commit() else: logging.info(f"Error returned by SCIM deletion: {response.content}") + raise ProvisionError("App cannot delete user via SCIM.") return # Get the related user object - logging.info(f"Info: Getting user data from Kratos.") + logging.info(f"Getting user data from Kratos.") kratos_user = KratosUser(self.kratos_identity_api, app_role.user_id) if app_role.role_id == Role.NO_ACCESS_ROLE_ID: active = False @@ -224,16 +225,18 @@ class Provision: # role as a field on the user object. if app_role.role_id == Role.ADMIN_ROLE_ID: data['role'] = 'owner' + else: + data['role'] = 'member' # Now format the URL and make the SCIM request. if existing_user is None: url = f"{app.scim_url}/Users" response = requests.post(url, headers=scim_headers, json=data) + logging.info(f"Post SCIM user: {url} with data: {data} getting status: {response.status_code}") else: url = f"{app.scim_url}/Users/{existing_user.scim_id}" response = requests.put(url, headers=scim_headers, json=data) - logging.info(f"SCIM url: {url}") - logging.info(f"SCIM http status: {response.status_code}") + logging.info(f"Put SCIM user: {url} with data: {data} getting status: {response.status_code}") try: response_json = response.json() except json.decoder.JSONDecodeError as e: @@ -241,6 +244,11 @@ class Provision: logging.info(response.content) raise ProvisionError("App returned non-json data in SCIM user put/post.") logging.info(f"got: {response_json}") + if existing_user is None: + # Because this is a new user for the app, we should read off its + # SCIM ID and store that in the Stackspin database. + app_role.scim_id = response_json['id'] + db.session.commit() user = User(app_role.user_id, response_json['id'], kratos_user.name) if app.scim_group_support: if app_role.role_id == Role.ADMIN_ROLE_ID: @@ -266,7 +274,7 @@ class Provision: except json.decoder.JSONDecodeError as e: logging.info("SCIM result was not json") logging.info(response.content) - raise ProvisionError("Failed to get existing users from {app.slug}") + raise ProvisionError(f"Failed to get existing users from {app.slug}") logging.info(f"All existing users for {app.slug}: {response_json}") # Make a dictionary of the users, using their externalId as key, which # is the kratos user ID. @@ -288,10 +296,44 @@ class Provision: scim_id=u['id'] ).first() if app_role is None: - # This is a user that was apparently not created by - # Stackspin (either SSO or SCIM), so we'll ignore it. logging.info(f" SCIM ID {u['id']} not listed in database.") - continue + # We can't find this app user in our Stackspin + # database, at least based on the SCIM ID. It could be + # that it was created before the introduction of SCIM, + # or was created on-the-fly on login by the app before + # SCIM got a chance to create it. To cover that case, + # we try to find the matching Stackspin user by email + # address. + try: + if app.slug == 'zulip': + email_address = u['userName'] + else: + email_address = u['emails'][0]['value'] + kratos_user = KratosUser.find_by_email(self.kratos_identity_api, email_address) + except KeyError: + # The `emails` field is not set, so give up. + kratos_user = None + except IndexError: + # The list of email addresses is empty, so give up. + kratos_user = None + if kratos_user is None: + # This user is not known at all by Stackspin, so + # we'll ignore it. + logging.info(f" SCIM user unknown, ignoring.") + continue + # We found the user based on email address. We'll + # store the SCIM ID for this user in the Stackspin + # database so we don't need to do this email + # address matching again next time. + app_role = db.session.query(AppRole).filter_by( + app_id=app.id, + user_id=kratos_user.uuid + ).first() + if app_role is not None: + app_role.scim_id = u['id'] + db.session.commit() + logging.info(f" Stored SCIM ID {u['id']} for user {kratos_user.uuid} for app {app.slug}") + kratos_id = kratos_user.uuid else: kratos_id = app_role.user_id users[kratos_id] = User(kratos_id, u['id'], u['displayName']) diff --git a/backend/helpers/threads.py b/backend/helpers/threads.py new file mode 100644 index 0000000000000000000000000000000000000000..4ee12c880b31b0ccaca20df2f2f600a52bfafc32 --- /dev/null +++ b/backend/helpers/threads.py @@ -0,0 +1,20 @@ +import functools +import threading + +# Signal to provisioning loop that we want to provision now. +provision_event = threading.Event() + +def debounce(timeout: float): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + wrapper.func.cancel() + wrapper.func = threading.Timer(timeout, func, args, kwargs) + wrapper.func.start() + wrapper.func = threading.Timer(timeout, lambda: None) + return wrapper + return decorator + +@debounce(1) +def request_provision(): + provision_event.set()