Push tests to new branch

This commit is contained in:
Daniel Sheppard 2024-03-22 14:50:17 -05:00
commit 546103d334
8 changed files with 647 additions and 0 deletions

definitions_test.py Normal file
View File

@ -0,0 +1,224 @@
import pickle_operations
from yaml_loader import DecimalSafeLoader
from device_types import DeviceType, ModuleType, verify_filename, validate_components
import decimal
import glob
import json
import os
import tempfile
import psutil
from urllib.request import urlopen
import pytest
import yaml
from jsonschema import Draft4Validator, RefResolver
from jsonschema.exceptions import ValidationError
from git import Repo
def _get_definition_files():
Return a list of all definition files within the specified path.
file_list = []
for path, schema in SCHEMAS:
# Initialize the schema
with open(f"schema/{schema}") as schema_file:
schema = json.loads(schema_file.read(), parse_float=decimal.Decimal)
# Validate that the schema exists
assert schema, f"Schema definition for {path} is empty!"
# Map each definition file to its schema as a tuple (file, schema)
for file in sorted(glob.glob(f"{path}/*/*", recursive=True)):
file_list.append((file, schema, 'skip'))
return file_list
def _get_diff_from_upstream():
file_list = []
repo = Repo(f"{os.path.dirname(os.path.abspath(__file__))}/../")
commits_list = list(repo.iter_commits())
if "upstream" not in repo.remotes:
repo.create_remote("upstream", NETBOX_DT_LIBRARY_URL)
upstream = repo.remotes.upstream
changes = upstream.refs.master.commit.diff(repo.head)
changes = changes + repo.index.diff("HEAD")
for path, schema in SCHEMAS:
# Initialize the schema
with open(f"schema/{schema}") as schema_file:
schema = json.loads(schema_file.read(), parse_float=decimal.Decimal)
# Validate that the schema exists
assert schema, f"Schema definition for {path} is empty!"
# Ensure files are either added, renamed, modified or type changed (do not get deleted files)
CHANGE_TYPE_LIST = ['A', 'R', 'M', 'T']
# Iterate through changed files
for file in changes:
# Ensure the files are modified or added, this will disclude deleted files
if file.change_type in CHANGE_TYPE_LIST:
# If the file is renamed, ensure we are picking the right schema
if 'R' in file.change_type and path in file.rename_to:
file_list.append((file.rename_to, schema, file.change_type))
elif path in file.a_path:
file_list.append((file.a_path, schema, file.change_type))
elif path in file.b_path:
file_list.append((file.b_path, schema, file.change_type))
return file_list
def _get_image_files():
Return a list of all image files within the specified path and manufacturer.
file_list = []
# Map each image file to its manufacturer
for file in sorted(glob.glob(f"elevation-images{os.path.sep}*{os.path.sep}*", recursive=True)):
# Validate that the file extension is valid
assert file.split(os.path.sep)[2].split('.')[-1] in IMAGE_FILETYPES, f"Invalid file extension: {file}"
# Map each image file to its manufacturer as a tuple (manufacturer, file)
file_list.append((file.split(os.path.sep)[1], file))
return file_list
def _decimal_file_handler(uri):
Handler to work with floating decimals that fail normal validation.
with urlopen(uri) as url:
result = json.loads(url.read().decode("utf-8"), parse_float=decimal.Decimal)
return result
def test_environment():
Run basic sanity checks on the environment to ensure tests are running correctly.
# Validate that definition files exist
if definition_files:
pytest.skip("No changes to definition files found.")
if any(x in PRECOMMIT_ALL_SWITCHES for x in psutil.Process(os.getppid()).cmdline()):
definition_files = _get_diff_from_upstream()
definition_files = _get_definition_files()
image_files = _get_image_files()
KNOWN_SLUGS = pickle_operations.read_pickle_data(f'{ROOT_DIR}/tests/known-slugs.pickle')
KNOWN_MODULES = pickle_operations.read_pickle_data(f'{ROOT_DIR}/tests/known-modules.pickle')
temp_dir = tempfile.TemporaryDirectory()
repo = Repo.clone_from(url=NETBOX_DT_LIBRARY_URL, to_path=temp_dir.name)
KNOWN_SLUGS = pickle_operations.read_pickle_data(f'{temp_dir.name}/tests/known-slugs.pickle')
KNOWN_MODULES = pickle_operations.read_pickle_data(f'{temp_dir.name}/tests/known-modules.pickle')
@pytest.mark.parametrize(('file_path', 'schema', 'change_type'), definition_files)
def test_definitions(file_path, schema, change_type):
Validate each definition file using the provided JSON schema and check for duplicate entries.
# Check file extension. Only .yml or .yaml files are supported.
assert file_path.split('.')[-1] in ('yaml', 'yml'), f"Invalid file extension: {file_path}"
# Read file
with open(file_path) as definition_file:
content = definition_file.read()
# Check for trailing newline. YAML files must end with an emtpy newline.
assert content.endswith('\n'), "Missing trailing newline"
# Load YAML data from file
definition = yaml.load(content, Loader=DecimalSafeLoader)
# Validate YAML definition against the supplied schema
resolver = RefResolver(
handlers={"file": _decimal_file_handler},
# Validate definition against schema
Draft4Validator(schema, resolver=resolver).validate(definition)
except ValidationError as e:
# Schema validation failure. Ensure you are following the proper format.
pytest.fail(f"{file_path} failed validation: {e}", False)
# Identify if the definition is for a Device or Module
if "device-types" in file_path:
# A device
this_device = DeviceType(definition, file_path, change_type)
# A module
this_device = ModuleType(definition, file_path, change_type)
# Verify the slug is valid, only if the definition type is a Device
if this_device.isDevice:
assert this_device.verify_slug(KNOWN_SLUGS), pytest.fail(this_device.failureMessage, False)
# Verify the filename is valid. Must either be the model or part_number.
assert verify_filename(this_device, (KNOWN_MODULES if not this_device.isDevice else None)), pytest.fail(this_device.failureMessage, False)
# Check for duplicate components within the definition
assert validate_components(COMPONENT_TYPES, this_device), pytest.fail(this_device.failureMessage, False)
# Check for empty quotes and fail if found
def iterdict(var):
for dict_value in var.values():
if isinstance(dict_value, dict):
if isinstance(dict_value, list):
if(isinstance(dict_value, str) and not dict_value):
pytest.fail(f'{file_path} has empty quotes', False)
def iterlist(var):
for list_value in var:
if isinstance(list_value, dict):
elif isinstance(list_value, list):
# Check for valid power definitions
if this_device.isDevice:
assert this_device.validate_power(), pytest.fail(this_device.failureMessage, False)
# Check for images if front_image or rear_image is True
if (definition.get('front_image') or definition.get('rear_image')):
# Find images for given manufacturer, with matching device slug (exact match including case)
manufacturer_images = [image[1] for image in image_files if image[0] == file_path.split('/')[1] and os.path.basename(image[1]).split('.')[0] == this_device.get_slug()]
if not manufacturer_images:
pytest.fail(f'{file_path} has Front or Rear Image set to True but no images found for manufacturer/device (slug={this_device.get_slug()})', False)
elif len(manufacturer_images)>2:
pytest.fail(f'More than 2 images found for device with slug {this_device.get_slug()}: {manufacturer_images}', False)
# If front_image is True, verify that a front image exists
front_image = [image_path.split('/')[2] for image_path in manufacturer_images if os.path.basename(image_path).split('.')[1] == 'front']
if not front_image:
pytest.fail(f'{file_path} has front_image set to True but no matching image found for device ({manufacturer_images})', False)
# If rear_image is True, verify that a front image exists
rear_image = [image_path.split('/')[2] for image_path in manufacturer_images if os.path.basename(image_path).split('.')[1] == 'rear']
if not rear_image:
pytest.fail(f'{file_path} has rear_image set to True but no images found for device', False)

device_types.py Normal file
View File

@ -0,0 +1,241 @@
import os
class DeviceType:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, definition, file_path, change_type):
self.file_path = file_path
self.isDevice = True
self.definition = definition
self.manufacturer = definition.get('manufacturer')
self._slug_manufacturer = self._slugify_manufacturer()
self.slug = definition.get('slug')
self.model = definition.get('model')
self._slug_model = self._slugify_model()
self.part_number = definition.get('part_number', "")
self._slug_part_number = self._slugify_part_number()
self.failureMessage = None
self.change_type = change_type
def _slugify_manufacturer(self):
return self.manufacturer.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus-").replace("+", "-plus").replace("_", "-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-").replace("&", "and")
def get_slug(self):
if hasattr(self, "slug"):
return self.slug
return None
def _slugify_model(self):
slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
if slugified.endswith("-"):
slugified = slugified[:-1]
return slugified
def _slugify_part_number(self):
slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
if slugified.endswith("-"):
slugified = slugified[:-1]
return slugified
def get_filepath(self):
return self.file_path
def verify_slug(self, KNOWN_SLUGS):
# Verify the slug is unique, and not already known
known_slug_list_intersect = [(slug, file_path) for slug, file_path in KNOWN_SLUGS if slug == self.slug]
if len(known_slug_list_intersect) == 0:
elif len(known_slug_list_intersect) == 1:
if self.file_path not in known_slug_list_intersect[0][1]:
if 'R' not in self.change_type:
self.failureMessage = f'{self.file_path} has a duplicate slug: "{self.slug}"'
return False
return True
self.failureMessage = f'{self.file_path} has a duplicate slug "{self.slug}"'
return False
# Verify the manufacturer is appended to the slug
if not self.slug.startswith(self._slug_manufacturer):
self.failureMessage = f'{self.file_path} contains slug "{self.slug}". Does not start with manufacturer: "{self.manufacturer.casefold()}-"'
return False
# Verify the slug ends with either the model or part number
if not (self.slug.endswith(self._slug_model) or self.slug.endswith(self._slug_part_number)):
self.failureMessage = f'{self.file_path} has slug "{self.slug}". Does not end with the model "{self._slug_model}" or part_number "{self._slug_part_number}"'
return False
# Add the slug to the list of known slugs
KNOWN_SLUGS.add((self.slug, self.file_path))
return True
def validate_power(self):
# Check if power-ports exists
if self.definition.get('power-ports', False):
# Verify that is_powered is not set to False. If so, there should not be any power-ports defined
if not self.definition.get('is_powered', True):
self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-ports" are defined.'
return False
return True
# Lastly, check if interfaces exists and has a poe_mode defined
interfaces = self.definition.get('interfaces', False)
if interfaces:
for interface in interfaces:
poe_mode = interface.get('poe_mode', "")
if poe_mode != "" and poe_mode == "pd":
return True
console_ports = self.definition.get('console-ports', False)
if console_ports:
for console_port in console_ports:
poe = console_port.get('poe', False)
if poe:
return True
rear_ports = self.definition.get('rear-ports', False)
if rear_ports:
for rear_port in rear_ports:
poe = rear_port.get('poe', False)
if poe:
return True
# Check if the device is a child device, and if so, assume it has a valid power source from the parent
subdevice_role = self.definition.get('subdevice_role', False)
if subdevice_role:
if subdevice_role == "child":
return True
# Check if module-bays exists
if self.definition.get('module-bays', False):
# There is not a standardized way to define PSUs that are module bays, so we will just assume they are valid
return True
# As the very last case, check if is_powered is defined and is False. Otherwise assume the device is powered
if not self.definition.get('is_powered', True): # is_powered defaults to True
# Arriving here means is_powered is set to False, so verify that there are no power-outlets defined
if self.definition.get('power-outlets', False):
self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-outlets" are defined.'
return False
return True
self.failureMessage = f'{self.file_path} has does not appear to have a valid power source. Ensure either "power-ports" or "interfaces" with "poe_mode" is defined.'
return False
class ModuleType:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, definition, file_path, change_type):
self.file_path = file_path
self.isDevice = False
self.definition = definition
self.manufacturer = definition.get('manufacturer')
self.model = definition.get('model')
self._slug_model = self._slugify_model()
self.part_number = definition.get('part_number', "")
self._slug_part_number = self._slugify_part_number()
self.change_type = change_type
def get_filepath(self):
return self.file_path
def _slugify_model(self):
slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
if slugified.endswith("-"):
slugified = slugified[:-1]
return slugified
def _slugify_part_number(self):
slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
if slugified.endswith("-"):
slugified = slugified[:-1]
return slugified
def validate_component_names(component_names: (set or None)):
if len(component_names) > 1:
verify_name = list(component_names[0])
for index, name in enumerate(component_names):
if index == 0:
intersection = sorted(set(verify_name) & set(list(name)), key = verify_name.index)
intersection_len = len(intersection)
verify_subset = verify_name[:intersection_len]
name_subset = list(name)[:intersection_len]
subset_match = sorted(set(verify_subset) & set(name_subset), key = name_subset.index)
if len(intersection) > 2 and len(subset_match) == len(intersection):
return False
return True
def verify_filename(device: (DeviceType or ModuleType), KNOWN_MODULES: (set or None)):
head, tail = os.path.split(device.get_filepath())
filename = tail.rsplit(".", 1)[0].casefold()
if not (filename == device._slug_model or filename == device._slug_part_number or filename == device.part_number.casefold()):
device.failureMessage = f'{device.file_path} file name is invalid. Must be either the model "{device._slug_model}" or part_number "{device.part_number} / {device._slug_part_number}"'
return False
if not device.isDevice:
matches = [file_name for file_name, file_path in KNOWN_MODULES if file_name.casefold() == filename.casefold()]
if len(matches) > 1:
device.failureMessage = f'{device.file_path} appears to be duplicated. Found {len(matches)} matches: {", ".join(matches)}'
return False
return True
def validate_components(component_types, device_or_module):
for component_type in component_types:
known_names = set()
known_components = []
defined_components = device_or_module.definition.get(component_type, [])
if not isinstance(defined_components, list):
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type}.'
return False
for idx, component in enumerate(defined_components):
if not isinstance(component, dict):
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} ({idx}).'
return False
name = component.get('name')
position = component.get('position')
eval_component = (name, position)
if not isinstance(name, str):
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} name ({idx}).'
return False
if eval_component[0] in known_names:
device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated names within {component_type} ({name}).'
return False
# Adding check for duplicate positions within a component type
# Stems from https://github.com/netbox-community/devicetype-library/pull/1586
# and from https://github.com/netbox-community/devicetype-library/issues/1584
position_set = {}
index = 0
for name, position in known_components:
if position is not None:
match = []
if len(position_set) > 0:
match = [key for key,val in position_set.items() if key == position]
if len(match) == 0:
if len(position_set) == 0:
position_set = {position: {known_components[index]}}
position_set.update({position: {known_components[index]}})
index = index + 1
for position in position_set:
if len(position_set[position]) > 1:
component_names = [name for name,pos in position_set[position]]
if not validate_component_names(component_names):
device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated positions within {component_type} ({position}).'
return False
return True

generate-slug-list.py Normal file
View File

@ -0,0 +1,96 @@
import os
import json
import glob
import yaml
import decimal
from yaml_loader import DecimalSafeLoader
from jsonschema import Draft4Validator, RefResolver
from jsonschema.exceptions import ValidationError
from test_configuration import SCHEMAS, KNOWN_SLUGS, ROOT_DIR, KNOWN_MODULES
from urllib.request import urlopen
import pickle_operations
def _get_type_files(device_or_module):
Return a list of all definition files within the specified path.
file_list = []
for path, schema in SCHEMAS:
if path == f'{device_or_module}-types':
# Initialize the schema
with open(f"{ROOT_DIR}/schema/{schema}") as schema_file:
schema = json.loads(schema_file.read(),
# Validate that the schema exists
if not schema:
print(f"Schema definition for {path} is empty!")
# Map each definition file to its schema as a tuple (file, schema)
for file in sorted(glob.glob(f"{path}/*/*", recursive=True)):
file_list.append((f'{file}', schema))
return file_list
def _decimal_file_handler(uri):
Handler to work with floating decimals that fail normal validation.
with urlopen(uri) as url:
result = json.loads(url.read().decode("utf-8"), parse_float=decimal.Decimal)
return result
def load_file(file_path, schema):
# Read file
with open(file_path) as definition_file:
content = definition_file.read()
except Exception as exc:
return (False, f'Error opening "{file_path}". stderr: {exc}')
# Check for trailing newline. YAML files must end with an emtpy newline.
if not content.endswith('\n'):
return (False, f'{file_path} is missing trailing newline')
# Load YAML data from file
definition = yaml.load(content, Loader=DecimalSafeLoader)
except Exception as exc:
return (False, f'Error during yaml.load "{file_path}". stderr: {exc}')
# Validate YAML definition against the supplied schema
resolver = RefResolver(
handlers={"file": _decimal_file_handler},
# Validate definition against schema
Draft4Validator(schema, resolver=resolver).validate(definition)
except ValidationError as exc:
# Schema validation failure. Ensure you are following the proper format.
return (False, f'{file_path} failed validation: {exc}')
return (True, definition)
def _generate_knowns(device_or_module):
all_files = _get_type_files(device_or_module)
for file_path, schema in all_files:
definition_status, definition = load_file(file_path, schema)
if not definition_status:
if device_or_module == 'device':
KNOWN_SLUGS.add((definition.get('slug'), file_path))
KNOWN_MODULES.add((os.path.splitext(os.path.basename(file_path))[0], os.path.dirname(file_path)))
pickle_operations.write_pickle_data(KNOWN_SLUGS, f'{ROOT_DIR}/tests/known-slugs.pickle')
pickle_operations.write_pickle_data(KNOWN_MODULES, f'{ROOT_DIR}/tests/known-modules.pickle')

known-modules.pickle Normal file

Binary file not shown.

known-slugs.pickle Normal file

Binary file not shown.

pickle_operations.py Normal file
View File

@ -0,0 +1,14 @@
import pickle
def write_pickle_data(data, file_path):
with open(file_path, 'wb') as pickle_file:
pickle.dump(data, pickle_file)
def read_pickle_data(file_path):
with open(file_path, 'rb') as pickle_file:
data = pickle.load(pickle_file)
return data

test_configuration.py Normal file
View File

@ -0,0 +1,38 @@
import os
('device-types', 'devicetype.json'),
('module-types', 'moduletype.json'),
'bmp', 'gif', 'pjp', 'jpg', 'pjpeg', 'jpeg', 'jfif', 'png', 'tif', 'tiff', 'webp'
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..'))
NETBOX_DT_LIBRARY_URL = "https://github.com/netbox-community/devicetype-library.git"

yaml_loader.py Normal file
View File

@ -0,0 +1,34 @@
import decimal
from yaml.composer import Composer
from yaml.constructor import SafeConstructor
from yaml.parser import Parser
from yaml.reader import Reader
from yaml.resolver import Resolver
from yaml.scanner import Scanner
class DecimalSafeConstructor(SafeConstructor):
"""Special constructor to override construct_yaml_float() in order to cast "Decimal" types to the value"""
def construct_yaml_float(self, node):
value = super().construct_yaml_float(node)
# We force the string representation of the float here to avoid things like:
# In [11]: decimal.Decimal(10.11)
# Out[11]: Decimal('10.1099999999999994315658113919198513031005859375')
return decimal.Decimal(f"{value}")
"tag:yaml.org,2002:float", DecimalSafeConstructor.construct_yaml_float
class DecimalSafeLoader(Reader, Scanner, Parser, Composer, DecimalSafeConstructor, Resolver):
def __init__(self, stream):
Reader.__init__(self, stream)