Files
netbox-audit/crud.py
2025-06-30 20:33:10 +03:00

85 lines
3.7 KiB
Python

import json
import logging
import os
from typing import TYPE_CHECKING
from init import netbox, oxi
from type import vimpelcomIPaddress374, vimpelcomIPaddressSSPD, vimpelcomIPaddressTEST
if TYPE_CHECKING:
from pynetbox.core.response import RecordSet
logging.basicConfig()
log = logging.getLogger()
def get_devices_filtered(filters: dict) -> tuple['RecordSet', list]:
devices = netbox.dcim.devices.filter(**filters)
device_list = [item.name for item in devices]
devices = netbox.dcim.devices.filter(**filters)
if not os.path.exists(f'devices_{filters.get('tenant')}.json'):
with open(f'devices_{filters.get('tenant')}.json', 'w') as file:
json.dump(device_list, file, ensure_ascii=False, indent=3)
return devices, device_list
def create_interfaces(devices: 'RecordSet', device_list: list):
for device in devices:
if device.name not in device_list:
continue
ex_interface = netbox.dcim.interfaces.filter(device=device.name)
log.debug("device: %r", device.name)
try:
oxidized_device = oxi.node(device.name.replace("(1)", ""))
except ValueError:
log.warning("%r no exist in Oxidized", device.name)
continue
log.debug("Vlan-interfaces: %r", oxidized_device.config.vlaninterfaces())
if oxidized_device.config.vlaninterfaces():
for vlan_interface in oxidized_device.config.vlaninterfaces():
log.debug("IPaddress: %r", vlan_interface.ip_address)
checker = netbox.dcim.interfaces.get(
name=vlan_interface.interface,
device=device.name
)
if checker:
continue
child_interface = netbox.dcim.interfaces.create(
name=vlan_interface.interface,
type='virtual',
device=device.id
)
print(device)
print("description: ", vlan_interface.description)
if vlan_interface.description:
child_interface.description = vlan_interface.description
child_interface.save()
print("child_interface: ", child_interface)
print("ip address:", vlan_interface.ip_address)
if vlan_interface.ip_address:
netbox_ip = netbox.ipam.ip_addresses.create(
address=str(vlan_interface.ip_address),
tenant=device.tenant.id,
)
if vlan_interface.ip_address in vimpelcomIPaddress374.prefix:
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddress374.vrf).id
elif vlan_interface.ip_address in vimpelcomIPaddressSSPD.prefix:
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddressSSPD.vrf).id
else:
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddressTEST.vrf).id
netbox_ip.assigned_object = child_interface
netbox_ip.assigned_object_id = child_interface.id
netbox_ip.assigned_object_type = 'dcim.interface'
netbox_ip.save()
device_list.remove(device.name)
with open('devices.json', 'w') as file:
json.dump(device_list, file, ensure_ascii=False, indent=3)
def create_vlans(devices: 'RecordSet', device_list: list):
for device in devices:
print(device.name.replace("(1)", ""))
oxidized_device = oxi.node(device.name.replace("(1)", ""))
print(oxidized_device.config.l3interfaces())
print(oxidized_device.config.vlans())
break