Merge pull request #52 from dmcken/add-module-bay

Add module bays and module types.
This commit is contained in:
minitriga 2022-09-09 17:04:38 +01:00 committed by GitHub
commit 730c8d2f8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 412 additions and 3 deletions

View File

@ -12,8 +12,36 @@ import sys
import re
counter = Counter(added=0, updated=0, manufacturer=0)
counter = Counter(
added=0,
updated=0,
manufacturer=0,
module_added=0,
module_port_added=0,
)
def determine_features(nb):
'''Automatically determine the netbox features available.
Currently only checks for existence of module-types.
Args:
nb: pynetbox API instance
Returns:
None
Raises:
None
'''
# nb.version should be the version in the form '3.2'
nb_ver = [int(x) for x in nb.version.split('.')]
# Later than 3.2
# Might want to check for the module-types entry as well?
if nb_ver[0] > 3 or (nb_ver[0] == 3 and nb_ver[1] >= 2):
settings.NETBOX_FEATURES['modules'] = True
def update_package(path: str, branch: str):
try:
@ -55,6 +83,45 @@ def getFiles(vendors=None):
files.extend(glob.glob(base_path + f'[!Testing]*/*.{extension}'))
return files, discoveredVendors
def get_files_modules(vendors=None):
'''Get files list for modules.
Args:
vendors: List of vendors to sync or None to sync all vendors.
Returns:
A 2-tuple of:
- list of filenames found
- list of vendors found
'''
files = []
discoveredVendors = []
base_path = './repo/module-types/'
if vendors:
for r, d, f in os.walk(base_path):
for folder in d:
for vendor in vendors:
if vendor.lower() == folder.lower():
discoveredVendors.append({'name': folder,
'slug': slugFormat(folder)})
for extension in YAML_EXTENSIONS:
files.extend(
glob.glob(
base_path + folder + f'/*.{extension}'
)
)
else:
for r, d, f in os.walk(base_path):
for folder in d:
if folder.lower() != "Testing":
discoveredVendors.append({'name': folder,
'slug': slugFormat(folder)})
for extension in YAML_EXTENSIONS:
files.extend(glob.glob(base_path + f'[!Testing]*/*.{extension}'))
return files, discoveredVendors
def readYAMl(files, **kwargs):
slugs = kwargs.get('slugs', None)
@ -80,6 +147,30 @@ def readYAMl(files, **kwargs):
manufacturers.append(manufacturer)
return deviceTypes
def read_yaml_modules(files, **kwargs):
slugs = kwargs.get('slugs', None)
module_types = []
manufacturers = []
for file in files:
with open(file, 'r') as stream:
try:
data = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
continue
manufacturer = data['manufacturer']
data['manufacturer'] = {}
data['manufacturer']['name'] = manufacturer
data['manufacturer']['slug'] = slugFormat(manufacturer)
if slugs and data['slug'] not in slugs:
print(f"Skipping {data['model']}")
continue
module_types.append(data)
manufacturers.append(manufacturer)
return module_types
def createManufacturers(vendors, nb):
all_manufacturers = {str(item): item for item in nb.dcim.manufacturers.all()}
@ -129,7 +220,30 @@ def createInterfaces(interfaces, deviceType, nb):
except pynetbox.RequestError as e:
print(e.error)
def create_module_interfaces(interfaces, module_type, nb):
all_interfaces = {str(item): item for item in nb.dcim.interface_templates.filter(moduletype_id=module_type)}
need_interfaces = []
for interface in interfaces:
try:
if_res = all_interfaces[interface["name"]]
print(f'Interface Template Exists: {if_res.name} - {if_res.type}'
+ f' - {if_res.module_type.id} - {if_res.id}')
except KeyError:
interface['module_type'] = module_type
need_interfaces.append(interface)
if not need_interfaces:
return
try:
ifSuccess = nb.dcim.interface_templates.create(need_interfaces)
for intf in ifSuccess:
print(f'Interface Template Created: {intf.name} - '
+ f'{intf.type} - {intf.module_type.id} - '
+ f'{intf.id}')
counter.update({'module_port_added': 1})
except pynetbox.RequestError as e:
print(e.error)
def createConsolePorts(consoleports, deviceType, nb):
all_consoleports = {str(item): item for item in nb.dcim.console_port_templates.filter(devicetype_id=deviceType)}
@ -156,6 +270,30 @@ def createConsolePorts(consoleports, deviceType, nb):
except pynetbox.RequestError as e:
print(e.error)
def create_module_console_ports(consoleports, module_type, nb):
all_consoleports = {str(item): item for item in nb.dcim.console_port_templates.filter(moduletype_id=module_type)}
need_consoleports = []
for consoleport in consoleports:
try:
cpGet = all_consoleports[consoleport["name"]]
print(f'Console Port Template Exists: {cpGet.name} - '
+ f'{cpGet.type} - {cpGet.module_type.id} - {cpGet.id}')
except KeyError:
consoleport['module_type'] = module_type
need_consoleports.append(consoleport)
if not need_consoleports:
return
try:
cpSuccess = nb.dcim.console_port_templates.create(need_consoleports)
for port in cpSuccess:
print(f'Console Port Created: {port.name} - {port.type} - ' +
f'{port.module_type.id} - {port.id}')
counter.update({'module_port_added': 1})
except pynetbox.RequestError as e:
print(f"Error creating module console port: {e.error}")
def createPowerPorts(powerports, deviceType, nb):
all_power_ports = {str(item): item for item in nb.dcim.power_port_templates.filter(devicetype_id=deviceType)}
@ -164,7 +302,7 @@ def createPowerPorts(powerports, deviceType, nb):
try:
ppGet = all_power_ports[powerport["name"]]
print(f'Power Port Template Exists: {ppGet.name} - '
+ f'{ppGet.type} - {ppGet.device_type.id} - {ppGet.id}')
+ f'{ppGet.type} - {ppGet.device_type.id} - {ppGet.id}')
except KeyError:
powerport['device_type'] = deviceType
need_power_ports.append(powerport)
@ -182,6 +320,29 @@ def createPowerPorts(powerports, deviceType, nb):
except pynetbox.RequestError as e:
print(e.error)
def create_module_power_ports(powerports, module_type, nb):
all_power_ports = {str(item): item for item in nb.dcim.power_port_templates.filter(moduletype_id=module_type)}
need_power_ports = []
for powerport in powerports:
try:
ppGet = all_power_ports[powerport["name"]]
print(f'Power Port Template Exists: {ppGet.name} - ' +
f'{ppGet.type} - {ppGet.module_type.id} - {ppGet.id}')
except KeyError:
powerport['module_type'] = module_type
need_power_ports.append(powerport)
if not need_power_ports:
return
try:
ppSuccess = nb.dcim.power_port_templates.create(need_power_ports)
for pp in ppSuccess:
print(f'Power port template created: {pp.name} - '
+ f'{pp.type} - {pp.module_type.id} - {pp.id}')
counter.update({'module_port_added': 1})
except pynetbox.RequestError as e:
print(e.error)
def createConsoleServerPorts(consoleserverports, deviceType, nb):
all_consoleserverports = {str(item): item for item in nb.dcim.console_server_port_templates.filter(devicetype_id=deviceType)}
@ -210,6 +371,32 @@ def createConsoleServerPorts(consoleserverports, deviceType, nb):
except pynetbox.RequestError as e:
print(e.error)
def create_module_console_server_ports(consoleserverports, module_type, nb):
all_consoleserverports = {str(item): item for item in nb.dcim.console_server_port_templates.filter(moduletype_id=module_type)}
need_consoleserverports = []
for csport in consoleserverports:
try:
cspGet = all_consoleserverports[csport["name"]]
print(f'Console Server Port Template Exists: {cspGet.name} - '
+ f'{cspGet.type} - {cspGet.module_type.id} - '
+ f'{cspGet.id}')
except KeyError:
csport['module_type'] = module_type
need_consoleserverports.append(csport)
if not need_consoleserverports:
return
try:
cspSuccess = nb.dcim.console_server_port_templates.create(
need_consoleserverports)
for csp in cspSuccess:
print(f'Console Server Port Created: {csp.name} - '
+ f'{csp.type} - {csp.module_type.id} - '
+ f'{csp.id}')
counter.update({'module_port_added': 1})
except pynetbox.RequestError as e:
print(e.error)
def createFrontPorts(frontports, deviceType, nb):
all_frontports = {str(item): item for item in nb.dcim.front_port_templates.filter(devicetype_id=deviceType)}
@ -245,6 +432,39 @@ def createFrontPorts(frontports, deviceType, nb):
except pynetbox.RequestError as e:
print(e.error)
def create_module_front_ports(frontports, module_type, nb):
all_frontports = {str(item): item for item in nb.dcim.front_port_templates.filter(moduletype_id=module_type)}
need_frontports = []
for frontport in frontports:
try:
fpGet = all_frontports[frontport["name"]]
print(f'Front Port Template Exists: {fpGet.name} - '
+ f'{fpGet.type} - {fpGet.module_type.id} - {fpGet.id}')
except KeyError:
frontport['module_type'] = module_type
need_frontports.append(frontport)
if not need_frontports:
return
all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(moduletype_id=module_type)}
for port in need_frontports:
try:
rpGet = all_rearports[port["rear_port"]]
port['rear_port'] = rpGet.id
except KeyError:
print(f'Could not find Rear Port for Front Port: {port["name"]} - '
+ f'{port["type"]} - {module_type}')
try:
fpSuccess = nb.dcim.front_port_templates.create(need_frontports)
for fp in fpSuccess:
print(f'Front Port Created: {fp.name} - '
+ f'{fp.type} - {fp.module_type.id} - '
+ f'{fp.id}')
counter.update({'module_port_added': 1})
except pynetbox.RequestError as e:
print(e.error)
def createRearPorts(rearports, deviceType, nb):
all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(devicetype_id=deviceType)}
@ -271,6 +491,30 @@ def createRearPorts(rearports, deviceType, nb):
except pynetbox.RequestError as e:
print(e.error)
def create_module_rear_ports(rearports, module_type, nb):
all_rearports = {str(item): item for item in nb.dcim.rear_port_templates.filter(moduletype_id=module_type)}
need_rearports = []
for rearport in rearports:
try:
rpGet = all_rearports[rearport["name"]]
print(f'Rear Port Template Exists: {rpGet.name} - {rpGet.type}'
+ f' - {rpGet.module_type.id} - {rpGet.id}')
except KeyError:
rearport['module_type'] = module_type
need_rearports.append(rearport)
if not need_rearports:
return
try:
rpSuccess = nb.dcim.rear_port_templates.create(
need_rearports)
for rp in rpSuccess:
print(f'Rear Port Created: {rp.name} - {rp.type}'
+ f' - {rp.module_type.id} - {rp.id}')
counter.update({'module_port_added': 1})
except pynetbox.RequestError as e:
print(e.error)
def createDeviceBays(devicebays, deviceType, nb):
all_devicebays = {str(item): item for item in nb.dcim.device_bay_templates.filter(devicetype_id=deviceType)}
@ -297,6 +541,42 @@ def createDeviceBays(devicebays, deviceType, nb):
print(e.error)
def create_module_bays(module_bays, device_type, nb):
'''Create module bays.
Args:
module_bays: parsed YAML module_bays section.
device_type: the device type instance from netbox.
nb: Netbox API instance
'''
all_module_bays = {
str(item): item for item in nb.dcim.module_bay_templates.filter(
devicetype_id=device_type
)
}
need_module_bays = []
for module_bay in module_bays:
try:
dbGet = all_module_bays[module_bay["name"]]
print(f'Module Bay Template Exists: {dbGet.name} - '
+ f'{dbGet.device_type.id} - {dbGet.id}')
except KeyError:
module_bay['device_type'] = device_type
need_module_bays.append(module_bay)
if not need_module_bays:
return
try:
module_bay_res = nb.dcim.module_bay_templates.create(need_module_bays)
for module_bay in module_bay_res:
print(f'Module Bay Created: {module_bay.name} - '
+ f'{module_bay.device_type.id} - {module_bay.id}')
counter.update({'updated': 1})
except pynetbox.RequestError as e:
print(e.error)
def createPowerOutlets(poweroutlets, deviceType, nb):
all_poweroutlets = {str(item): item for item in nb.dcim.power_outlet_templates.filter(devicetype_id=deviceType)}
need_poweroutlets = []
@ -331,6 +611,53 @@ def createPowerOutlets(poweroutlets, deviceType, nb):
except pynetbox.RequestError as e:
print(e.error)
def create_module_power_outlets(poweroutlets, module_type, nb):
'''Create missing module power outlets.
Args:
poweroutlets: YAML power outlet data.
module_type: Netbox module_type instance.
nb: pynetbox API instance.
Returns:
None
Raises:
None
'''
all_poweroutlets = {str(item): item for item in nb.dcim.power_outlet_templates.filter(moduletype_id=module_type)}
need_poweroutlets = []
for poweroutlet in poweroutlets:
try:
poGet = all_poweroutlets[poweroutlet["name"]]
print(f'Power Outlet Template Exists: {poGet.name} - {poGet.type}'
+ f' - {poGet.module_type.id} - {poGet.id}')
except KeyError:
poweroutlet["module_type"] = module_type
need_poweroutlets.append(poweroutlet)
if not need_poweroutlets:
return
all_power_ports = {str(item): item for item in nb.dcim.\
power_port_templates.filter(moduletype_id=module_type)}
for outlet in need_poweroutlets:
try:
ppGet = all_power_ports[outlet["power_port"]]
outlet['power_port'] = ppGet.id
except KeyError:
pass
try:
poSuccess = nb.dcim.power_outlet_templates.create(
need_poweroutlets)
for po in poSuccess:
print(f'Power Outlet Created: {po.name} - '
+ f'{po.type} - {po.module_type.id} - '
+ f'{po.id}')
counter.update({'module_port_added': 1})
except pynetbox.RequestError as e:
print(e.error)
def createDeviceTypes(deviceTypes, nb):
all_device_types = {str(item): item for item in nb.dcim.device_types.all()}
@ -375,7 +702,67 @@ def createDeviceTypes(deviceTypes, nb):
if "device-bays" in deviceType:
createDeviceBays(deviceType["device-bays"],
dt.id, nb)
if settings.NETBOX_FEATURES['modules'] and 'module-bays' in deviceType:
create_module_bays(deviceType['module-bays'],
dt.id, nb)
def create_module_types(module_types, nb):
'''Create missing module types.
Args:
module_types: yaml data from repo.
nb: pynetbox API instance
Returns:
None
'''
all_module_types = {}
for curr_nb_mt in nb.dcim.module_types.all():
if curr_nb_mt.manufacturer.slug not in all_module_types:
all_module_types[curr_nb_mt.manufacturer.slug] = {}
all_module_types[curr_nb_mt.manufacturer.slug][curr_nb_mt.model] = curr_nb_mt
for curr_mt in module_types:
try:
module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]]
print(f'Module Type Exists: {module_type_res.manufacturer.name} - '
+ f'{module_type_res.model} - {module_type_res.id}')
except KeyError:
try:
module_type_res = nb.dcim.module_types.create(curr_mt)
counter.update({'module_added': 1})
print(f'Module Type Created: {module_type_res.manufacturer.name} - '
+ f'{module_type_res.model} - {module_type_res.id}')
except pynetbox.RequestError as exce:
print(f"Error '{exce.error}' creating module type: " +
f"{curr_mt}")
#module_type_res = all_module_types[curr_mt['manufacturer']['slug']][curr_mt["model"]]
if "interfaces" in curr_mt:
create_module_interfaces(curr_mt["interfaces"],
module_type_res.id, nb)
if "power-ports" in curr_mt:
create_module_power_ports(curr_mt["power-ports"],
module_type_res.id, nb)
if "console-ports" in curr_mt:
create_module_console_ports(curr_mt["console-ports"],
module_type_res.id, nb)
if "power-outlets" in curr_mt: # No current entries to test
create_module_power_outlets(curr_mt["power-outlets"],
module_type_res.id, nb)
if "console-server-ports" in curr_mt: # No current entries to test
create_module_console_server_ports(curr_mt["console-server-ports"],
module_type_res.id, nb)
if "rear-ports" in curr_mt:
create_module_rear_ports(curr_mt["rear-ports"],
module_type_res.id, nb)
if "front-ports" in curr_mt:
create_module_front_ports(curr_mt["front-ports"],
module_type_res.id, nb)
def main():
@ -386,6 +773,8 @@ def main():
nbToken = settings.NETBOX_TOKEN
nb = pynetbox.api(nbUrl, token=nbToken)
determine_features(nb)
if settings.IGNORE_SSL_ERRORS:
import requests
requests.packages.urllib3.disable_warnings()
@ -436,12 +825,29 @@ def main():
createManufacturers(vendors, nb)
createDeviceTypes(deviceTypes, nb)
if settings.NETBOX_FEATURES['modules']:
if not args.vendors:
print("No Vendors Specified, Gathering All Module-Types")
files, vendors = get_files_modules()
else:
print("Vendor Specified, Gathering All Matching Module-Types")
files, vendors = get_files_modules(args.vendors)
print(str(len(vendors)) + " Module Vendors Found")
module_types = read_yaml_modules(files, slugs=args.slugs)
print(str(len(module_types)) + " Module-Types Found")
createManufacturers(vendors, nb)
create_module_types(module_types, nb)
print('---')
print('Script took {} to run'.format(datetime.now() - startTime))
print('{} devices created'.format(counter['added']))
print('{} interfaces/ports updated'.format(counter['updated']))
print('{} manufacturers created'.format(counter['manufacturer']))
if settings.NETBOX_FEATURES['modules']:
print(f"{counter['module_added']} modules created")
print(f"{counter['module_port_added']} module interface / ports created")
if __name__ == "__main__":
main()

View File

@ -14,6 +14,9 @@ VENDORS = list(filter(None, os.getenv("VENDORS", "").split(",")))
# optionally load device types through a space separated list as env var
SLUGS = os.getenv("SLUGS", "").split()
NETBOX_FEATURES = {
'modules': False,
}
MANDATORY_ENV_VARS = ["REPO_URL", "NETBOX_URL", "NETBOX_TOKEN"]