Skip to content
Snippets Groups Projects
Unverified Commit 1de303e8 authored by Varac's avatar Varac
Browse files

Properly test kustomizations in CI

parent 554b57b1
No related branches found
No related tags found
No related merge requests found
"""
This module contains tests for:
- test_helmreleases: if all the helmreleases are ready
- test_apps_running: if all the applications are running without problems
(i.e., condition = Ready for all the related pods)
- test_kustomizations: Test if all kustomizations are ready
- test_helmreleases: Test if all the helmreleases are ready
- test_apps_running: Test if all deployments are ready
Documentation for the kubernetes client:
* https://github.com/kubernetes-client/python/blob/master/kubernetes/README.md
......@@ -16,9 +16,9 @@ from kubernetes.client.rest import ApiException
import pytest
def get_kustomization_status(name, api, namespace="flux-system"):
def get_kustomization_status(api, name, namespace='flux-system'):
"""Returns status contition for kustomization `name` in `namespace`"""
print('Testing %s in namespace %s ...' % (name, namespace), end='')
print('Testing kustomization %s in namespace %s ...' % (name, namespace), end='')
try:
kustomization = api.get_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
......@@ -28,6 +28,7 @@ def get_kustomization_status(name, api, namespace="flux-system"):
namespace=namespace
)
ks_status = kustomization['status']['conditions'][0]['status']
print(ks_status)
except ApiException as ex:
if ex.status == 404:
ks_status = 'Not found'
......@@ -40,7 +41,7 @@ def get_kustomization_status(name, api, namespace="flux-system"):
print(kustomization)
return ks_status
def get_release_status(name, namespace, api):
def get_release_status(api, name, namespace):
"""Returns release status for release `name` in `namespace`"""
print('Testing helmrelease %s in namespace %s ...' % (name, namespace), end='')
try:
......@@ -95,33 +96,54 @@ def run_around_tests():
@pytest.mark.app
@pytest.mark.kustomizations
def test_kustomizations(app):
def test_kustomizations(app, namespace):
"""
Checks if all desired Kustomizations installed by weave flux are in
Checks if all kustomizations installed by weave flux are in
'Ready' state.
"""
if app == 'base':
kustomizations = EXPECTED_KUSTOMIZATIONS_BASE
else:
kustomizations = [app]
custom_objects = client.CustomObjectsApi()
failed = 0
failed_apps = []
print('\n')
for kustomization in kustomizations:
ks_status = get_kustomization_status(kustomization, custom_objects)
if ks_status != 'True':
if app != 'all':
print(f"Testing single kustomization: {app}")
release_status = get_kustomization_status(
custom_objects, app, namespace)
if release_status != 'True':
failed += 1
failed_apps.append(app)
else:
# we can't get a list of custom objects from all namespaces,
# so we have to iterate over all namespaces. See
# https://github.com/kubernetes-client/python/issues/1377
core_api = client.CoreV1Api()
namespaces = core_api.list_namespace(watch=False)
for namespace_object in namespaces.items:
namespace_name = namespace_object.metadata.name
kustomizations = custom_objects.list_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta1",
plural="kustomizations",
namespace=namespace_name,
watch=False
)
for kustomization_object in kustomizations['items']:
kustomization_name = kustomization_object['metadata']['name']
kustomization_status = get_kustomization_status(
custom_objects, kustomization_name, namespace_name)
if kustomization_status != 'True':
failed += 1
failed_apps.append(kustomization_name)
assert failed == 0, f"Error: {failed} kustomizations not ready ({failed_apps})!"
assert failed == 0, f"Error: {failed} kustomizations not 'ready'!"
@pytest.mark.app
@pytest.mark.helmreleases
def test_helmreleases(app, namespace):
"""
Checks if all desired HelmReleases installed by weave flux are in
Checks if all HelmReleases installed by weave flux are in
'Ready' state.
"""
......@@ -132,7 +154,7 @@ def test_helmreleases(app, namespace):
if app != 'all':
release_status = get_release_status(
app, namespace, custom_objects)
custom_objects, app, namespace)
if release_status != 'True':
failed += 1
failed_apps.append(app)
......@@ -142,8 +164,8 @@ def test_helmreleases(app, namespace):
# https://github.com/kubernetes-client/python/issues/1377
core_api = client.CoreV1Api()
namespaces = core_api.list_namespace(watch=False)
for namespace in namespaces.items:
namespace_name = namespace.metadata.name
for namespace_object in namespaces.items:
namespace_name = namespace_object.metadata.name
releases = custom_objects.list_namespaced_custom_object(
group="helm.toolkit.fluxcd.io",
version="v2beta1",
......@@ -151,10 +173,10 @@ def test_helmreleases(app, namespace):
namespace=namespace_name,
watch=False
)
for release in releases['items']:
release_name = release['metadata']['name']
for release_object in releases['items']:
release_name = release_object['metadata']['name']
release_status = get_release_status(
release_name, namespace_name, custom_objects)
custom_objects, release_name, namespace_name)
if release_status != 'True':
failed += 1
failed_apps.append(release_name)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment