Logging cleanup (#78)

* Removed multiple imports of settings.py

* stating to abstract the netbox api calls to their own class

* Abstracted away determine features from main script, implemented as part of class initialization

* added helper functions to get repos relative & absolute path

* renaming gitcmd to repo

* starting to abstract away the get_files

* fixed issue where spaces and commas in vendor list with/without spaces breaks matching

* Added prelim fix for slugs if same issue vendors arg was facing exists. untested

* Finished abstracting the get_files function. Reduced fors and ifs to be cleaner and more efficent

* abstracted getFiles to repo class. Fixed slug issue not matching because of new slug format. added non-halting log function and renamed exception handler to log handler.

* utilized new logging class throughout script to reduce excess logging

* Abstracted and optimized create manufacturers

* Abstracted the create interfaces for devices to the netbox api class

* Fixed regression where check manufactuerers did not have the latest list

* Fixed regression caused by externally calling script. Discovered from https://github.com/netbox-community/Device-Type-Library-Import/pull/76

* abstracted all device interfaces to the devicetype class. optimized function calls to reduce duplicate code and reduce extra log calls

* Ran against all devices and passed with flying colors

* formatting settings.py

* formatting repo.py

* formatting main file

* formatting log_handler.py
This commit is contained in:
Daniel W. Anner 2023-03-09 00:45:32 -05:00 committed by GitHub
parent 1872ac577f
commit cef5c992d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 592 additions and 33 deletions

View File

@ -1,27 +0,0 @@
from sys import exit as system_exit
class ExceptionHandler:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, args):
self.args = args
def exception(self, exception_type, exception, stack_trace=None):
exception_dict = {
"EnvironmentError": f'Environment variable "{exception}" is not set.',
"SSLError": f'SSL verification failed. IGNORE_SSL_ERRORS is {exception}. Set IGNORE_SSL_ERRORS to True if you want to ignore this error. EXITING.',
"GitCommandError": f'The repo "{exception}" is not a valid git repo.',
"GitInvalidRepositoryError": f'The repo "{exception}" is not a valid git repo.',
"Exception": f'An unknown error occurred: "{exception}"'
}
if self.args.verbose and stack_trace:
print(stack_trace)
print(exception_dict[exception_type])
system_exit(1)

44
log_handler.py Normal file
View File

@ -0,0 +1,44 @@
from sys import exit as system_exit
class LogHandler:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, args):
self.args = args
def exception(self, exception_type, exception, stack_trace=None):
exception_dict = {
"EnvironmentError": f'Environment variable "{exception}" is not set.',
"SSLError": f'SSL verification failed. IGNORE_SSL_ERRORS is {exception}. Set IGNORE_SSL_ERRORS to True if you want to ignore this error. EXITING.',
"GitCommandError": f'The repo "{exception}" is not a valid git repo.',
"GitInvalidRepositoryError": f'The repo "{exception}" is not a valid git repo.',
"Exception": f'An unknown error occurred: "{exception}"'
}
if self.args.verbose and stack_trace:
print(stack_trace)
print(exception_dict[exception_type])
system_exit(1)
def verbose_log(self, message):
if self.args.verbose:
print(message)
def log(self, message):
print(message)
def log_device_ports_created(self, created_ports: list = [], port_type: str = "port"):
for port in created_ports:
self.verbose_log(f'{port_type} Template Created: {port.name} - '
+ f'{port.type if hasattr(port, "type") else ""} - {port.device_type.id} - '
+ f'{port.id}')
return len(created_ports)
def log_module_ports_created(self, created_ports: list = [], port_type: str = "port"):
for port in created_ports:
self.verbose_log(f'{port_type} Template Created: {port.name} - '
+ f'{port.type if hasattr(port, "type") else ""} - {port.module_type.id} - '
+ f'{port.id}')
return len(created_ports)

435
netbox_api.py Normal file
View File

@ -0,0 +1,435 @@
from collections import Counter
import pynetbox
import requests
# from pynetbox import RequestError as APIRequestError
class NetBox:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, settings):
self.counter = Counter(
added=0,
updated=0,
manufacturer=0,
module_added=0,
module_port_added=0,
)
self.url = settings.NETBOX_URL
self.token = settings.NETBOX_TOKEN
self.handle = settings.handle
self.netbox = None
self.ignore_ssl = settings.IGNORE_SSL_ERRORS
self.modules = False
self.connect_api()
self.verify_compatibility()
self.existing_manufacturers = self.get_manufacturers()
self.device_types = DeviceTypes(self.netbox, self.handle, self.counter)
def connect_api(self):
try:
self.netbox = pynetbox.api(self.url, token=self.token)
if self.ignore_ssl:
self.handle.verbose_log("IGNORE_SSL_ERRORS is True, catching exception and disabling SSL verification.")
requests.packages.urllib3.disable_warnings()
self.netbox.http_session.verify = False
except Exception as e:
self.handle.exception("Exception", 'NetBox API Error', e)
def get_api(self):
return self.netbox
def get_counter(self):
return self.counter
def verify_compatibility(self):
# nb.version should be the version in the form '3.2'
version_split = [int(x) for x in self.netbox.version.split('.')]
# Later than 3.2
# Might want to check for the module-types entry as well?
if version_split[0] > 3 or (version_split[0] == 3 and version_split[1] >= 2):
self.modules = True
def get_manufacturers(self):
return {str(item): item for item in self.netbox.dcim.manufacturers.all()}
def create_manufacturers(self, vendors):
to_create = []
self.existing_manufacturers = self.get_manufacturers()
for vendor in vendors:
try:
manGet = self.existing_manufacturers[vendor["name"]]
self.handle.verbose_log(f'Manufacturer Exists: {manGet.name} - {manGet.id}')
except KeyError:
to_create.append(vendor)
self.handle.verbose_log(f"Manufacturer queued for addition: {vendor['name']}")
if to_create:
try:
created_manufacturers = self.netbox.dcim.manufacturers.create(to_create)
for manufacturer in created_manufacturers:
self.handle.verbose_log(f'Manufacturer Created: {manufacturer.name} - '
+ f'{manufacturer.id}')
self.counter.update({'manufacturer': 1})
except pynetbox.RequestError as request_error:
self.handle.log("Error creating manufacturers")
self.handle.verbose_log(f"Error during manufacturer creation. - {request_error.error}")
def create_device_types(self, device_types_to_add):
for device_type in device_types_to_add:
try:
dt = self.device_types.existing_device_types[device_type["model"]]
self.handle.verbose_log(f'Device Type Exists: {dt.manufacturer.name} - '
+ f'{dt.model} - {dt.id}')
except KeyError:
try:
dt = self.netbox.dcim.device_types.create(device_type)
self.counter.update({'added': 1})
self.handle.verbose_log(f'Device Type Created: {dt.manufacturer.name} - '
+ f'{dt.model} - {dt.id}')
except pynetbox.RequestError as e:
self.handle.log(e.error)
if "interfaces" in device_type:
self.device_types.create_interfaces(device_type["interfaces"], dt.id)
if "power-ports" in device_type:
self.device_types.create_power_ports(device_type["power-ports"], dt.id)
if "power-port" in device_type:
self.device_types.create_power_ports(device_type["power-port"], dt.id)
if "console-ports" in device_type:
self.device_types.create_console_ports(device_type["console-ports"], dt.id)
if "power-outlets" in device_type:
self.device_types.create_power_outlets(device_type["power-outlets"], dt.id)
if "console-server-ports" in device_type:
self.device_types.create_console_server_ports(device_type["console-server-ports"], dt.id)
if "rear-ports" in device_type:
self.device_types.create_rear_ports(device_type["rear-ports"], dt.id)
if "front-ports" in device_type:
self.device_types.create_front_ports(device_type["front-ports"], dt.id)
if "device-bays" in device_type:
self.device_types.create_device_bays(device_type["device-bays"], dt.id)
if self.modules and 'module-bays' in device_type:
self.device_types.create_module_bays(device_type['module-bays'], dt.id)
def create_module_types(self, module_types):
all_module_types = {}
for curr_nb_mt in self.netbox.dcim.module_types.all():
if curr_nb_mt.manufacturer.slug not in all_module_types:
all_module_types[curr_nb_mt.manufacturer.slug] = {}
all_module_types[curr_nb_mt.manufacturer.slug][curr_nb_mt.model] = curr_nb_mt
for curr_mt in module_types:
try:
module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]]
self.handle.verbose_log(f'Module Type Exists: {module_type_res.manufacturer.name} - '
+ f'{module_type_res.model} - {module_type_res.id}')
except KeyError:
try:
module_type_res = self.netbox.dcim.module_types.create(curr_mt)
self.counter.update({'module_added': 1})
self.handle.verbose_log(f'Module Type Created: {module_type_res.manufacturer.name} - '
+ f'{module_type_res.model} - {module_type_res.id}')
except pynetbox.RequestError as exce:
self.handle.log(f"Error '{exce.error}' creating module type: " +
f"{curr_mt}")
if "interfaces" in curr_mt:
self.device_types.create_module_interfaces(curr_mt["interfaces"], module_type_res.id)
if "power-ports" in curr_mt:
self.device_types.create_module_power_ports(curr_mt["power-ports"], module_type_res.id)
if "console-ports" in curr_mt:
self.device_types.create_module_console_ports(curr_mt["console-ports"], module_type_res.id)
if "power-outlets" in curr_mt:
self.device_types.create_module_power_outlets(curr_mt["power-outlets"], module_type_res.id)
if "console-server-ports" in curr_mt:
self.device_types.create_module_console_server_ports(curr_mt["console-server-ports"], module_type_res.id)
if "rear-ports" in curr_mt:
self.device_types.create_module_rear_ports(curr_mt["rear-ports"], module_type_res.id)
if "front-ports" in curr_mt:
self.device_types.create_module_front_ports(curr_mt["front-ports"], module_type_res.id)
class DeviceTypes:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, netbox, handle, counter):
self.netbox = netbox
self.handle = handle
self.counter = counter
self.existing_device_types = self.get_device_types()
def get_device_types(self):
return {str(item): item for item in self.netbox.dcim.device_types.all()}
def get_power_ports(self, device_type):
return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(devicetype_id=device_type)}
def get_rear_ports(self, device_type):
return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(devicetype_id=device_type)}
def get_module_power_ports(self, module_type):
return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(moduletype_id=module_type)}
def get_module_rear_ports(self, module_type):
return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(moduletype_id=module_type)}
def get_device_type_ports_to_create(self, dcim_ports, device_type, existing_ports):
to_create = [port for port in dcim_ports if port['name'] not in existing_ports]
for port in to_create:
port['device_type'] = device_type
return to_create
def get_module_type_ports_to_create(self, module_ports, module_type, existing_ports):
to_create = [port for port in module_ports if port['name'] not in existing_ports]
for port in to_create:
port['module_type'] = module_type
return to_create
def create_interfaces(self, interfaces, device_type):
existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(
devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(
interfaces, device_type, existing_interfaces)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.interface_templates.create(to_create), "Interface")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Interface")
def create_power_ports(self, power_ports, device_type):
existing_power_ports = self.get_power_ports(device_type)
to_create = self.get_device_type_ports_to_create(power_ports, device_type, existing_power_ports)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.power_port_templates.create(to_create), "Power Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Power Port")
def create_console_ports(self, console_ports, device_type):
existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(console_ports, device_type, existing_console_ports)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.console_port_templates.create(to_create), "Console Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Console Port")
def create_power_outlets(self, power_outlets, device_type):
existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(power_outlets, device_type, existing_power_outlets)
if to_create:
existing_power_ports = self.get_power_ports(device_type)
for outlet in to_create:
try:
power_port = existing_power_ports[outlet["power_port"]]
outlet['power_port'] = power_port.id
except KeyError:
pass
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.power_outlet_templates.create(to_create), "Power Outlet")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Power Outlet")
def create_console_server_ports(self, console_server_ports, device_type):
existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(console_server_ports, device_type, existing_console_server_ports)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.console_server_port_templates.create(to_create), "Console Server Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Console Server Port")
def create_rear_ports(self, rear_ports, device_type):
existing_rear_ports = self.get_rear_ports(device_type)
to_create = self.get_device_type_ports_to_create(rear_ports, device_type, existing_rear_ports)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.rear_port_templates.create(to_create), "Rear Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Rear Port")
def create_front_ports(self, front_ports, device_type):
existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(front_ports, device_type, existing_front_ports)
if to_create:
all_rearports = self.get_rear_ports(device_type)
for port in to_create:
try:
rear_port = all_rearports[port["rear_port"]]
port['rear_port'] = rear_port.id
except KeyError:
self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - '
+ f'{port["type"]} - {device_type}')
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.front_port_templates.create(to_create), "Front Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Front Port")
def create_device_bays(self, device_bays, device_type):
existing_device_bays = {str(item): item for item in self.netbox.dcim.device_bay_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(device_bays, device_type, existing_device_bays)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.device_bay_templates.create(to_create), "Device Bay")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Device Bay")
def create_module_bays(self, module_bays, device_type):
existing_module_bays = {str(item): item for item in self.netbox.dcim.module_bay_templates.filter(devicetype_id=device_type)}
to_create = self.get_device_type_ports_to_create(module_bays, device_type, existing_module_bays)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_device_ports_created(
self.netbox.dcim.module_bay_templates.create(to_create), "Module Bay")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Bay")
def create_module_interfaces(self, module_interfaces, module_type):
existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(module_interfaces, module_type, existing_interfaces)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.interface_templates.create(to_create), "Module Interface")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Interface")
def create_module_power_ports(self, power_ports, module_type):
existing_power_ports = self.get_module_power_ports(module_type)
to_create = self.get_module_type_ports_to_create(power_ports, module_type, existing_power_ports)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.power_port_templates.create(to_create), "Module Power Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Power Port")
def create_module_console_ports(self, console_ports, module_type):
existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(console_ports, module_type, existing_console_ports)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.console_port_templates.create(to_create), "Module Console Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Console Port")
def create_module_power_outlets(self, power_outlets, module_type):
existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(power_outlets, module_type, existing_power_outlets)
if to_create:
existing_power_ports = self.get_module_power_ports(module_type)
for outlet in to_create:
try:
power_port = existing_power_ports[outlet["power_port"]]
outlet['power_port'] = power_port.id
except KeyError:
pass
try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.power_outlet_templates.create(to_create), "Module Power Outlet")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Power Outlet")
def create_module_console_server_ports(self, console_server_ports, module_type):
existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(console_server_ports, module_type, existing_console_server_ports)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.console_server_port_templates.create(to_create), "Module Console Server Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Console Server Port")
def create_module_rear_ports(self, rear_ports, module_type):
existing_rear_ports = self.get_module_rear_ports(module_type)
to_create = self.get_module_type_ports_to_create(rear_ports, module_type, existing_rear_ports)
if to_create:
try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.rear_port_templates.create(to_create), "Module Rear Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Rear Port")
def create_module_front_ports(self, front_ports, module_type):
existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(moduletype_id=module_type)}
to_create = self.get_module_type_ports_to_create(front_ports, module_type, existing_front_ports)
if to_create:
existing_rear_ports = self.get_module_rear_ports(module_type)
for port in to_create:
try:
rear_port = existing_rear_ports[port["rear_port"]]
port['rear_port'] = rear_port.id
except KeyError:
self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - '
+ f'{port["type"]} - {module_type}')
try:
self.counter.update({'updated':
self.handle.log_module_ports_created(
self.netbox.dcim.front_port_templates.create(to_create), "Module Front Port")
})
except pynetbox.RequestError as excep:
self.handle.log(f"Error '{excep.error}' creating Module Front Port")

103
repo.py Normal file
View File

@ -0,0 +1,103 @@
import os
from glob import glob
from re import sub as re_sub
from git import Repo, exc
import yaml
class DTLRepo:
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
def __init__(self, args, repo_path, exception_handler):
self.handle = exception_handler
self.yaml_extensions = ['yaml', 'yml']
self.url = args.url
self.repo_path = repo_path
self.branch = args.branch
self.repo = None
self.cwd = os.getcwd()
if os.path.isdir(self.repo_path):
self.pull_repo()
else:
self.clone_repo()
def get_relative_path(self):
return self.repo_path
def get_absolute_path(self):
return os.path.join(self.cwd, self.repo_path)
def get_devices_path(self):
return os.path.join(self.get_absolute_path(), 'device-types')
def get_modules_path(self):
return os.path.join(self.get_absolute_path(), 'module-types')
def slug_format(self, name):
return re_sub('\W+', '-', name.lower())
def pull_repo(self):
try:
self.handle.log("Package devicetype-library is already installed, "
+ f"updating {self.get_absolute_path()}")
self.repo = Repo(self.repo_path)
if not self.repo.remotes.origin.url.endswith('.git'):
self.handle.exception("GitInvalidRepositoryError", self.repo.remotes.origin.url,
f"Origin URL {self.repo.remotes.origin.url} does not end with .git")
self.repo.remotes.origin.pull()
self.repo.git.checkout(self.branch)
self.handle.verbose_log(
f"Pulled Repo {self.repo.remotes.origin.url}")
except exc.GitCommandError as git_error:
self.handle.exception(
"GitCommandError", self.repo.remotes.origin.url, git_error)
except Exception as git_error:
self.handle.exception(
"Exception", 'Git Repository Error', git_error)
def clone_repo(self):
try:
self.repo = Repo.clone_from(
self.url, self.get_absolute_path(), branch=self.branch)
self.handle.log(
f"Package Installed {self.repo.remotes.origin.url}")
except exc.GitCommandError as git_error:
self.handle.exception("GitCommandError", self.url, git_error)
except Exception as git_error:
self.handle.exception(
"Exception", 'Git Repository Error', git_error)
def get_devices(self, base_path, vendors: list = None):
files = []
discovered_vendors = []
vendor_dirs = os.listdir(base_path)
for folder in [vendor for vendor in vendor_dirs if not vendors or vendor.casefold() in vendors]:
if folder.casefold() is not "testing":
discovered_vendors.append({'name': folder,
'slug': self.slug_format(folder)})
for extension in self.yaml_extensions:
files.extend(glob(base_path + folder + f'/*.{extension}'))
return files, discovered_vendors
def parse_files(self, files: list, slugs: list = None):
deviceTypes = []
for file in files:
with open(file, 'r') as stream:
try:
data = yaml.safe_load(stream)
except yaml.YAMLError as excep:
self.handle.verbose_log(excep)
continue
manufacturer = data['manufacturer']
data['manufacturer'] = {
'name': manufacturer, 'slug': self.slug_format(manufacturer)}
if slugs and True not in [True if s.casefold() in data['slug'].casefold() else False for s in slugs]:
self.handle.verbose_log(f"Skipping {data['model']}")
continue
deviceTypes.append(data)
return deviceTypes

View File

@ -1,8 +1,7 @@
from argparse import ArgumentParser
from sys import exit as system_exit
import os
from exception_handler import ExceptionHandler
from gitcmd import GitCMD
from log_handler import LogHandler
from repo import DTLRepo
from dotenv import load_dotenv
load_dotenv()
@ -38,11 +37,16 @@ parser.add_argument('--verbose', action='store_true', default=False,
args = parser.parse_args()
handle = ExceptionHandler(args)
args.vendors = [v.casefold()
for vendor in args.vendors for v in vendor.split(",") if v.strip()]
args.slugs = [s for slug in args.slugs for s in slug.split(",") if s.strip()]
handle = LogHandler(args)
# Evaluate environment variables and exit if one of the mandatory ones are not set
MANDATORY_ENV_VARS = ["REPO_URL", "NETBOX_URL", "NETBOX_TOKEN"]
for var in MANDATORY_ENV_VARS:
if var not in os.environ:
handle.exception("EnvironmentError", var, f'Environment variable "{var}" is not set.\n\nMANDATORY_ENV_VARS: {str(MANDATORY_ENV_VARS)}.\n\nCURRENT_ENV_VARS: {str(os.environ)}')
handle.exception("EnvironmentError", var,
f'Environment variable "{var}" is not set.\n\nMANDATORY_ENV_VARS: {str(MANDATORY_ENV_VARS)}.\n\nCURRENT_ENV_VARS: {str(os.environ)}')
git_repo = GitCMD(args, REPO_PATH)
dtl_repo = DTLRepo(args, REPO_PATH, handle)