FQDN Field Validation with Pydantic
The Problem
Customers were configuring egress entries for external services using incorrect syntax. This would lead to validation errors from Istio when deploying microservice CRDs, so the controller that manages the CRD's owned resources would then begin generating errors when attempting to update resources that Istio's validatingwebhook
would reject:
microserviceDeployment.yaml |
---|
| ...
DeploymentStrategy:
ServiceDependencies:
egress:
- hosts:
- external/some-external-serviceentry # (1)
...
|
- When the CRD is applied, various Istio resources are generated. Istio's
DestinationRule
resource expects host
values that reference ServiceEntry
resources to be formatted as namespace/service-entry-name.namespace.svc.cluster.local
.
The microservice pipeline would validate that the field contained a list of strings:
src/service_dependency_schema.py |
---|
| from pydantic import BaseModel, parse_obj_as
from Faker import Faker
from faker.providers import python
import pytest
class Egress(BaseModel, extra="forbid"):
port: Optional[Port]
hosts: list[str]
fake = Faker()
fake.add_provider(python)
@pytest.fixture
def egress_data():
return {"hosts": [pystr()]}
def test_egress_schema(egress_data):
# when
parsed = parse_obj_as(Egress, egress_data)
# then
assert isinstance(parsed.hosts, list)
assert isinstance(parsed.hosts[0], str)
|
The pipeline should be able to fail builds with invalid configurations, so the hosts
field validation required a refactor.
The Solution
Validation of the hosts
field is multi-faceted:
- The FQDN should meet RFC1035 standards
- Istio expects
namespace/
to prefix each fqdn
hosts
values that represent ServiceEntry
resources need to be fully qualified
Typically, this can be done using regex:
# This regex pattern validates FQDNs according to RFC1035:
^(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)
# This regex pattern validates the namespace is prefixed to the fQDN:
external\/.+.svc.cluster.local
# This regex combines the two:
^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local
Pydantic provides string constraint capabilities using constr()
, which includes regex matching capabilities:
src/service_dependency_schema.py |
---|
| from pydantic import BaseModel, constr, ValidationError, parse_obj_as
from Faker import Faker
from faker.providers import python
import pytest
class Egress(BaseModel, extra="forbid"):
# Add regex
regex_pattern = "^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local"
port: Optional[Port]
hosts: list[constr(regex=regex_pattern)] # (1)
fake = Faker()
fake.add_provider(python)
@pytest.fixture
def egress_data(valid=True): # (2)
if valid:
return {"hosts": [f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster.local"]} # (3)
else:
return {"hosts": [ # (4)
f"external/{fake.pystr()}",
f"external/{fake.pystr()}.{fake.pystr()}",
f"external/{fake.pystr()}.{fake.pystr()}.svc",
f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster",
f"external/{fake.pystr()}.svc.cluster.local"
]
}
def test_egress_schema(egress_data):
# when
parsed = parse_obj_as(Egress, egress_data)
# then
assert isinstance(parsed.hosts, list)
assert isinstance(parsed.hosts[0], str)
@egress_data(valid=False) # (5)
def test_egress_hosts_fqdn_raises_validation_error_on_invalid_entries(egress_data):
# when
with pytest.Raises(ValidationError, match="string does not match regex")
parsed = parse_obj_as(Egress, egress_data)
# then
assert isinstance(parsed.hosts, list)
assert isinstance(parsed.hosts[0], str)
|
- Update the
hosts
type to use constr() with regex pattern matching
- Update
egress_data()
fixture to output either valid or invalid data
- valid output will pass Istio validation
- Invalid output added
- Test case added to test
ValidationError
is raised on invalid hosts
However, the ValidationError
message is miserable to parse:
Expected Validation Error: 'string does not match regex "^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local" (type=value_error.str.regex; pattern=^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local
Furthermore, flake8
validation fails when using Pydantic's constr()
function, so it cannot be the way that the constraints can be validated.
Flake8 Validation
Fixing flake8
validation is relatively simple:
src/service_dependency_schema.py |
---|
| ...
class Egress(BaseModel, extra="forbid"):
regex_pattern = "^\*\/(?!:\/\/)(?=.{1,255}$)((.{1,63}\.){1,127}(?![0-9]*$)[a-z0-9-]+\.?)|external\/.+.svc.cluster.local"
port: Optional[Port]
hosts: list[str] = Field(regex=regex_pattern) # (1)
...
|
- Instead of
constr()
, using Field()
allows flake8
validations to pass successfully.
The next step is to tackle the ValidationError
message.
Breaking Up the Regex
The regex pattern being used does the following:
^\*\/
- If the FQDN is */
prefixed:
(?!:\/\/)
- Disallow URL schema prefix
(?=.{1,255}$)
- FQDN character constraints between 1 and 255
((.{1,63}\.)
- Domain label character constraints between 1 and 63
{1,127}
- FQDN Domain label constraints between 1 and 127
(?![0-9]*$)[a-z0-9-]+
- TLD cannot consist of numbers
\.?
- no more than one optional trailing dot
external\/.+.svc.cluster.local
- If the FQDN is external/
prefixed, ensure the name is fully qualified with the local cluster dns suffix
In its current form, the regex pattern raises an error with a single error message that says the field value was not matched by the regex pattern. If the regex was broken up and used to perform individual validations, reasons for validation could be far more actionable:
src/service_dependency_schema.py |
---|
| ...
class Egress(BaseModel, extra="forbid"):
port: Optional[Port]
hosts: list[str] # (1)
@validator("hosts")
def hosts_fqdn_must_be_valid(cls, v): # (2)
for host in v: # (3)
no_slash_prefix_regex_pattern = re.compile("^(\*|external)\/.+$")
if not no_slash_prefix_regex_pattern.match(host):
raise ValueError("Invalid FQDN. Must include '*/' or 'external/' prefix.")
no_http_schema_regex_pattern = re.compile("^(\*|external)\/((?!.+:\/\/).+)$")
if not no_http_schema_regex_pattern.match(host):
raise ValueError(
f"Host FQDN cannot begin with '://' schema."
)
host_min_max_domain_levels_regex_pattern = re.compile(
"^(\*|external)\/([A-Za-z0-9-]+\.){1,127}[A-Za-z0-9-]+\.?$"
)
if not host_min_max_domain_levels_regex_pattern.match(host):
raise ValueError(
f"Valid number of domain labels for host FQDN between 1 and 127."
)
host_min_max_characters_regex_pattern = re.compile("^(\*|external)\/((?=.{1,255}$).+$)")
if not host_min_max_characters_regex_pattern.match(host):
raise ValueError(
f"Valid character length for host FQDN between 1 and 255."
)
host_min_max_domain_label_regex_pattern = re.compile("^(\*|external)\/((.{1,63}\.).+$)")
if not host_min_max_domain_label_regex_pattern.match(host):
raise ValueError(
f"Valid character length for each domain label between 1-63."
)
tld_does_not_begin_with_or_contain_only_numerics_regex_pattern = re.compile(
"^(\*|external)\/(.+\.(?![0-9])[A-Za-z0-9-]+)\.?$"
)
if not tld_does_not_begin_with_or_contain_only_numerics_regex_pattern.match(host):
raise ValueError(
f"TLD must not begin with a numeric, or contain only numerics."
)
external_slashed_fqdn_regex_pattern = re.compile("^external\/.+$")
if external_slashed_fqdn_regex_pattern.match(host):
external_slashed_fqdn_must_contain_service_and_namespace_domain_labels_regex_pattern = re.compile(
"^external\/[A-Za-z0-9-]+\.(?=svc\.cluster\.local$)"
)
if external_slashed_fqdn_must_contain_service_and_namespace_domain_labels_regex_pattern.match(
host
):
raise ValueError(
f"'external/' host FQDNs must include service and namespace domain labels."
)
external_slahed_fqdn_must_have_cluster_dns_suffix_regex_pattern = re.compile(
"^external\/.+\.svc\.cluster\.local$"
)
if not external_slahed_fqdn_must_have_cluster_dns_suffix_regex_pattern.match(host):
raise ValueError(
f"'external/' host FQDNs must contain the cluster DNS suffix."
)
return v
...
|
- Since the
Field
class constraint capabilities do not meet the solution requirements, it is removed and normal type validation is used instead.
- The
hosts_fqdn_must_be_valid
uses the validator
decorator, which pydantic will run each time it perform schema validation.
- The
hosts_fqdn_must_be_valid
validator method loops through each hosts
value, and performs regex matches through nested if statements, which is never great, and should be refactored as soon as pydantic's validation capabilities are better understood.
Additionally, the unit tests are dramatically expanded:
tests/unit/test_service_dependency_schema.py |
---|
| from pydantic import ValidationError, parse_obj_as
from src.service_dependency_schema import Egress
from Faker import Faker
from faker.providers import python, internet
import pytest
fake = Faker()
fake.add_provider(python)
fake.add_provider(internet)
@pytest.fixture
def egress_data(valid=True):
if valid:
return {"hosts": [f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster.local"]}
else:
return {"hosts": [
f"external/{fake.pystr()}",
f"external/{fake.pystr()}.{fake.pystr()}",
f"external/{fake.pystr()}.{fake.pystr()}.svc",
f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster",
f"external/{fake.pystr()}.svc.cluster.local"
]
}
def test_egress_schema(egress_data):
# when
parsed = parse_obj_as(Egress, egress_data)
# then
assert isinstance(parsed.hosts, list)
assert isinstance(parsed.hosts[0], str)
@pytest.mark.parametrize(
"fqdn_schema_data",
[f"{generate_string()}.{generate_string()}.svc.cluster.local", fake.domain_name()],
)
def test_egress_disallow_hosts_without_slash_prefix(fqdn_schema_data):
# when
egress_data = {"hosts": [fqdn_schema_data]}
# then
with pytest.raises(
ValidationError, match=r"Invalid FQDN. Must include '\*\/' or 'external\/' prefix."
):
parse_obj_as(Egress, egress_data)
@pytest.mark.parametrize(
"fqdn_schema_data",
[
f"*/https://{fake.domain_name()}",
f"*/http://{fake.domain_name()}",
f"*/tcp://{fake.domain_name()}",
f"*/{fake.pystr()}://{fake.domain_name()}",
f"external/http://{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
f"external/https://{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
f"external/tcp://{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
f"external/{fake.pystr()}://{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
],
)
def test_egress_disallow_hosts_with_url_schema(fqdn_schema_data):
# when
egress_data = {"hosts": [fqdn_schema_data]}
# then
with pytest.raises(ValidationError, match="Host FQDN cannot begin with '://' schema."):
parse_obj_as(Egress, egress_data)
@pytest.mark.parametrize(
"fqdn_schema_data",
[
f"external/{fake.pystr(min_chars=128, max_chars=128)}.{fake.pystr(min_chars=128, max_chars=128)}.svc.cluster.local",
f"*/{fake.pystr(min_chars=128, max_chars=128)}.{fake.pystr(min_chars=128, max_chars=128)}.org",
],
)
def test_egress_disallow_hosts_above_max_characters(fqdn_schema_data):
# when
egress_data = {"hosts": [fqdn_schema_data]}
# then
with pytest.raises(
ValidationError, match="Valid character length for host FQDN between 1 and 255."
):
parse_obj_as(Egress, egress_data)
@pytest.mark.parametrize(
"fqdn_schema_data",
[
f"external/{fake.pystr(min_chars=65, max_chars=65)}.{fake.pystr()}.svc.cluster.local",
f"*/{fake.pystr(min_chars=65, max_chars=65)}.{fake.domain_name()}",
],
)
def test_egress_disallow_hosts_with_domain_labels_above_maximum_characters(fqdn_schema_data):
# when
egress_data = {"hosts": [fqdn_schema_data]}
# then
with pytest.raises(
ValidationError, match="Valid character length for each domain label between 1-63."
):
parse_obj_as(Egress, egress_data)
@pytest.mark.parametrize(
"fqdn_schema_data",
[f"*/{(fake.pystr(max_chars=1) + '.') * 129 + fake.pystr(max_chars=1)}"],
)
def test_egress_disallow_hosts_with_domain_labels_above_maximum(fqdn_schema_data):
# when
egress_data = {"hosts": [fqdn_schema_data]}
# then
with pytest.raises(
ValidationError, match="Valid number of domain labels for host FQDN between 1 and 127."
):
parse_obj_as(Egress, egress_data)
@pytest.mark.parametrize(
"fqdn_schema_data",
[
f"*/{fake.pystr()}.{fake.pyint(minval=0)}",
f"*/{fake.pystr()}.{fake.pyint(minval=0)}{fake.pystr()}",
f"external/{fake.pystr()}.{fake.pyint(minval=0)}",
f"external/{fake.pystr()}.{fake.pyint(minval=0)}{fake.pystr()}",
],
)
def test_egress_disallow_hosts_if_tld_begins_with_numeric_or_contains_only_numerics(
fqdn_schema_data,
):
# when
egress_data = {"hosts": [fqdn_schema_data]}
# then
with pytest.raises(ValidationError, match="TLD must not begin with a numeric."):
parse_obj_as(Egress, egress_data)
@pytest.mark.parametrize("fqdn_schema_data", [f"external/{fake.pystr()}.svc.cluster.local"])
def test_egress_disallow_external_hosts_with_missing_service_or_namespace_domain_labels(
fqdn_schema_data,
):
# when
egress_data = {"hosts": [fqdn_schema_data]}
# then
with pytest.raises(
ValidationError,
match="'external/' host FQDNs must include service and namespace domain labels",
):
parse_obj_as(Egress, egress_data)
@pytest.mark.parametrize(
"fqdn_schema_data",
[
f"external/{fake.pystr()}.{fake.pystr()}",
f"external/{fake.pystr()}.{fake.pystr()}.svc",
f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster",
],
)
def test_egress_disallow_external_hosts_with_missing_cluster_dns_suffix(fqdn_schema_data):
# when
egress_data = {"hosts": [fqdn_schema_data]}
# then
with pytest.raises(
ValidationError, match="'external/' host FQDNs must contain the cluster DNS suffix."
):
parse_obj_as(Egress, egress_data)
@pytest.mark.parametrize(
"egress_schema_data",
[
f"external/{fake.pystr()}.{fake.pystr()}.svc.cluster.local",
f"external/{(fake.pystr(max_chars=63, min_chars=63) + '.') * 2}svc.cluster.local",
f"external/{(fake.pystr(max_chars=1) + '.') * 2}svc.cluster.local",
f"external/{(fake.pystr(max_chars=1) + '.') * 50}svc.cluster.local",
f"*/{fake.domain_name()}",
f"*/{fake.pystr(max_chars=63, min_chars=63)}.com",
f"*/{(fake.pystr(max_chars=1) + '.') * 50 + fake.pystr(max_chars=1)}",
f"*/{fake.domain_name()}{fake.pyint()}",
f"*/{fake.domain_name()}.",
],
)
def test_egress_valid_hosts_fqdn(egress_schema_data):
# when
egress_data = {"hosts": [egress_schema_data]}
parsed = parse_obj_as(Egress, egress_data)
# then
assert isinstance(parsed.hosts[0], str)
|
The test cases are in a good enough state that refactoring the code should be relatively simple.
Refactoring
There are a number of issues with the code:
- Nested if statement introduce unnecessary complexity. The nested statements can be eliminated by creating a function that validates each scenario, with an additional benefit of eliminating duplicate code.
- For loop iterates through each value in
hosts
. If there is a way that the validator method could validate each item in the field, it would drastically reduce code complexity.
- Each scenario is not standardized. During the refactor, if each scenario's inputs and outputs can be standardized, this would make the code a bit more readable.
Standardizing each function is the first step in the refactor. Each scenario has expected inputs and outputs, although some require assert
statements, and others require assert not
statements. Each scenario expects a regex pattern and an error message to be passed in as input, and will raise a value error if the scenario fails:
src/service_dependency_schema.py |
---|
| ...
class Egress(BaseModel, extra="forbid"):
port: Optional[Port]
hosts: list[str]
@staticmethod # (1)
def fqdn_has_slash_prefix(fqdn):
message = f"Invalid FQDN. Must include '*/' or 'external/' prefix."
try:
assert re.match("^(\*|external)\/.+$", fqdn)
except AssertionError:
raise ValueError(message) from None
@staticmethod
def disallow_url_schema(fqdn):
message = f"Host FQDN cannot begin with '://' schema."
try:
assert re.match("^(\*|external)\/((?!.+:\/\/).+)$", fqdn)
except AssertionError:
raise ValueError(message) from None
@staticmethod
def fqdn_domain_label_constraints(fqdn):
message = f"Valid number of domain labels for host FQDN between 1 and 127."
try:
assert re.match("^(\*|external)\/([A-Za-z0-9-]+\.){1,127}[A-Za-z0-9-]+\.?$", fqdn)
except AssertionError:
raise ValueError(message) from None
@staticmethod
def fqdn_character_length(fqdn):
message = f"Valid character length for host FQDN between 1 and 255."
try:
assert re.match("^(\*|external)\/((?=.{1,255}$).+$)", fqdn)
except AssertionError:
raise ValueError(message) from None
@staticmethod
def fqdn_domain_label_character_length(fqdn):
message = f"Valid character length for each domain label between 1-63."
try:
assert re.match("^(\*|external)\/((.{1,63}\.).+$)", fqdn)
except AssertionError:
raise ValueError(message) from None
@staticmethod
def tld_numerics_constraints(fqdn):
message = f"TLD must not begin with a numeric, or contain only numerics."
try:
assert re.match("^(\*|external)\/(.+\.(?![0-9])[A-Za-z0-9-]+)\.?$", fqdn)
except AssertionError:
raise ValueError(message) from None
@staticmethod
def external_slashed_fqdn_has_service_and_namespace_domain_labels(fqdn):
message = f"'external/' host FQDNs must include service and namespace domain labels."
try:
assert not re.match("^external\/[A-Za-z0-9-]+\.(?=svc\.cluster\.local$)", fqdn)
except AssertionError:
raise ValueError(message) from None
@staticmethod
def external_slashed_fqdn_has_cluster_dns_suffix(fqdn):
message = (
f"'external/' host FQDNs must contain the cluster DNS suffix."
@validator("hosts", each_item=True) # (2)
def hosts_fqdn_must_be_valid(cls, v):
cls.fqdn_has_slash_prefix(v)
cls.disallow_url_schema(v)
cls.fqdn_domain_label_constraints(v)
cls.fqdn_character_length(v)
cls.fqdn_domain_label_character_length(v)
cls.tld_numerics_constraints(v)
if re.match("^external\/.+$", v):
cls.external_slashed_fqdn_has_service_and_namespace_domain_labels(v)
cls.external_slashed_fqdn_has_cluster_dns_suffix(v)
return v
...
|
- Static methods are created for each validation scenario to reduce duplicate code
- The
hosts_fqdn_must_be_valid
validator method now validates against each item in the hosts
field. Additionally, since the static methods are being called, nested statements are nearly eliminated
Additionally, the validator
decorator allows validation of iterable fields with the each_item
parameter. Setting each_item=True
eliminates the for loop from the first iteration of code. The next step now is to improve the way that input data for each scenario is handled. This can be handled like so:
src/constants.py |
---|
| # Each constant contains the following:
# (regex_pattern, error_message)
FQDN_HAS_SLASH_PREFIX = (
"^(\*|external)\/.+$",
f"Invalid FQDN. Must include '*/' or 'external/' prefix.",
)
DISALLOW_URL_SCHEMA = (
"^(\*|external)\/((?!.+:\/\/).+)$",
f"Host FQDN cannot begin with '://' schema.",
)
FQDN_DOMAIN_LABEL_CONSTRAINTS = (
"^(\*|external)\/([A-Za-z0-9-]+\.){1,127}[A-Za-z0-9-]+\.?$",
f"Valid number of domain labels for host FQDN between 1 and 127.",
)
FQDN_DOMAIN_LABEL_CHARACTER_LENGTH = (
"^(\*|external)\/((.{1,63}\.).+$)",
f"Valid character length for each domain label between 1-63.",
)
FQDN_CHARACTER_LENGTH = (
"^(\*|external)\/((?=.{1,255}$).+$)",
f"Valid character length for host FQDN between 1 and 255.",
)
TLD_NUMERIC_CONSTRAINTS = (
"^(\*|external)\/(.+\.(?![0-9])[A-Za-z0-9-]+)\.?$",
f"TLD must not begin with a numeric, or contain only numerics.",
)
EXTERNAL_SLASHED_FQDN_HAS_SERVICE_AND_NAMESPACE_DOMAIN_LABELS = (
"^external\/[A-Za-z0-9-]+\.(?=svc\.cluster\.local$)",
f"'external/' host FQDNs must include service and namespace domain labels.",
)
EXTERNAL_SLASHED_FQDN_HAS_CLUSTER_DNS_SUFFIX = (
"^external\/.+\.svc\.cluster\.local$",
f"'external/' host FQDNs must contain the cluster DNS suffix.",
)
|
Adding clarity to the contant variable names allows for more readable code. When reviewing the function fqdn_has_slash_prefix(v)
, one can guess that the function checks that an fqdn has a slash prefix. But if we ran a single function called validate_hosts_fqdn
, the code readability increases when used in conjunction with well-named variables. Therefore, validate_hosts_fqdn(FQDN_HAS_SLASH_PREFIX)
adds an extra bit of context to what it is doing.
The Final Result
Reducing the amount of duplicate code to a single static method greatly improves readability:
src/service_dependency_schema.py |
---|
| from pydantic import BaseModel, ValidationError, validation, parse_obj_as
from constants import (
DISALLOW_URL_SCHEMA,
EXTERNAL_SLASHED_FQDN_HAS_CLUSTER_DNS_SUFFIX,
EXTERNAL_SLASHED_FQDN_HAS_SERVICE_AND_NAMESPACE_DOMAIN_LABELS,
FQDN_CHARACTER_LENGTH,
FQDN_DOMAIN_LABEL_CHARACTER_LENGTH,
FQDN_DOMAIN_LABEL_CONSTRAINTS,
FQDN_HAS_SLASH_PREFIX,
LINK_TO_DOCUMENTATION,
TLD_NUMERIC_CONSTRAINTS,
)
class Egress(BaseModel, extra="forbid"):
port: Optional[Port]
hosts: list[str]
@staticmethod
def validate_hosts_fqdn(fqdn, scenario, assert_not=False): # (1)
pattern = scenario[0]
message = scenario[1]
try:
if assert_not:
assert not re.match(pattern, fqdn)
else:
assert re.match(pattern, fqdn)
except AssertionError:
raise ValueError(message) from None
@validator("hosts", each_item=True)
def hosts_fqdn_must_be_valid(cls, v): # (2)
cls.validate_hosts_fqdn(v, FQDN_HAS_SLASH_PREFIX)
cls.validate_hosts_fqdn(v, DISALLOW_URL_SCHEMA)
cls.validate_hosts_fqdn(v, FQDN_DOMAIN_LABEL_CONSTRAINTS)
cls.validate_hosts_fqdn(v, FQDN_CHARACTER_LENGTH)
cls.validate_hosts_fqdn(v, FQDN_DOMAIN_LABEL_CHARACTER_LENGTH)
cls.validate_hosts_fqdn(v, TLD_NUMERIC_CONSTRAINTS)
if re.match("^external\/.+$", v):
cls.validate_hosts_fqdn(
v, EXTERNAL_SLASHED_FQDN_HAS_SERVICE_AND_NAMESPACE_DOMAIN_LABELS, assert_not=True
)
cls.validate_hosts_fqdn(v, EXTERNAL_SLASHED_FQDN_HAS_CLUSTER_DNS_SUFFIX)
return v
|
- The
validate_hosts_fqdn
static method receives the scenario as input, then asserts whether the regex pattern matches, and if it doesn't, it raises a ValueError
. During processing, pydantic raises a ValidationError
, so the unit tests cases must reflect that.
- The
hosts_fqdn_must_be_valid
validator method runs through all scenarios, and raises a ValidationError
for each invalid hosts
entry, so multiple invalid entries return multiple ValidationError
s
Custom resources with hosts
entries will now be validated, and the CRD controller error messages should now be eliminated.
Conclusion
While primiarily used to integrate with openapi-compliant web frameworks like fastapi, Pydantic is a fantastic tool to use for CRD schema validation as well.