Commit 9a5a7666 authored by Mark's avatar Mark
Browse files

Merge branch '10-refactoring-work-to-make-development-faster' into 'main'

Resolve "Refactoring work to make development faster"

Closes #6 and #10

See merge request !5
parents 55733bc8 fe919de3
Pipeline #6043 passed with stage
in 1 minute and 10 seconds
[run]
omit = isitup/__main__.py
\ No newline at end of file
......@@ -11,7 +11,14 @@
"module": "debug",
"jinja": true,
"justMyCode": false,
"console": "externalTerminal"
},
{
"name": "Pytest",
"type": "python",
"request": "launch",
"module": "pytest",
"jinja": true,
"justMyCode": false,
}
]
}
\ No newline at end of file
}
""" Initialise the FastAPI app. """
import asyncio
from typing import Union
from pydantic import AnyHttpUrl, BaseModel, Field
from fastapi import FastAPI, HTTPException
from isitup import http, dns
from isitup.types import CheckDomain
from isitup.checks import register_check, retrieve_check
__version__ = "0.1.0"
http.register(register_check)
dns.register(register_check)
......@@ -25,7 +27,7 @@ tags_metadata = [
app = FastAPI(
title="Is it up?",
description="Check whether web services are configured correctly and are available.",
version="0.1.0",
version=__version__,
openapi_tags=tags_metadata,
)
......@@ -36,10 +38,19 @@ app = FastAPI(
response_model=http.HttpCheckResult,
tags=["http"],
)
async def http_checks(url: http.CheckUrl):
async def http_checks(domain: CheckDomain):
checker = retrieve_check("http")
return await checker(url)
return await checker(domain)
@app.post(
"/check/https/",
responses=http.responses,
response_model=http.HttpCheckResult,
tags=["http", "https"],
)
async def https_checks(domain: CheckDomain):
checker = retrieve_check("https")
return await checker(domain)
@app.post(
"/check/dns/{record_type}",
......@@ -47,14 +58,14 @@ async def http_checks(url: http.CheckUrl):
response_model=dns.DnsCheckResult,
tags=["dns"],
)
async def dns_checks(domain: dns.CheckDomain, record_type: dns.DnsRecordTypesEnum):
async def dns_checks(domain: CheckDomain, record_type: dns.DnsRecordTypesEnum):
record_type = record_type.lower()
checker = retrieve_check(f"dns_{record_type}")
return await checker(domain)
# @app.post("/check/", tags=["http", "dns"])
# async def all_url_checks(check: Union[http.CheckUrl, dns.CheckDomain]) -> list[dict]:
# async def all_url_checks(check: CheckDomain]) -> list[dict]:
# checks = ["http", "dns_a", "dns_aaaa", "dns_soa"]
# async def coro_wrapper(check_name, coro, check):
......
from isitup import app
import uvicorn
......
from pydantic import BaseModel
from fastapi import HTTPException
_functions = {}
......
""" Implement DNS based checks. """
import logging
from enum import Enum
from typing import Union
import dns
import dns.resolver as resolver
import dns.resolver
import pydantic
from isitup.checks import CheckErrorException
from isitup.http import CheckUrl
class CheckDomain(pydantic.BaseModel):
domain: pydantic.AnyUrl = pydantic.Field(
..., title="Domain", description="Domain to check the DNS records of."
)
class Config:
schema_extra = {
"example": {
"domain": "https://example.com",
}
}
from isitup.types import CheckDomain
LOG = logging.getLogger(F"{__name__}-check")
class DnsRecordTypesEnum(str, Enum):
a = "a"
......@@ -51,26 +39,42 @@ responses = {
"example": {"error": "No AAAA record found for example.com"}
}
},
}
},
598: {
"description": "Network timeout",
"content": {
"application/json": {
"example": {"error": "Fetching the A for example.com timed out."}
}
},
},
}
async def check_record_by_type(
domain: Union[CheckDomain, CheckUrl], record_type: str
domain: CheckDomain, record_type: str
) -> list[dict]:
try:
result = []
for record in resolver.query(domain, record_type):
for record in dns.resolver.query(domain, record_type):
data = {}
for attr in record.__slots__:
data[attr] = str(getattr(record, attr))
result.append(data)
return result
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN) as exc:
raise CheckErrorException(
404, f"No {record_type} record found for {domain.host}"
404, f"No {record_type} record found for {domain}, reason: {exc}"
)
except (dns.exception.Timeout) as exc:
raise CheckErrorException(
598, f"Fetching the {record_type} for {domain} timed out."
)
except dns.resolver.NoNameservers as exc:
LOG.fatal("No nameservers configured!")
raise CheckErrorException(
500, "Internal server error."
)
def register(register_callback: callable) -> None:
def wrap_check_record_by_type(record_type: str) -> callable:
......
......@@ -4,27 +4,7 @@ import httpx
import pydantic
from httpx import AsyncClient
from isitup.checks import CheckErrorException
def get_client():
return AsyncClient()
class CheckUrl(pydantic.BaseModel):
url: pydantic.AnyHttpUrl = pydantic.Field(
..., title="URL", description="Url to check the status of."
)
@property
def domain(self):
return self.url.host
class Config:
schema_extra = {
"example": {
"url": "https://example.com",
}
}
from isitup.types import CheckDomain
class HttpCheckResult(pydantic.BaseModel):
......@@ -50,14 +30,23 @@ responses = {
}
async def http_check(url: CheckUrl) -> HttpCheckResult:
async with get_client() as client:
async def _http_check(url: pydantic.HttpUrl) -> HttpCheckResult:
async with AsyncClient() as client:
try:
response = await client.get(url.url)
response = await client.get(url)
return {"status_code": response.status_code}
except httpx.ConnectError as exc:
raise CheckErrorException(521, f"Can't connect to {exc.request.url}")
async def http_check(domain: CheckDomain) -> HttpCheckResult:
return await _http_check(f"http://{domain.domain}")
async def https_check(domain: CheckDomain) -> HttpCheckResult:
return await _http_check(f"https://{domain.domain}")
def register(register_callback: callable) -> None:
register_callback(http_check, "http")
register_callback(https_check, "https")
import re
import urllib
from typing import Union
import pydantic
class DomainName(str):
"""
Domain name validation type, URLs can be passed, they will be parsed to domains.
"""
# regex to check whether there is a scheme
__url_regex = re.compile(r"^\w+://", re.IGNORECASE)
__domain_regex = re.compile(
r"^(?:[a-z\u00a1-\uffff0-9]-*)*[a-z\u00a1-\uffff0-9]+(?:\.(?:[a-z\u00a1-\uffff0-9]-*)*[a-z\u00a1-\uffff0-9]+)*$",
re.IGNORECASE,
)
@classmethod
def __get_validators__(cls):
# one or more validators may be yielded which will be called in the
# order to validate the input, each validator will receive as an input
# the value returned from the previous validator
yield cls.validate
@classmethod
def __modify_schema__(cls, field_schema):
# __modify_schema__ should mutate the dict it receives in place,
# the returned value will be ignored
field_schema.update(
# simplified pattern for brevity
pattern="[subdomain].domain.tld",
# some example domains
examples=["example.com", "site.example.com"],
)
@classmethod
def validate(cls, domain):
if not isinstance(domain, str):
raise TypeError("string required")
domain = cls.parse_domain(domain)
m = cls.__domain_regex.fullmatch(domain)
if not m:
raise ValueError("Invalid format for domain name")
# you could also return a string here which would mean model.domain
# would be a string, pydantic won't care but you could end up with some
# confusion since the value's type won't match the type annotation
# exactly
return cls(domain)
@classmethod
def parse_domain(cls, domain):
"""
Transform URLs to domains so they can pass validation.
"""
if not cls.__url_regex.match(domain):
# Assume it is a domain, the domain validator will check it.
return domain
domain = urllib.parse.urlparse(domain).hostname
if domain is None:
raise ValueError("Invalid format for domain name or URL")
return domain
def __repr__(self):
return f"DomainName({super().__repr__()})"
class CheckDomain(pydantic.BaseModel):
"""
Generic domain-like data model for posts to the API, accepts both domains and URLs.
"""
domain: DomainName = pydantic.Field(
..., title="Domain", description="Domain to check the DNS records of."
)
class Config:
schema_extra = {
"example": {
"domain": "example.com",
}
}
\ No newline at end of file
......@@ -3,6 +3,8 @@ black==20.8b1
asgi_lifespan
pytest==6.2.1
pytest-asyncio
pytest-mock
pytest-raises
pre-commit
pytest-watch
pytest-cov
......
from unittest import mock
import pytest
from asgi_lifespan import LifespanManager
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from httpx import AsyncClient
from isitup import app
......@@ -15,36 +15,3 @@ async def api() -> AsyncClient:
base_url="http://localhost:9000",
) as client:
yield client
mock_app = FastAPI()
@mock_app.get("/{status_code}/")
async def status(status_code: int):
if status_code < 200 or status_code > 599:
raise HTTPException(status_code=418)
raise HTTPException(status_code=status_code)
@pytest.fixture
def base_url() -> str:
return "https://testserver/"
@pytest.fixture
def _get_client(base_url: str) -> AsyncClient:
def get_client():
""" Make requests in our tests. """
return AsyncClient(
app=mock_app,
base_url=base_url,
)
return get_client
@pytest.fixture(autouse=True)
def mock_client(_get_client: callable, monkeypatch):
"""Remove requests.sessions.Session.request for all tests."""
monkeypatch.setattr("isitup.http.get_client", _get_client)
import pytest
from unittest import mock
from httpx import AsyncClient
from starlette.status import (
HTTP_404_NOT_FOUND,
HTTP_422_UNPROCESSABLE_ENTITY,
HTTP_200_OK,
)
class TestHttpChecks:
@pytest.mark.asyncio
async def test_post_empty(self, api: AsyncClient) -> None:
res = await api.post("/check/http/", json={})
assert res.status_code == HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_post_invalid(self, api: AsyncClient) -> None:
res = await api.post("/check/http/", json={"url": "ftp://greenhost.net"})
assert res.status_code == HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_post_to_200_url(self, api: AsyncClient, base_url: str) -> None:
res = await api.post("/check/http/", json={"url": f"{base_url}200"})
assert res.status_code == 200
assert res.json()["status_code"] == 200
@pytest.mark.asyncio
async def test_post_to_404_url(self, api: AsyncClient, base_url: str) -> None:
res = await api.post("/check/http/", json={"url": f"{base_url}404"})
assert res.status_code == 200
assert res.json()["status_code"] == 404
@pytest.mark.asyncio
async def test_post_to_500_url(self, api: AsyncClient, base_url: str) -> None:
res = await api.post("/check/http/", json={"url": f"{base_url}500"})
assert res.status_code == 200
assert res.json()["status_code"] == 500
class TestDnsChecks:
@pytest.mark.asyncio
async def test_post_empty(self, api: AsyncClient) -> None:
res = await api.post("/check/dns/a/", json={})
assert res.status_code == HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_post_invalid(self, api: AsyncClient) -> None:
res = await api.post("/check/dns/a/", json={"url": "ftp://greenhost.net"})
assert res.status_code == HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_post_url(self, api: AsyncClient, base_url: str) -> None:
res = await api.post("/check/dns/a/", json={"url": f"{base_url}404"})
assert res.status_code == 200
assert res.json()['status_code'] == 404
res = await api.post("/check/dns/", json={"url": f"{base_url}500"})
assert res.status_code == 200
assert res.json()['status_code'] == 500
\ No newline at end of file
import pytest
from isitup.checks import retrieve_check
def test_retrieve_non_existing_test() -> None:
with pytest.raises(ValueError):
retrieve_check("does-not-exist")
\ No newline at end of file
import pytest
from httpx import AsyncClient
from starlette import status
from isitup import __version__
@pytest.mark.asyncio
async def test_api_generation(api: AsyncClient) -> None:
res = await api.get("/openapi.json")
assert res.status_code == status.HTTP_200_OK
res.json()["info"]["version"] == __version__
from unittest import mock
import pytest
from pytest_mock import MockerFixture
from httpx import AsyncClient
from starlette import status
import dns
import dns.resolver
import isitup.dns
@pytest.fixture
def mock_dns_query_response():
def wrapped_mock_query_response(*args, **kwargs):
response = mock.Mock()
response.__slots__ = ["address"]
response.address = "127.0.0.1"
return [
response,
]
return wrapped_mock_query_response
@pytest.mark.asyncio
async def test_post_empty(api: AsyncClient) -> None:
res = await api.post("/check/dns/a/", json={})
assert res.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_post_not_domain(api: AsyncClient) -> None:
res = await api.post("/check/dns/a/", json={"domain": "!"})
assert res.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_post_query_timeout(api: AsyncClient) -> None:
with mock.patch("dns.resolver.query") as query:
query.side_effect = dns.exception.Timeout()
res = await api.post("/check/dns/a/", json={"domain": "example.com"})
# Custom status code used by large organisations unoffically
assert res.status_code == 598
assert "Fetching the A for example.com timed out." in res.json()["detail"]
@pytest.mark.asyncio
async def test_post_non_existent_domain(api: AsyncClient) -> None:
with mock.patch("dns.resolver.query") as query:
query.side_effect = dns.resolver.NXDOMAIN()
res = await api.post("/check/dns/a/", json={"domain": "nonexistentdomain.tld"})
assert res.status_code == status.HTTP_404_NOT_FOUND
assert "No A record found for nonexistentdomain.tld" in res.json()["detail"]
query.side_effect = dns.resolver.NoAnswer()
res = await api.post("/check/dns/a/", json={"domain": "nonexistentdomain.tld"})
assert res.status_code == status.HTTP_404_NOT_FOUND
assert "No A record found for nonexistentdomain.tld" in res.json()["detail"]
@pytest.mark.asyncio
async def test_post_no_nameserver(api: AsyncClient, mocker: MockerFixture) -> None:
mocker.patch("isitup.dns.LOG.fatal")
with mock.patch("dns.resolver.query") as query:
query.side_effect = dns.resolver.NoNameservers()
res = await api.post("/check/dns/a/", json={"domain": "nonexistentdomain.tld"})
isitup.dns.LOG.fatal.assert_called_once_with("No nameservers configured!")
assert res.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert "Internal server error." in res.json()["detail"]
@pytest.mark.asyncio
async def test_post_mocked_existing_domain(api: AsyncClient) -> None:
def mock_dns_query_response(*args, **kwargs):
response = mock.Mock()
response.__slots__ = ["address"]
response.address = "127.0.0.1"
return [
response,
]
with mock.patch("dns.resolver.query") as query:
query.side_effect = mock_dns_query_response
res = await api.post("/check/dns/a/", json={"domain": "existingdomain.tld"})
query.assert_called_once_with("existingdomain.tld", "A")
assert res.status_code == status.HTTP_200_OK
assert res.json()["record_type"] == "a"
assert res.json()["value"][0]["address"] == "127.0.0.1"
@pytest.mark.asyncio
async def test_post_mocked_bad_url(
api: AsyncClient, mock_dns_query_response: callable
) -> None:
with mock.patch("dns.resolver.query") as query:
query.side_effect = mock_dns_query_response
res = await api.post("/check/dns/a/", json={"domain": "https://"})
query.assert_not_called
assert res.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
assert "Invalid format for domain name or URL" in res.json()["detail"][0]["msg"]
@pytest.mark.asyncio
async def test_post_mocked_existing_url(
api: AsyncClient, mock_dns_query_response: callable
) -> None:
with mock.patch("dns.resolver.query") as query:
query.side_effect = mock_dns_query_response
res = await api.post(
"/check/dns/a/", json={"domain": "https://existingdomain.tld"}
)
query.assert_called_once_with("existingdomain.tld", "A")
assert res.status_code == status.HTTP_200_OK
assert res.json()["record_type"] == "a"
assert res.json()["value"][0]["address"] == "127.0.0.1"
import pytest
from httpx import AsyncClient, ConnectError
from pytest_mock import MockerFixture
from starlette import status
from fastapi import FastAPI, HTTPException
from unittest import mock
mock_app = FastAPI()
# Hack to be able to tell the mock app the expected status code during a test.
mock_app.expected_status_code = None
@mock_app.get("/")
async def status_code():
raise HTTPException(status_code=mock_app.expected_status_code)
@pytest.fixture
def base_url() -> str:
return "https://testserver/"
@pytest.fixture
def mock_client(base_url: str) -> callable:
""" Make requests in our tests. """
def MockAsyncClient():
return AsyncClient(
app=mock_app,
base_url=base_url,
)
return MockAsyncClient
@pytest.mark.asyncio
async def test_post_empty(api: AsyncClient) -> None:
res = await api.post("/check/http/", json={})
assert res.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
@pytest.mark.asyncio
async def test_post_invalid(api: AsyncClient) -> None:
res = await api.post("/check/http/", json={"domain": "!@#$%"})
assert res.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY