mirror of
https://github.com/netbox-community/devicetype-library.git
synced 2025-01-13 17:58:48 +01:00
6aae04ac3b
* Updating tests to solve the issue in PR#1667 which fails on a renamed file (with a valid slug) * preseving old functionality
242 lines
12 KiB
Python
242 lines
12 KiB
Python
import os
|
|
|
|
class DeviceType:
|
|
def __new__(cls, *args, **kwargs):
|
|
return super().__new__(cls)
|
|
|
|
def __init__(self, definition, file_path, change_type):
|
|
self.file_path = file_path
|
|
self.isDevice = True
|
|
self.definition = definition
|
|
self.manufacturer = definition.get('manufacturer')
|
|
self._slug_manufacturer = self._slugify_manufacturer()
|
|
self.slug = definition.get('slug')
|
|
self.model = definition.get('model')
|
|
self._slug_model = self._slugify_model()
|
|
self.part_number = definition.get('part_number', "")
|
|
self._slug_part_number = self._slugify_part_number()
|
|
self.failureMessage = None
|
|
self.change_type = change_type
|
|
|
|
def _slugify_manufacturer(self):
|
|
return self.manufacturer.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus-").replace("+", "-plus").replace("_", "-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-").replace("&", "and")
|
|
|
|
def get_slug(self):
|
|
if hasattr(self, "slug"):
|
|
return self.slug
|
|
return None
|
|
|
|
def _slugify_model(self):
|
|
slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
|
|
if slugified.endswith("-"):
|
|
slugified = slugified[:-1]
|
|
return slugified
|
|
|
|
def _slugify_part_number(self):
|
|
slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
|
|
if slugified.endswith("-"):
|
|
slugified = slugified[:-1]
|
|
return slugified
|
|
|
|
def get_filepath(self):
|
|
return self.file_path
|
|
|
|
def verify_slug(self, KNOWN_SLUGS):
|
|
# Verify the slug is unique, and not already known
|
|
known_slug_list_intersect = [(slug, file_path) for slug, file_path in KNOWN_SLUGS if slug == self.slug]
|
|
|
|
if len(known_slug_list_intersect) == 0:
|
|
pass
|
|
elif len(known_slug_list_intersect) == 1:
|
|
if self.file_path not in known_slug_list_intersect[0][1]:
|
|
if 'R' not in self.change_type:
|
|
self.failureMessage = f'{self.file_path} has a duplicate slug: "{self.slug}"'
|
|
return False
|
|
return True
|
|
else:
|
|
self.failureMessage = f'{self.file_path} has a duplicate slug "{self.slug}"'
|
|
return False
|
|
|
|
# Verify the manufacturer is appended to the slug
|
|
if not self.slug.startswith(self._slug_manufacturer):
|
|
self.failureMessage = f'{self.file_path} contains slug "{self.slug}". Does not start with manufacturer: "{self.manufacturer.casefold()}-"'
|
|
return False
|
|
|
|
# Verify the slug ends with either the model or part number
|
|
if not (self.slug.endswith(self._slug_model) or self.slug.endswith(self._slug_part_number)):
|
|
self.failureMessage = f'{self.file_path} has slug "{self.slug}". Does not end with the model "{self._slug_model}" or part_number "{self._slug_part_number}"'
|
|
return False
|
|
|
|
# Add the slug to the list of known slugs
|
|
KNOWN_SLUGS.add((self.slug, self.file_path))
|
|
return True
|
|
|
|
def validate_power(self):
|
|
# Check if power-ports exists
|
|
if self.definition.get('power-ports', False):
|
|
# Verify that is_powered is not set to False. If so, there should not be any power-ports defined
|
|
if not self.definition.get('is_powered', True):
|
|
self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-ports" are defined.'
|
|
return False
|
|
return True
|
|
|
|
# Lastly, check if interfaces exists and has a poe_mode defined
|
|
interfaces = self.definition.get('interfaces', False)
|
|
if interfaces:
|
|
for interface in interfaces:
|
|
poe_mode = interface.get('poe_mode', "")
|
|
if poe_mode != "" and poe_mode == "pd":
|
|
return True
|
|
|
|
console_ports = self.definition.get('console-ports', False)
|
|
if console_ports:
|
|
for console_port in console_ports:
|
|
poe = console_port.get('poe', False)
|
|
if poe:
|
|
return True
|
|
|
|
rear_ports = self.definition.get('rear-ports', False)
|
|
if rear_ports:
|
|
for rear_port in rear_ports:
|
|
poe = rear_port.get('poe', False)
|
|
if poe:
|
|
return True
|
|
|
|
# Check if the device is a child device, and if so, assume it has a valid power source from the parent
|
|
subdevice_role = self.definition.get('subdevice_role', False)
|
|
if subdevice_role:
|
|
if subdevice_role == "child":
|
|
return True
|
|
|
|
# Check if module-bays exists
|
|
if self.definition.get('module-bays', False):
|
|
# There is not a standardized way to define PSUs that are module bays, so we will just assume they are valid
|
|
return True
|
|
|
|
# As the very last case, check if is_powered is defined and is False. Otherwise assume the device is powered
|
|
if not self.definition.get('is_powered', True): # is_powered defaults to True
|
|
# Arriving here means is_powered is set to False, so verify that there are no power-outlets defined
|
|
if self.definition.get('power-outlets', False):
|
|
self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-outlets" are defined.'
|
|
return False
|
|
return True
|
|
|
|
self.failureMessage = f'{self.file_path} has does not appear to have a valid power source. Ensure either "power-ports" or "interfaces" with "poe_mode" is defined.'
|
|
return False
|
|
|
|
class ModuleType:
|
|
def __new__(cls, *args, **kwargs):
|
|
return super().__new__(cls)
|
|
|
|
def __init__(self, definition, file_path, change_type):
|
|
self.file_path = file_path
|
|
self.isDevice = False
|
|
self.definition = definition
|
|
self.manufacturer = definition.get('manufacturer')
|
|
self.model = definition.get('model')
|
|
self._slug_model = self._slugify_model()
|
|
self.part_number = definition.get('part_number', "")
|
|
self._slug_part_number = self._slugify_part_number()
|
|
self.change_type = change_type
|
|
|
|
def get_filepath(self):
|
|
return self.file_path
|
|
|
|
def _slugify_model(self):
|
|
slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
|
|
if slugified.endswith("-"):
|
|
slugified = slugified[:-1]
|
|
return slugified
|
|
|
|
def _slugify_part_number(self):
|
|
slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-")
|
|
if slugified.endswith("-"):
|
|
slugified = slugified[:-1]
|
|
return slugified
|
|
|
|
def validate_component_names(component_names: (set or None)):
|
|
if len(component_names) > 1:
|
|
verify_name = list(component_names[0])
|
|
for index, name in enumerate(component_names):
|
|
if index == 0:
|
|
continue
|
|
|
|
intersection = sorted(set(verify_name) & set(list(name)), key = verify_name.index)
|
|
|
|
intersection_len = len(intersection)
|
|
verify_subset = verify_name[:intersection_len]
|
|
name_subset = list(name)[:intersection_len]
|
|
subset_match = sorted(set(verify_subset) & set(name_subset), key = name_subset.index)
|
|
|
|
if len(intersection) > 2 and len(subset_match) == len(intersection):
|
|
return False
|
|
return True
|
|
|
|
def verify_filename(device: (DeviceType or ModuleType), KNOWN_MODULES: (set or None)):
|
|
head, tail = os.path.split(device.get_filepath())
|
|
filename = tail.rsplit(".", 1)[0].casefold()
|
|
|
|
if not (filename == device._slug_model or filename == device._slug_part_number or filename == device.part_number.casefold()):
|
|
device.failureMessage = f'{device.file_path} file name is invalid. Must be either the model "{device._slug_model}" or part_number "{device.part_number} / {device._slug_part_number}"'
|
|
return False
|
|
|
|
if not device.isDevice:
|
|
matches = [file_name for file_name, file_path in KNOWN_MODULES if file_name.casefold() == filename.casefold()]
|
|
if len(matches) > 1:
|
|
device.failureMessage = f'{device.file_path} appears to be duplicated. Found {len(matches)} matches: {", ".join(matches)}'
|
|
return False
|
|
|
|
return True
|
|
|
|
def validate_components(component_types, device_or_module):
|
|
for component_type in component_types:
|
|
known_names = set()
|
|
known_components = []
|
|
defined_components = device_or_module.definition.get(component_type, [])
|
|
if not isinstance(defined_components, list):
|
|
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type}.'
|
|
return False
|
|
for idx, component in enumerate(defined_components):
|
|
if not isinstance(component, dict):
|
|
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} ({idx}).'
|
|
return False
|
|
name = component.get('name')
|
|
position = component.get('position')
|
|
eval_component = (name, position)
|
|
if not isinstance(name, str):
|
|
device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} name ({idx}).'
|
|
return False
|
|
if eval_component[0] in known_names:
|
|
device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated names within {component_type} ({name}).'
|
|
return False
|
|
known_components.append(eval_component)
|
|
known_names.add(name)
|
|
|
|
# Adding check for duplicate positions within a component type
|
|
# Stems from https://github.com/netbox-community/devicetype-library/pull/1586
|
|
# and from https://github.com/netbox-community/devicetype-library/issues/1584
|
|
position_set = {}
|
|
index = 0
|
|
for name, position in known_components:
|
|
if position is not None:
|
|
match = []
|
|
if len(position_set) > 0:
|
|
match = [key for key,val in position_set.items() if key == position]
|
|
if len(match) == 0:
|
|
if len(position_set) == 0:
|
|
position_set = {position: {known_components[index]}}
|
|
else:
|
|
position_set.update({position: {known_components[index]}})
|
|
else:
|
|
position_set[position].add(known_components[index])
|
|
index = index + 1
|
|
|
|
for position in position_set:
|
|
if len(position_set[position]) > 1:
|
|
component_names = [name for name,pos in position_set[position]]
|
|
if not validate_component_names(component_names):
|
|
device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated positions within {component_type} ({position}).'
|
|
return False
|
|
|
|
return True
|