mirror of
https://github.com/netbox-community/devicetype-library.git
synced 2024-12-22 15:13:06 +01:00
Revert submodule test
This commit is contained in:
parent
d153e20bd1
commit
6efd71e4ce
224
tests/definitions_test.py
Normal file
224
tests/definitions_test.py
Normal file
@ -0,0 +1,224 @@
|
||||
from test_configuration import COMPONENT_TYPES, IMAGE_FILETYPES, SCHEMAS, KNOWN_SLUGS, ROOT_DIR, USE_LOCAL_KNOWN_SLUGS, NETBOX_DT_LIBRARY_URL, KNOWN_MODULES, USE_UPSTREAM_DIFF, PRECOMMIT_ALL_SWITCHES
|
||||
import pickle_operations
|
||||
from yaml_loader import DecimalSafeLoader
|
||||
from device_types import DeviceType, ModuleType, verify_filename, validate_components
|
||||
import decimal
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import psutil
|
||||
from urllib.request import urlopen
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
from jsonschema import Draft4Validator, RefResolver
|
||||
from jsonschema.exceptions import ValidationError
|
||||
from git import Repo
|
||||
|
||||
def _get_definition_files():
|
||||
"""
|
||||
Return a list of all definition files within the specified path.
|
||||
"""
|
||||
file_list = []
|
||||
|
||||
for path, schema in SCHEMAS:
|
||||
# Initialize the schema
|
||||
with open(f"schema/{schema}") as schema_file:
|
||||
schema = json.loads(schema_file.read(), parse_float=decimal.Decimal)
|
||||
|
||||
# Validate that the schema exists
|
||||
assert schema, f"Schema definition for {path} is empty!"
|
||||
|
||||
# Map each definition file to its schema as a tuple (file, schema)
|
||||
for file in sorted(glob.glob(f"{path}/*/*", recursive=True)):
|
||||
file_list.append((file, schema, 'skip'))
|
||||
|
||||
return file_list
|
||||
|
||||
def _get_diff_from_upstream():
|
||||
file_list = []
|
||||
|
||||
repo = Repo(f"{os.path.dirname(os.path.abspath(__file__))}/../")
|
||||
commits_list = list(repo.iter_commits())
|
||||
|
||||
if "upstream" not in repo.remotes:
|
||||
repo.create_remote("upstream", NETBOX_DT_LIBRARY_URL)
|
||||
|
||||
upstream = repo.remotes.upstream
|
||||
upstream.fetch()
|
||||
changes = upstream.refs.master.commit.diff(repo.head)
|
||||
changes = changes + repo.index.diff("HEAD")
|
||||
|
||||
for path, schema in SCHEMAS:
|
||||
# Initialize the schema
|
||||
with open(f"schema/{schema}") as schema_file:
|
||||
schema = json.loads(schema_file.read(), parse_float=decimal.Decimal)
|
||||
|
||||
# Validate that the schema exists
|
||||
assert schema, f"Schema definition for {path} is empty!"
|
||||
|
||||
# Ensure files are either added, renamed, modified or type changed (do not get deleted files)
|
||||
CHANGE_TYPE_LIST = ['A', 'R', 'M', 'T']
|
||||
|
||||
# Iterate through changed files
|
||||
for file in changes:
|
||||
# Ensure the files are modified or added, this will disclude deleted files
|
||||
if file.change_type in CHANGE_TYPE_LIST:
|
||||
# If the file is renamed, ensure we are picking the right schema
|
||||
if 'R' in file.change_type and path in file.rename_to:
|
||||
file_list.append((file.rename_to, schema, file.change_type))
|
||||
elif path in file.a_path:
|
||||
file_list.append((file.a_path, schema, file.change_type))
|
||||
elif path in file.b_path:
|
||||
file_list.append((file.b_path, schema, file.change_type))
|
||||
|
||||
return file_list
|
||||
|
||||
def _get_image_files():
|
||||
"""
|
||||
Return a list of all image files within the specified path and manufacturer.
|
||||
"""
|
||||
file_list = []
|
||||
|
||||
# Map each image file to its manufacturer
|
||||
for file in sorted(glob.glob(f"elevation-images{os.path.sep}*{os.path.sep}*", recursive=True)):
|
||||
# Validate that the file extension is valid
|
||||
assert file.split(os.path.sep)[2].split('.')[-1] in IMAGE_FILETYPES, f"Invalid file extension: {file}"
|
||||
|
||||
# Map each image file to its manufacturer as a tuple (manufacturer, file)
|
||||
file_list.append((file.split(os.path.sep)[1], file))
|
||||
|
||||
return file_list
|
||||
|
||||
def _decimal_file_handler(uri):
|
||||
"""
|
||||
Handler to work with floating decimals that fail normal validation.
|
||||
"""
|
||||
with urlopen(uri) as url:
|
||||
result = json.loads(url.read().decode("utf-8"), parse_float=decimal.Decimal)
|
||||
return result
|
||||
|
||||
def test_environment():
|
||||
"""
|
||||
Run basic sanity checks on the environment to ensure tests are running correctly.
|
||||
"""
|
||||
# Validate that definition files exist
|
||||
if definition_files:
|
||||
pytest.skip("No changes to definition files found.")
|
||||
|
||||
EVALUATE_ALL = False
|
||||
if any(x in PRECOMMIT_ALL_SWITCHES for x in psutil.Process(os.getppid()).cmdline()):
|
||||
EVALUATE_ALL = True
|
||||
|
||||
if USE_UPSTREAM_DIFF and not EVALUATE_ALL:
|
||||
definition_files = _get_diff_from_upstream()
|
||||
else:
|
||||
definition_files = _get_definition_files()
|
||||
image_files = _get_image_files()
|
||||
|
||||
if USE_LOCAL_KNOWN_SLUGS:
|
||||
KNOWN_SLUGS = pickle_operations.read_pickle_data(f'{ROOT_DIR}/tests/known-slugs.pickle')
|
||||
KNOWN_MODULES = pickle_operations.read_pickle_data(f'{ROOT_DIR}/tests/known-modules.pickle')
|
||||
else:
|
||||
temp_dir = tempfile.TemporaryDirectory()
|
||||
repo = Repo.clone_from(url=NETBOX_DT_LIBRARY_URL, to_path=temp_dir.name)
|
||||
KNOWN_SLUGS = pickle_operations.read_pickle_data(f'{temp_dir.name}/tests/known-slugs.pickle')
|
||||
KNOWN_MODULES = pickle_operations.read_pickle_data(f'{temp_dir.name}/tests/known-modules.pickle')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(('file_path', 'schema', 'change_type'), definition_files)
|
||||
def test_definitions(file_path, schema, change_type):
|
||||
"""
|
||||
Validate each definition file using the provided JSON schema and check for duplicate entries.
|
||||
"""
|
||||
# Check file extension. Only .yml or .yaml files are supported.
|
||||
assert file_path.split('.')[-1] in ('yaml', 'yml'), f"Invalid file extension: {file_path}"
|
||||
|
||||
# Read file
|
||||
with open(file_path) as definition_file:
|
||||
content = definition_file.read()
|
||||
|
||||
# Check for trailing newline. YAML files must end with an emtpy newline.
|
||||
assert content.endswith('\n'), "Missing trailing newline"
|
||||
|
||||
# Load YAML data from file
|
||||
definition = yaml.load(content, Loader=DecimalSafeLoader)
|
||||
|
||||
# Validate YAML definition against the supplied schema
|
||||
try:
|
||||
resolver = RefResolver(
|
||||
f"file://{os.getcwd()}/schema/devicetype.json",
|
||||
schema,
|
||||
handlers={"file": _decimal_file_handler},
|
||||
)
|
||||
# Validate definition against schema
|
||||
Draft4Validator(schema, resolver=resolver).validate(definition)
|
||||
except ValidationError as e:
|
||||
# Schema validation failure. Ensure you are following the proper format.
|
||||
pytest.fail(f"{file_path} failed validation: {e}", False)
|
||||
|
||||
# Identify if the definition is for a Device or Module
|
||||
if "device-types" in file_path:
|
||||
# A device
|
||||
this_device = DeviceType(definition, file_path, change_type)
|
||||
else:
|
||||
# A module
|
||||
this_device = ModuleType(definition, file_path, change_type)
|
||||
|
||||
# Verify the slug is valid, only if the definition type is a Device
|
||||
if this_device.isDevice:
|
||||
assert this_device.verify_slug(KNOWN_SLUGS), pytest.fail(this_device.failureMessage, False)
|
||||
|
||||
# Verify the filename is valid. Must either be the model or part_number.
|
||||
assert verify_filename(this_device, (KNOWN_MODULES if not this_device.isDevice else None)), pytest.fail(this_device.failureMessage, False)
|
||||
|
||||
# Check for duplicate components within the definition
|
||||
assert validate_components(COMPONENT_TYPES, this_device), pytest.fail(this_device.failureMessage, False)
|
||||
|
||||
# Check for empty quotes and fail if found
|
||||
def iterdict(var):
|
||||
for dict_value in var.values():
|
||||
if isinstance(dict_value, dict):
|
||||
iterdict(dict_value)
|
||||
if isinstance(dict_value, list):
|
||||
iterlist(dict_value)
|
||||
else:
|
||||
if(isinstance(dict_value, str) and not dict_value):
|
||||
pytest.fail(f'{file_path} has empty quotes', False)
|
||||
|
||||
def iterlist(var):
|
||||
for list_value in var:
|
||||
if isinstance(list_value, dict):
|
||||
iterdict(list_value)
|
||||
elif isinstance(list_value, list):
|
||||
iterlist(list_value)
|
||||
|
||||
# Check for valid power definitions
|
||||
if this_device.isDevice:
|
||||
assert this_device.validate_power(), pytest.fail(this_device.failureMessage, False)
|
||||
|
||||
# Check for images if front_image or rear_image is True
|
||||
if (definition.get('front_image') or definition.get('rear_image')):
|
||||
# Find images for given manufacturer, with matching device slug (exact match including case)
|
||||
manufacturer_images = [image[1] for image in image_files if image[0] == file_path.split('/')[1] and os.path.basename(image[1]).split('.')[0] == this_device.get_slug()]
|
||||
if not manufacturer_images:
|
||||
pytest.fail(f'{file_path} has Front or Rear Image set to True but no images found for manufacturer/device (slug={this_device.get_slug()})', False)
|
||||
elif len(manufacturer_images)>2:
|
||||
pytest.fail(f'More than 2 images found for device with slug {this_device.get_slug()}: {manufacturer_images}', False)
|
||||
|
||||
# If front_image is True, verify that a front image exists
|
||||
if(definition.get('front_image')):
|
||||
front_image = [image_path.split('/')[2] for image_path in manufacturer_images if os.path.basename(image_path).split('.')[1] == 'front']
|
||||
|
||||
if not front_image:
|
||||
pytest.fail(f'{file_path} has front_image set to True but no matching image found for device ({manufacturer_images})', False)
|
||||
|
||||
# If rear_image is True, verify that a front image exists
|
||||
if(definition.get('rear_image')):
|
||||
rear_image = [image_path.split('/')[2] for image_path in manufacturer_images if os.path.basename(image_path).split('.')[1] == 'rear']
|
||||
|
||||
if not rear_image:
|
||||
pytest.fail(f'{file_path} has rear_image set to True but no images found for device', False)
|
||||
|
||||
iterdict(definition)
|
242
tests/device_types.py
Normal file
242
tests/device_types.py
Normal file
@ -0,0 +1,242 @@
|
||||
import os
|
||||
|
||||
|
||||
class DeviceType:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
return super().__new__(cls)
|
||||
|
||||
def __init__(self, definition, file_path, change_type):
|
||||
self.file_path = file_path
|
||||
self.isDevice = True
|
||||
self.definition = definition
|
||||
self.manufacturer = definition.get('manufacturer')
|
||||
self._slug_manufacturer = self._slugify_manufacturer()
|
||||
self.slug = definition.get('slug')
|
||||
self.model = definition.get('model')
|
||||
self._slug_model = self._slugify_model()
|
||||
self.part_number = definition.get('part_number', "")
|
||||
self._slug_part_number = self._slugify_part_number()
|
||||
self.failureMessage = None
|
||||
self.change_type = change_type
|
||||
|
||||
def _slugify_manufacturer(self):
|
||||
return self.manufacturer.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus-").replace("+", "-plus").replace("_", "-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-").replace("&", "and")
|
||||
|
||||
def get_slug(self):
|
||||
if hasattr(self, "slug"):
|
||||
return self.slug
|
||||
return None
|
||||
|
||||
def _slugify_model(self):
|
||||
slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
|
||||
if slugified.endswith("-"):
|
||||
slugified = slugified[:-1]
|
||||
return slugified
|
||||
|
||||
def _slugify_part_number(self):
|
||||
slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
|
||||
if slugified.endswith("-"):
|
||||
slugified = slugified[:-1]
|
||||
return slugified
|
||||
|
||||
def get_filepath(self):
|
||||
return self.file_path
|
||||
|
||||
def verify_slug(self, KNOWN_SLUGS):
|
||||
# Verify the slug is unique, and not already known
|
||||
known_slug_list_intersect = [(slug, file_path) for slug, file_path in KNOWN_SLUGS if slug == self.slug]
|
||||
|
||||
if len(known_slug_list_intersect) == 0:
|
||||
pass
|
||||
elif len(known_slug_list_intersect) == 1:
|
||||
if self.file_path not in known_slug_list_intersect[0][1]:
|
||||
if 'R' not in self.change_type:
|
||||
self.failureMessage = f'{self.file_path} has a duplicate slug: "{self.slug}"'
|
||||
return False
|
||||
return True
|
||||
else:
|
||||
self.failureMessage = f'{self.file_path} has a duplicate slug "{self.slug}"'
|
||||
return False
|
||||
|
||||
# Verify the manufacturer is appended to the slug
|
||||
if not self.slug.startswith(self._slug_manufacturer):
|
||||
self.failureMessage = f'{self.file_path} contains slug "{self.slug}". Does not start with manufacturer: "{self.manufacturer.casefold()}-"'
|
||||
return False
|
||||
|
||||
# Verify the slug ends with either the model or part number
|
||||
if not (self.slug.endswith(self._slug_model) or self.slug.endswith(self._slug_part_number)):
|
||||
self.failureMessage = f'{self.file_path} has slug "{self.slug}". Does not end with the model "{self._slug_model}" or part_number "{self._slug_part_number}"'
|
||||
return False
|
||||
|
||||
# Add the slug to the list of known slugs
|
||||
KNOWN_SLUGS.add((self.slug, self.file_path))
|
||||
return True
|
||||
|
||||
def validate_power(self):
|
||||
# Check if power-ports exists
|
||||
if self.definition.get('power-ports', False):
|
||||
# Verify that is_powered is not set to False. If so, there should not be any power-ports defined
|
||||
if not self.definition.get('is_powered', True):
|
||||
self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-ports" are defined.'
|
||||
return False
|
||||
return True
|
||||
|
||||
# Lastly, check if interfaces exists and has a poe_mode defined
|
||||
interfaces = self.definition.get('interfaces', False)
|
||||
if interfaces:
|
||||
for interface in interfaces:
|
||||
poe_mode = interface.get('poe_mode', "")
|
||||
if poe_mode != "" and poe_mode == "pd":
|
||||
return True
|
||||
|
||||
console_ports = self.definition.get('console-ports', False)
|
||||
if console_ports:
|
||||
for console_port in console_ports:
|
||||
poe = console_port.get('poe', False)
|
||||
if poe:
|
||||
return True
|
||||
|
||||
rear_ports = self.definition.get('rear-ports', False)
|
||||
if rear_ports:
|
||||
for rear_port in rear_ports:
|
||||
poe = rear_port.get('poe', False)
|
||||
if poe:
|
||||
return True
|
||||
|
||||
# Check if the device is a child device, and if so, assume it has a valid power source from the parent
|
||||
subdevice_role = self.definition.get('subdevice_role', False)
|
||||
if subdevice_role:
|
||||
if subdevice_role == "child":
|
||||
return True
|
||||
|
||||
# Check if module-bays exists
|
||||
if self.definition.get('module-bays', False):
|
||||
# There is not a standardized way to define PSUs that are module bays, so we will just assume they are valid
|
||||
return True
|
||||
|
||||
# As the very last case, check if is_powered is defined and is False. Otherwise assume the device is powered
|
||||
if not self.definition.get('is_powered', True): # is_powered defaults to True
|
||||
# Arriving here means is_powered is set to False, so verify that there are no power-outlets defined
|
||||
if self.definition.get('power-outlets', False):
|
||||
self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-outlets" are defined.'
|
||||
return False
|
||||
return True
|
||||
|
||||
self.failureMessage = f'{self.file_path} has does not appear to have a valid power source. Ensure either "power-ports" or "interfaces" with "poe_mode" is defined.'
|
||||
return False
|
||||
|
||||
class ModuleType:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
return super().__new__(cls)
|
||||
|
||||
def __init__(self, definition, file_path, change_type):
|
||||
self.file_path = file_path
|
||||
self.isDevice = False
|
||||
self.definition = definition
|
||||
self.manufacturer = definition.get('manufacturer')
|
||||
self.model = definition.get('model')
|
||||
self._slug_model = self._slugify_model()
|
||||
self.part_number = definition.get('part_number', "")
|
||||
self._slug_part_number = self._slugify_part_number()
|
||||
self.change_type = change_type
|
||||
|
||||
def get_filepath(self):
|
||||
return self.file_path
|
||||
|
||||
def _slugify_model(self):
|
||||
slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
|
||||
if slugified.endswith("-"):
|
||||
slugified = slugified[:-1]
|
||||
return slugified
|
||||
|
||||
def _slugify_part_number(self):
|
||||
slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
|
||||
if slugified.endswith("-"):
|
||||
slugified = slugified[:-1]
|
||||
return slugified
|
||||
|
||||
def validate_component_names(component_names: (set or None)):
|
||||
if len(component_names) > 1:
|
||||
verify_name = list(component_names[0])
|
||||
for index, name in enumerate(component_names):
|
||||
if index == 0:
|
||||
continue
|
||||
|
||||
intersection = sorted(set(verify_name) & set(list(name)), key = verify_name.index)
|
||||
|
||||
intersection_len = len(intersection)
|
||||
verify_subset = verify_name[:intersection_len]
|
||||
name_subset = list(name)[:intersection_len]
|
||||
subset_match = sorted(set(verify_subset) & set(name_subset), key = name_subset.index)
|
||||
|
||||
if len(intersection) > 2 and len(subset_match) == len(intersection):
|
||||
return False
|
||||
return True
|
||||
|
||||
def verify_filename(device: (DeviceType or ModuleType), KNOWN_MODULES: (set or None)):
|
||||
head, tail = os.path.split(device.get_filepath())
|
||||
filename = tail.rsplit(".", 1)[0].casefold()
|
||||
|
||||
if not (filename == device._slug_model or filename == device._slug_part_number or filename == device.part_number.casefold()):
|
||||
device.failureMessage = f'{device.file_path} file name is invalid. Must be either the model "{device._slug_model}" or part_number "{device.part_number} / {device._slug_part_number}"'
|
||||
return False
|
||||
|
||||
if not device.isDevice:
|
||||
matches = [file_name for file_name, file_path in KNOWN_MODULES if file_name.casefold() == filename.casefold()]
|
||||
if len(matches) > 1:
|
||||
device.failureMessage = f'{device.file_path} appears to be duplicated. Found {len(matches)} matches: {", ".join(matches)}'
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def validate_components(component_types, device_or_module):
|
||||
for component_type in component_types:
|
||||
known_names = set()
|
||||
known_components = []
|
||||
defined_components = device_or_module.definition.get(component_type, [])
|
||||
if not isinstance(defined_components, list):
|
||||
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type}.'
|
||||
return False
|
||||
for idx, component in enumerate(defined_components):
|
||||
if not isinstance(component, dict):
|
||||
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} ({idx}).'
|
||||
return False
|
||||
name = component.get('name')
|
||||
position = component.get('position')
|
||||
eval_component = (name, position)
|
||||
if not isinstance(name, str):
|
||||
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} name ({idx}).'
|
||||
return False
|
||||
if eval_component[0] in known_names:
|
||||
device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated names within {component_type} ({name}).'
|
||||
return False
|
||||
known_components.append(eval_component)
|
||||
known_names.add(name)
|
||||
|
||||
# Adding check for duplicate positions within a component type
|
||||
# Stems from https://github.com/netbox-community/devicetype-library/pull/1586
|
||||
# and from https://github.com/netbox-community/devicetype-library/issues/1584
|
||||
position_set = {}
|
||||
index = 0
|
||||
for name, position in known_components:
|
||||
if position is not None:
|
||||
match = []
|
||||
if len(position_set) > 0:
|
||||
match = [key for key,val in position_set.items() if key == position]
|
||||
if len(match) == 0:
|
||||
if len(position_set) == 0:
|
||||
position_set = {position: {known_components[index]}}
|
||||
else:
|
||||
position_set.update({position: {known_components[index]}})
|
||||
else:
|
||||
position_set[position].add(known_components[index])
|
||||
index = index + 1
|
||||
|
||||
for position in position_set:
|
||||
if len(position_set[position]) > 1:
|
||||
component_names = [name for name,pos in position_set[position]]
|
||||
if not validate_component_names(component_names):
|
||||
device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated positions within {component_type} ({position}).'
|
||||
return False
|
||||
|
||||
return True
|
96
tests/generate-slug-list.py
Normal file
96
tests/generate-slug-list.py
Normal file
@ -0,0 +1,96 @@
|
||||
import os
|
||||
import json
|
||||
import glob
|
||||
import yaml
|
||||
import decimal
|
||||
from yaml_loader import DecimalSafeLoader
|
||||
from jsonschema import Draft4Validator, RefResolver
|
||||
from jsonschema.exceptions import ValidationError
|
||||
from test_configuration import SCHEMAS, KNOWN_SLUGS, ROOT_DIR, KNOWN_MODULES
|
||||
from urllib.request import urlopen
|
||||
import pickle_operations
|
||||
|
||||
def _get_type_files(device_or_module):
|
||||
"""
|
||||
Return a list of all definition files within the specified path.
|
||||
"""
|
||||
file_list = []
|
||||
|
||||
for path, schema in SCHEMAS:
|
||||
if path == f'{device_or_module}-types':
|
||||
# Initialize the schema
|
||||
with open(f"{ROOT_DIR}/schema/{schema}") as schema_file:
|
||||
schema = json.loads(schema_file.read(),
|
||||
parse_float=decimal.Decimal)
|
||||
|
||||
# Validate that the schema exists
|
||||
if not schema:
|
||||
print(f"Schema definition for {path} is empty!")
|
||||
exit(1)
|
||||
|
||||
# Map each definition file to its schema as a tuple (file, schema)
|
||||
for file in sorted(glob.glob(f"{path}/*/*", recursive=True)):
|
||||
file_list.append((f'{file}', schema))
|
||||
|
||||
return file_list
|
||||
|
||||
def _decimal_file_handler(uri):
|
||||
"""
|
||||
Handler to work with floating decimals that fail normal validation.
|
||||
"""
|
||||
with urlopen(uri) as url:
|
||||
result = json.loads(url.read().decode("utf-8"), parse_float=decimal.Decimal)
|
||||
return result
|
||||
|
||||
def load_file(file_path, schema):
|
||||
# Read file
|
||||
try:
|
||||
with open(file_path) as definition_file:
|
||||
content = definition_file.read()
|
||||
except Exception as exc:
|
||||
return (False, f'Error opening "{file_path}". stderr: {exc}')
|
||||
|
||||
# Check for trailing newline. YAML files must end with an emtpy newline.
|
||||
if not content.endswith('\n'):
|
||||
return (False, f'{file_path} is missing trailing newline')
|
||||
|
||||
# Load YAML data from file
|
||||
try:
|
||||
definition = yaml.load(content, Loader=DecimalSafeLoader)
|
||||
except Exception as exc:
|
||||
return (False, f'Error during yaml.load "{file_path}". stderr: {exc}')
|
||||
|
||||
# Validate YAML definition against the supplied schema
|
||||
try:
|
||||
resolver = RefResolver(
|
||||
f"file://{os.getcwd()}/schema/devicetype.json",
|
||||
schema,
|
||||
handlers={"file": _decimal_file_handler},
|
||||
)
|
||||
# Validate definition against schema
|
||||
Draft4Validator(schema, resolver=resolver).validate(definition)
|
||||
except ValidationError as exc:
|
||||
# Schema validation failure. Ensure you are following the proper format.
|
||||
return (False, f'{file_path} failed validation: {exc}')
|
||||
|
||||
return (True, definition)
|
||||
|
||||
def _generate_knowns(device_or_module):
|
||||
all_files = _get_type_files(device_or_module)
|
||||
|
||||
for file_path, schema in all_files:
|
||||
definition_status, definition = load_file(file_path, schema)
|
||||
if not definition_status:
|
||||
print(definition)
|
||||
exit(1)
|
||||
|
||||
if device_or_module == 'device':
|
||||
KNOWN_SLUGS.add((definition.get('slug'), file_path))
|
||||
else:
|
||||
KNOWN_MODULES.add((os.path.splitext(os.path.basename(file_path))[0], os.path.dirname(file_path)))
|
||||
|
||||
_generate_knowns('device')
|
||||
pickle_operations.write_pickle_data(KNOWN_SLUGS, f'{ROOT_DIR}/tests/known-slugs.pickle')
|
||||
|
||||
_generate_knowns('module')
|
||||
pickle_operations.write_pickle_data(KNOWN_MODULES, f'{ROOT_DIR}/tests/known-modules.pickle')
|
BIN
tests/known-modules.pickle
Normal file
BIN
tests/known-modules.pickle
Normal file
Binary file not shown.
BIN
tests/known-slugs.pickle
Normal file
BIN
tests/known-slugs.pickle
Normal file
Binary file not shown.
14
tests/pickle_operations.py
Normal file
14
tests/pickle_operations.py
Normal file
@ -0,0 +1,14 @@
|
||||
import pickle
|
||||
|
||||
def write_pickle_data(data, file_path):
|
||||
with open(file_path, 'wb') as pickle_file:
|
||||
pickle.dump(data, pickle_file)
|
||||
pickle_file.close()
|
||||
|
||||
|
||||
def read_pickle_data(file_path):
|
||||
with open(file_path, 'rb') as pickle_file:
|
||||
data = pickle.load(pickle_file)
|
||||
pickle_file.close()
|
||||
|
||||
return data
|
38
tests/test_configuration.py
Normal file
38
tests/test_configuration.py
Normal file
@ -0,0 +1,38 @@
|
||||
import os
|
||||
|
||||
SCHEMAS = (
|
||||
('device-types', 'devicetype.json'),
|
||||
('module-types', 'moduletype.json'),
|
||||
)
|
||||
|
||||
IMAGE_FILETYPES = (
|
||||
'bmp', 'gif', 'pjp', 'jpg', 'pjpeg', 'jpeg', 'jfif', 'png', 'tif', 'tiff', 'webp'
|
||||
)
|
||||
|
||||
COMPONENT_TYPES = (
|
||||
'console-ports',
|
||||
'console-server-ports',
|
||||
'power-ports',
|
||||
'power-outlets',
|
||||
'interfaces',
|
||||
'front-ports',
|
||||
'rear-ports',
|
||||
'device-bays',
|
||||
'module-bays',
|
||||
)
|
||||
|
||||
PRECOMMIT_ALL_SWITCHES = [
|
||||
'-a',
|
||||
'--all-files',
|
||||
'--all'
|
||||
]
|
||||
|
||||
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..'))
|
||||
|
||||
KNOWN_SLUGS = set()
|
||||
KNOWN_MODULES = set()
|
||||
|
||||
USE_LOCAL_KNOWN_SLUGS = False
|
||||
USE_UPSTREAM_DIFF = True
|
||||
|
||||
NETBOX_DT_LIBRARY_URL = "https://github.com/netbox-community/devicetype-library.git"
|
34
tests/yaml_loader.py
Normal file
34
tests/yaml_loader.py
Normal file
@ -0,0 +1,34 @@
|
||||
import decimal
|
||||
|
||||
from yaml.composer import Composer
|
||||
from yaml.constructor import SafeConstructor
|
||||
from yaml.parser import Parser
|
||||
from yaml.reader import Reader
|
||||
from yaml.resolver import Resolver
|
||||
from yaml.scanner import Scanner
|
||||
|
||||
|
||||
class DecimalSafeConstructor(SafeConstructor):
|
||||
"""Special constructor to override construct_yaml_float() in order to cast "Decimal" types to the value"""
|
||||
|
||||
def construct_yaml_float(self, node):
|
||||
value = super().construct_yaml_float(node)
|
||||
# We force the string representation of the float here to avoid things like:
|
||||
# In [11]: decimal.Decimal(10.11)
|
||||
# Out[11]: Decimal('10.1099999999999994315658113919198513031005859375')
|
||||
return decimal.Decimal(f"{value}")
|
||||
|
||||
|
||||
DecimalSafeConstructor.add_constructor(
|
||||
"tag:yaml.org,2002:float", DecimalSafeConstructor.construct_yaml_float
|
||||
)
|
||||
|
||||
|
||||
class DecimalSafeLoader(Reader, Scanner, Parser, Composer, DecimalSafeConstructor, Resolver):
|
||||
def __init__(self, stream):
|
||||
Reader.__init__(self, stream)
|
||||
Scanner.__init__(self)
|
||||
Parser.__init__(self)
|
||||
Composer.__init__(self)
|
||||
DecimalSafeConstructor.__init__(self)
|
||||
Resolver.__init__(self)
|
Loading…
Reference in New Issue
Block a user