Skip to content
Snippets Groups Projects
generate_secrets.py 7.73 KiB
Newer Older
Generates Kubernetes secrets based on a provided app name.
If the `templates` directory contains a secret called `stackspin-{app}-variables`, it
will check if that secret already exists in the cluster, and if not: generate
it. It does the same for an `stackspin-{app}-basic-auth` secret that will contain a
See https://open.greenhost.net/stackspin/stackspin/-/issues/891 for the
context why we use this script and not a helm chart to generate secrets.

usage: python generate_secrets.py template_filename.j2
"""

import crypt
import os
import secrets
import string
import sys

import jinja2
import jinja2_base64_filters  # pylint: disable=unused-import
from kubernetes import client, config
from kubernetes.client.exceptions import ApiException
from kubernetes.utils import create_from_yaml

# This script gets called with an app name as argument. Most of them need an
# oauth client in Hydra, but some don't. This list contains the ones that
# don't.
APPS_WITHOUT_OAUTH = [
    "single-sign-on",
    "prometheus",
    "alertmanager",
]

def main():
    """Run everything"""
    # Add jinja filters we want to use
    env = jinja2.Environment(extensions=["jinja2_base64_filters.Base64Filters"])
    env.filters["generate_password"] = generate_password

    if len(sys.argv) < 2:
        print("Please provide an app name as an argument")
        sys.exit(1)
    app_name = sys.argv[1]

    # Create app variables secret
    create_variables_secret(app_name, f"stackspin-{app_name}-variables.yaml.jinja", env)
    # Create a secret that contains the oauth variables for Hydra Maester
    if app_name not in APPS_WITHOUT_OAUTH:
        create_variables_secret(app_name, "stackspin-oauth-variables.yaml.jinja", env)
    create_basic_auth_secret(app_name, env)


def get_templates_dir():
    """Returns directory that contains the Jinja templates used to create app
    secrets"""
    return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'templates')


def create_variables_secret(app_name, variables_filename, env):
    """Checks if a variables secret for app_name already exists, generates it if necessary"""
    variables_filepath = \
        os.path.join(get_templates_dir(), variables_filename)
    if os.path.exists(variables_filepath):
        # Check if k8s secret already exists, if not, generate it
        with open(variables_filepath) as template_file:
            lines = template_file.read()
            secret_name, secret_namespace = get_secret_metadata(lines)
            new_secret_dict = yaml.safe_load(
                env.from_string(
                    lines,
                    globals={"app": app_name}
                ).render())
            current_secret_data = get_kubernetes_secret_data(secret_name,
                    secret_namespace)
            if current_secret_data is None:
                # Create new secret
                update_secret = False
            elif current_secret_data.keys() != new_secret_dict['data'].keys():
                # Update current secret with new keys
                update_secret = True
                print(f"Secret {secret_name} in namespace {secret_namespace}"
                      " already exists. Merging...")
                # Merge dicts. Values from current_secret_data take precedence
                new_secret_dict['data'] |= current_secret_data
            else:
                # Do Nothing
                print(f"Secret {secret_name} in namespace {secret_namespace}"
                      " is already in a good state, doing nothing.")
                return
            print(f"Storing secret {secret_name} in namespace"
                  f" {secret_namespace} in cluster.")
            store_kubernetes_secret(new_secret_dict, secret_namespace,
                                    update=update_secret)
        print(f'Template {variables_filename} does not exist, no action needed')


def create_basic_auth_secret(app_name, env):
    """Checks if a basic auth secret for app_name already exists, generates it if necessary"""
    basic_auth_filename = \
        os.path.join(get_templates_dir(), f"stackspin-{app_name}-basic-auth.yaml.jinja")
    if os.path.exists(basic_auth_filename):
        with open(basic_auth_filename) as template_file:
            lines = template_file.read()
            secret_name, secret_namespace = get_secret_metadata(lines)

            if get_kubernetes_secret_data(secret_name, secret_namespace) is None:
                basic_auth_username = 'admin'
                basic_auth_password = generate_password(32)
                basic_auth_htpasswd = gen_htpasswd(
                    basic_auth_username,
                    basic_auth_password)
                print(f"Adding secret {secret_name} in namespace"
                      f" {secret_namespace} to cluster.")
                template = env.from_string(
                        lines,
                        globals={
                            'pass': basic_auth_password,
                            'htpasswd': basic_auth_htpasswd
                })
                secret_dict = yaml.safe_load(template.render())
                store_kubernetes_secret(secret_dict, secret_namespace)
            else:
                print(f"Secret {secret_name} in namespace {secret_namespace}"
                      " already exists. Not generating new secrets.")
        print(f'File {basic_auth_filename} does not exist, no action needed')

def get_secret_metadata(yaml_string):
    """Returns secret name and namespace from metadata field in a yaml string"""
    secret_dict = yaml.safe_load(yaml_string)
    secret_name = secret_dict['metadata']['name']
    # default namespace is flux-system, but other namespace can be
    # provided in secret metadata
    if 'namespace' in secret_dict['metadata']:
        secret_namespace = secret_dict['metadata']['namespace']
    else:
        secret_namespace = 'flux-system'
    return secret_name, secret_namespace


def get_kubernetes_secret_data(secret_name, namespace):
    """Returns the contents of a kubernetes secret or None if the secret does not exist."""
    try:
        secret = API.read_namespaced_secret(secret_name, namespace).data
    except ApiException as ex:
        # 404 is expected when the optional secret does not exist.
        if ex.status != 404:
            raise ex
        return None
    return secret

def store_kubernetes_secret(secret_dict, namespace, update=False):
    """Stores either a new secret in the cluster, or updates an existing one"""
    api_client = client.api_client.ApiClient()
    if update:
        verb = "updated"
        api_response = patch_kubernetes_secret(secret_dict, namespace)
    else:
        verb = "created"
        api_response = create_from_yaml(
                api_client,
                yaml_objects=[secret_dict],
                namespace=namespace)
    print(f"Secret {verb} with api response: {api_response}")

def patch_kubernetes_secret(secret_dict, namespace):
    """Patches secret in the cluster with new data"""
    api_instance = client.CoreV1Api(api_client)
    name = secret_dict['metadata']['name']
    body = {}
    body['data'] = secret_dict['data']
    return api_instance.patch_namespaced_secret(name, namespace, body)

def generate_password(length):
    """Generates a password of "length" characters"""
    length = int(length)
    password = ''.join((secrets.choice(string.ascii_letters) for i in range(length)))
    return password


def gen_htpasswd(user, password):
    """generate htpasswd entry for user with password"""
    return "{}:{}".format(user, crypt.crypt(
                password, crypt.mksalt(crypt.METHOD_SHA512)
            ),
        )

if __name__ == "__main__":
    config.load_kube_config()
    API = client.CoreV1Api()
    main()