mirror of
https://github.com/netbox-community/Device-Type-Library-Import.git
synced 2025-01-07 14:09:21 +01:00
Beta Release Merge (#87)
* Delete gitcmd.py * Delete nb-dt-import.py * Add files via upload * Logging cleanup (#78) * Removed multiple imports of settings.py * stating to abstract the netbox api calls to their own class * Abstracted away determine features from main script, implemented as part of class initialization * added helper functions to get repos relative & absolute path * renaming gitcmd to repo * starting to abstract away the get_files * fixed issue where spaces and commas in vendor list with/without spaces breaks matching * Added prelim fix for slugs if same issue vendors arg was facing exists. untested * Finished abstracting the get_files function. Reduced fors and ifs to be cleaner and more efficent * abstracted getFiles to repo class. Fixed slug issue not matching because of new slug format. added non-halting log function and renamed exception handler to log handler. * utilized new logging class throughout script to reduce excess logging * Abstracted and optimized create manufacturers * Abstracted the create interfaces for devices to the netbox api class * Fixed regression where check manufactuerers did not have the latest list * Fixed regression caused by externally calling script. Discovered from https://github.com/netbox-community/Device-Type-Library-Import/pull/76 * abstracted all device interfaces to the devicetype class. optimized function calls to reduce duplicate code and reduce extra log calls * Ran against all devices and passed with flying colors * formatting settings.py * formatting repo.py * formatting main file * formatting log_handler.py * added back executable on file (#79) * Add more info to failed device_type creations (#81) --------- Co-authored-by: Philipp Rintz <13933258+p-rintz@users.noreply.github.com>
This commit is contained in:
parent
8832370aba
commit
98857fc649
@ -1,27 +0,0 @@
|
||||
from sys import exit as system_exit
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class ExceptionHandler:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
return super().__new__(cls)
|
||||
|
||||
def __init__(self, args):
|
||||
self.args = args
|
||||
|
||||
def exception(self, exception_type, exception, stack_trace=None):
|
||||
exception_dict = {
|
||||
"EnvironmentError": f'Environment variable "{exception}" is not set.',
|
||||
"SSLError": f'SSL verification failed. IGNORE_SSL_ERRORS is {exception}. Set IGNORE_SSL_ERRORS to True if you want to ignore this error. EXITING.',
|
||||
"GitCommandError": f'The repo "{exception}" is not a valid git repo.',
|
||||
"GitInvalidRepositoryError": f'The repo "{exception}" is not a valid git repo.',
|
||||
"Exception": f'An unknown error occurred: "{exception}"'
|
||||
}
|
||||
|
||||
if self.args.verbose and stack_trace:
|
||||
print(stack_trace)
|
||||
print(exception_dict[exception_type])
|
||||
system_exit(1)
|
||||
|
45
gitcmd.py
45
gitcmd.py
@ -1,45 +0,0 @@
|
||||
from git import Repo, exc
|
||||
from sys import exit as system_exit
|
||||
import settings
|
||||
import os
|
||||
|
||||
class GitCMD:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
return super().__new__(cls)
|
||||
|
||||
def __init__(self, args, repo_path):
|
||||
self.url = args.url
|
||||
self.repo_path = repo_path
|
||||
self.branch = args.branch
|
||||
# self.repo = Repo()
|
||||
self.cwd = os.getcwd()
|
||||
|
||||
if os.path.isdir(self.repo_path):
|
||||
self.pull_repo()
|
||||
else:
|
||||
self.clone_repo()
|
||||
|
||||
|
||||
def pull_repo(self):
|
||||
try:
|
||||
print("Package devicetype-library is already installed, "
|
||||
+ f"updating {os.path.join(self.cwd, self.repo_path)}")
|
||||
self.repo = Repo(self.repo_path)
|
||||
if not self.repo.remotes.origin.url.endswith('.git'):
|
||||
settings.handle.exception("GitInvalidRepositoryError", self.repo.remotes.origin.url, f"Origin URL {self.repo.remotes.origin.url} does not end with .git")
|
||||
self.repo.remotes.origin.pull()
|
||||
self.repo.git.checkout(self.branch)
|
||||
print(f"Pulled Repo {self.repo.remotes.origin.url}")
|
||||
except exc.GitCommandError as git_error:
|
||||
settings.handle.exception("GitCommandError", self.repo.remotes.origin.url, git_error)
|
||||
except Exception as git_error:
|
||||
settings.handle.exception("Exception", 'Git Repository Error', git_error)
|
||||
|
||||
def clone_repo(self):
|
||||
try:
|
||||
self.repo = Repo.clone_from(self.url, os.path.join(self.cwd, self.repo_path), branch=self.branch)
|
||||
print(f"Package Installed {self.repo.remotes.origin.url}")
|
||||
except exc.GitCommandError as git_error:
|
||||
settings.handle.exception("GitCommandError", self.url, git_error)
|
||||
except Exception as git_error:
|
||||
settings.handle.exception("Exception", 'Git Repository Error', git_error)
|
44
log_handler.py
Normal file
44
log_handler.py
Normal file
@ -0,0 +1,44 @@
|
||||
from sys import exit as system_exit
|
||||
|
||||
|
||||
class LogHandler:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
return super().__new__(cls)
|
||||
|
||||
def __init__(self, args):
|
||||
self.args = args
|
||||
|
||||
def exception(self, exception_type, exception, stack_trace=None):
|
||||
exception_dict = {
|
||||
"EnvironmentError": f'Environment variable "{exception}" is not set.',
|
||||
"SSLError": f'SSL verification failed. IGNORE_SSL_ERRORS is {exception}. Set IGNORE_SSL_ERRORS to True if you want to ignore this error. EXITING.',
|
||||
"GitCommandError": f'The repo "{exception}" is not a valid git repo.',
|
||||
"GitInvalidRepositoryError": f'The repo "{exception}" is not a valid git repo.',
|
||||
"Exception": f'An unknown error occurred: "{exception}"'
|
||||
}
|
||||
|
||||
if self.args.verbose and stack_trace:
|
||||
print(stack_trace)
|
||||
print(exception_dict[exception_type])
|
||||
system_exit(1)
|
||||
|
||||
def verbose_log(self, message):
|
||||
if self.args.verbose:
|
||||
print(message)
|
||||
|
||||
def log(self, message):
|
||||
print(message)
|
||||
|
||||
def log_device_ports_created(self, created_ports: list = [], port_type: str = "port"):
|
||||
for port in created_ports:
|
||||
self.verbose_log(f'{port_type} Template Created: {port.name} - '
|
||||
+ f'{port.type if hasattr(port, "type") else ""} - {port.device_type.id} - '
|
||||
+ f'{port.id}')
|
||||
return len(created_ports)
|
||||
|
||||
def log_module_ports_created(self, created_ports: list = [], port_type: str = "port"):
|
||||
for port in created_ports:
|
||||
self.verbose_log(f'{port_type} Template Created: {port.name} - '
|
||||
+ f'{port.type if hasattr(port, "type") else ""} - {port.module_type.id} - '
|
||||
+ f'{port.id}')
|
||||
return len(created_ports)
|
825
nb-dt-import.py
825
nb-dt-import.py
@ -3,814 +3,51 @@ from collections import Counter
|
||||
from datetime import datetime
|
||||
import yaml
|
||||
import pynetbox
|
||||
import glob
|
||||
from glob import glob
|
||||
import os
|
||||
import re
|
||||
import requests
|
||||
|
||||
import settings
|
||||
from netbox_api import NetBox
|
||||
|
||||
counter = Counter(
|
||||
added=0,
|
||||
updated=0,
|
||||
manufacturer=0,
|
||||
module_added=0,
|
||||
module_port_added=0,
|
||||
)
|
||||
|
||||
def determine_features(nb):
|
||||
'''Automatically determine the netbox features available.
|
||||
|
||||
Currently only checks for existence of module-types.
|
||||
|
||||
Args:
|
||||
nb: pynetbox API instance
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Raises:
|
||||
None
|
||||
'''
|
||||
|
||||
# nb.version should be the version in the form '3.2'
|
||||
nb_ver = [int(x) for x in nb.version.split('.')]
|
||||
|
||||
# Later than 3.2
|
||||
# Might want to check for the module-types entry as well?
|
||||
if nb_ver[0] > 3 or (nb_ver[0] == 3 and nb_ver[1] >= 2):
|
||||
settings.NETBOX_FEATURES['modules'] = True
|
||||
|
||||
|
||||
|
||||
|
||||
def slugFormat(name):
|
||||
return re.sub('\W+','-', name.lower())
|
||||
|
||||
YAML_EXTENSIONS = ['yml', 'yaml']
|
||||
|
||||
def getFiles(vendors=None):
|
||||
|
||||
files = []
|
||||
discoveredVendors = []
|
||||
base_path = f'{settings.REPO_PATH}/device-types/'
|
||||
if vendors:
|
||||
for r, d, f in os.walk(base_path):
|
||||
for folder in d:
|
||||
for vendor in vendors:
|
||||
if vendor.lower() == folder.lower():
|
||||
discoveredVendors.append({'name': folder,
|
||||
'slug': slugFormat(folder)})
|
||||
for extension in YAML_EXTENSIONS:
|
||||
files.extend(glob.glob(base_path + folder + f'/*.{extension}'))
|
||||
else:
|
||||
for r, d, f in os.walk(base_path):
|
||||
for folder in d:
|
||||
if folder.lower() != "Testing":
|
||||
discoveredVendors.append({'name': folder,
|
||||
'slug': slugFormat(folder)})
|
||||
for extension in YAML_EXTENSIONS:
|
||||
files.extend(glob.glob(base_path + f'[!Testing]*/*.{extension}'))
|
||||
return files, discoveredVendors
|
||||
|
||||
def get_files_modules(vendors=None):
|
||||
'''Get files list for modules.
|
||||
|
||||
Args:
|
||||
vendors: List of vendors to sync or None to sync all vendors.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of:
|
||||
- list of filenames found
|
||||
- list of vendors found
|
||||
|
||||
'''
|
||||
|
||||
files = []
|
||||
discoveredVendors = []
|
||||
base_path = f'{settings.REPO_PATH}/module-types/'
|
||||
if vendors:
|
||||
for r, d, f in os.walk(base_path):
|
||||
for folder in d:
|
||||
for vendor in vendors:
|
||||
if vendor.lower() == folder.lower():
|
||||
discoveredVendors.append({'name': folder,
|
||||
'slug': slugFormat(folder)})
|
||||
for extension in YAML_EXTENSIONS:
|
||||
files.extend(
|
||||
glob.glob(
|
||||
base_path + folder + f'/*.{extension}'
|
||||
)
|
||||
)
|
||||
else:
|
||||
for r, d, f in os.walk(base_path):
|
||||
for folder in d:
|
||||
if folder.lower() != "Testing":
|
||||
discoveredVendors.append({'name': folder,
|
||||
'slug': slugFormat(folder)})
|
||||
for extension in YAML_EXTENSIONS:
|
||||
files.extend(glob.glob(base_path + f'[!Testing]*/*.{extension}'))
|
||||
|
||||
return files, discoveredVendors
|
||||
|
||||
def readYAMl(files, **kwargs):
|
||||
slugs = kwargs.get('slugs', None)
|
||||
deviceTypes = []
|
||||
manufacturers = []
|
||||
for file in files:
|
||||
with open(file, 'r') as stream:
|
||||
try:
|
||||
data = yaml.safe_load(stream)
|
||||
except yaml.YAMLError as exc:
|
||||
print(exc)
|
||||
continue
|
||||
manufacturer = data['manufacturer']
|
||||
data['manufacturer'] = {}
|
||||
data['manufacturer']['name'] = manufacturer
|
||||
data['manufacturer']['slug'] = slugFormat(manufacturer)
|
||||
|
||||
if slugs and data['slug'] not in slugs:
|
||||
print(f"Skipping {data['model']}")
|
||||
continue
|
||||
|
||||
deviceTypes.append(data)
|
||||
manufacturers.append(manufacturer)
|
||||
return deviceTypes
|
||||
|
||||
def read_yaml_modules(files, **kwargs):
|
||||
|
||||
slugs = kwargs.get('slugs', None)
|
||||
module_types = []
|
||||
manufacturers = []
|
||||
for file in files:
|
||||
with open(file, 'r') as stream:
|
||||
try:
|
||||
data = yaml.safe_load(stream)
|
||||
except yaml.YAMLError as exc:
|
||||
print(exc)
|
||||
continue
|
||||
manufacturer = data['manufacturer']
|
||||
data['manufacturer'] = {}
|
||||
data['manufacturer']['name'] = manufacturer
|
||||
data['manufacturer']['slug'] = slugFormat(manufacturer)
|
||||
|
||||
if slugs and data['slug'] not in slugs:
|
||||
print(f"Skipping {data['model']}")
|
||||
continue
|
||||
|
||||
module_types.append(data)
|
||||
manufacturers.append(manufacturer)
|
||||
return module_types
|
||||
|
||||
def createManufacturers(vendors, nb):
|
||||
all_manufacturers = {str(item): item for item in nb.dcim.manufacturers.all()}
|
||||
need_manufacturers = []
|
||||
for vendor in vendors:
|
||||
try:
|
||||
manGet = all_manufacturers[vendor["name"]]
|
||||
print(f'Manufacturer Exists: {manGet.name} - {manGet.id}')
|
||||
except KeyError:
|
||||
need_manufacturers.append(vendor)
|
||||
|
||||
if not need_manufacturers:
|
||||
return
|
||||
|
||||
try:
|
||||
manSuccess = nb.dcim.manufacturers.create(need_manufacturers)
|
||||
for man in manSuccess:
|
||||
print(f'Manufacturer Created: {man.name} - '
|
||||
+ f'{man.id}')
|
||||
counter.update({'manufacturer': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
|
||||
def createInterfaces(interfaces, deviceType, nb):
|
||||
all_interfaces = {str(item): item for item in nb.dcim.interface_templates.filter(devicetype_id=deviceType)}
|
||||
need_interfaces = []
|
||||
for interface in interfaces:
|
||||
try:
|
||||
ifGet = all_interfaces[interface["name"]]
|
||||
print(f'Interface Template Exists: {ifGet.name} - {ifGet.type}'
|
||||
+ f' - {ifGet.device_type.id} - {ifGet.id}')
|
||||
except KeyError:
|
||||
interface['device_type'] = deviceType
|
||||
need_interfaces.append(interface)
|
||||
|
||||
if not need_interfaces:
|
||||
return
|
||||
|
||||
try:
|
||||
ifSuccess = nb.dcim.interface_templates.create(need_interfaces)
|
||||
for intf in ifSuccess:
|
||||
print(f'Interface Template Created: {intf.name} - '
|
||||
+ f'{intf.type} - {intf.device_type.id} - '
|
||||
+ f'{intf.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def create_module_interfaces(interfaces, module_type, nb):
|
||||
all_interfaces = {str(item): item for item in nb.dcim.interface_templates.filter(moduletype_id=module_type)}
|
||||
need_interfaces = []
|
||||
for interface in interfaces:
|
||||
try:
|
||||
if_res = all_interfaces[interface["name"]]
|
||||
print(f'Interface Template Exists: {if_res.name} - {if_res.type}'
|
||||
+ f' - {if_res.module_type.id} - {if_res.id}')
|
||||
except KeyError:
|
||||
interface['module_type'] = module_type
|
||||
need_interfaces.append(interface)
|
||||
|
||||
if not need_interfaces:
|
||||
return
|
||||
|
||||
try:
|
||||
ifSuccess = nb.dcim.interface_templates.create(need_interfaces)
|
||||
for intf in ifSuccess:
|
||||
print(f'Interface Template Created: {intf.name} - '
|
||||
+ f'{intf.type} - {intf.module_type.id} - '
|
||||
+ f'{intf.id}')
|
||||
counter.update({'module_port_added': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def createConsolePorts(consoleports, deviceType, nb):
|
||||
all_consoleports = {str(item): item for item in nb.dcim.console_port_templates.filter(devicetype_id=deviceType)}
|
||||
need_consoleports = []
|
||||
for consoleport in consoleports:
|
||||
try:
|
||||
cpGet = all_consoleports[consoleport["name"]]
|
||||
print(f'Console Port Template Exists: {cpGet.name} - '
|
||||
+ f'{cpGet.type} - {cpGet.device_type.id} - {cpGet.id}')
|
||||
except KeyError:
|
||||
consoleport['device_type'] = deviceType
|
||||
need_consoleports.append(consoleport)
|
||||
|
||||
if not need_consoleports:
|
||||
return
|
||||
|
||||
try:
|
||||
cpSuccess = nb.dcim.console_port_templates.create(need_consoleports)
|
||||
for port in cpSuccess:
|
||||
print(f'Console Port Created: {port.name} - '
|
||||
+ f'{port.type} - {port.device_type.id} - '
|
||||
+ f'{port.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def create_module_console_ports(consoleports, module_type, nb):
|
||||
|
||||
all_consoleports = {str(item): item for item in nb.dcim.console_port_templates.filter(moduletype_id=module_type)}
|
||||
need_consoleports = []
|
||||
for consoleport in consoleports:
|
||||
try:
|
||||
cpGet = all_consoleports[consoleport["name"]]
|
||||
print(f'Console Port Template Exists: {cpGet.name} - '
|
||||
+ f'{cpGet.type} - {cpGet.module_type.id} - {cpGet.id}')
|
||||
except KeyError:
|
||||
consoleport['module_type'] = module_type
|
||||
need_consoleports.append(consoleport)
|
||||
|
||||
if not need_consoleports:
|
||||
return
|
||||
|
||||
try:
|
||||
cpSuccess = nb.dcim.console_port_templates.create(need_consoleports)
|
||||
for port in cpSuccess:
|
||||
print(f'Console Port Created: {port.name} - {port.type} - ' +
|
||||
f'{port.module_type.id} - {port.id}')
|
||||
counter.update({'module_port_added': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(f"Error creating module console port: {e.error}")
|
||||
|
||||
def createPowerPorts(powerports, deviceType, nb):
|
||||
all_power_ports = {str(item): item for item in nb.dcim.power_port_templates.filter(devicetype_id=deviceType)}
|
||||
need_power_ports = []
|
||||
for powerport in powerports:
|
||||
try:
|
||||
ppGet = all_power_ports[powerport["name"]]
|
||||
print(f'Power Port Template Exists: {ppGet.name} - '
|
||||
+ f'{ppGet.type} - {ppGet.device_type.id} - {ppGet.id}')
|
||||
except KeyError:
|
||||
powerport['device_type'] = deviceType
|
||||
need_power_ports.append(powerport)
|
||||
|
||||
if not need_power_ports:
|
||||
return
|
||||
|
||||
try:
|
||||
ppSuccess = nb.dcim.power_port_templates.create(need_power_ports)
|
||||
for pp in ppSuccess:
|
||||
print(f'Interface Template Created: {pp.name} - '
|
||||
+ f'{pp.type} - {pp.device_type.id} - '
|
||||
+ f'{pp.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def create_module_power_ports(powerports, module_type, nb):
|
||||
all_power_ports = {str(item): item for item in nb.dcim.power_port_templates.filter(moduletype_id=module_type)}
|
||||
need_power_ports = []
|
||||
for powerport in powerports:
|
||||
try:
|
||||
ppGet = all_power_ports[powerport["name"]]
|
||||
print(f'Power Port Template Exists: {ppGet.name} - ' +
|
||||
f'{ppGet.type} - {ppGet.module_type.id} - {ppGet.id}')
|
||||
except KeyError:
|
||||
powerport['module_type'] = module_type
|
||||
need_power_ports.append(powerport)
|
||||
|
||||
if not need_power_ports:
|
||||
return
|
||||
|
||||
try:
|
||||
ppSuccess = nb.dcim.power_port_templates.create(need_power_ports)
|
||||
for pp in ppSuccess:
|
||||
print(f'Power port template created: {pp.name} - '
|
||||
+ f'{pp.type} - {pp.module_type.id} - {pp.id}')
|
||||
counter.update({'module_port_added': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def createConsoleServerPorts(consoleserverports, deviceType, nb):
|
||||
all_consoleserverports = {str(item): item for item in nb.dcim.console_server_port_templates.filter(devicetype_id=deviceType)}
|
||||
need_consoleserverports = []
|
||||
for csport in consoleserverports:
|
||||
try:
|
||||
cspGet = all_consoleserverports[csport["name"]]
|
||||
print(f'Console Server Port Template Exists: {cspGet.name} - '
|
||||
+ f'{cspGet.type} - {cspGet.device_type.id} - '
|
||||
+ f'{cspGet.id}')
|
||||
except KeyError:
|
||||
csport['device_type'] = deviceType
|
||||
need_consoleserverports.append(csport)
|
||||
|
||||
if not need_consoleserverports:
|
||||
return
|
||||
|
||||
try:
|
||||
cspSuccess = nb.dcim.console_server_port_templates.create(
|
||||
need_consoleserverports)
|
||||
for csp in cspSuccess:
|
||||
print(f'Console Server Port Created: {csp.name} - '
|
||||
+ f'{csp.type} - {csp.device_type.id} - '
|
||||
+ f'{csp.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def create_module_console_server_ports(consoleserverports, module_type, nb):
|
||||
all_consoleserverports = {str(item): item for item in nb.dcim.console_server_port_templates.filter(moduletype_id=module_type)}
|
||||
need_consoleserverports = []
|
||||
for csport in consoleserverports:
|
||||
try:
|
||||
cspGet = all_consoleserverports[csport["name"]]
|
||||
print(f'Console Server Port Template Exists: {cspGet.name} - '
|
||||
+ f'{cspGet.type} - {cspGet.module_type.id} - '
|
||||
+ f'{cspGet.id}')
|
||||
except KeyError:
|
||||
csport['module_type'] = module_type
|
||||
need_consoleserverports.append(csport)
|
||||
|
||||
if not need_consoleserverports:
|
||||
return
|
||||
|
||||
try:
|
||||
cspSuccess = nb.dcim.console_server_port_templates.create(
|
||||
need_consoleserverports)
|
||||
for csp in cspSuccess:
|
||||
print(f'Console Server Port Created: {csp.name} - '
|
||||
+ f'{csp.type} - {csp.module_type.id} - '
|
||||
+ f'{csp.id}')
|
||||
counter.update({'module_port_added': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def createFrontPorts(frontports, deviceType, nb):
|
||||
all_frontports = {str(item): item for item in nb.dcim.front_port_templates.filter(devicetype_id=deviceType)}
|
||||
need_frontports = []
|
||||
for frontport in frontports:
|
||||
try:
|
||||
fpGet = all_frontports[frontport["name"]]
|
||||
print(f'Front Port Template Exists: {fpGet.name} - '
|
||||
+ f'{fpGet.type} - {fpGet.device_type.id} - {fpGet.id}')
|
||||
except KeyError:
|
||||
frontport['device_type'] = deviceType
|
||||
need_frontports.append(frontport)
|
||||
|
||||
if not need_frontports:
|
||||
return
|
||||
|
||||
all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(devicetype_id=deviceType)}
|
||||
for port in need_frontports:
|
||||
try:
|
||||
rpGet = all_rearports[port["rear_port"]]
|
||||
port['rear_port'] = rpGet.id
|
||||
except KeyError:
|
||||
print(f'Could not find Rear Port for Front Port: {port["name"]} - '
|
||||
+ f'{port["type"]} - {deviceType}')
|
||||
|
||||
try:
|
||||
fpSuccess = nb.dcim.front_port_templates.create(need_frontports)
|
||||
for fp in fpSuccess:
|
||||
print(f'Front Port Created: {fp.name} - '
|
||||
+ f'{fp.type} - {fp.device_type.id} - '
|
||||
+ f'{fp.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def create_module_front_ports(frontports, module_type, nb):
|
||||
all_frontports = {str(item): item for item in nb.dcim.front_port_templates.filter(moduletype_id=module_type)}
|
||||
need_frontports = []
|
||||
for frontport in frontports:
|
||||
try:
|
||||
fpGet = all_frontports[frontport["name"]]
|
||||
print(f'Front Port Template Exists: {fpGet.name} - '
|
||||
+ f'{fpGet.type} - {fpGet.module_type.id} - {fpGet.id}')
|
||||
except KeyError:
|
||||
frontport['module_type'] = module_type
|
||||
need_frontports.append(frontport)
|
||||
|
||||
if not need_frontports:
|
||||
return
|
||||
|
||||
all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(moduletype_id=module_type)}
|
||||
for port in need_frontports:
|
||||
try:
|
||||
rpGet = all_rearports[port["rear_port"]]
|
||||
port['rear_port'] = rpGet.id
|
||||
except KeyError:
|
||||
print(f'Could not find Rear Port for Front Port: {port["name"]} - '
|
||||
+ f'{port["type"]} - {module_type}')
|
||||
|
||||
try:
|
||||
fpSuccess = nb.dcim.front_port_templates.create(need_frontports)
|
||||
for fp in fpSuccess:
|
||||
print(f'Front Port Created: {fp.name} - '
|
||||
+ f'{fp.type} - {fp.module_type.id} - '
|
||||
+ f'{fp.id}')
|
||||
counter.update({'module_port_added': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def createRearPorts(rearports, deviceType, nb):
|
||||
all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(devicetype_id=deviceType)}
|
||||
need_rearports = []
|
||||
for rearport in rearports:
|
||||
try:
|
||||
rpGet = all_rearports[rearport["name"]]
|
||||
print(f'Rear Port Template Exists: {rpGet.name} - {rpGet.type}'
|
||||
+ f' - {rpGet.device_type.id} - {rpGet.id}')
|
||||
except KeyError:
|
||||
rearport['device_type'] = deviceType
|
||||
need_rearports.append(rearport)
|
||||
|
||||
if not need_rearports:
|
||||
return
|
||||
|
||||
try:
|
||||
rpSuccess = nb.dcim.rear_port_templates.create(
|
||||
need_rearports)
|
||||
for rp in rpSuccess:
|
||||
print(f'Rear Port Created: {rp.name} - {rp.type}'
|
||||
+ f' - {rp.device_type.id} - {rp.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def create_module_rear_ports(rearports, module_type, nb):
|
||||
all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(moduletype_id=module_type)}
|
||||
need_rearports = []
|
||||
for rearport in rearports:
|
||||
try:
|
||||
rpGet = all_rearports[rearport["name"]]
|
||||
print(f'Rear Port Template Exists: {rpGet.name} - {rpGet.type}'
|
||||
+ f' - {rpGet.module_type.id} - {rpGet.id}')
|
||||
except KeyError:
|
||||
rearport['module_type'] = module_type
|
||||
need_rearports.append(rearport)
|
||||
|
||||
if not need_rearports:
|
||||
return
|
||||
|
||||
try:
|
||||
rpSuccess = nb.dcim.rear_port_templates.create(
|
||||
need_rearports)
|
||||
for rp in rpSuccess:
|
||||
print(f'Rear Port Created: {rp.name} - {rp.type}'
|
||||
+ f' - {rp.module_type.id} - {rp.id}')
|
||||
counter.update({'module_port_added': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def createDeviceBays(devicebays, deviceType, nb):
|
||||
all_devicebays = {str(item): item for item in nb.dcim.device_bay_templates.filter(devicetype_id=deviceType)}
|
||||
need_devicebays = []
|
||||
for devicebay in devicebays:
|
||||
try:
|
||||
dbGet = all_devicebays[devicebay["name"]]
|
||||
print(f'Device Bay Template Exists: {dbGet.name} - '
|
||||
+ f'{dbGet.device_type.id} - {dbGet.id}')
|
||||
except KeyError:
|
||||
devicebay['device_type'] = deviceType
|
||||
need_devicebays.append(devicebay)
|
||||
|
||||
if not need_devicebays:
|
||||
return
|
||||
|
||||
try:
|
||||
dbSuccess = nb.dcim.device_bay_templates.create(need_devicebays)
|
||||
for db in dbSuccess:
|
||||
print(f'Device Bay Created: {db.name} - '
|
||||
+ f'{db.device_type.id} - {db.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
|
||||
def create_module_bays(module_bays, device_type, nb):
|
||||
'''Create module bays.
|
||||
|
||||
Args:
|
||||
module_bays: parsed YAML module_bays section.
|
||||
device_type: the device type instance from netbox.
|
||||
nb: Netbox API instance
|
||||
'''
|
||||
all_module_bays = {
|
||||
str(item): item for item in nb.dcim.module_bay_templates.filter(
|
||||
devicetype_id=device_type
|
||||
)
|
||||
}
|
||||
need_module_bays = []
|
||||
for module_bay in module_bays:
|
||||
try:
|
||||
dbGet = all_module_bays[module_bay["name"]]
|
||||
print(f'Module Bay Template Exists: {dbGet.name} - '
|
||||
+ f'{dbGet.device_type.id} - {dbGet.id}')
|
||||
except KeyError:
|
||||
module_bay['device_type'] = device_type
|
||||
need_module_bays.append(module_bay)
|
||||
|
||||
if not need_module_bays:
|
||||
return
|
||||
|
||||
try:
|
||||
module_bay_res = nb.dcim.module_bay_templates.create(need_module_bays)
|
||||
for module_bay in module_bay_res:
|
||||
print(f'Module Bay Created: {module_bay.name} - '
|
||||
+ f'{module_bay.device_type.id} - {module_bay.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
|
||||
def createPowerOutlets(poweroutlets, deviceType, nb):
|
||||
all_poweroutlets = {str(item): item for item in nb.dcim.power_outlet_templates.filter(devicetype_id=deviceType)}
|
||||
need_poweroutlets = []
|
||||
for poweroutlet in poweroutlets:
|
||||
try:
|
||||
poGet = all_poweroutlets[poweroutlet["name"]]
|
||||
print(f'Power Outlet Template Exists: {poGet.name} - '
|
||||
+ f'{poGet.type} - {poGet.device_type.id} - {poGet.id}')
|
||||
except KeyError:
|
||||
poweroutlet["device_type"] = deviceType
|
||||
need_poweroutlets.append(poweroutlet)
|
||||
|
||||
if not need_poweroutlets:
|
||||
return
|
||||
|
||||
all_power_ports = {str(item): item for item in nb.dcim.power_port_templates.filter(devicetype_id=deviceType)}
|
||||
for outlet in need_poweroutlets:
|
||||
try:
|
||||
ppGet = all_power_ports[outlet["power_port"]]
|
||||
outlet['power_port'] = ppGet.id
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
poSuccess = nb.dcim.power_outlet_templates.create(
|
||||
need_poweroutlets)
|
||||
for po in poSuccess:
|
||||
print(f'Power Outlet Created: {po.name} - '
|
||||
+ f'{po.type} - {po.device_type.id} - '
|
||||
+ f'{po.id}')
|
||||
counter.update({'updated': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def create_module_power_outlets(poweroutlets, module_type, nb):
|
||||
'''Create missing module power outlets.
|
||||
|
||||
Args:
|
||||
poweroutlets: YAML power outlet data.
|
||||
module_type: Netbox module_type instance.
|
||||
nb: pynetbox API instance.
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Raises:
|
||||
None
|
||||
'''
|
||||
all_poweroutlets = {str(item): item for item in nb.dcim.power_outlet_templates.filter(moduletype_id=module_type)}
|
||||
need_poweroutlets = []
|
||||
for poweroutlet in poweroutlets:
|
||||
try:
|
||||
poGet = all_poweroutlets[poweroutlet["name"]]
|
||||
print(f'Power Outlet Template Exists: {poGet.name} - {poGet.type}'
|
||||
+ f' - {poGet.module_type.id} - {poGet.id}')
|
||||
except KeyError:
|
||||
poweroutlet["module_type"] = module_type
|
||||
need_poweroutlets.append(poweroutlet)
|
||||
|
||||
if not need_poweroutlets:
|
||||
return
|
||||
|
||||
all_power_ports = {str(item): item for item in nb.dcim.\
|
||||
power_port_templates.filter(moduletype_id=module_type)}
|
||||
for outlet in need_poweroutlets:
|
||||
try:
|
||||
ppGet = all_power_ports[outlet["power_port"]]
|
||||
outlet['power_port'] = ppGet.id
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
poSuccess = nb.dcim.power_outlet_templates.create(
|
||||
need_poweroutlets)
|
||||
for po in poSuccess:
|
||||
print(f'Power Outlet Created: {po.name} - '
|
||||
+ f'{po.type} - {po.module_type.id} - '
|
||||
+ f'{po.id}')
|
||||
counter.update({'module_port_added': 1})
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
def createDeviceTypes(deviceTypes, nb):
|
||||
all_device_types = {str(item): item for item in nb.dcim.device_types.all()}
|
||||
for deviceType in deviceTypes:
|
||||
try:
|
||||
dt = all_device_types[deviceType["model"]]
|
||||
print(f'Device Type Exists: {dt.manufacturer.name} - '
|
||||
+ f'{dt.model} - {dt.id}')
|
||||
except KeyError:
|
||||
try:
|
||||
dt = nb.dcim.device_types.create(deviceType)
|
||||
counter.update({'added': 1})
|
||||
print(f'Device Type Created: {dt.manufacturer.name} - '
|
||||
+ f'{dt.model} - {dt.id}')
|
||||
except pynetbox.RequestError as e:
|
||||
print(e.error)
|
||||
|
||||
if "interfaces" in deviceType:
|
||||
createInterfaces(deviceType["interfaces"],
|
||||
dt.id, nb)
|
||||
if "power-ports" in deviceType:
|
||||
createPowerPorts(deviceType["power-ports"],
|
||||
dt.id, nb)
|
||||
if "power-port" in deviceType:
|
||||
createPowerPorts(deviceType["power-port"],
|
||||
dt.id, nb)
|
||||
if "console-ports" in deviceType:
|
||||
createConsolePorts(deviceType["console-ports"],
|
||||
dt.id, nb)
|
||||
if "power-outlets" in deviceType:
|
||||
createPowerOutlets(deviceType["power-outlets"],
|
||||
dt.id, nb)
|
||||
if "console-server-ports" in deviceType:
|
||||
createConsoleServerPorts(
|
||||
deviceType["console-server-ports"], dt.id, nb)
|
||||
if "rear-ports" in deviceType:
|
||||
createRearPorts(deviceType["rear-ports"],
|
||||
dt.id, nb)
|
||||
if "front-ports" in deviceType:
|
||||
createFrontPorts(deviceType["front-ports"],
|
||||
dt.id, nb)
|
||||
if "device-bays" in deviceType:
|
||||
createDeviceBays(deviceType["device-bays"],
|
||||
dt.id, nb)
|
||||
if settings.NETBOX_FEATURES['modules'] and 'module-bays' in deviceType:
|
||||
create_module_bays(deviceType['module-bays'],
|
||||
dt.id, nb)
|
||||
|
||||
def create_module_types(module_types, nb):
|
||||
'''Create missing module types.
|
||||
|
||||
Args:
|
||||
module_types: yaml data from repo.
|
||||
nb: pynetbox API instance
|
||||
|
||||
Returns:
|
||||
None
|
||||
'''
|
||||
|
||||
all_module_types = {}
|
||||
for curr_nb_mt in nb.dcim.module_types.all():
|
||||
if curr_nb_mt.manufacturer.slug not in all_module_types:
|
||||
all_module_types[curr_nb_mt.manufacturer.slug] = {}
|
||||
|
||||
all_module_types[curr_nb_mt.manufacturer.slug][curr_nb_mt.model] = curr_nb_mt
|
||||
|
||||
|
||||
for curr_mt in module_types:
|
||||
try:
|
||||
module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]]
|
||||
print(f'Module Type Exists: {module_type_res.manufacturer.name} - '
|
||||
+ f'{module_type_res.model} - {module_type_res.id}')
|
||||
except KeyError:
|
||||
try:
|
||||
module_type_res = nb.dcim.module_types.create(curr_mt)
|
||||
counter.update({'module_added': 1})
|
||||
print(f'Module Type Created: {module_type_res.manufacturer.name} - '
|
||||
+ f'{module_type_res.model} - {module_type_res.id}')
|
||||
except pynetbox.RequestError as exce:
|
||||
print(f"Error '{exce.error}' creating module type: " +
|
||||
f"{curr_mt}")
|
||||
|
||||
#module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]]
|
||||
|
||||
if "interfaces" in curr_mt:
|
||||
create_module_interfaces(curr_mt["interfaces"],
|
||||
module_type_res.id, nb)
|
||||
if "power-ports" in curr_mt:
|
||||
create_module_power_ports(curr_mt["power-ports"],
|
||||
module_type_res.id, nb)
|
||||
if "console-ports" in curr_mt:
|
||||
create_module_console_ports(curr_mt["console-ports"],
|
||||
module_type_res.id, nb)
|
||||
if "power-outlets" in curr_mt: # No current entries to test
|
||||
create_module_power_outlets(curr_mt["power-outlets"],
|
||||
module_type_res.id, nb)
|
||||
if "console-server-ports" in curr_mt: # No current entries to test
|
||||
create_module_console_server_ports(curr_mt["console-server-ports"],
|
||||
module_type_res.id, nb)
|
||||
if "rear-ports" in curr_mt:
|
||||
create_module_rear_ports(curr_mt["rear-ports"],
|
||||
module_type_res.id, nb)
|
||||
if "front-ports" in curr_mt:
|
||||
create_module_front_ports(curr_mt["front-ports"],
|
||||
module_type_res.id, nb)
|
||||
|
||||
def main():
|
||||
|
||||
cwd = os.getcwd()
|
||||
startTime = datetime.now()
|
||||
args = settings.args
|
||||
|
||||
nbUrl = settings.NETBOX_URL
|
||||
nbToken = settings.NETBOX_TOKEN
|
||||
nb = pynetbox.api(nbUrl, token=nbToken)
|
||||
|
||||
try:
|
||||
determine_features(nb)
|
||||
except requests.exceptions.SSLError as ssl_exception:
|
||||
if not settings.IGNORE_SSL_ERRORS:
|
||||
settings.handle.exception("SSLError", settings.IGNORE_SSL_ERRORS, ssl_exception)
|
||||
print("IGNORE_SSL_ERRORS is True, catching exception and disabling SSL verification.")
|
||||
requests.packages.urllib3.disable_warnings()
|
||||
nb.http_session.verify = False
|
||||
determine_features(nb)
|
||||
netbox = NetBox(settings)
|
||||
files, vendors = settings.dtl_repo.get_devices(
|
||||
f'{settings.dtl_repo.repo_path}/device-types/', args.vendors)
|
||||
|
||||
if not args.vendors:
|
||||
print("No Vendors Specified, Gathering All Device-Types")
|
||||
files, vendors = getFiles()
|
||||
else:
|
||||
print("Vendor Specified, Gathering All Matching Device-Types")
|
||||
files, vendors = getFiles(args.vendors)
|
||||
settings.handle.log(f'{len(vendors)} Vendors Found')
|
||||
device_types = settings.dtl_repo.parse_files(files, slugs=args.slugs)
|
||||
settings.handle.log(f'{len(device_types)} Device-Types Found')
|
||||
netbox.create_manufacturers(vendors)
|
||||
netbox.create_device_types(device_types)
|
||||
|
||||
if netbox.modules:
|
||||
settings.handle.log("Modules Enabled. Creating Modules...")
|
||||
files, vendors = settings.dtl_repo.get_devices(
|
||||
f'{settings.dtl_repo.repo_path}/module-types/', args.vendors)
|
||||
settings.handle.log(f'{len(vendors)} Module Vendors Found')
|
||||
module_types = settings.dtl_repo.parse_files(files, slugs=args.slugs)
|
||||
settings.handle.log(f'{len(module_types)} Module-Types Found')
|
||||
netbox.create_manufacturers(vendors)
|
||||
netbox.create_module_types(module_types)
|
||||
|
||||
print(str(len(vendors)) + " Vendors Found")
|
||||
deviceTypes = readYAMl(files, slugs=args.slugs)
|
||||
print(str(len(deviceTypes)) + " Device-Types Found")
|
||||
createManufacturers(vendors, nb)
|
||||
createDeviceTypes(deviceTypes, nb)
|
||||
|
||||
settings.handle.log('---')
|
||||
settings.handle.verbose_log(
|
||||
f'Script took {(datetime.now() - startTime)} to run')
|
||||
settings.handle.log(f'{netbox.counter["added"]} devices created')
|
||||
settings.handle.log(
|
||||
f'{netbox.counter["updated"]} interfaces/ports updated')
|
||||
settings.handle.log(
|
||||
f'{netbox.counter["manufacturer"]} manufacturers created')
|
||||
if settings.NETBOX_FEATURES['modules']:
|
||||
if not args.vendors:
|
||||
print("No Vendors Specified, Gathering All Module-Types")
|
||||
files, vendors = get_files_modules()
|
||||
else:
|
||||
print("Vendor Specified, Gathering All Matching Module-Types")
|
||||
files, vendors = get_files_modules(args.vendors)
|
||||
settings.handle.log(
|
||||
f'{netbox.counter["module_added"]} modules created')
|
||||
settings.handle.log(
|
||||
f'{netbox.counter["module_port_added"]} module interface / ports created')
|
||||
|
||||
|
||||
print(str(len(vendors)) + " Module Vendors Found")
|
||||
module_types = read_yaml_modules(files, slugs=args.slugs)
|
||||
print(str(len(module_types)) + " Module-Types Found")
|
||||
createManufacturers(vendors, nb)
|
||||
create_module_types(module_types, nb)
|
||||
|
||||
print('---')
|
||||
print('Script took {} to run'.format(datetime.now() - startTime))
|
||||
print('{} devices created'.format(counter['added']))
|
||||
print('{} interfaces/ports updated'.format(counter['updated']))
|
||||
print('{} manufacturers created'.format(counter['manufacturer']))
|
||||
if settings.NETBOX_FEATURES['modules']:
|
||||
print(f"{counter['module_added']} modules created")
|
||||
print(f"{counter['module_port_added']} module interface / ports created")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
437
netbox_api.py
Normal file
437
netbox_api.py
Normal file
@ -0,0 +1,437 @@
|
||||
from collections import Counter
|
||||
import pynetbox
|
||||
import requests
|
||||
# from pynetbox import RequestError as APIRequestError
|
||||
|
||||
class NetBox:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
return super().__new__(cls)
|
||||
|
||||
def __init__(self, settings):
|
||||
self.counter = Counter(
|
||||
added=0,
|
||||
updated=0,
|
||||
manufacturer=0,
|
||||
module_added=0,
|
||||
module_port_added=0,
|
||||
)
|
||||
self.url = settings.NETBOX_URL
|
||||
self.token = settings.NETBOX_TOKEN
|
||||
self.handle = settings.handle
|
||||
self.netbox = None
|
||||
self.ignore_ssl = settings.IGNORE_SSL_ERRORS
|
||||
self.modules = False
|
||||
self.connect_api()
|
||||
self.verify_compatibility()
|
||||
self.existing_manufacturers = self.get_manufacturers()
|
||||
self.device_types = DeviceTypes(self.netbox, self.handle, self.counter)
|
||||
|
||||
def connect_api(self):
|
||||
try:
|
||||
self.netbox = pynetbox.api(self.url, token=self.token)
|
||||
if self.ignore_ssl:
|
||||
self.handle.verbose_log("IGNORE_SSL_ERRORS is True, catching exception and disabling SSL verification.")
|
||||
requests.packages.urllib3.disable_warnings()
|
||||
self.netbox.http_session.verify = False
|
||||
except Exception as e:
|
||||
self.handle.exception("Exception", 'NetBox API Error', e)
|
||||
|
||||
def get_api(self):
|
||||
return self.netbox
|
||||
|
||||
def get_counter(self):
|
||||
return self.counter
|
||||
|
||||
def verify_compatibility(self):
|
||||
# nb.version should be the version in the form '3.2'
|
||||
version_split = [int(x) for x in self.netbox.version.split('.')]
|
||||
|
||||
# Later than 3.2
|
||||
# Might want to check for the module-types entry as well?
|
||||
if version_split[0] > 3 or (version_split[0] == 3 and version_split[1] >= 2):
|
||||
self.modules = True
|
||||
|
||||
def get_manufacturers(self):
|
||||
return {str(item): item for item in self.netbox.dcim.manufacturers.all()}
|
||||
|
||||
def create_manufacturers(self, vendors):
|
||||
to_create = []
|
||||
self.existing_manufacturers = self.get_manufacturers()
|
||||
for vendor in vendors:
|
||||
try:
|
||||
manGet = self.existing_manufacturers[vendor["name"]]
|
||||
self.handle.verbose_log(f'Manufacturer Exists: {manGet.name} - {manGet.id}')
|
||||
except KeyError:
|
||||
to_create.append(vendor)
|
||||
self.handle.verbose_log(f"Manufacturer queued for addition: {vendor['name']}")
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
created_manufacturers = self.netbox.dcim.manufacturers.create(to_create)
|
||||
for manufacturer in created_manufacturers:
|
||||
self.handle.verbose_log(f'Manufacturer Created: {manufacturer.name} - '
|
||||
+ f'{manufacturer.id}')
|
||||
self.counter.update({'manufacturer': 1})
|
||||
except pynetbox.RequestError as request_error:
|
||||
self.handle.log("Error creating manufacturers")
|
||||
self.handle.verbose_log(f"Error during manufacturer creation. - {request_error.error}")
|
||||
|
||||
def create_device_types(self, device_types_to_add):
|
||||
for device_type in device_types_to_add:
|
||||
try:
|
||||
dt = self.device_types.existing_device_types[device_type["model"]]
|
||||
self.handle.verbose_log(f'Device Type Exists: {dt.manufacturer.name} - '
|
||||
+ f'{dt.model} - {dt.id}')
|
||||
except KeyError:
|
||||
try:
|
||||
dt = self.netbox.dcim.device_types.create(device_type)
|
||||
self.counter.update({'added': 1})
|
||||
self.handle.verbose_log(f'Device Type Created: {dt.manufacturer.name} - '
|
||||
+ f'{dt.model} - {dt.id}')
|
||||
except pynetbox.RequestError as e:
|
||||
self.handle.log(f'Error {e.error} creating device type:'
|
||||
f' {device_type["manufacturer"]["name"]} {device_type["model"]}')
|
||||
continue
|
||||
|
||||
if "interfaces" in device_type:
|
||||
self.device_types.create_interfaces(device_type["interfaces"], dt.id)
|
||||
if "power-ports" in device_type:
|
||||
self.device_types.create_power_ports(device_type["power-ports"], dt.id)
|
||||
if "power-port" in device_type:
|
||||
self.device_types.create_power_ports(device_type["power-port"], dt.id)
|
||||
if "console-ports" in device_type:
|
||||
self.device_types.create_console_ports(device_type["console-ports"], dt.id)
|
||||
if "power-outlets" in device_type:
|
||||
self.device_types.create_power_outlets(device_type["power-outlets"], dt.id)
|
||||
if "console-server-ports" in device_type:
|
||||
self.device_types.create_console_server_ports(device_type["console-server-ports"], dt.id)
|
||||
if "rear-ports" in device_type:
|
||||
self.device_types.create_rear_ports(device_type["rear-ports"], dt.id)
|
||||
if "front-ports" in device_type:
|
||||
self.device_types.create_front_ports(device_type["front-ports"], dt.id)
|
||||
if "device-bays" in device_type:
|
||||
self.device_types.create_device_bays(device_type["device-bays"], dt.id)
|
||||
if self.modules and 'module-bays' in device_type:
|
||||
self.device_types.create_module_bays(device_type['module-bays'], dt.id)
|
||||
|
||||
def create_module_types(self, module_types):
|
||||
all_module_types = {}
|
||||
for curr_nb_mt in self.netbox.dcim.module_types.all():
|
||||
if curr_nb_mt.manufacturer.slug not in all_module_types:
|
||||
all_module_types[curr_nb_mt.manufacturer.slug] = {}
|
||||
|
||||
all_module_types[curr_nb_mt.manufacturer.slug][curr_nb_mt.model] = curr_nb_mt
|
||||
|
||||
|
||||
for curr_mt in module_types:
|
||||
try:
|
||||
module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]]
|
||||
self.handle.verbose_log(f'Module Type Exists: {module_type_res.manufacturer.name} - '
|
||||
+ f'{module_type_res.model} - {module_type_res.id}')
|
||||
except KeyError:
|
||||
try:
|
||||
module_type_res = self.netbox.dcim.module_types.create(curr_mt)
|
||||
self.counter.update({'module_added': 1})
|
||||
self.handle.verbose_log(f'Module Type Created: {module_type_res.manufacturer.name} - '
|
||||
+ f'{module_type_res.model} - {module_type_res.id}')
|
||||
except pynetbox.RequestError as exce:
|
||||
self.handle.log(f"Error '{exce.error}' creating module type: " +
|
||||
f"{curr_mt}")
|
||||
|
||||
if "interfaces" in curr_mt:
|
||||
self.device_types.create_module_interfaces(curr_mt["interfaces"], module_type_res.id)
|
||||
if "power-ports" in curr_mt:
|
||||
self.device_types.create_module_power_ports(curr_mt["power-ports"], module_type_res.id)
|
||||
if "console-ports" in curr_mt:
|
||||
self.device_types.create_module_console_ports(curr_mt["console-ports"], module_type_res.id)
|
||||
if "power-outlets" in curr_mt:
|
||||
self.device_types.create_module_power_outlets(curr_mt["power-outlets"], module_type_res.id)
|
||||
if "console-server-ports" in curr_mt:
|
||||
self.device_types.create_module_console_server_ports(curr_mt["console-server-ports"], module_type_res.id)
|
||||
if "rear-ports" in curr_mt:
|
||||
self.device_types.create_module_rear_ports(curr_mt["rear-ports"], module_type_res.id)
|
||||
if "front-ports" in curr_mt:
|
||||
self.device_types.create_module_front_ports(curr_mt["front-ports"], module_type_res.id)
|
||||
|
||||
class DeviceTypes:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
return super().__new__(cls)
|
||||
|
||||
def __init__(self, netbox, handle, counter):
|
||||
self.netbox = netbox
|
||||
self.handle = handle
|
||||
self.counter = counter
|
||||
self.existing_device_types = self.get_device_types()
|
||||
|
||||
def get_device_types(self):
|
||||
return {str(item): item for item in self.netbox.dcim.device_types.all()}
|
||||
|
||||
def get_power_ports(self, device_type):
|
||||
return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(devicetype_id=device_type)}
|
||||
|
||||
def get_rear_ports(self, device_type):
|
||||
return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(devicetype_id=device_type)}
|
||||
|
||||
def get_module_power_ports(self, module_type):
|
||||
return {str(item): item for item in self.netbox.dcim.power_port_templates.filter(moduletype_id=module_type)}
|
||||
|
||||
def get_module_rear_ports(self, module_type):
|
||||
return {str(item): item for item in self.netbox.dcim.rear_port_templates.filter(moduletype_id=module_type)}
|
||||
|
||||
def get_device_type_ports_to_create(self, dcim_ports, device_type, existing_ports):
|
||||
to_create = [port for port in dcim_ports if port['name'] not in existing_ports]
|
||||
for port in to_create:
|
||||
port['device_type'] = device_type
|
||||
|
||||
return to_create
|
||||
|
||||
def get_module_type_ports_to_create(self, module_ports, module_type, existing_ports):
|
||||
to_create = [port for port in module_ports if port['name'] not in existing_ports]
|
||||
for port in to_create:
|
||||
port['module_type'] = module_type
|
||||
|
||||
return to_create
|
||||
|
||||
def create_interfaces(self, interfaces, device_type):
|
||||
existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(
|
||||
devicetype_id=device_type)}
|
||||
to_create = self.get_device_type_ports_to_create(
|
||||
interfaces, device_type, existing_interfaces)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.interface_templates.create(to_create), "Interface")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Interface")
|
||||
|
||||
def create_power_ports(self, power_ports, device_type):
|
||||
existing_power_ports = self.get_power_ports(device_type)
|
||||
to_create = self.get_device_type_ports_to_create(power_ports, device_type, existing_power_ports)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.power_port_templates.create(to_create), "Power Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Power Port")
|
||||
|
||||
def create_console_ports(self, console_ports, device_type):
|
||||
existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(devicetype_id=device_type)}
|
||||
to_create = self.get_device_type_ports_to_create(console_ports, device_type, existing_console_ports)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.console_port_templates.create(to_create), "Console Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Console Port")
|
||||
|
||||
def create_power_outlets(self, power_outlets, device_type):
|
||||
existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(devicetype_id=device_type)}
|
||||
to_create = self.get_device_type_ports_to_create(power_outlets, device_type, existing_power_outlets)
|
||||
|
||||
if to_create:
|
||||
existing_power_ports = self.get_power_ports(device_type)
|
||||
for outlet in to_create:
|
||||
try:
|
||||
power_port = existing_power_ports[outlet["power_port"]]
|
||||
outlet['power_port'] = power_port.id
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.power_outlet_templates.create(to_create), "Power Outlet")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Power Outlet")
|
||||
|
||||
def create_console_server_ports(self, console_server_ports, device_type):
|
||||
existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(devicetype_id=device_type)}
|
||||
to_create = self.get_device_type_ports_to_create(console_server_ports, device_type, existing_console_server_ports)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.console_server_port_templates.create(to_create), "Console Server Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Console Server Port")
|
||||
|
||||
def create_rear_ports(self, rear_ports, device_type):
|
||||
existing_rear_ports = self.get_rear_ports(device_type)
|
||||
to_create = self.get_device_type_ports_to_create(rear_ports, device_type, existing_rear_ports)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.rear_port_templates.create(to_create), "Rear Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Rear Port")
|
||||
|
||||
def create_front_ports(self, front_ports, device_type):
|
||||
existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(devicetype_id=device_type)}
|
||||
to_create = self.get_device_type_ports_to_create(front_ports, device_type, existing_front_ports)
|
||||
|
||||
if to_create:
|
||||
all_rearports = self.get_rear_ports(device_type)
|
||||
for port in to_create:
|
||||
try:
|
||||
rear_port = all_rearports[port["rear_port"]]
|
||||
port['rear_port'] = rear_port.id
|
||||
except KeyError:
|
||||
self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - '
|
||||
+ f'{port["type"]} - {device_type}')
|
||||
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.front_port_templates.create(to_create), "Front Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Front Port")
|
||||
|
||||
def create_device_bays(self, device_bays, device_type):
|
||||
existing_device_bays = {str(item): item for item in self.netbox.dcim.device_bay_templates.filter(devicetype_id=device_type)}
|
||||
to_create = self.get_device_type_ports_to_create(device_bays, device_type, existing_device_bays)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.device_bay_templates.create(to_create), "Device Bay")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Device Bay")
|
||||
|
||||
def create_module_bays(self, module_bays, device_type):
|
||||
existing_module_bays = {str(item): item for item in self.netbox.dcim.module_bay_templates.filter(devicetype_id=device_type)}
|
||||
to_create = self.get_device_type_ports_to_create(module_bays, device_type, existing_module_bays)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_device_ports_created(
|
||||
self.netbox.dcim.module_bay_templates.create(to_create), "Module Bay")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Module Bay")
|
||||
|
||||
def create_module_interfaces(self, module_interfaces, module_type):
|
||||
existing_interfaces = {str(item): item for item in self.netbox.dcim.interface_templates.filter(moduletype_id=module_type)}
|
||||
to_create = self.get_module_type_ports_to_create(module_interfaces, module_type, existing_interfaces)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_module_ports_created(
|
||||
self.netbox.dcim.interface_templates.create(to_create), "Module Interface")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Module Interface")
|
||||
|
||||
def create_module_power_ports(self, power_ports, module_type):
|
||||
existing_power_ports = self.get_module_power_ports(module_type)
|
||||
to_create = self.get_module_type_ports_to_create(power_ports, module_type, existing_power_ports)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_module_ports_created(
|
||||
self.netbox.dcim.power_port_templates.create(to_create), "Module Power Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Module Power Port")
|
||||
|
||||
def create_module_console_ports(self, console_ports, module_type):
|
||||
existing_console_ports = {str(item): item for item in self.netbox.dcim.console_port_templates.filter(moduletype_id=module_type)}
|
||||
to_create = self.get_module_type_ports_to_create(console_ports, module_type, existing_console_ports)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_module_ports_created(
|
||||
self.netbox.dcim.console_port_templates.create(to_create), "Module Console Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Module Console Port")
|
||||
|
||||
def create_module_power_outlets(self, power_outlets, module_type):
|
||||
existing_power_outlets = {str(item): item for item in self.netbox.dcim.power_outlet_templates.filter(moduletype_id=module_type)}
|
||||
to_create = self.get_module_type_ports_to_create(power_outlets, module_type, existing_power_outlets)
|
||||
|
||||
if to_create:
|
||||
existing_power_ports = self.get_module_power_ports(module_type)
|
||||
for outlet in to_create:
|
||||
try:
|
||||
power_port = existing_power_ports[outlet["power_port"]]
|
||||
outlet['power_port'] = power_port.id
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_module_ports_created(
|
||||
self.netbox.dcim.power_outlet_templates.create(to_create), "Module Power Outlet")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Module Power Outlet")
|
||||
|
||||
def create_module_console_server_ports(self, console_server_ports, module_type):
|
||||
existing_console_server_ports = {str(item): item for item in self.netbox.dcim.console_server_port_templates.filter(moduletype_id=module_type)}
|
||||
to_create = self.get_module_type_ports_to_create(console_server_ports, module_type, existing_console_server_ports)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_module_ports_created(
|
||||
self.netbox.dcim.console_server_port_templates.create(to_create), "Module Console Server Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Module Console Server Port")
|
||||
|
||||
def create_module_rear_ports(self, rear_ports, module_type):
|
||||
existing_rear_ports = self.get_module_rear_ports(module_type)
|
||||
to_create = self.get_module_type_ports_to_create(rear_ports, module_type, existing_rear_ports)
|
||||
|
||||
if to_create:
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_module_ports_created(
|
||||
self.netbox.dcim.rear_port_templates.create(to_create), "Module Rear Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Module Rear Port")
|
||||
|
||||
def create_module_front_ports(self, front_ports, module_type):
|
||||
existing_front_ports = {str(item): item for item in self.netbox.dcim.front_port_templates.filter(moduletype_id=module_type)}
|
||||
to_create = self.get_module_type_ports_to_create(front_ports, module_type, existing_front_ports)
|
||||
|
||||
if to_create:
|
||||
existing_rear_ports = self.get_module_rear_ports(module_type)
|
||||
for port in to_create:
|
||||
try:
|
||||
rear_port = existing_rear_ports[port["rear_port"]]
|
||||
port['rear_port'] = rear_port.id
|
||||
except KeyError:
|
||||
self.handle.log(f'Could not find Rear Port for Front Port: {port["name"]} - '
|
||||
+ f'{port["type"]} - {module_type}')
|
||||
|
||||
try:
|
||||
self.counter.update({'updated':
|
||||
self.handle.log_module_ports_created(
|
||||
self.netbox.dcim.front_port_templates.create(to_create), "Module Front Port")
|
||||
})
|
||||
except pynetbox.RequestError as excep:
|
||||
self.handle.log(f"Error '{excep.error}' creating Module Front Port")
|
103
repo.py
Normal file
103
repo.py
Normal file
@ -0,0 +1,103 @@
|
||||
import os
|
||||
from glob import glob
|
||||
from re import sub as re_sub
|
||||
from git import Repo, exc
|
||||
import yaml
|
||||
|
||||
|
||||
class DTLRepo:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
return super().__new__(cls)
|
||||
|
||||
def __init__(self, args, repo_path, exception_handler):
|
||||
self.handle = exception_handler
|
||||
self.yaml_extensions = ['yaml', 'yml']
|
||||
self.url = args.url
|
||||
self.repo_path = repo_path
|
||||
self.branch = args.branch
|
||||
self.repo = None
|
||||
self.cwd = os.getcwd()
|
||||
|
||||
if os.path.isdir(self.repo_path):
|
||||
self.pull_repo()
|
||||
else:
|
||||
self.clone_repo()
|
||||
|
||||
def get_relative_path(self):
|
||||
return self.repo_path
|
||||
|
||||
def get_absolute_path(self):
|
||||
return os.path.join(self.cwd, self.repo_path)
|
||||
|
||||
def get_devices_path(self):
|
||||
return os.path.join(self.get_absolute_path(), 'device-types')
|
||||
|
||||
def get_modules_path(self):
|
||||
return os.path.join(self.get_absolute_path(), 'module-types')
|
||||
|
||||
def slug_format(self, name):
|
||||
return re_sub('\W+', '-', name.lower())
|
||||
|
||||
def pull_repo(self):
|
||||
try:
|
||||
self.handle.log("Package devicetype-library is already installed, "
|
||||
+ f"updating {self.get_absolute_path()}")
|
||||
self.repo = Repo(self.repo_path)
|
||||
if not self.repo.remotes.origin.url.endswith('.git'):
|
||||
self.handle.exception("GitInvalidRepositoryError", self.repo.remotes.origin.url,
|
||||
f"Origin URL {self.repo.remotes.origin.url} does not end with .git")
|
||||
self.repo.remotes.origin.pull()
|
||||
self.repo.git.checkout(self.branch)
|
||||
self.handle.verbose_log(
|
||||
f"Pulled Repo {self.repo.remotes.origin.url}")
|
||||
except exc.GitCommandError as git_error:
|
||||
self.handle.exception(
|
||||
"GitCommandError", self.repo.remotes.origin.url, git_error)
|
||||
except Exception as git_error:
|
||||
self.handle.exception(
|
||||
"Exception", 'Git Repository Error', git_error)
|
||||
|
||||
def clone_repo(self):
|
||||
try:
|
||||
self.repo = Repo.clone_from(
|
||||
self.url, self.get_absolute_path(), branch=self.branch)
|
||||
self.handle.log(
|
||||
f"Package Installed {self.repo.remotes.origin.url}")
|
||||
except exc.GitCommandError as git_error:
|
||||
self.handle.exception("GitCommandError", self.url, git_error)
|
||||
except Exception as git_error:
|
||||
self.handle.exception(
|
||||
"Exception", 'Git Repository Error', git_error)
|
||||
|
||||
def get_devices(self, base_path, vendors: list = None):
|
||||
files = []
|
||||
discovered_vendors = []
|
||||
vendor_dirs = os.listdir(base_path)
|
||||
|
||||
for folder in [vendor for vendor in vendor_dirs if not vendors or vendor.casefold() in vendors]:
|
||||
if folder.casefold() != "testing":
|
||||
discovered_vendors.append({'name': folder,
|
||||
'slug': self.slug_format(folder)})
|
||||
for extension in self.yaml_extensions:
|
||||
files.extend(glob(base_path + folder + f'/*.{extension}'))
|
||||
return files, discovered_vendors
|
||||
|
||||
def parse_files(self, files: list, slugs: list = None):
|
||||
deviceTypes = []
|
||||
for file in files:
|
||||
with open(file, 'r') as stream:
|
||||
try:
|
||||
data = yaml.safe_load(stream)
|
||||
except yaml.YAMLError as excep:
|
||||
self.handle.verbose_log(excep)
|
||||
continue
|
||||
manufacturer = data['manufacturer']
|
||||
data['manufacturer'] = {
|
||||
'name': manufacturer, 'slug': self.slug_format(manufacturer)}
|
||||
|
||||
if slugs and True not in [True if s.casefold() in data['slug'].casefold() else False for s in slugs]:
|
||||
self.handle.verbose_log(f"Skipping {data['model']}")
|
||||
continue
|
||||
|
||||
deviceTypes.append(data)
|
||||
return deviceTypes
|
16
settings.py
16
settings.py
@ -1,8 +1,7 @@
|
||||
from argparse import ArgumentParser
|
||||
from sys import exit as system_exit
|
||||
import os
|
||||
from exception_handler import ExceptionHandler
|
||||
from gitcmd import GitCMD
|
||||
from log_handler import LogHandler
|
||||
from repo import DTLRepo
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
@ -38,11 +37,16 @@ parser.add_argument('--verbose', action='store_true', default=False,
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
handle = ExceptionHandler(args)
|
||||
args.vendors = [v.casefold()
|
||||
for vendor in args.vendors for v in vendor.split(",") if v.strip()]
|
||||
args.slugs = [s for slug in args.slugs for s in slug.split(",") if s.strip()]
|
||||
|
||||
handle = LogHandler(args)
|
||||
# Evaluate environment variables and exit if one of the mandatory ones are not set
|
||||
MANDATORY_ENV_VARS = ["REPO_URL", "NETBOX_URL", "NETBOX_TOKEN"]
|
||||
for var in MANDATORY_ENV_VARS:
|
||||
if var not in os.environ:
|
||||
handle.exception("EnvironmentError", var, f'Environment variable "{var}" is not set.\n\nMANDATORY_ENV_VARS: {str(MANDATORY_ENV_VARS)}.\n\nCURRENT_ENV_VARS: {str(os.environ)}')
|
||||
handle.exception("EnvironmentError", var,
|
||||
f'Environment variable "{var}" is not set.\n\nMANDATORY_ENV_VARS: {str(MANDATORY_ENV_VARS)}.\n\nCURRENT_ENV_VARS: {str(os.environ)}')
|
||||
|
||||
git_repo = GitCMD(args, REPO_PATH)
|
||||
dtl_repo = DTLRepo(args, REPO_PATH, handle)
|
||||
|
Loading…
Reference in New Issue
Block a user