devicetype-library/tests/device_types.py

265 lines
12 KiB
Python
Raw Normal View History

2024-03-22 21:36:57 +01:00
import os
class DeviceType:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, definition, file_path, change_type):
self.file_path = file_path
self.isDevice = True
self.definition = definition
self.manufacturer = definition.get('manufacturer')
self._slug_manufacturer = self._slugify_manufacturer()
self.slug = definition.get('slug')
self.model = definition.get('model')
self._slug_model = self._slugify_model()
self.part_number = definition.get('part_number', "")
self._slug_part_number = self._slugify_part_number()
self.failureMessage = None
self.change_type = change_type
def _slugify_manufacturer(self):
return self.manufacturer.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus-").replace("+", "-plus").replace("_", "-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-").replace("&", "and")
def get_slug(self):
if hasattr(self, "slug"):
return self.slug
return None
def _slugify_model(self):
slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-").replace("(", "").replace(")", "").replace(";", "")
2024-03-22 21:36:57 +01:00
if slugified.endswith("-"):
slugified = slugified[:-1]
return slugified
def _slugify_part_number(self):
slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-").replace("(", "").replace(")", "").replace(";", "")
2024-03-22 21:36:57 +01:00
if slugified.endswith("-"):
slugified = slugified[:-1]
return slugified
def get_filepath(self):
return self.file_path
def verify_slug(self, KNOWN_SLUGS):
# Verify the slug is unique, and not already known
known_slug_list_intersect = [(slug, file_path) for slug, file_path in KNOWN_SLUGS if slug == self.slug]
if len(known_slug_list_intersect) == 0:
pass
elif len(known_slug_list_intersect) == 1:
if self.file_path not in known_slug_list_intersect[0][1]:
if 'R' not in self.change_type:
self.failureMessage = f'{self.file_path} has a duplicate slug: "{self.slug}"'
return False
return True
else:
self.failureMessage = f'{self.file_path} has a duplicate slug "{self.slug}"'
return False
# Verify the manufacturer is appended to the slug
if not self.slug.startswith(self._slug_manufacturer):
self.failureMessage = f'{self.file_path} contains slug "{self.slug}". Does not start with manufacturer: "{self.manufacturer.casefold()}-"'
return False
# Verify the slug ends with either the model or part number
if not (self.slug.endswith(self._slug_model) or self.slug.endswith(self._slug_part_number)):
self.failureMessage = f'{self.file_path} has slug "{self.slug}". Does not end with the model "{self._slug_model}" or part_number "{self._slug_part_number}"'
return False
# Add the slug to the list of known slugs
KNOWN_SLUGS.add((self.slug, self.file_path))
return True
def validate_power(self):
# Check if power-ports exists
if self.definition.get('power-ports', False):
# Verify that is_powered is not set to False. If so, there should not be any power-ports defined
if not self.definition.get('is_powered', True):
self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-ports" are defined.'
return False
return True
# Lastly, check if interfaces exists and has a poe_mode defined
interfaces = self.definition.get('interfaces', False)
if interfaces:
for interface in interfaces:
poe_mode = interface.get('poe_mode', "")
if poe_mode != "" and poe_mode == "pd":
return True
console_ports = self.definition.get('console-ports', False)
if console_ports:
for console_port in console_ports:
poe = console_port.get('poe', False)
if poe:
return True
rear_ports = self.definition.get('rear-ports', False)
if rear_ports:
for rear_port in rear_ports:
poe = rear_port.get('poe', False)
if poe:
return True
# Check if the device is a child device, and if so, assume it has a valid power source from the parent
subdevice_role = self.definition.get('subdevice_role', False)
if subdevice_role:
if subdevice_role == "child":
return True
# Check if module-bays exists
if self.definition.get('module-bays', False):
# There is not a standardized way to define PSUs that are module bays, so we will just assume they are valid
return True
# As the very last case, check if is_powered is defined and is False. Otherwise assume the device is powered
if not self.definition.get('is_powered', True): # is_powered defaults to True
# Arriving here means is_powered is set to False, so verify that there are no power-outlets defined
if self.definition.get('power-outlets', False):
self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-outlets" are defined.'
return False
return True
self.failureMessage = f'{self.file_path} has does not appear to have a valid power source. Ensure either "power-ports" or "interfaces" with "poe_mode" is defined.'
return False
def ensure_no_vga(self):
NO_VGA_COMPONENTS = [
'console-ports',
'console-server-ports',
'interfaces',
'front-ports',
'rear-ports'
]
for component_to_test in NO_VGA_COMPONENTS:
test_component = self.definition.get(component_to_test, False)
if test_component:
for component in test_component:
name = component.get('name', "")
label = component.get('label', "")
if "vga" in name.casefold() or "vga" in label.casefold():
self.failureMessage = f'{self.file_path} has a VGA component defined. VGA is not a valid definition at this time.'
return False
return True
2024-03-22 21:36:57 +01:00
class ModuleType:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, definition, file_path, change_type):
self.file_path = file_path
self.isDevice = False
self.definition = definition
self.manufacturer = definition.get('manufacturer')
self.model = definition.get('model')
self._slug_model = self._slugify_model()
self.part_number = definition.get('part_number', "")
self._slug_part_number = self._slugify_part_number()
self.change_type = change_type
def get_filepath(self):
return self.file_path
def _slugify_model(self):
slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
if slugified.endswith("-"):
slugified = slugified[:-1]
return slugified
def _slugify_part_number(self):
slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
if slugified.endswith("-"):
slugified = slugified[:-1]
return slugified
def validate_component_names(component_names: (set or None)):
if len(component_names) > 1:
verify_name = list(component_names[0])
for index, name in enumerate(component_names):
if index == 0:
continue
intersection = sorted(set(verify_name) & set(list(name)), key = verify_name.index)
intersection_len = len(intersection)
verify_subset = verify_name[:intersection_len]
name_subset = list(name)[:intersection_len]
subset_match = sorted(set(verify_subset) & set(name_subset), key = name_subset.index)
if len(intersection) > 2 and len(subset_match) == len(intersection):
return False
return True
def verify_filename(device: (DeviceType or ModuleType), KNOWN_MODULES: (set or None)):
head, tail = os.path.split(device.get_filepath())
filename = tail.rsplit(".", 1)[0].casefold()
if not (filename == device._slug_model or filename == device._slug_part_number or filename == device.part_number.casefold()):
device.failureMessage = f'{device.file_path} file name is invalid. Must be either the model "{device._slug_model}" or part_number "{device.part_number} / {device._slug_part_number}"'
return False
if not device.isDevice:
matches = [file_name for file_name, file_path in KNOWN_MODULES if file_name.casefold() == filename.casefold()]
if len(matches) > 1:
device.failureMessage = f'{device.file_path} appears to be duplicated. Found {len(matches)} matches: {", ".join(matches)}'
return False
return True
def validate_components(component_types, device_or_module):
for component_type in component_types:
known_names = set()
known_components = []
defined_components = device_or_module.definition.get(component_type, [])
if not isinstance(defined_components, list):
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type}.'
return False
for idx, component in enumerate(defined_components):
if not isinstance(component, dict):
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} ({idx}).'
return False
name = component.get('name')
position = component.get('position')
eval_component = (name, position)
if not isinstance(name, str):
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} name ({idx}).'
return False
if eval_component[0] in known_names:
device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated names within {component_type} ({name}).'
return False
known_components.append(eval_component)
known_names.add(name)
# Adding check for duplicate positions within a component type
# Stems from https://github.com/netbox-community/devicetype-library/pull/1586
# and from https://github.com/netbox-community/devicetype-library/issues/1584
position_set = {}
index = 0
for name, position in known_components:
if position is not None:
match = []
if len(position_set) > 0:
match = [key for key,val in position_set.items() if key == position]
if len(match) == 0:
if len(position_set) == 0:
position_set = {position: {known_components[index]}}
else:
position_set.update({position: {known_components[index]}})
else:
position_set[position].add(known_components[index])
index = index + 1
for position in position_set:
if len(position_set[position]) > 1:
component_names = [name for name,pos in position_set[position]]
if not validate_component_names(component_names):
device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated positions within {component_type} ({position}).'
return False
return True