Skip to content

FQDN Field Validation with Pydantic

The Problem

Customers were configuring egress entries for external services using incorrect syntax. This would lead to validation errors from Istio when deploying microservice CRDs, so the controller that manages the CRD's owned resources would then begin generating errors when attempting to update resources that Istio's validatingwebhook would reject:

microserviceDeployment.yaml
...
DeploymentStrategy:
  ServiceDependencies:
    egress:
    - hosts:
      - external/some-external-serviceentry # (1)
...
  1. When the CRD is applied, various Istio resources are generated. Istio's DestinationRule resource expects host values that reference ServiceEntry resources to be formatted as namespace/service-entry-name.namespace.svc.cluster.local.

The microservice pipeline would validate that the field contained a list of strings:

src/service_dependency_schema.py
from pydantic import BaseModel, parse_obj_as
from Faker import Faker
from faker.providers import python
import pytest

class Egress(BaseModel, extra="forbid"):
    port: Optional[Port]
    hosts: list[str]


fake = Faker()
fake.add_provider(python)


@pytest.fixture
def egress_data():
    return {"hosts": [pystr()]}


def test_egress_schema(egress_data):
    # when
    parsed = parse_obj_as(Egress, egress_data)

    # then
    assert isinstance(parsed.hosts, list)
    assert isinstance(parsed.hosts[0], str)

The pipeline should be able to fail builds with invalid configurations, so the hosts field validation required a refactor.

The Solution

Validation of the hosts field is multi-faceted:

  1. The FQDN should meet RFC1035 standards
  2. Istio expects namespace/ to prefix each fqdn
  3. hosts values that represent ServiceEntry resources need to be fully qualified

Typically, this can be done using regex:

# This regex pattern validates FQDNs according to RFC1035:
^(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)

# This regex pattern validates the namespace is prefixed to the fQDN:
external\/.+.svc.cluster.local

# This regex combines the two:
^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local

Pydantic provides string constraint capabilities using constr(), which includes regex matching capabilities:

src/service_dependency_schema.py
from pydantic import BaseModel, constr, ValidationError, parse_obj_as
from Faker import Faker
from faker.providers import python
import pytest

class Egress(BaseModel, extra="forbid"):
    # Add regex
    regex_pattern = "^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local"
    port: Optional[Port]
    hosts: list[constr(regex=regex_pattern)] # (1)


fake = Faker()
fake.add_provider(python)


@pytest.fixture
def egress_data(valid=True): # (2)
    if valid:
        return {"hosts": [f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster.local"]} # (3)
    else:
        return {"hosts": [ # (4)
                f"external/{fake.pystr()}",
                f"external/{fake.pystr()}.{fake.pystr()}",
                f"external/{fake.pystr()}.{fake.pystr()}.svc",
                f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster",
                f"external/{fake.pystr()}.svc.cluster.local"
            ]
        }


def test_egress_schema(egress_data):
    # when
    parsed = parse_obj_as(Egress, egress_data)

    # then
    assert isinstance(parsed.hosts, list)
    assert isinstance(parsed.hosts[0], str)

@egress_data(valid=False) # (5)
def test_egress_hosts_fqdn_raises_validation_error_on_invalid_entries(egress_data):
    # when
    with pytest.Raises(ValidationError, match="string does not match regex")
    parsed = parse_obj_as(Egress, egress_data)

    # then
    assert isinstance(parsed.hosts, list)
    assert isinstance(parsed.hosts[0], str)
  1. Update the hosts type to use constr() with regex pattern matching
  2. Update egress_data() fixture to output either valid or invalid data
  3. valid output will pass Istio validation
  4. Invalid output added
  5. Test case added to test ValidationError is raised on invalid hosts

However, the ValidationError message is miserable to parse:

Expected Validation Error: 'string does not match regex "^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local" (type=value_error.str.regex; pattern=^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local

Furthermore, flake8 validation fails when using Pydantic's constr() function, so it cannot be the way that the constraints can be validated.

Flake8 Validation

Fixing flake8 validation is relatively simple:

src/service_dependency_schema.py
...

class Egress(BaseModel, extra="forbid"):
    regex_pattern = "^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local"
    port: Optional[Port]
    hosts: list[str] = Field(regex=regex_pattern) # (1)


...
  1. Instead of constr(), using Field() allows flake8 validations to pass successfully.

The next step is to tackle the ValidationError message.

Breaking Up the Regex

The regex pattern being used does the following:

  1. ^\*\/ - If the FQDN is */ prefixed:
    1. (?!:\/\/) - Disallow URL schema prefix
    2. (?=.{1,255}$) - FQDN character constraints between 1 and 255
    3. ((.{1,63}\.) - Domain label character constraints between 1 and 63
    4. {1,127} - FQDN Domain label constraints between 1 and 127
    5. (?![0-9]*$)[a-z0-9-]+ - TLD cannot consist of numbers
    6. \.? - no more than one optional trailing dot
  2. external\/.+.svc.cluster.local - If the FQDN is external/ prefixed, ensure the name is fully qualified with the local cluster dns suffix

In its current form, the regex pattern raises an error with a single error message that says the field value was not matched by the regex pattern. If the regex was broken up and used to perform individual validations, reasons for validation could be far more actionable:

src/service_dependency_schema.py
...
class Egress(BaseModel, extra="forbid"):
    port: Optional[Port]
    hosts: list[str] # (1)

    @validator("hosts")
    def hosts_fqdn_must_be_valid(cls, v): # (2)

        for host in v: # (3)
            no_slash_prefix_regex_pattern = re.compile("^(\*|external)\/.+$")
            if not no_slash_prefix_regex_pattern.match(host):
                raise ValueError("Invalid FQDN.  Must include '*/' or 'external/' prefix.")

            no_http_schema_regex_pattern = re.compile("^(\*|external)\/((?!.+:\/\/).+)$")
            if not no_http_schema_regex_pattern.match(host):
                raise ValueError(
                    f"Host FQDN cannot begin with '://' schema."
                )

            host_min_max_domain_levels_regex_pattern = re.compile(
                "^(\*|external)\/([A-Za-z0-9-]+\.){1,127}[A-Za-z0-9-]+\.?$"
            )
            if not host_min_max_domain_levels_regex_pattern.match(host):
                raise ValueError(
                    f"Valid number of domain labels for host FQDN between 1 and 127."
                )

            host_min_max_characters_regex_pattern = re.compile("^(\*|external)\/((?=.{1,255}$).+$)")
            if not host_min_max_characters_regex_pattern.match(host):
                raise ValueError(
                    f"Valid character length for host FQDN between 1 and 255."
                )

            host_min_max_domain_label_regex_pattern = re.compile("^(\*|external)\/((.{1,63}\.).+$)")
            if not host_min_max_domain_label_regex_pattern.match(host):
                raise ValueError(
                    f"Valid character length for each domain label between 1-63."
                )

            tld_does_not_begin_with_or_contain_only_numerics_regex_pattern = re.compile(
                "^(\*|external)\/(.+\.(?![0-9])[A-Za-z0-9-]+)\.?$"
            )
            if not tld_does_not_begin_with_or_contain_only_numerics_regex_pattern.match(host):
                raise ValueError(
                    f"TLD must not begin with a numeric, or contain only numerics."
                )

            external_slashed_fqdn_regex_pattern = re.compile("^external\/.+$")
            if external_slashed_fqdn_regex_pattern.match(host):
                external_slashed_fqdn_must_contain_service_and_namespace_domain_labels_regex_pattern = re.compile(
                    "^external\/[A-Za-z0-9-]+\.(?=svc\.cluster\.local$)"
                )
                if external_slashed_fqdn_must_contain_service_and_namespace_domain_labels_regex_pattern.match(
                    host
                ):
                    raise ValueError(
                        f"'external/' host FQDNs must include service and namespace domain labels."
                    )

                external_slahed_fqdn_must_have_cluster_dns_suffix_regex_pattern = re.compile(
                    "^external\/.+\.svc\.cluster\.local$"
                )
                if not external_slahed_fqdn_must_have_cluster_dns_suffix_regex_pattern.match(host):
                    raise ValueError(
                        f"'external/' host FQDNs must contain the cluster DNS suffix."
                    )

        return v
...
  1. Since the Field class constraint capabilities do not meet the solution requirements, it is removed and normal type validation is used instead.
  2. The hosts_fqdn_must_be_valid uses the validator decorator, which pydantic will run each time it perform schema validation.
  3. The hosts_fqdn_must_be_valid validator method loops through each hosts value, and performs regex matches through nested if statements, which is never great, and should be refactored as soon as pydantic's validation capabilities are better understood.

Additionally, the unit tests are dramatically expanded:

tests/unit/test_service_dependency_schema.py
from pydantic import ValidationError, parse_obj_as
from src.service_dependency_schema import Egress
from Faker import Faker
from faker.providers import python, internet
import pytest

fake = Faker()
fake.add_provider(python)
fake.add_provider(internet)


@pytest.fixture
def egress_data(valid=True):
    if valid:
        return {"hosts": [f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster.local"]}
    else:
        return {"hosts": [
                f"external/{fake.pystr()}",
                f"external/{fake.pystr()}.{fake.pystr()}",
                f"external/{fake.pystr()}.{fake.pystr()}.svc",
                f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster",
                f"external/{fake.pystr()}.svc.cluster.local"
            ]
        }


def test_egress_schema(egress_data):
    # when
    parsed = parse_obj_as(Egress, egress_data)

    # then
    assert isinstance(parsed.hosts, list)
    assert isinstance(parsed.hosts[0], str)

@pytest.mark.parametrize(
    "fqdn_schema_data",
    [f"{generate_string()}.{generate_string()}.svc.cluster.local", fake.domain_name()],
)
def test_egress_disallow_hosts_without_slash_prefix(fqdn_schema_data):
    # when
    egress_data = {"hosts": [fqdn_schema_data]}

    # then
    with pytest.raises(
        ValidationError, match=r"Invalid FQDN.  Must include '\*\/' or 'external\/' prefix."
    ):
        parse_obj_as(Egress, egress_data)


@pytest.mark.parametrize(
    "fqdn_schema_data",
    [
        f"*/https://{fake.domain_name()}",
        f"*/http://{fake.domain_name()}",
        f"*/tcp://{fake.domain_name()}",
        f"*/{fake.pystr()}://{fake.domain_name()}",
        f"external/http://{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
        f"external/https://{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
        f"external/tcp://{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
        f"external/{fake.pystr()}://{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
    ],
)
def test_egress_disallow_hosts_with_url_schema(fqdn_schema_data):
    # when
    egress_data = {"hosts": [fqdn_schema_data]}

    # then
    with pytest.raises(ValidationError, match="Host FQDN cannot begin with '://' schema."):
        parse_obj_as(Egress, egress_data)


@pytest.mark.parametrize(
    "fqdn_schema_data",
    [
        f"external/{fake.pystr(min_chars=128, max_chars=128)}.{fake.pystr(min_chars=128, max_chars=128)}.svc.cluster.local",
        f"*/{fake.pystr(min_chars=128, max_chars=128)}.{fake.pystr(min_chars=128, max_chars=128)}.org",
    ],
)
def test_egress_disallow_hosts_above_max_characters(fqdn_schema_data):
    # when
    egress_data = {"hosts": [fqdn_schema_data]}

    # then
    with pytest.raises(
        ValidationError, match="Valid character length for host FQDN between 1 and 255."
    ):
        parse_obj_as(Egress, egress_data)


@pytest.mark.parametrize(
    "fqdn_schema_data",
    [
        f"external/{fake.pystr(min_chars=65, max_chars=65)}.{fake.pystr()}.svc.cluster.local",
        f"*/{fake.pystr(min_chars=65, max_chars=65)}.{fake.domain_name()}",
    ],
)
def test_egress_disallow_hosts_with_domain_labels_above_maximum_characters(fqdn_schema_data):
    # when
    egress_data = {"hosts": [fqdn_schema_data]}

    # then
    with pytest.raises(
        ValidationError, match="Valid character length for each domain label between 1-63."
    ):
        parse_obj_as(Egress, egress_data)


@pytest.mark.parametrize(
    "fqdn_schema_data",
    [f"*/{(fake.pystr(max_chars=1) + '.') * 129 + fake.pystr(max_chars=1)}"],
)
def test_egress_disallow_hosts_with_domain_labels_above_maximum(fqdn_schema_data):
    # when
    egress_data = {"hosts": [fqdn_schema_data]}

    # then
    with pytest.raises(
        ValidationError, match="Valid number of domain labels for host FQDN between 1 and 127."
    ):
        parse_obj_as(Egress, egress_data)


@pytest.mark.parametrize(
    "fqdn_schema_data",
    [
        f"*/{fake.pystr()}.{fake.pyint(minval=0)}",
        f"*/{fake.pystr()}.{fake.pyint(minval=0)}{fake.pystr()}",
        f"external/{fake.pystr()}.{fake.pyint(minval=0)}",
        f"external/{fake.pystr()}.{fake.pyint(minval=0)}{fake.pystr()}",
    ],
)
def test_egress_disallow_hosts_if_tld_begins_with_numeric_or_contains_only_numerics(
    fqdn_schema_data,
):
    # when
    egress_data = {"hosts": [fqdn_schema_data]}

    # then
    with pytest.raises(ValidationError, match="TLD must not begin with a numeric."):
        parse_obj_as(Egress, egress_data)


@pytest.mark.parametrize("fqdn_schema_data", [f"external/{fake.pystr()}.svc.cluster.local"])
def test_egress_disallow_external_hosts_with_missing_service_or_namespace_domain_labels(
    fqdn_schema_data,
):
    # when
    egress_data = {"hosts": [fqdn_schema_data]}

    # then
    with pytest.raises(
        ValidationError,
        match="'external/' host FQDNs must include service and namespace domain labels",
    ):
        parse_obj_as(Egress, egress_data)


@pytest.mark.parametrize(
    "fqdn_schema_data",
    [
        f"external/{fake.pystr()}.{fake.pystr()}",
        f"external/{fake.pystr()}.{fake.pystr()}.svc",
        f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster",
    ],
)
def test_egress_disallow_external_hosts_with_missing_cluster_dns_suffix(fqdn_schema_data):
    # when
    egress_data = {"hosts": [fqdn_schema_data]}

    # then
    with pytest.raises(
        ValidationError, match="'external/' host FQDNs must contain the cluster DNS suffix."
    ):
        parse_obj_as(Egress, egress_data)


@pytest.mark.parametrize(
    "egress_schema_data",
    [
        f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
        f"external/{(fake.pystr(max_chars=63, min_chars=63) + '.') * 2}svc.cluster.local",
        f"external/{(fake.pystr(max_chars=1) + '.') * 2}svc.cluster.local",
        f"external/{(fake.pystr(max_chars=1) + '.') * 50}svc.cluster.local",
        f"*/{fake.domain_name()}",
        f"*/{fake.pystr(max_chars=63, min_chars=63)}.com",
        f"*/{(fake.pystr(max_chars=1) + '.') * 50 + fake.pystr(max_chars=1)}",
        f"*/{fake.domain_name()}{fake.pyint()}",
        f"*/{fake.domain_name()}.",
    ],
)
def test_egress_valid_hosts_fqdn(egress_schema_data):
    # when

    egress_data = {"hosts": [egress_schema_data]}
    parsed = parse_obj_as(Egress, egress_data)

    # then
    assert isinstance(parsed.hosts[0], str)

The test cases are in a good enough state that refactoring the code should be relatively simple.

Refactoring

There are a number of issues with the code:

  1. Nested if statement introduce unnecessary complexity. The nested statements can be eliminated by creating a function that validates each scenario, with an additional benefit of eliminating duplicate code.
  2. For loop iterates through each value in hosts. If there is a way that the validator method could validate each item in the field, it would drastically reduce code complexity.
  3. Each scenario is not standardized. During the refactor, if each scenario's inputs and outputs can be standardized, this would make the code a bit more readable.

Standardizing each function is the first step in the refactor. Each scenario has expected inputs and outputs, although some require assert statements, and others require assert not statements. Each scenario expects a regex pattern and an error message to be passed in as input, and will raise a value error if the scenario fails:

src/service_dependency_schema.py
...
class Egress(BaseModel, extra="forbid"):
    port: Optional[Port]
    hosts: list[str]

    @staticmethod   # (1)
    def fqdn_has_slash_prefix(fqdn):
        message = f"Invalid FQDN.  Must include '*/' or 'external/' prefix."
        try:
            assert re.match("^(\*|external)\/.+$", fqdn)
        except AssertionError:
            raise ValueError(message) from None

    @staticmethod
    def disallow_url_schema(fqdn):
        message = f"Host FQDN cannot begin with '://' schema."
        try:
            assert re.match("^(\*|external)\/((?!.+:\/\/).+)$", fqdn)
        except AssertionError:
            raise ValueError(message) from None

    @staticmethod
    def fqdn_domain_label_constraints(fqdn):
        message = f"Valid number of domain labels for host FQDN between 1 and 127."
        try:
            assert re.match("^(\*|external)\/([A-Za-z0-9-]+\.){1,127}[A-Za-z0-9-]+\.?$", fqdn)
        except AssertionError:
            raise ValueError(message) from None

    @staticmethod
    def fqdn_character_length(fqdn):
        message = f"Valid character length for host FQDN between 1 and 255."
        try:
            assert re.match("^(\*|external)\/((?=.{1,255}$).+$)", fqdn)
        except AssertionError:
            raise ValueError(message) from None

    @staticmethod
    def fqdn_domain_label_character_length(fqdn):
        message = f"Valid character length for each domain label between 1-63."
        try:
            assert re.match("^(\*|external)\/((.{1,63}\.).+$)", fqdn)
        except AssertionError:
            raise ValueError(message) from None

    @staticmethod
    def tld_numerics_constraints(fqdn):
        message = f"TLD must not begin with a numeric, or contain only numerics."
        try:
            assert re.match("^(\*|external)\/(.+\.(?![0-9])[A-Za-z0-9-]+)\.?$", fqdn)
        except AssertionError:
            raise ValueError(message) from None

    @staticmethod
    def external_slashed_fqdn_has_service_and_namespace_domain_labels(fqdn):
        message = f"'external/' host FQDNs must include service and namespace domain labels."
        try:
            assert not re.match("^external\/[A-Za-z0-9-]+\.(?=svc\.cluster\.local$)", fqdn)
        except AssertionError:
            raise ValueError(message) from None

    @staticmethod
    def external_slashed_fqdn_has_cluster_dns_suffix(fqdn):
        message = (
            f"'external/' host FQDNs must contain the cluster DNS suffix."

    @validator("hosts", each_item=True) # (2)
    def hosts_fqdn_must_be_valid(cls, v):
        cls.fqdn_has_slash_prefix(v)
        cls.disallow_url_schema(v)
        cls.fqdn_domain_label_constraints(v)
        cls.fqdn_character_length(v)
        cls.fqdn_domain_label_character_length(v)
        cls.tld_numerics_constraints(v)
        if re.match("^external\/.+$", v):
            cls.external_slashed_fqdn_has_service_and_namespace_domain_labels(v)
            cls.external_slashed_fqdn_has_cluster_dns_suffix(v)

        return v
...
  1. Static methods are created for each validation scenario to reduce duplicate code
  2. The hosts_fqdn_must_be_valid validator method now validates against each item in the hosts field. Additionally, since the static methods are being called, nested statements are nearly eliminated

Additionally, the validator decorator allows validation of iterable fields with the each_item parameter. Setting each_item=True eliminates the for loop from the first iteration of code. The next step now is to improve the way that input data for each scenario is handled. This can be handled like so:

src/constants.py
# Each constant contains the following:
# (regex_pattern, error_message)
FQDN_HAS_SLASH_PREFIX = (
    "^(\*|external)\/.+$",
    f"Invalid FQDN.  Must include '*/' or 'external/' prefix.",
)
DISALLOW_URL_SCHEMA = (
    "^(\*|external)\/((?!.+:\/\/).+)$",
    f"Host FQDN cannot begin with '://' schema.",
)
FQDN_DOMAIN_LABEL_CONSTRAINTS = (
    "^(\*|external)\/([A-Za-z0-9-]+\.){1,127}[A-Za-z0-9-]+\.?$",
    f"Valid number of domain labels for host FQDN between 1 and 127.",
)
FQDN_DOMAIN_LABEL_CHARACTER_LENGTH = (
    "^(\*|external)\/((.{1,63}\.).+$)",
    f"Valid character length for each domain label between 1-63.",
)
FQDN_CHARACTER_LENGTH = (
    "^(\*|external)\/((?=.{1,255}$).+$)",
    f"Valid character length for host FQDN between 1 and 255.",
)
TLD_NUMERIC_CONSTRAINTS = (
    "^(\*|external)\/(.+\.(?![0-9])[A-Za-z0-9-]+)\.?$",
    f"TLD must not begin with a numeric, or contain only numerics.",
)
EXTERNAL_SLASHED_FQDN_HAS_SERVICE_AND_NAMESPACE_DOMAIN_LABELS = (
    "^external\/[A-Za-z0-9-]+\.(?=svc\.cluster\.local$)",
    f"'external/' host FQDNs must include service and namespace domain labels.",
)
EXTERNAL_SLASHED_FQDN_HAS_CLUSTER_DNS_SUFFIX = (
    "^external\/.+\.svc\.cluster\.local$",
    f"'external/' host FQDNs must contain the cluster DNS suffix.",
)

Adding clarity to the contant variable names allows for more readable code. When reviewing the function fqdn_has_slash_prefix(v), one can guess that the function checks that an fqdn has a slash prefix. But if we ran a single function called validate_hosts_fqdn, the code readability increases when used in conjunction with well-named variables. Therefore, validate_hosts_fqdn(FQDN_HAS_SLASH_PREFIX) adds an extra bit of context to what it is doing.

The Final Result

Reducing the amount of duplicate code to a single static method greatly improves readability:

src/service_dependency_schema.py
from pydantic import BaseModel, ValidationError, validation, parse_obj_as

from constants import (
    DISALLOW_URL_SCHEMA,
    EXTERNAL_SLASHED_FQDN_HAS_CLUSTER_DNS_SUFFIX,
    EXTERNAL_SLASHED_FQDN_HAS_SERVICE_AND_NAMESPACE_DOMAIN_LABELS,
    FQDN_CHARACTER_LENGTH,
    FQDN_DOMAIN_LABEL_CHARACTER_LENGTH,
    FQDN_DOMAIN_LABEL_CONSTRAINTS,
    FQDN_HAS_SLASH_PREFIX,
    LINK_TO_DOCUMENTATION,
    TLD_NUMERIC_CONSTRAINTS,
)


class Egress(BaseModel, extra="forbid"):
    port: Optional[Port]
    hosts: list[str]

    @staticmethod
    def validate_hosts_fqdn(fqdn, scenario, assert_not=False): # (1)
        pattern = scenario[0]
        message = scenario[1]
        try:
            if assert_not:
                assert not re.match(pattern, fqdn)
            else:
                assert re.match(pattern, fqdn)
        except AssertionError:
            raise ValueError(message) from None

    @validator("hosts", each_item=True)
    def hosts_fqdn_must_be_valid(cls, v):                       # (2)
        cls.validate_hosts_fqdn(v, FQDN_HAS_SLASH_PREFIX)
        cls.validate_hosts_fqdn(v, DISALLOW_URL_SCHEMA)
        cls.validate_hosts_fqdn(v, FQDN_DOMAIN_LABEL_CONSTRAINTS)
        cls.validate_hosts_fqdn(v, FQDN_CHARACTER_LENGTH)
        cls.validate_hosts_fqdn(v, FQDN_DOMAIN_LABEL_CHARACTER_LENGTH)
        cls.validate_hosts_fqdn(v, TLD_NUMERIC_CONSTRAINTS)
        if re.match("^external\/.+$", v):
            cls.validate_hosts_fqdn(
                v, EXTERNAL_SLASHED_FQDN_HAS_SERVICE_AND_NAMESPACE_DOMAIN_LABELS, assert_not=True
            )
            cls.validate_hosts_fqdn(v, EXTERNAL_SLASHED_FQDN_HAS_CLUSTER_DNS_SUFFIX)

        return v
  1. The validate_hosts_fqdn static method receives the scenario as input, then asserts whether the regex pattern matches, and if it doesn't, it raises a ValueError. During processing, pydantic raises a ValidationError, so the unit tests cases must reflect that.
  2. The hosts_fqdn_must_be_valid validator method runs through all scenarios, and raises a ValidationError for each invalid hosts entry, so multiple invalid entries return multiple ValidationErrors

Custom resources with hosts entries will now be validated, and the CRD controller error messages should now be eliminated.

Conclusion

While primiarily used to integrate with openapi-compliant web frameworks like fastapi, Pydantic is a fantastic tool to use for CRD schema validation as well.