From 98857fc64988223cf0cf2d582b4f392a2566a706 Mon Sep 17 00:00:00 2001 From: "Daniel W. Anner" Date: Fri, 24 Mar 2023 15:13:38 -0400 Subject: [PATCH] Beta Release Merge (#87) * Delete gitcmd.py * Delete nb-dt-import.py * Add files via upload * Logging cleanup (#78) * Removed multiple imports of settings.py * stating to abstract the netbox api calls to their own class * Abstracted away determine features from main script, implemented as part of class initialization * added helper functions to get repos relative & absolute path * renaming gitcmd to repo * starting to abstract away the get_files * fixed issue where spaces and commas in vendor list with/without spaces breaks matching * Added prelim fix for slugs if same issue vendors arg was facing exists. untested * Finished abstracting the get_files function. Reduced fors and ifs to be cleaner and more efficent * abstracted getFiles to repo class. Fixed slug issue not matching because of new slug format. added non-halting log function and renamed exception handler to log handler. * utilized new logging class throughout script to reduce excess logging * Abstracted and optimized create manufacturers * Abstracted the create interfaces for devices to the netbox api class * Fixed regression where check manufactuerers did not have the latest list * Fixed regression caused by externally calling script. Discovered from https://github.com/netbox-community/Device-Type-Library-Import/pull/76 * abstracted all device interfaces to the devicetype class. optimized function calls to reduce duplicate code and reduce extra log calls * Ran against all devices and passed with flying colors * formatting settings.py * formatting repo.py * formatting main file * formatting log_handler.py * added back executable on file (#79) * Add more info to failed device_type creations (#81) --------- Co-authored-by: Philipp Rintz <13933258+p-rintz@users.noreply.github.com> --- exception_handler.py | 27 -- gitcmd.py | 45 --- log_handler.py | 44 +++ nb-dt-import.py | 825 ++----------------------------------------- netbox_api.py | 437 +++++++++++++++++++++++ repo.py | 103 ++++++ settings.py | 16 +- 7 files changed, 625 insertions(+), 872 deletions(-) delete mode 100644 exception_handler.py delete mode 100644 gitcmd.py create mode 100644 log_handler.py create mode 100644 netbox_api.py create mode 100644 repo.py diff --git a/exception_handler.py b/exception_handler.py deleted file mode 100644 index 3bc0da4..0000000 --- a/exception_handler.py +++ /dev/null @@ -1,27 +0,0 @@ -from sys import exit as system_exit - - - - - -class ExceptionHandler: - def __new__(cls, *args, **kwargs): - return super().__new__(cls) - - def __init__(self, args): - self.args = args - - def exception(self, exception_type, exception, stack_trace=None): - exception_dict = { - "EnvironmentError": f'Environment variable "{exception}" is not set.', - "SSLError": f'SSL verification failed. IGNORE_SSL_ERRORS is {exception}. Set IGNORE_SSL_ERRORS to True if you want to ignore this error. EXITING.', - "GitCommandError": f'The repo "{exception}" is not a valid git repo.', - "GitInvalidRepositoryError": f'The repo "{exception}" is not a valid git repo.', - "Exception": f'An unknown error occurred: "{exception}"' - } - - if self.args.verbose and stack_trace: - print(stack_trace) - print(exception_dict[exception_type]) - system_exit(1) - diff --git a/gitcmd.py b/gitcmd.py deleted file mode 100644 index c192469..0000000 --- a/gitcmd.py +++ /dev/null @@ -1,45 +0,0 @@ -from git import Repo, exc -from sys import exit as system_exit -import settings -import os - -class GitCMD: - def __new__(cls, *args, **kwargs): - return super().__new__(cls) - - def __init__(self, args, repo_path): - self.url = args.url - self.repo_path = repo_path - self.branch = args.branch -# self.repo = Repo() - self.cwd = os.getcwd() - - if os.path.isdir(self.repo_path): - self.pull_repo() - else: - self.clone_repo() - - - def pull_repo(self): - try: - print("Package devicetype-library is already installed, " - + f"updating {os.path.join(self.cwd, self.repo_path)}") - self.repo = Repo(self.repo_path) - if not self.repo.remotes.origin.url.endswith('.git'): - settings.handle.exception("GitInvalidRepositoryError", self.repo.remotes.origin.url, f"Origin URL {self.repo.remotes.origin.url} does not end with .git") - self.repo.remotes.origin.pull() - self.repo.git.checkout(self.branch) - print(f"Pulled Repo {self.repo.remotes.origin.url}") - except exc.GitCommandError as git_error: - settings.handle.exception("GitCommandError", self.repo.remotes.origin.url, git_error) - except Exception as git_error: - settings.handle.exception("Exception", 'Git Repository Error', git_error) - - def clone_repo(self): - try: - self.repo = Repo.clone_from(self.url, os.path.join(self.cwd, self.repo_path), branch=self.branch) - print(f"Package Installed {self.repo.remotes.origin.url}") - except exc.GitCommandError as git_error: - settings.handle.exception("GitCommandError", self.url, git_error) - except Exception as git_error: - settings.handle.exception("Exception", 'Git Repository Error', git_error) diff --git a/log_handler.py b/log_handler.py new file mode 100644 index 0000000..46cdf0b --- /dev/null +++ b/log_handler.py @@ -0,0 +1,44 @@ +from sys import exit as system_exit + + +class LogHandler: + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__(self, args): + self.args = args + + def exception(self, exception_type, exception, stack_trace=None): + exception_dict = { + "EnvironmentError": f'Environment variable "{exception}" is not set.', + "SSLError": f'SSL verification failed. IGNORE_SSL_ERRORS is {exception}. Set IGNORE_SSL_ERRORS to True if you want to ignore this error. EXITING.', + "GitCommandError": f'The repo "{exception}" is not a valid git repo.', + "GitInvalidRepositoryError": f'The repo "{exception}" is not a valid git repo.', + "Exception": f'An unknown error occurred: "{exception}"' + } + + if self.args.verbose and stack_trace: + print(stack_trace) + print(exception_dict[exception_type]) + system_exit(1) + + def verbose_log(self, message): + if self.args.verbose: + print(message) + + def log(self, message): + print(message) + + def log_device_ports_created(self, created_ports: list = [], port_type: str = "port"): + for port in created_ports: + self.verbose_log(f'{port_type} Template Created: {port.name} - ' + + f'{port.type if hasattr(port, "type") else ""} - {port.device_type.id} - ' + + f'{port.id}') + return len(created_ports) + + def log_module_ports_created(self, created_ports: list = [], port_type: str = "port"): + for port in created_ports: + self.verbose_log(f'{port_type} Template Created: {port.name} - ' + + f'{port.type if hasattr(port, "type") else ""} - {port.module_type.id} - ' + + f'{port.id}') + return len(created_ports) diff --git a/nb-dt-import.py b/nb-dt-import.py index 7e55e17..34b2728 100755 --- a/nb-dt-import.py +++ b/nb-dt-import.py @@ -3,814 +3,51 @@ from collections import Counter from datetime import datetime import yaml import pynetbox -import glob +from glob import glob import os -import re -import requests import settings +from netbox_api import NetBox -counter = Counter( - added=0, - updated=0, - manufacturer=0, - module_added=0, - module_port_added=0, -) - -def determine_features(nb): - '''Automatically determine the netbox features available. - - Currently only checks for existence of module-types. - - Args: - nb: pynetbox API instance - - Returns: - None - - Raises: - None - ''' - - # nb.version should be the version in the form '3.2' - nb_ver = [int(x) for x in nb.version.split('.')] - - # Later than 3.2 - # Might want to check for the module-types entry as well? - if nb_ver[0] > 3 or (nb_ver[0] == 3 and nb_ver[1] >= 2): - settings.NETBOX_FEATURES['modules'] = True - - - - -def slugFormat(name): - return re.sub('\W+','-', name.lower()) - -YAML_EXTENSIONS = ['yml', 'yaml'] - -def getFiles(vendors=None): - - files = [] - discoveredVendors = [] - base_path = f'{settings.REPO_PATH}/device-types/' - if vendors: - for r, d, f in os.walk(base_path): - for folder in d: - for vendor in vendors: - if vendor.lower() == folder.lower(): - discoveredVendors.append({'name': folder, - 'slug': slugFormat(folder)}) - for extension in YAML_EXTENSIONS: - files.extend(glob.glob(base_path + folder + f'/*.{extension}')) - else: - for r, d, f in os.walk(base_path): - for folder in d: - if folder.lower() != "Testing": - discoveredVendors.append({'name': folder, - 'slug': slugFormat(folder)}) - for extension in YAML_EXTENSIONS: - files.extend(glob.glob(base_path + f'[!Testing]*/*.{extension}')) - return files, discoveredVendors - -def get_files_modules(vendors=None): - '''Get files list for modules. - - Args: - vendors: List of vendors to sync or None to sync all vendors. - - Returns: - A 2-tuple of: - - list of filenames found - - list of vendors found - - ''' - - files = [] - discoveredVendors = [] - base_path = f'{settings.REPO_PATH}/module-types/' - if vendors: - for r, d, f in os.walk(base_path): - for folder in d: - for vendor in vendors: - if vendor.lower() == folder.lower(): - discoveredVendors.append({'name': folder, - 'slug': slugFormat(folder)}) - for extension in YAML_EXTENSIONS: - files.extend( - glob.glob( - base_path + folder + f'/*.{extension}' - ) - ) - else: - for r, d, f in os.walk(base_path): - for folder in d: - if folder.lower() != "Testing": - discoveredVendors.append({'name': folder, - 'slug': slugFormat(folder)}) - for extension in YAML_EXTENSIONS: - files.extend(glob.glob(base_path + f'[!Testing]*/*.{extension}')) - - return files, discoveredVendors - -def readYAMl(files, **kwargs): - slugs = kwargs.get('slugs', None) - deviceTypes = [] - manufacturers = [] - for file in files: - with open(file, 'r') as stream: - try: - data = yaml.safe_load(stream) - except yaml.YAMLError as exc: - print(exc) - continue - manufacturer = data['manufacturer'] - data['manufacturer'] = {} - data['manufacturer']['name'] = manufacturer - data['manufacturer']['slug'] = slugFormat(manufacturer) - - if slugs and data['slug'] not in slugs: - print(f"Skipping {data['model']}") - continue - - deviceTypes.append(data) - manufacturers.append(manufacturer) - return deviceTypes - -def read_yaml_modules(files, **kwargs): - - slugs = kwargs.get('slugs', None) - module_types = [] - manufacturers = [] - for file in files: - with open(file, 'r') as stream: - try: - data = yaml.safe_load(stream) - except yaml.YAMLError as exc: - print(exc) - continue - manufacturer = data['manufacturer'] - data['manufacturer'] = {} - data['manufacturer']['name'] = manufacturer - data['manufacturer']['slug'] = slugFormat(manufacturer) - - if slugs and data['slug'] not in slugs: - print(f"Skipping {data['model']}") - continue - - module_types.append(data) - manufacturers.append(manufacturer) - return module_types - -def createManufacturers(vendors, nb): - all_manufacturers = {str(item): item for item in nb.dcim.manufacturers.all()} - need_manufacturers = [] - for vendor in vendors: - try: - manGet = all_manufacturers[vendor["name"]] - print(f'Manufacturer Exists: {manGet.name} - {manGet.id}') - except KeyError: - need_manufacturers.append(vendor) - - if not need_manufacturers: - return - - try: - manSuccess = nb.dcim.manufacturers.create(need_manufacturers) - for man in manSuccess: - print(f'Manufacturer Created: {man.name} - ' - + f'{man.id}') - counter.update({'manufacturer': 1}) - except pynetbox.RequestError as e: - print(e.error) - - -def createInterfaces(interfaces, deviceType, nb): - all_interfaces = {str(item): item for item in nb.dcim.interface_templates.filter(devicetype_id=deviceType)} - need_interfaces = [] - for interface in interfaces: - try: - ifGet = all_interfaces[interface["name"]] - print(f'Interface Template Exists: {ifGet.name} - {ifGet.type}' - + f' - {ifGet.device_type.id} - {ifGet.id}') - except KeyError: - interface['device_type'] = deviceType - need_interfaces.append(interface) - - if not need_interfaces: - return - - try: - ifSuccess = nb.dcim.interface_templates.create(need_interfaces) - for intf in ifSuccess: - print(f'Interface Template Created: {intf.name} - ' - + f'{intf.type} - {intf.device_type.id} - ' - + f'{intf.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def create_module_interfaces(interfaces, module_type, nb): - all_interfaces = {str(item): item for item in nb.dcim.interface_templates.filter(moduletype_id=module_type)} - need_interfaces = [] - for interface in interfaces: - try: - if_res = all_interfaces[interface["name"]] - print(f'Interface Template Exists: {if_res.name} - {if_res.type}' - + f' - {if_res.module_type.id} - {if_res.id}') - except KeyError: - interface['module_type'] = module_type - need_interfaces.append(interface) - - if not need_interfaces: - return - - try: - ifSuccess = nb.dcim.interface_templates.create(need_interfaces) - for intf in ifSuccess: - print(f'Interface Template Created: {intf.name} - ' - + f'{intf.type} - {intf.module_type.id} - ' - + f'{intf.id}') - counter.update({'module_port_added': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def createConsolePorts(consoleports, deviceType, nb): - all_consoleports = {str(item): item for item in nb.dcim.console_port_templates.filter(devicetype_id=deviceType)} - need_consoleports = [] - for consoleport in consoleports: - try: - cpGet = all_consoleports[consoleport["name"]] - print(f'Console Port Template Exists: {cpGet.name} - ' - + f'{cpGet.type} - {cpGet.device_type.id} - {cpGet.id}') - except KeyError: - consoleport['device_type'] = deviceType - need_consoleports.append(consoleport) - - if not need_consoleports: - return - - try: - cpSuccess = nb.dcim.console_port_templates.create(need_consoleports) - for port in cpSuccess: - print(f'Console Port Created: {port.name} - ' - + f'{port.type} - {port.device_type.id} - ' - + f'{port.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def create_module_console_ports(consoleports, module_type, nb): - - all_consoleports = {str(item): item for item in nb.dcim.console_port_templates.filter(moduletype_id=module_type)} - need_consoleports = [] - for consoleport in consoleports: - try: - cpGet = all_consoleports[consoleport["name"]] - print(f'Console Port Template Exists: {cpGet.name} - ' - + f'{cpGet.type} - {cpGet.module_type.id} - {cpGet.id}') - except KeyError: - consoleport['module_type'] = module_type - need_consoleports.append(consoleport) - - if not need_consoleports: - return - - try: - cpSuccess = nb.dcim.console_port_templates.create(need_consoleports) - for port in cpSuccess: - print(f'Console Port Created: {port.name} - {port.type} - ' + - f'{port.module_type.id} - {port.id}') - counter.update({'module_port_added': 1}) - except pynetbox.RequestError as e: - print(f"Error creating module console port: {e.error}") - -def createPowerPorts(powerports, deviceType, nb): - all_power_ports = {str(item): item for item in nb.dcim.power_port_templates.filter(devicetype_id=deviceType)} - need_power_ports = [] - for powerport in powerports: - try: - ppGet = all_power_ports[powerport["name"]] - print(f'Power Port Template Exists: {ppGet.name} - ' - + f'{ppGet.type} - {ppGet.device_type.id} - {ppGet.id}') - except KeyError: - powerport['device_type'] = deviceType - need_power_ports.append(powerport) - - if not need_power_ports: - return - - try: - ppSuccess = nb.dcim.power_port_templates.create(need_power_ports) - for pp in ppSuccess: - print(f'Interface Template Created: {pp.name} - ' - + f'{pp.type} - {pp.device_type.id} - ' - + f'{pp.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def create_module_power_ports(powerports, module_type, nb): - all_power_ports = {str(item): item for item in nb.dcim.power_port_templates.filter(moduletype_id=module_type)} - need_power_ports = [] - for powerport in powerports: - try: - ppGet = all_power_ports[powerport["name"]] - print(f'Power Port Template Exists: {ppGet.name} - ' + - f'{ppGet.type} - {ppGet.module_type.id} - {ppGet.id}') - except KeyError: - powerport['module_type'] = module_type - need_power_ports.append(powerport) - - if not need_power_ports: - return - - try: - ppSuccess = nb.dcim.power_port_templates.create(need_power_ports) - for pp in ppSuccess: - print(f'Power port template created: {pp.name} - ' - + f'{pp.type} - {pp.module_type.id} - {pp.id}') - counter.update({'module_port_added': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def createConsoleServerPorts(consoleserverports, deviceType, nb): - all_consoleserverports = {str(item): item for item in nb.dcim.console_server_port_templates.filter(devicetype_id=deviceType)} - need_consoleserverports = [] - for csport in consoleserverports: - try: - cspGet = all_consoleserverports[csport["name"]] - print(f'Console Server Port Template Exists: {cspGet.name} - ' - + f'{cspGet.type} - {cspGet.device_type.id} - ' - + f'{cspGet.id}') - except KeyError: - csport['device_type'] = deviceType - need_consoleserverports.append(csport) - - if not need_consoleserverports: - return - - try: - cspSuccess = nb.dcim.console_server_port_templates.create( - need_consoleserverports) - for csp in cspSuccess: - print(f'Console Server Port Created: {csp.name} - ' - + f'{csp.type} - {csp.device_type.id} - ' - + f'{csp.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def create_module_console_server_ports(consoleserverports, module_type, nb): - all_consoleserverports = {str(item): item for item in nb.dcim.console_server_port_templates.filter(moduletype_id=module_type)} - need_consoleserverports = [] - for csport in consoleserverports: - try: - cspGet = all_consoleserverports[csport["name"]] - print(f'Console Server Port Template Exists: {cspGet.name} - ' - + f'{cspGet.type} - {cspGet.module_type.id} - ' - + f'{cspGet.id}') - except KeyError: - csport['module_type'] = module_type - need_consoleserverports.append(csport) - - if not need_consoleserverports: - return - - try: - cspSuccess = nb.dcim.console_server_port_templates.create( - need_consoleserverports) - for csp in cspSuccess: - print(f'Console Server Port Created: {csp.name} - ' - + f'{csp.type} - {csp.module_type.id} - ' - + f'{csp.id}') - counter.update({'module_port_added': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def createFrontPorts(frontports, deviceType, nb): - all_frontports = {str(item): item for item in nb.dcim.front_port_templates.filter(devicetype_id=deviceType)} - need_frontports = [] - for frontport in frontports: - try: - fpGet = all_frontports[frontport["name"]] - print(f'Front Port Template Exists: {fpGet.name} - ' - + f'{fpGet.type} - {fpGet.device_type.id} - {fpGet.id}') - except KeyError: - frontport['device_type'] = deviceType - need_frontports.append(frontport) - - if not need_frontports: - return - - all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(devicetype_id=deviceType)} - for port in need_frontports: - try: - rpGet = all_rearports[port["rear_port"]] - port['rear_port'] = rpGet.id - except KeyError: - print(f'Could not find Rear Port for Front Port: {port["name"]} - ' - + f'{port["type"]} - {deviceType}') - - try: - fpSuccess = nb.dcim.front_port_templates.create(need_frontports) - for fp in fpSuccess: - print(f'Front Port Created: {fp.name} - ' - + f'{fp.type} - {fp.device_type.id} - ' - + f'{fp.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def create_module_front_ports(frontports, module_type, nb): - all_frontports = {str(item): item for item in nb.dcim.front_port_templates.filter(moduletype_id=module_type)} - need_frontports = [] - for frontport in frontports: - try: - fpGet = all_frontports[frontport["name"]] - print(f'Front Port Template Exists: {fpGet.name} - ' - + f'{fpGet.type} - {fpGet.module_type.id} - {fpGet.id}') - except KeyError: - frontport['module_type'] = module_type - need_frontports.append(frontport) - - if not need_frontports: - return - - all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(moduletype_id=module_type)} - for port in need_frontports: - try: - rpGet = all_rearports[port["rear_port"]] - port['rear_port'] = rpGet.id - except KeyError: - print(f'Could not find Rear Port for Front Port: {port["name"]} - ' - + f'{port["type"]} - {module_type}') - - try: - fpSuccess = nb.dcim.front_port_templates.create(need_frontports) - for fp in fpSuccess: - print(f'Front Port Created: {fp.name} - ' - + f'{fp.type} - {fp.module_type.id} - ' - + f'{fp.id}') - counter.update({'module_port_added': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def createRearPorts(rearports, deviceType, nb): - all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(devicetype_id=deviceType)} - need_rearports = [] - for rearport in rearports: - try: - rpGet = all_rearports[rearport["name"]] - print(f'Rear Port Template Exists: {rpGet.name} - {rpGet.type}' - + f' - {rpGet.device_type.id} - {rpGet.id}') - except KeyError: - rearport['device_type'] = deviceType - need_rearports.append(rearport) - - if not need_rearports: - return - - try: - rpSuccess = nb.dcim.rear_port_templates.create( - need_rearports) - for rp in rpSuccess: - print(f'Rear Port Created: {rp.name} - {rp.type}' - + f' - {rp.device_type.id} - {rp.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def create_module_rear_ports(rearports, module_type, nb): - all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(moduletype_id=module_type)} - need_rearports = [] - for rearport in rearports: - try: - rpGet = all_rearports[rearport["name"]] - print(f'Rear Port Template Exists: {rpGet.name} - {rpGet.type}' - + f' - {rpGet.module_type.id} - {rpGet.id}') - except KeyError: - rearport['module_type'] = module_type - need_rearports.append(rearport) - - if not need_rearports: - return - - try: - rpSuccess = nb.dcim.rear_port_templates.create( - need_rearports) - for rp in rpSuccess: - print(f'Rear Port Created: {rp.name} - {rp.type}' - + f' - {rp.module_type.id} - {rp.id}') - counter.update({'module_port_added': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def createDeviceBays(devicebays, deviceType, nb): - all_devicebays = {str(item): item for item in nb.dcim.device_bay_templates.filter(devicetype_id=deviceType)} - need_devicebays = [] - for devicebay in devicebays: - try: - dbGet = all_devicebays[devicebay["name"]] - print(f'Device Bay Template Exists: {dbGet.name} - ' - + f'{dbGet.device_type.id} - {dbGet.id}') - except KeyError: - devicebay['device_type'] = deviceType - need_devicebays.append(devicebay) - - if not need_devicebays: - return - - try: - dbSuccess = nb.dcim.device_bay_templates.create(need_devicebays) - for db in dbSuccess: - print(f'Device Bay Created: {db.name} - ' - + f'{db.device_type.id} - {db.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - - -def create_module_bays(module_bays, device_type, nb): - '''Create module bays. - - Args: - module_bays: parsed YAML module_bays section. - device_type: the device type instance from netbox. - nb: Netbox API instance - ''' - all_module_bays = { - str(item): item for item in nb.dcim.module_bay_templates.filter( - devicetype_id=device_type - ) - } - need_module_bays = [] - for module_bay in module_bays: - try: - dbGet = all_module_bays[module_bay["name"]] - print(f'Module Bay Template Exists: {dbGet.name} - ' - + f'{dbGet.device_type.id} - {dbGet.id}') - except KeyError: - module_bay['device_type'] = device_type - need_module_bays.append(module_bay) - - if not need_module_bays: - return - - try: - module_bay_res = nb.dcim.module_bay_templates.create(need_module_bays) - for module_bay in module_bay_res: - print(f'Module Bay Created: {module_bay.name} - ' - + f'{module_bay.device_type.id} - {module_bay.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - - -def createPowerOutlets(poweroutlets, deviceType, nb): - all_poweroutlets = {str(item): item for item in nb.dcim.power_outlet_templates.filter(devicetype_id=deviceType)} - need_poweroutlets = [] - for poweroutlet in poweroutlets: - try: - poGet = all_poweroutlets[poweroutlet["name"]] - print(f'Power Outlet Template Exists: {poGet.name} - ' - + f'{poGet.type} - {poGet.device_type.id} - {poGet.id}') - except KeyError: - poweroutlet["device_type"] = deviceType - need_poweroutlets.append(poweroutlet) - - if not need_poweroutlets: - return - - all_power_ports = {str(item): item for item in nb.dcim.power_port_templates.filter(devicetype_id=deviceType)} - for outlet in need_poweroutlets: - try: - ppGet = all_power_ports[outlet["power_port"]] - outlet['power_port'] = ppGet.id - except KeyError: - pass - - try: - poSuccess = nb.dcim.power_outlet_templates.create( - need_poweroutlets) - for po in poSuccess: - print(f'Power Outlet Created: {po.name} - ' - + f'{po.type} - {po.device_type.id} - ' - + f'{po.id}') - counter.update({'updated': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def create_module_power_outlets(poweroutlets, module_type, nb): - '''Create missing module power outlets. - - Args: - poweroutlets: YAML power outlet data. - module_type: Netbox module_type instance. - nb: pynetbox API instance. - - Returns: - None - - Raises: - None - ''' - all_poweroutlets = {str(item): item for item in nb.dcim.power_outlet_templates.filter(moduletype_id=module_type)} - need_poweroutlets = [] - for poweroutlet in poweroutlets: - try: - poGet = all_poweroutlets[poweroutlet["name"]] - print(f'Power Outlet Template Exists: {poGet.name} - {poGet.type}' - + f' - {poGet.module_type.id} - {poGet.id}') - except KeyError: - poweroutlet["module_type"] = module_type - need_poweroutlets.append(poweroutlet) - - if not need_poweroutlets: - return - - all_power_ports = {str(item): item for item in nb.dcim.\ - power_port_templates.filter(moduletype_id=module_type)} - for outlet in need_poweroutlets: - try: - ppGet = all_power_ports[outlet["power_port"]] - outlet['power_port'] = ppGet.id - except KeyError: - pass - - try: - poSuccess = nb.dcim.power_outlet_templates.create( - need_poweroutlets) - for po in poSuccess: - print(f'Power Outlet Created: {po.name} - ' - + f'{po.type} - {po.module_type.id} - ' - + f'{po.id}') - counter.update({'module_port_added': 1}) - except pynetbox.RequestError as e: - print(e.error) - -def createDeviceTypes(deviceTypes, nb): - all_device_types = {str(item): item for item in nb.dcim.device_types.all()} - for deviceType in deviceTypes: - try: - dt = all_device_types[deviceType["model"]] - print(f'Device Type Exists: {dt.manufacturer.name} - ' - + f'{dt.model} - {dt.id}') - except KeyError: - try: - dt = nb.dcim.device_types.create(deviceType) - counter.update({'added': 1}) - print(f'Device Type Created: {dt.manufacturer.name} - ' - + f'{dt.model} - {dt.id}') - except pynetbox.RequestError as e: - print(e.error) - - if "interfaces" in deviceType: - createInterfaces(deviceType["interfaces"], - dt.id, nb) - if "power-ports" in deviceType: - createPowerPorts(deviceType["power-ports"], - dt.id, nb) - if "power-port" in deviceType: - createPowerPorts(deviceType["power-port"], - dt.id, nb) - if "console-ports" in deviceType: - createConsolePorts(deviceType["console-ports"], - dt.id, nb) - if "power-outlets" in deviceType: - createPowerOutlets(deviceType["power-outlets"], - dt.id, nb) - if "console-server-ports" in deviceType: - createConsoleServerPorts( - deviceType["console-server-ports"], dt.id, nb) - if "rear-ports" in deviceType: - createRearPorts(deviceType["rear-ports"], - dt.id, nb) - if "front-ports" in deviceType: - createFrontPorts(deviceType["front-ports"], - dt.id, nb) - if "device-bays" in deviceType: - createDeviceBays(deviceType["device-bays"], - dt.id, nb) - if settings.NETBOX_FEATURES['modules'] and 'module-bays' in deviceType: - create_module_bays(deviceType['module-bays'], - dt.id, nb) - -def create_module_types(module_types, nb): - '''Create missing module types. - - Args: - module_types: yaml data from repo. - nb: pynetbox API instance - - Returns: - None - ''' - - all_module_types = {} - for curr_nb_mt in nb.dcim.module_types.all(): - if curr_nb_mt.manufacturer.slug not in all_module_types: - all_module_types[curr_nb_mt.manufacturer.slug] = {} - - all_module_types[curr_nb_mt.manufacturer.slug][curr_nb_mt.model] = curr_nb_mt - - - for curr_mt in module_types: - try: - module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]] - print(f'Module Type Exists: {module_type_res.manufacturer.name} - ' - + f'{module_type_res.model} - {module_type_res.id}') - except KeyError: - try: - module_type_res = nb.dcim.module_types.create(curr_mt) - counter.update({'module_added': 1}) - print(f'Module Type Created: {module_type_res.manufacturer.name} - ' - + f'{module_type_res.model} - {module_type_res.id}') - except pynetbox.RequestError as exce: - print(f"Error '{exce.error}' creating module type: " + - f"{curr_mt}") - - #module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]] - - if "interfaces" in curr_mt: - create_module_interfaces(curr_mt["interfaces"], - module_type_res.id, nb) - if "power-ports" in curr_mt: - create_module_power_ports(curr_mt["power-ports"], - module_type_res.id, nb) - if "console-ports" in curr_mt: - create_module_console_ports(curr_mt["console-ports"], - module_type_res.id, nb) - if "power-outlets" in curr_mt: # No current entries to test - create_module_power_outlets(curr_mt["power-outlets"], - module_type_res.id, nb) - if "console-server-ports" in curr_mt: # No current entries to test - create_module_console_server_ports(curr_mt["console-server-ports"], - module_type_res.id, nb) - if "rear-ports" in curr_mt: - create_module_rear_ports(curr_mt["rear-ports"], - module_type_res.id, nb) - if "front-ports" in curr_mt: - create_module_front_ports(curr_mt["front-ports"], - module_type_res.id, nb) def main(): - - cwd = os.getcwd() startTime = datetime.now() args = settings.args - nbUrl = settings.NETBOX_URL - nbToken = settings.NETBOX_TOKEN - nb = pynetbox.api(nbUrl, token=nbToken) - - try: - determine_features(nb) - except requests.exceptions.SSLError as ssl_exception: - if not settings.IGNORE_SSL_ERRORS: - settings.handle.exception("SSLError", settings.IGNORE_SSL_ERRORS, ssl_exception) - print("IGNORE_SSL_ERRORS is True, catching exception and disabling SSL verification.") - requests.packages.urllib3.disable_warnings() - nb.http_session.verify = False - determine_features(nb) + netbox = NetBox(settings) + files, vendors = settings.dtl_repo.get_devices( + f'{settings.dtl_repo.repo_path}/device-types/', args.vendors) - if not args.vendors: - print("No Vendors Specified, Gathering All Device-Types") - files, vendors = getFiles() - else: - print("Vendor Specified, Gathering All Matching Device-Types") - files, vendors = getFiles(args.vendors) + settings.handle.log(f'{len(vendors)} Vendors Found') + device_types = settings.dtl_repo.parse_files(files, slugs=args.slugs) + settings.handle.log(f'{len(device_types)} Device-Types Found') + netbox.create_manufacturers(vendors) + netbox.create_device_types(device_types) + if netbox.modules: + settings.handle.log("Modules Enabled. Creating Modules...") + files, vendors = settings.dtl_repo.get_devices( + f'{settings.dtl_repo.repo_path}/module-types/', args.vendors) + settings.handle.log(f'{len(vendors)} Module Vendors Found') + module_types = settings.dtl_repo.parse_files(files, slugs=args.slugs) + settings.handle.log(f'{len(module_types)} Module-Types Found') + netbox.create_manufacturers(vendors) + netbox.create_module_types(module_types) - print(str(len(vendors)) + " Vendors Found") - deviceTypes = readYAMl(files, slugs=args.slugs) - print(str(len(deviceTypes)) + " Device-Types Found") - createManufacturers(vendors, nb) - createDeviceTypes(deviceTypes, nb) - + settings.handle.log('---') + settings.handle.verbose_log( + f'Script took {(datetime.now() - startTime)} to run') + settings.handle.log(f'{netbox.counter["added"]} devices created') + settings.handle.log( + f'{netbox.counter["updated"]} interfaces/ports updated') + settings.handle.log( + f'{netbox.counter["manufacturer"]} manufacturers created') if settings.NETBOX_FEATURES['modules']: - if not args.vendors: - print("No Vendors Specified, Gathering All Module-Types") - files, vendors = get_files_modules() - else: - print("Vendor Specified, Gathering All Matching Module-Types") - files, vendors = get_files_modules(args.vendors) + settings.handle.log( + f'{netbox.counter["module_added"]} modules created') + settings.handle.log( + f'{netbox.counter["module_port_added"]} module interface / ports created') - print(str(len(vendors)) + " Module Vendors Found") - module_types = read_yaml_modules(files, slugs=args.slugs) - print(str(len(module_types)) + " Module-Types Found") - createManufacturers(vendors, nb) - create_module_types(module_types, nb) - - print('---') - print('Script took {} to run'.format(datetime.now() - startTime)) - print('{} devices created'.format(counter['added'])) - print('{} interfaces/ports updated'.format(counter['updated'])) - print('{} manufacturers created'.format(counter['manufacturer'])) - if settings.NETBOX_FEATURES['modules']: - print(f"{counter['module_added']} modules created") - print(f"{counter['module_port_added']} module interface / ports created") - if __name__ == "__main__": main() diff --git a/netbox_api.py b/netbox_api.py new file mode 100644 index 0000000..09d048c --- /dev/null +++ b/netbox_api.py @@ -0,0 +1,437 @@ +from collections import Counter +import pynetbox +import requests +# from pynetbox import RequestError as APIRequestError + +class NetBox: + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__(self, settings): + self.counter = Counter( + added=0, + updated=0, + manufacturer=0, + module_added=0, + module_port_added=0, + ) + self.url = settings.NETBOX_URL + self.token = settings.NETBOX_TOKEN + self.handle = settings.handle + self.netbox = None + self.ignore_ssl = settings.IGNORE_SSL_ERRORS + self.modules = False + self.connect_api() + self.verify_compatibility() + self.existing_manufacturers = self.get_manufacturers() + self.device_types = DeviceTypes(self.netbox, self.handle, self.counter) + + def connect_api(self): + try: + self.netbox = pynetbox.api(self.url, token=self.token) + if self.ignore_ssl: + self.handle.verbose_log("IGNORE_SSL_ERRORS is True, catching exception and disabling SSL verification.") + requests.packages.urllib3.disable_warnings() + self.netbox.http_session.verify = False + except Exception as e: + self.handle.exception("Exception", 'NetBox API Error', e) + + def get_api(self): + return self.netbox + + def get_counter(self): + return self.counter + + def verify_compatibility(self): + # nb.version should be the version in the form '3.2' + version_split = [int(x) for x in self.netbox.version.split('.')] + + # Later than 3.2 + # Might want to check for the module-types entry as well? + if version_split[0] > 3 or (version_split[0] == 3 and version_split[1] >= 2): + self.modules = True + + def get_manufacturers(self): + return {str(item): item for item in self.netbox.dcim.manufacturers.all()} + + def create_manufacturers(self, vendors): + to_create = [] + self.existing_manufacturers = self.get_manufacturers() + for vendor in vendors: + try: + manGet = self.existing_manufacturers[vendor["name"]] + self.handle.verbose_log(f'Manufacturer Exists: {manGet.name} - {manGet.id}') + except KeyError: + to_create.append(vendor) + self.handle.verbose_log(f"Manufacturer queued for addition: {vendor['name']}") + + if to_create: + try: + created_manufacturers = self.netbox.dcim.manufacturers.create(to_create) + for manufacturer in created_manufacturers: + self.handle.verbose_log(f'Manufacturer Created: {manufacturer.name} - ' + + f'{manufacturer.id}') + self.counter.update({'manufacturer': 1}) + except pynetbox.RequestError as request_error: + self.handle.log("Error creating manufacturers") + self.handle.verbose_log(f"Error during manufacturer creation. - {request_error.error}") + + def create_device_types(self, device_types_to_add): + for device_type in device_types_to_add: + try: + dt = self.device_types.existing_device_types[device_type["model"]] + self.handle.verbose_log(f'Device Type Exists: {dt.manufacturer.name} - ' + + f'{dt.model} - {dt.id}') + except KeyError: + try: + dt = self.netbox.dcim.device_types.create(device_type) + self.counter.update({'added': 1}) + self.handle.verbose_log(f'Device Type Created: {dt.manufacturer.name} - ' + + f'{dt.model} - {dt.id}') + except pynetbox.RequestError as e: + self.handle.log(f'Error {e.error} creating device type:' + f' {device_type["manufacturer"]["name"]} {device_type["model"]}') + continue + + if "interfaces" in device_type: + self.device_types.create_interfaces(device_type["interfaces"], dt.id) + if "power-ports" in device_type: + self.device_types.create_power_ports(device_type["power-ports"], dt.id) + if "power-port" in device_type: + self.device_types.create_power_ports(device_type["power-port"], dt.id) + if "console-ports" in device_type: + self.device_types.create_console_ports(device_type["console-ports"], dt.id) + if "power-outlets" in device_type: + self.device_types.create_power_outlets(device_type["power-outlets"], dt.id) + if "console-server-ports" in device_type: + self.device_types.create_console_server_ports(device_type["console-server-ports"], dt.id) + if "rear-ports" in device_type: + self.device_types.create_rear_ports(device_type["rear-ports"], dt.id) + if "front-ports" in device_type: + self.device_types.create_front_ports(device_type["front-ports"], dt.id) + if "device-bays" in device_type: + self.device_types.create_device_bays(device_type["device-bays"], dt.id) + if self.modules and 'module-bays' in device_type: + self.device_types.create_module_bays(device_type['module-bays'], dt.id) + + def create_module_types(self, module_types): + all_module_types = {} + for curr_nb_mt in self.netbox.dcim.module_types.all(): + if curr_nb_mt.manufacturer.slug not in all_module_types: + all_module_types[curr_nb_mt.manufacturer.slug] = {} + + all_module_types[curr_nb_mt.manufacturer.slug][curr_nb_mt.model] = curr_nb_mt + + + for curr_mt in module_types: + try: + module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]] + self.handle.verbose_log(f'Module Type Exists: {module_type_res.manufacturer.name} - ' + + f'{module_type_res.model} - {module_type_res.id}') + except KeyError: + try: + module_type_res = self.netbox.dcim.module_types.create(curr_mt) + self.counter.update({'module_added': 1}) + self.handle.verbose_log(f'Module Type Created: {module_type_res.manufacturer.name} - ' + + f'{module_type_res.model} - {module_type_res.id}') + except pynetbox.RequestError as exce: + self.handle.log(f"Error '{exce.error}' creating module type: " + + f"{curr_mt}") + + if "interfaces" in curr_mt: + self.device_types.create_module_interfaces(curr_mt["interfaces"], module_type_res.id) + if "power-ports" in curr_mt: + self.device_types.create_module_power_ports(curr_mt["power-ports"], module_type_res.id) + if "console-ports" in curr_mt: + self.device_types.create_module_console_ports(curr_mt["console-ports"], module_type_res.id) + if "power-outlets" in curr_mt: + self.device_types.create_module_power_outlets(curr_mt["power-outlets"], module_type_res.id) + if "console-server-ports" in curr_mt: + self.device_types.create_module_console_server_ports(curr_mt["console-server-ports"], module_type_res.id) + if "rear-ports" in curr_mt: + self.device_types.create_module_rear_ports(curr_mt["rear-ports"], module_type_res.id) + if "front-ports" in curr_mt: + self.device_types.create_module_front_ports(curr_mt["front-ports"], module_type_res.id) + +class DeviceTypes: + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__(self, netbox, handle, counter): + self.netbox = netbox + self.handle = handle + self.counter = counter + self.existing_device_types = self.get_device_types() + + def get_device_types(self): + return {str(item): item for item in self.netbox.dcim.device_types.all()} + + def get_power_ports(self, device_type): + return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(devicetype_id=device_type)} + + def get_rear_ports(self, device_type): + return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(devicetype_id=device_type)} + + def get_module_power_ports(self, module_type): + return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(moduletype_id=module_type)} + + def get_module_rear_ports(self, module_type): + return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(moduletype_id=module_type)} + + def get_device_type_ports_to_create(self, dcim_ports, device_type, existing_ports): + to_create = [port for port in dcim_ports if port['name'] not in existing_ports] + for port in to_create: + port['device_type'] = device_type + + return to_create + + def get_module_type_ports_to_create(self, module_ports, module_type, existing_ports): + to_create = [port for port in module_ports if port['name'] not in existing_ports] + for port in to_create: + port['module_type'] = module_type + + return to_create + + def create_interfaces(self, interfaces, device_type): + existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter( + devicetype_id=device_type)} + to_create = self.get_device_type_ports_to_create( + interfaces, device_type, existing_interfaces) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.interface_templates.create(to_create), "Interface") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Interface") + + def create_power_ports(self, power_ports, device_type): + existing_power_ports = self.get_power_ports(device_type) + to_create = self.get_device_type_ports_to_create(power_ports, device_type, existing_power_ports) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.power_port_templates.create(to_create), "Power Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Power Port") + + def create_console_ports(self, console_ports, device_type): + existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(devicetype_id=device_type)} + to_create = self.get_device_type_ports_to_create(console_ports, device_type, existing_console_ports) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.console_port_templates.create(to_create), "Console Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Console Port") + + def create_power_outlets(self, power_outlets, device_type): + existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(devicetype_id=device_type)} + to_create = self.get_device_type_ports_to_create(power_outlets, device_type, existing_power_outlets) + + if to_create: + existing_power_ports = self.get_power_ports(device_type) + for outlet in to_create: + try: + power_port = existing_power_ports[outlet["power_port"]] + outlet['power_port'] = power_port.id + except KeyError: + pass + + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.power_outlet_templates.create(to_create), "Power Outlet") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Power Outlet") + + def create_console_server_ports(self, console_server_ports, device_type): + existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(devicetype_id=device_type)} + to_create = self.get_device_type_ports_to_create(console_server_ports, device_type, existing_console_server_ports) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.console_server_port_templates.create(to_create), "Console Server Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Console Server Port") + + def create_rear_ports(self, rear_ports, device_type): + existing_rear_ports = self.get_rear_ports(device_type) + to_create = self.get_device_type_ports_to_create(rear_ports, device_type, existing_rear_ports) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.rear_port_templates.create(to_create), "Rear Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Rear Port") + + def create_front_ports(self, front_ports, device_type): + existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(devicetype_id=device_type)} + to_create = self.get_device_type_ports_to_create(front_ports, device_type, existing_front_ports) + + if to_create: + all_rearports = self.get_rear_ports(device_type) + for port in to_create: + try: + rear_port = all_rearports[port["rear_port"]] + port['rear_port'] = rear_port.id + except KeyError: + self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - ' + + f'{port["type"]} - {device_type}') + + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.front_port_templates.create(to_create), "Front Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Front Port") + + def create_device_bays(self, device_bays, device_type): + existing_device_bays = {str(item): item for item in self.netbox.dcim.device_bay_templates.filter(devicetype_id=device_type)} + to_create = self.get_device_type_ports_to_create(device_bays, device_type, existing_device_bays) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.device_bay_templates.create(to_create), "Device Bay") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Device Bay") + + def create_module_bays(self, module_bays, device_type): + existing_module_bays = {str(item): item for item in self.netbox.dcim.module_bay_templates.filter(devicetype_id=device_type)} + to_create = self.get_device_type_ports_to_create(module_bays, device_type, existing_module_bays) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_device_ports_created( + self.netbox.dcim.module_bay_templates.create(to_create), "Module Bay") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Module Bay") + + def create_module_interfaces(self, module_interfaces, module_type): + existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(moduletype_id=module_type)} + to_create = self.get_module_type_ports_to_create(module_interfaces, module_type, existing_interfaces) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_module_ports_created( + self.netbox.dcim.interface_templates.create(to_create), "Module Interface") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Module Interface") + + def create_module_power_ports(self, power_ports, module_type): + existing_power_ports = self.get_module_power_ports(module_type) + to_create = self.get_module_type_ports_to_create(power_ports, module_type, existing_power_ports) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_module_ports_created( + self.netbox.dcim.power_port_templates.create(to_create), "Module Power Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Module Power Port") + + def create_module_console_ports(self, console_ports, module_type): + existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(moduletype_id=module_type)} + to_create = self.get_module_type_ports_to_create(console_ports, module_type, existing_console_ports) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_module_ports_created( + self.netbox.dcim.console_port_templates.create(to_create), "Module Console Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Module Console Port") + + def create_module_power_outlets(self, power_outlets, module_type): + existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(moduletype_id=module_type)} + to_create = self.get_module_type_ports_to_create(power_outlets, module_type, existing_power_outlets) + + if to_create: + existing_power_ports = self.get_module_power_ports(module_type) + for outlet in to_create: + try: + power_port = existing_power_ports[outlet["power_port"]] + outlet['power_port'] = power_port.id + except KeyError: + pass + + try: + self.counter.update({'updated': + self.handle.log_module_ports_created( + self.netbox.dcim.power_outlet_templates.create(to_create), "Module Power Outlet") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Module Power Outlet") + + def create_module_console_server_ports(self, console_server_ports, module_type): + existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(moduletype_id=module_type)} + to_create = self.get_module_type_ports_to_create(console_server_ports, module_type, existing_console_server_ports) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_module_ports_created( + self.netbox.dcim.console_server_port_templates.create(to_create), "Module Console Server Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Module Console Server Port") + + def create_module_rear_ports(self, rear_ports, module_type): + existing_rear_ports = self.get_module_rear_ports(module_type) + to_create = self.get_module_type_ports_to_create(rear_ports, module_type, existing_rear_ports) + + if to_create: + try: + self.counter.update({'updated': + self.handle.log_module_ports_created( + self.netbox.dcim.rear_port_templates.create(to_create), "Module Rear Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Module Rear Port") + + def create_module_front_ports(self, front_ports, module_type): + existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(moduletype_id=module_type)} + to_create = self.get_module_type_ports_to_create(front_ports, module_type, existing_front_ports) + + if to_create: + existing_rear_ports = self.get_module_rear_ports(module_type) + for port in to_create: + try: + rear_port = existing_rear_ports[port["rear_port"]] + port['rear_port'] = rear_port.id + except KeyError: + self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - ' + + f'{port["type"]} - {module_type}') + + try: + self.counter.update({'updated': + self.handle.log_module_ports_created( + self.netbox.dcim.front_port_templates.create(to_create), "Module Front Port") + }) + except pynetbox.RequestError as excep: + self.handle.log(f"Error '{excep.error}' creating Module Front Port") \ No newline at end of file diff --git a/repo.py b/repo.py new file mode 100644 index 0000000..65cda96 --- /dev/null +++ b/repo.py @@ -0,0 +1,103 @@ +import os +from glob import glob +from re import sub as re_sub +from git import Repo, exc +import yaml + + +class DTLRepo: + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__(self, args, repo_path, exception_handler): + self.handle = exception_handler + self.yaml_extensions = ['yaml', 'yml'] + self.url = args.url + self.repo_path = repo_path + self.branch = args.branch + self.repo = None + self.cwd = os.getcwd() + + if os.path.isdir(self.repo_path): + self.pull_repo() + else: + self.clone_repo() + + def get_relative_path(self): + return self.repo_path + + def get_absolute_path(self): + return os.path.join(self.cwd, self.repo_path) + + def get_devices_path(self): + return os.path.join(self.get_absolute_path(), 'device-types') + + def get_modules_path(self): + return os.path.join(self.get_absolute_path(), 'module-types') + + def slug_format(self, name): + return re_sub('\W+', '-', name.lower()) + + def pull_repo(self): + try: + self.handle.log("Package devicetype-library is already installed, " + + f"updating {self.get_absolute_path()}") + self.repo = Repo(self.repo_path) + if not self.repo.remotes.origin.url.endswith('.git'): + self.handle.exception("GitInvalidRepositoryError", self.repo.remotes.origin.url, + f"Origin URL {self.repo.remotes.origin.url} does not end with .git") + self.repo.remotes.origin.pull() + self.repo.git.checkout(self.branch) + self.handle.verbose_log( + f"Pulled Repo {self.repo.remotes.origin.url}") + except exc.GitCommandError as git_error: + self.handle.exception( + "GitCommandError", self.repo.remotes.origin.url, git_error) + except Exception as git_error: + self.handle.exception( + "Exception", 'Git Repository Error', git_error) + + def clone_repo(self): + try: + self.repo = Repo.clone_from( + self.url, self.get_absolute_path(), branch=self.branch) + self.handle.log( + f"Package Installed {self.repo.remotes.origin.url}") + except exc.GitCommandError as git_error: + self.handle.exception("GitCommandError", self.url, git_error) + except Exception as git_error: + self.handle.exception( + "Exception", 'Git Repository Error', git_error) + + def get_devices(self, base_path, vendors: list = None): + files = [] + discovered_vendors = [] + vendor_dirs = os.listdir(base_path) + + for folder in [vendor for vendor in vendor_dirs if not vendors or vendor.casefold() in vendors]: + if folder.casefold() != "testing": + discovered_vendors.append({'name': folder, + 'slug': self.slug_format(folder)}) + for extension in self.yaml_extensions: + files.extend(glob(base_path + folder + f'/*.{extension}')) + return files, discovered_vendors + + def parse_files(self, files: list, slugs: list = None): + deviceTypes = [] + for file in files: + with open(file, 'r') as stream: + try: + data = yaml.safe_load(stream) + except yaml.YAMLError as excep: + self.handle.verbose_log(excep) + continue + manufacturer = data['manufacturer'] + data['manufacturer'] = { + 'name': manufacturer, 'slug': self.slug_format(manufacturer)} + + if slugs and True not in [True if s.casefold() in data['slug'].casefold() else False for s in slugs]: + self.handle.verbose_log(f"Skipping {data['model']}") + continue + + deviceTypes.append(data) + return deviceTypes diff --git a/settings.py b/settings.py index 22902a6..797f77f 100644 --- a/settings.py +++ b/settings.py @@ -1,8 +1,7 @@ from argparse import ArgumentParser -from sys import exit as system_exit import os -from exception_handler import ExceptionHandler -from gitcmd import GitCMD +from log_handler import LogHandler +from repo import DTLRepo from dotenv import load_dotenv load_dotenv() @@ -38,11 +37,16 @@ parser.add_argument('--verbose', action='store_true', default=False, args = parser.parse_args() -handle = ExceptionHandler(args) +args.vendors = [v.casefold() + for vendor in args.vendors for v in vendor.split(",") if v.strip()] +args.slugs = [s for slug in args.slugs for s in slug.split(",") if s.strip()] + +handle = LogHandler(args) # Evaluate environment variables and exit if one of the mandatory ones are not set MANDATORY_ENV_VARS = ["REPO_URL", "NETBOX_URL", "NETBOX_TOKEN"] for var in MANDATORY_ENV_VARS: if var not in os.environ: - handle.exception("EnvironmentError", var, f'Environment variable "{var}" is not set.\n\nMANDATORY_ENV_VARS: {str(MANDATORY_ENV_VARS)}.\n\nCURRENT_ENV_VARS: {str(os.environ)}') + handle.exception("EnvironmentError", var, + f'Environment variable "{var}" is not set.\n\nMANDATORY_ENV_VARS: {str(MANDATORY_ENV_VARS)}.\n\nCURRENT_ENV_VARS: {str(os.environ)}') -git_repo = GitCMD(args, REPO_PATH) +dtl_repo = DTLRepo(args, REPO_PATH, handle)