Skip to content
Snippets Groups Projects
Verified Commit 2e55e2fa authored by Maarten de Waard's avatar Maarten de Waard :angel:
Browse files

Process lots of feedback

- Add a lot of docstrings
- Add AppStatus class
- Remove unused code
parent 8e41705d
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
......@@ -24,8 +24,6 @@ APPS_DATA = [
APP_DATA = {"id": 1, "name": "Nextcloud", "selected": True, "status": "ON for everyone", "config": CONFIG_DATA},
APP_NOT_INSTALLED_STATUS = "Not installed"
@api_v1.route('/apps', methods=['GET'])
@jwt_required()
@cross_origin()
......
......@@ -6,7 +6,6 @@ from sqlalchemy import ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from database import db
import helpers.kubernetes as k8s
from .apps import APP_NOT_INSTALLED_STATUS
class App(db.Model):
......@@ -23,34 +22,8 @@ class App(db.Model):
return f"{self.id} <{self.name}>"
def get_status(self):
"""Returns a string that describes the app state in the cluster"""
kustomization = self.kustomization
if kustomization is not None and "status" in kustomization:
ks_ready, ks_message = App.check_condition(kustomization['status'])
else:
ks_ready = None
for helmrelease in self.helmreleases['items']:
hr_status = helmrelease['status']
hr_ready, hr_message = App.check_condition(hr_status)
# For now, only show the message of the first HR that isn't ready
if not hr_ready:
break
if ks_ready is None:
return APP_NOT_INSTALLED_STATUS
# *Should* not happen, but just in case:
if (ks_ready is None and hr_ready is not None) or \
(hr_ready is None and ks_ready is not None):
return ("This app is in a strange state. Contact a Stackspin"
" administrator if this status stays for longer than 5 minutes")
if ks_ready and hr_ready:
return "App installed and running"
if not hr_ready:
return f"App HelmRelease status: {hr_message}"
if not ks_ready:
return f"App Kustomization status: {ks_message}"
return "App is installing..."
"""Returns an AppStatus object that describes the current cluster state"""
return AppStatus(self.kustomization, self.helmreleases)
def install(self):
......@@ -139,7 +112,7 @@ class App(db.Model):
@property
def helmreleases(self):
"""Returns the helmreleases associated with the kustomization for this app"""
return k8s.list_helmreleases(self.namespace,
return k8s.get_all_helmreleases(self.namespace,
f"kustomize.toolkit.fluxcd.io/name={self.slug}")
@staticmethod
......@@ -147,6 +120,74 @@ class App(db.Model):
"""Returns directory that contains the Jinja templates used to create app secrets."""
return os.path.join(os.path.dirname(os.path.realpath(__file__)), "templates")
class AppRole(db.Model): # pylint: disable=too-few-public-methods
"""
The AppRole object, stores the roles Users have on Apps
"""
user_id = db.Column(String(length=64), primary_key=True)
app_id = db.Column(Integer, ForeignKey("app.id"), primary_key=True)
role_id = db.Column(Integer, ForeignKey("role.id"))
role = relationship("Role")
def __repr__(self):
return (f"role_id: {self.role_id}, user_id: {self.user_id},"
f" app_id: {self.app_id}, role: {self.role}")
class AppStatus(): # pylint: disable=too-few-public-methods
"""
Represents the status of an app in the Kubernetes cluster.
This class can answer a few questions, like "is the app installed?", but
can also return raw status messages from Kustomizations and HelmReleases
This constructor sets three variables:
self.installed (bool): Whether the app should be installed
self.ready (bool): Whether the app is installed correctly
self.message (str): Information about the status
:param kustomization_status: The status of the Kustomization of this app:
:type kustomization_status: str
:param helmrelease_status: The status of the helmreleases of this app
:type helmrelease_status: str[]
"""
def __init__(self, kustomization, helmreleases):
self.helmreleases = {}
if kustomization is not None and "status" in kustomization:
ks_ready, ks_message = AppStatus.check_condition(kustomization['status'])
self.installed = True
else:
ks_ready = None
ks_message = "Kustomization does not exist"
self.installed = False
self.ready = False
self.message = "Not installed"
for helmrelease in helmreleases:
hr_status = helmrelease['status']
hr_ready, hr_message = AppStatus.check_condition(hr_status)
# For now, only show the message of the first HR that isn't ready
if not hr_ready:
self.ready = False
self.message = f"HelmRelease {helmrelease['metadata']['name']} status: {hr_message}"
return
# If we end up here, all HRs are ready
if ks_ready:
self.ready = True
self.message = "Installed"
else:
self.ready = False
self.message = f"App Kustomization status: {ks_message}"
def __repr__(self):
return f"Installed: {self.installed}\tReady: {self.ready}\tMessage: {self.message}"
@staticmethod
def check_condition(status):
"""
......@@ -166,19 +207,3 @@ class App(db.Model):
if condition["type"] == "Ready":
return condition["status"] == "True", condition["message"]
return False, "Condition with type 'Ready' not found"
class AppRole(db.Model): # pylint: disable=too-few-public-methods
"""
The AppRole object, stores the roles Users have on Apps
"""
user_id = db.Column(String(length=64), primary_key=True)
app_id = db.Column(Integer, ForeignKey("app.id"), primary_key=True)
role_id = db.Column(Integer, ForeignKey("role.id"))
role = relationship("Role")
def __repr__(self):
return (f"role_id: {self.role_id}, user_id: {self.user_id},"
f" app_id: {self.app_id}, role: {self.role}")
......@@ -17,7 +17,7 @@ from config import HYDRA_ADMIN_URL, KRATOS_ADMIN_URL, KRATOS_PUBLIC_URL
from helpers import KratosUser
from cliapp import cli
from areas.roles import Role
from areas.apps import AppRole, App, APP_NOT_INSTALLED_STATUS
from areas.apps import AppRole, App
from database import db
# APIs
......@@ -135,7 +135,7 @@ def status_app(slug):
current_app.logger.error(f"App {slug} does not exist")
return
current_app.logger.info(f"Status: {app.get_status()}")
current_app.logger.info(app.get_status())
@app_cli.command("install")
@click.argument("slug")
......@@ -152,13 +152,12 @@ def install_app(slug):
return
current_status = app.get_status()
if current_status == APP_NOT_INSTALLED_STATUS:
if current_status.installed == False:
app.install()
current_app.logger.info(
f"App {slug} installing... use `status` to see status")
else:
current_app.logger.error("App {slug} should have status"
f" {APP_NOT_INSTALLED_STATUS} but has status: {current_status}")
current_app.logger.error(f"App {slug} is already installed")
@app_cli.command("roles")
@click.argument("slug")
......
......@@ -12,17 +12,28 @@ from kubernetes.client import api_client
from kubernetes.client.exceptions import ApiException
from kubernetes.utils import create_from_yaml
from kubernetes.utils.create_from_yaml import FailToCreateError
from flask import current_app
# Load the kube config once
#
# By default this loads whatever we define in the `KUBECONFIG` env variable,
# otherwise loads the config from default locations, similar to what kubectl
# does.
config.load_kube_config()
def create_variables_secret(app_slug, variables_filepath):
"""Checks if a variables secret for app_name already exists, generates it if necessary.
If a secret already exists, loops through keys from the template, and adds
values for keys that miss in the Kubernetes secret, but are available in
the template.
:param app_slug: The slug of the app, used in the oauth secrets
:type app_slug: string
:param variables_filepath: The path to an existing jinja2 template
:type variables_filepath: string
:return: returns True, unless an exception gets raised by the Kubernetes API
:rtype: boolean
"""
new_secret_dict = read_template_to_dict(
variables_filepath,
......@@ -37,7 +48,7 @@ def create_variables_secret(app_slug, variables_filepath):
elif current_secret_data.keys() != new_secret_dict["data"].keys():
# Update current secret with new keys
update_secret = True
print(
current_app.logger.info(
f"Secret {secret_name} in namespace {secret_namespace}"
" already exists. Merging..."
)
......@@ -45,12 +56,12 @@ def create_variables_secret(app_slug, variables_filepath):
new_secret_dict["data"] |= current_secret_data
else:
# Do Nothing
print(
current_app.logger.info(
f"Secret {secret_name} in namespace {secret_namespace}"
" is already in a good state, doing nothing."
)
return True
print(
current_app.logger.info(
f"Storing secret {secret_name} in namespace"
f" {secret_namespace} in cluster."
)
......@@ -61,7 +72,14 @@ def create_variables_secret(app_slug, variables_filepath):
def get_secret_metadata(secret_dict):
"""Returns secret name and namespace from metadata field in a yaml string."""
"""
Returns secret name and namespace from metadata field in a yaml string.
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
:type secret_dict: dict
:return: Tuple containing secret name and secret namespace
:rtype: tuple
"""
secret_name = secret_dict["metadata"]["name"]
# default namespace is flux-system, but other namespace can be
# provided in secret metadata
......@@ -73,7 +91,17 @@ def get_secret_metadata(secret_dict):
def get_kubernetes_secret_data(secret_name, namespace):
"""Returns the contents of a kubernetes secret or None if the secret does not exist."""
"""
Get secret from Kubernetes
:param secret_name: Name of the secret
:type secret_name: string
:param namespace: Namespace of the secret
:type namespace: string
:return: The contents of a kubernetes secret or None if the secret does not exist.
:rtype: dict or None
"""
api_client_instance = api_client.ApiClient()
api_instance = client.CoreV1Api(api_client_instance)
try:
......@@ -87,7 +115,20 @@ def get_kubernetes_secret_data(secret_name, namespace):
def store_kubernetes_secret(secret_dict, namespace, update=False):
"""Stores either a new secret in the cluster, or updates an existing one."""
"""
Stores either a new secret in the cluster, or updates an existing one.
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
:type secret_dict: dict
:param namespace: Namespace of the secret
:type namespace: string
:param update: If True, use `patch_kubernetes_secret`,
otherwise use `create_from_yaml` (default: False)
:type update: boolean
:return: None
:rtype: None
"""
api_client_instance = api_client.ApiClient()
if update:
verb = "updated"
......@@ -101,13 +142,23 @@ def store_kubernetes_secret(secret_dict, namespace, update=False):
namespace=namespace
)
except FailToCreateError as ex:
print(f"Secret not {verb} because of exception {ex}")
current_app.logger.info(f"Secret not created because of exception {ex}")
return
print(f"Secret {verb} with api response: {api_response}")
current_app.logger.info(f"Secret {verb} with api response: {api_response}")
def store_kustomization(kustomization_template_filepath, app_slug):
"""Add a kustomization that installs app {app_slug} to the cluster"""
"""
Add a kustomization that installs app {app_slug} to the cluster.
:param kustomization_template_filepath: Path to the template that describes
the kustomization. The template should have an `{{ app }}` entry.
:type kustomization_template_filepath: string
:param app_slug: Slug for the app, used to replace `{{ app }}` in the
template
:return: True on success
:rtype: boolean
"""
kustomization_dict = read_template_to_dict(kustomization_template_filepath,
{"app": app_slug})
custom_objects_api = client.CustomObjectsApi()
......@@ -119,14 +170,25 @@ def store_kustomization(kustomization_template_filepath, app_slug):
plural="kustomizations",
body=kustomization_dict)
except FailToCreateError as ex:
print(f"Could not create {app_slug} Kustomization because of exception {ex}")
return
print(f"Kustomization created with api response: {api_response}")
current_app.logger.info(
f"Could not create {app_slug} Kustomization because of exception {ex}")
return False
current_app.logger.debug(f"Kustomization created with api response: {api_response}")
return True
def delete_kustomization(kustomization_name):
"""Deletes kustomization for an app_slug. Should also result in the
deletion of the app's HelmReleases, PVCs, OAuth2Client, etc. Nothing will
remain"""
"""
Deletes a kustomization.
Note that this can also result in the deletion of an app's HelmReleases,
PVCs (user data!), OAuth2Client, etc. Nothing will remain
:param kustomization_name: name of the kustomization to delete
:type kustomization_name: string
:return: Response of delete API call
:rtype: dict
"""
custom_objects_api = client.CustomObjectsApi()
body = client.V1DeleteOptions()
try:
......@@ -138,14 +200,16 @@ def delete_kustomization(kustomization_name):
name=kustomization_name,
body=body)
except ApiException as ex:
print(f"Could not delete {kustomization_name} Kustomization because of exception {ex}")
current_app.logger.info(
f"Could not delete {kustomization_name} Kustomization because of exception {ex}")
return False
print(f"Kustomization deleted with api response: {api_response}")
current_app.logger.debug(f"Kustomization deleted with api response: {api_response}")
return api_response
def read_template_to_dict(template_filepath, template_globals):
"""Reads a Jinja2 template that contains yaml and turns it into a dict
"""
Reads a Jinja2 template that contains yaml and turns it into a dict.
:param template_filepath: The path to an existing Jinja2 template
:type template_filepath: string
......@@ -167,7 +231,17 @@ def read_template_to_dict(template_filepath, template_globals):
def patch_kubernetes_secret(secret_dict, namespace):
"""Patches secret in the cluster with new data."""
"""
Patches secret in the cluster with new data.
Warning: currently ignores everything that's not in secret_dict["data"]
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
:type secret_dict: dict
:param namespace: Namespace of the secret
:type namespace: string
:return: Response of the patch API call
"""
api_client_instance = api_client.ApiClient()
api_instance = client.CoreV1Api(api_client_instance)
name = secret_dict["metadata"]["name"]
......@@ -177,30 +251,32 @@ def patch_kubernetes_secret(secret_dict, namespace):
def generate_password(length):
"""Generates a password of "length" characters."""
"""
Generates a password with letters and digits.
:param length: The amount of characters in the password
:type length: int
:return: Generated password
:rtype: string
"""
length = int(length)
password = "".join((secrets.choice(string.ascii_letters)
password = "".join((secrets.choice(string.ascii_letters + string.digits)
for i in range(length)))
return password
def gen_htpasswd(user, password):
"""Generate htpasswd entry for user with password."""
return f"{user}:{crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512))}"
def get_all_kustomization_names(namespace='flux-system'):
"""
Returns all flux kustomizations in a namespace.
:param namespace: namespace that contains kustomizations. Default: `flux-system`
:type namespace: str
:return: List of names for kustomizations in namespace
:rtype: list
Generate htpasswd entry for user with password.
:param user: Username used in the htpasswd entry
:type user: string
:param password: Password for the user, will get encrypted.
:type password: string
:return: htpassword line entry
:rtype: string
"""
kustomizations = get_all_kustomizations(namespace)
return_kustomizations = []
for kustomization in kustomizations['items']:
return_kustomizations.append(kustomization['metadata']['name'])
return return_kustomizations
return f"{user}:{crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512))}"
def get_all_kustomizations(namespace='flux-system'):
......@@ -208,8 +284,8 @@ def get_all_kustomizations(namespace='flux-system'):
Returns all flux kustomizations in a namespace.
:param namespace: namespace that contains kustomizations. Default: `flux-system`
:type namespace: str
:return: Kustomizations as returned by CustomObjectsApi.list_namespaced_custom_object()
:rtype: object
:return: 'items' in dict returned by CustomObjectsApi.list_namespaced_custom_object()
:rtype: dict[]
"""
api = client.CustomObjectsApi()
api_response = api.list_namespaced_custom_object(
......@@ -221,107 +297,58 @@ def get_all_kustomizations(namespace='flux-system'):
return api_response
def get_all_helmrelease_names(namespace='stackspin'):
"""
Returns names of all helmreleases in a namespace.
:param namespace: namespace that contains kustomizations. Default: `stackspin`
:type namespace: str
:return: List of names for helmreleases in namespace
:rtype: list
def get_all_helmreleases(namespace='stackspin', label_selector=""):
"""
helmreleases = get_all_helmreleases(namespace)
return_helmreleases = []
for helmrelease in helmreleases['items']:
return_helmreleases.append(helmrelease['metadata']['name'])
return return_helmreleases
Lists all helmreleases in a certain namespace (stackspin by default)
def get_all_helmreleases(namespace='stackspin'):
"""
Returns all helmreleases in a namespace.
:param namespace: namespace that contains kustomizations. Default: `stackspin`
:param namespace: namespace that contains helmreleases. Default: `stackspin-apps`
:type namespace: str
:return: Helmreleases as returned by CustomObjectsApi.list_namespaced_custom_object()
:rtype: object
"""
api = client.CustomObjectsApi()
api_response = api.list_namespaced_custom_object(
group="helm.toolkit.fluxcd.io",
version="v2beta1",
plural="helmreleases",
namespace=namespace,
)
return api_response
:param label_selector: a label selector to limit the list (optional)
:type label_selector: str
:return: List of helmreleases
:rtype: dict[]
"""
api_instance = client.CustomObjectsApi()
def get_kustomization(name, namespace='flux-system'):
"""Returns all info of a Flux kustomization with name 'name'"""
api = client.CustomObjectsApi()
try:
resource = api.get_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta1",
name=name,
namespace=namespace,
plural="kustomizations",
)
except client.exceptions.ApiException as error:
api_response = api_instance.list_namespaced_custom_object(
group="helm.toolkit.fluxcd.io",
version="v2beta1",
namespace=namespace,
plural="helmreleases",
label_selector=label_selector)
except ApiException as error:
if error.status == 404:
return None
# Raise all non-404 errors
raise error
return resource
return api_response['items']
def get_helmrelease(name, namespace='stackspin-apps'):
"""Returns all info of a Flux helmrelease with name 'name'"""
def get_kustomization(name, namespace='flux-system'):
"""
Returns all info of a Flux kustomization with name 'name'
:param name: Name of the kustomizatoin
:type name: string
:param namespace: Namespace of the kustomization
:type namespace: string
:return: kustomization as returned by the API
:rtype: dict
"""
api = client.CustomObjectsApi()
try:
resource = api.get_namespaced_custom_object(
group="helm.toolkit.fluxcd.io",
version="v2beta1",
group="kustomize.toolkit.fluxcd.io",
version="v1beta1",
name=name,
namespace=namespace,
plural="helmreleases",
plural="kustomizations",
)
except client.exceptions.ApiException as error:
if error.status == 404:
return None
# Raise all non-404 errors
raise error
return resource
def list_helmreleases(namespace='stackspin-apps', label_selector=""):
"""
Lists all helmreleases in a certain namespace (stackspin-apps by default)
Optionally takes a label selector to limit the list.
"""
api_instance = client.CustomObjectsApi()
try:
api_response = api_instance.list_namespaced_custom_object(
group="helm.toolkit.fluxcd.io",
version="v2beta1",
namespace=namespace,
plural="helmreleases",
label_selector=label_selector)
except ApiException as error:
if error.status == 404:
return None
# Raise all non-404 errors
raise error
return api_response
def get_readiness(app_status):
"""
Parses an app status's 'conditions' to find a type field called 'Ready' and
returns its status. Works for Kustomizations as well as Helmreleases.
"""
for condition in app_status['conditions']:
if condition['type'] == 'Ready':
return condition['status']
# If this point is reached, no condition "Ready" exists, so the application
# is not ready.
return False
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment