Compare commits

...

10 Commits

Author SHA1 Message Date
IluaAir
338b9e69a7 fix cached property 2025-06-30 21:56:43 +03:00
IluaAir
e1eeefd42d fix base vlan pattern 2025-06-30 21:06:03 +03:00
IluaAir
c30729ec63 create vlans 2025-06-30 20:33:10 +03:00
IluaAir
2bdbad096d add type for get_devices_filtered 2025-06-30 20:06:57 +03:00
IluaAir
08eef97ec3 update directory 2025-06-30 20:03:47 +03:00
IluaAir
8715f2223d simple add bew interfaces and ipaddresses 2025-06-26 12:16:25 +03:00
IluaAir
8bb5ae94df add ipaddress for base class 2025-06-25 17:22:57 +03:00
IluaAir
2e54749159 add exception 2025-06-25 16:18:48 +03:00
IluaAir
7f3119a99d add checker 2025-06-25 10:51:02 +03:00
IluaAir
1eb0ff1eca add oxiApi 2025-06-25 10:32:51 +03:00
17 changed files with 229 additions and 76 deletions

7
app.py Normal file
View File

@@ -0,0 +1,7 @@
from crud import get_devices_filtered, create_vlans
from type import vimpelcomSWType
devices_vimpelcom = get_devices_filtered(vimpelcomSWType)
vlans_vimpelcom = create_vlans(*devices_vimpelcom)

84
crud.py Normal file
View File

@@ -0,0 +1,84 @@
import json
import logging
import os
from typing import TYPE_CHECKING
from init import netbox, oxi
from type import vimpelcomIPaddress374, vimpelcomIPaddressSSPD, vimpelcomIPaddressTEST
if TYPE_CHECKING:
from pynetbox.core.response import RecordSet
logging.basicConfig()
log = logging.getLogger()
def get_devices_filtered(filters: dict) -> tuple['RecordSet', list]:
devices = netbox.dcim.devices.filter(**filters)
device_list = [item.name for item in devices]
devices = netbox.dcim.devices.filter(**filters)
if not os.path.exists(f'devices_{filters.get('tenant')}.json'):
with open(f'devices_{filters.get('tenant')}.json', 'w') as file:
json.dump(device_list, file, ensure_ascii=False, indent=3)
return devices, device_list
def create_interfaces(devices: 'RecordSet', device_list: list):
for device in devices:
if device.name not in device_list:
continue
ex_interface = netbox.dcim.interfaces.filter(device=device.name)
log.debug("device: %r", device.name)
try:
oxidized_device = oxi.node(device.name.replace("(1)", ""))
except ValueError:
log.warning("%r no exist in Oxidized", device.name)
continue
log.debug("Vlan-interfaces: %r", oxidized_device.config.vlaninterfaces())
if oxidized_device.config.vlaninterfaces():
for vlan_interface in oxidized_device.config.vlaninterfaces():
log.debug("IPaddress: %r", vlan_interface.ip_address)
checker = netbox.dcim.interfaces.get(
name=vlan_interface.interface,
device=device.name
)
if checker:
continue
child_interface = netbox.dcim.interfaces.create(
name=vlan_interface.interface,
type='virtual',
device=device.id
)
print(device)
print("description: ", vlan_interface.description)
if vlan_interface.description:
child_interface.description = vlan_interface.description
child_interface.save()
print("child_interface: ", child_interface)
print("ip address:", vlan_interface.ip_address)
if vlan_interface.ip_address:
netbox_ip = netbox.ipam.ip_addresses.create(
address=str(vlan_interface.ip_address),
tenant=device.tenant.id,
)
if vlan_interface.ip_address in vimpelcomIPaddress374.prefix:
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddress374.vrf).id
elif vlan_interface.ip_address in vimpelcomIPaddressSSPD.prefix:
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddressSSPD.vrf).id
else:
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddressTEST.vrf).id
netbox_ip.assigned_object = child_interface
netbox_ip.assigned_object_id = child_interface.id
netbox_ip.assigned_object_type = 'dcim.interface'
netbox_ip.save()
device_list.remove(device.name)
with open('devices.json', 'w') as file:
json.dump(device_list, file, ensure_ascii=False, indent=3)
def create_vlans(devices: 'RecordSet', device_list: list):
for device in devices:
print(device.name.replace("(1)", ""))
oxidized_device = oxi.node(device.name.replace("(1)", ""))
print(oxidized_device.config.l3interfaces())
print(oxidized_device.config.vlans())
break

14
init.py Normal file
View File

@@ -0,0 +1,14 @@
import pynetbox
from oxi.manager import OxidizedAPI
from settings import settings
netbox = pynetbox.api(
settings.nb_url,
token=settings.nb_token)
netbox.http_session.verify = False
oxi = OxidizedAPI(
username=settings.oxi_username,
password=settings.oxi_password,
verify=False)

0
oxi/__init__.py Normal file
View File

View File

@@ -1,10 +1,6 @@
from .vrp import Vrp from oxi.interface.registry import register_parser, device_registry
from .qtech import Qtech
from .mikrotik import Mikrotik
from .bdcom import BDcom
__all__ = [ __all__ = [
'Vrp', 'register_parser',
'Qtech', 'device_registry'
'Mikrotik',
'BDcom'
] ]

View File

@@ -1,6 +1,7 @@
import ipaddress
import re import re
from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from typing import Protocol
@dataclass @dataclass
@@ -25,7 +26,7 @@ class ParsedDeviceData:
vlans: list[Vlan] vlans: list[Vlan]
class BaseDevice(ABC): class BaseDevice(Protocol):
anchor_pattern: str = '!' anchor_pattern: str = '!'
hostname_pattern: str = 'hostname' hostname_pattern: str = 'hostname'
@@ -35,7 +36,7 @@ class BaseDevice(ABC):
@property @property
def vlan_parse_pattern(self): def vlan_parse_pattern(self):
return rf"^vlan\s+(\d{{1,4}})\n(.*?)(?=^{self.anchor_pattern}|\Z)" return rf"^vlan\s+(\d{{1,4}})\r?\n(.*?)(?=^{self.anchor_pattern}|\Z)"
unamed_vlan_splitter: str = ',' unamed_vlan_splitter: str = ','
unamed_vlan_counter = '-' unamed_vlan_counter = '-'
@@ -81,7 +82,12 @@ class BaseDevice(ABC):
ip_address_match = re.search(r'\s?ip address\s(.+)$', interface_block, re.MULTILINE) ip_address_match = re.search(r'\s?ip address\s(.+)$', interface_block, re.MULTILINE)
if not description_match and not ip_address_match: if not description_match and not ip_address_match:
continue continue
ip_address = ip_address_match.group(1) if ip_address_match else None ip_address = ip_address_match.group(1).replace(' ', '/').replace('\r', '') if ip_address_match else None
try:
if ip_address is not None:
ip_address = ipaddress.ip_interface(ip_address)
except ValueError:
continue
description = description_match.group(1) if description_match else None description = description_match.group(1) if description_match else None
interfaces.append(L3Interface( interfaces.append(L3Interface(
interface=interface_name, interface=interface_name,

View File

@@ -1,5 +0,0 @@
from oxi.interface import Qtech
class BDcom(Qtech):
pass

View File

@@ -0,0 +1,7 @@
import importlib
import pkgutil
package = __package__
for loader, module_name, is_pkg in pkgutil.iter_modules(__path__):
importlib.import_module(f"{package}.{module_name}")

View File

@@ -0,0 +1,7 @@
from oxi.interface import register_parser
from oxi.interface.models.qtech import Qtech
@register_parser("BDCOM")
class BDcom(Qtech):
pass

View File

@@ -1,5 +1,7 @@
from oxi.interface import register_parser
from oxi.interface.base import BaseDevice from oxi.interface.base import BaseDevice
@register_parser("Mikrotik")
class Mikrotik(BaseDevice): class Mikrotik(BaseDevice):
... ...

View File

@@ -1,8 +1,10 @@
import re import re
from oxi.interface import register_parser
from oxi.interface.base import BaseDevice from oxi.interface.base import BaseDevice
@register_parser("QTECH")
class Qtech(BaseDevice): class Qtech(BaseDevice):
def __init__(self, config): def __init__(self, config):
@@ -12,12 +14,3 @@ class Qtech(BaseDevice):
pattern = r"Pending configurations.*" pattern = r"Pending configurations.*"
cleaned_text = re.sub(pattern, "", config, flags=re.DOTALL) cleaned_text = re.sub(pattern, "", config, flags=re.DOTALL)
return cleaned_text return cleaned_text
with open('../../core_switch.txt', 'r') as file:
data = file.read()
result = Qtech(data).parse_config()
print(result.vlans)
print(result.l3interfaces)
print(result.vlaninterfaces)

View File

@@ -1,8 +1,10 @@
import re import re
from oxi.interface import register_parser
from oxi.interface.base import BaseDevice, Vlan from oxi.interface.base import BaseDevice, Vlan
@register_parser("VRP")
class Vrp(BaseDevice): class Vrp(BaseDevice):
anchor_pattern: str = '#' anchor_pattern: str = '#'
hostname_pattern = 'sysname' hostname_pattern = 'sysname'
@@ -26,14 +28,3 @@ class Vrp(BaseDevice):
vlans.append(Vlan(vlan=str(tokens[i]), name=None, description=None)) vlans.append(Vlan(vlan=str(tokens[i]), name=None, description=None))
i += 1 i += 1
return vlans return vlans
with open('../../vrp_switch.txt', 'r') as file:
data = file.read()
result = Vrp(data).parse_config()
print(result.vlans)
print(result.l3interfaces)
print(result.vlaninterfaces)

12
oxi/interface/registry.py Normal file
View File

@@ -0,0 +1,12 @@
from typing import Callable, Type
from oxi.interface.base import BaseDevice
device_registry = {}
def register_parser(name: str) -> Callable[[Type[BaseDevice]], Type[BaseDevice]]:
def wrapper(cls):
device_registry[name.lower()] = cls
return cls
return wrapper

View File

@@ -1,9 +1,13 @@
import logging import logging
from functools import cached_property
import requests import requests
from typing import Optional from typing import Optional
from oxi.interface.base import BaseDevice
from settings import settings from settings import settings
import oxi.interface.models # noqa
from oxi.interface import device_registry
log = logging.getLogger() log = logging.getLogger()
@@ -21,10 +25,7 @@ class OxidizedAPI:
self._session.verify = verify self._session.verify = verify
if username and password: if username and password:
self._session.auth = (username, password) self._session.auth = (username, password)
self.node = Node(self._session, self.base_url)
@property
def node(self):
return Node(self._session, self.base_url)
def __enter__(self): def __enter__(self):
return self return self
@@ -48,15 +49,18 @@ class Node:
self._base_url = base_url self._base_url = base_url
self._data = None self._data = None
def show(self, name: str) -> 'NodeView': def __call__(self, name: str) -> 'NodeView':
url = f"{self._base_url}/node/show/{name}" url = f"{self._base_url}/node/show/{name}"
if not url.endswith('.json'): if not url.endswith('.json'):
url += '.json' url += '.json'
self._data = self._session.get(url) response = self._session.get(url)
if response.status_code == 500:
log.warning("Oxidized response: %r , %r not found", response.status_code, url)
raise ValueError(f'page {url} not found')
return NodeView( return NodeView(
session=self._session, session=self._session,
base_url=self._base_url, base_url=self._base_url,
data=self._data.json() data=response.json()
) )
@@ -82,28 +86,45 @@ class NodeView:
def model(self): def model(self):
return self._data.get('model') return self._data.get('model')
@property @cached_property
def config(self): def config(self):
return NodeConfig(self._session, self.full_name, self._base_url) return NodeConfig(self._session, self.full_name, self.model, self._base_url)
class NodeConfig: class NodeConfig:
def __init__(self, session: requests.Session, full_name: str, base_url: str): def __init__(self, session: requests.Session, full_name: str, model: str, base_url: str):
self._session = session self._session = session
self._full_name = full_name self._full_name = full_name
self._model = model
self._url = f"{base_url}/node/fetch/{full_name}" self._url = f"{base_url}/node/fetch/{full_name}"
self._device: type[BaseDevice] = device_registry.get(self._model.lower())
if self._device is None:
raise ValueError(f"Device model '{self._model}' not found in registry")
self._parsed_data = self._device(self.text).parse_config()
def text(self) -> str: @cached_property
return self._session.get(self._url).text def _response(self):
log.debug(f"Fetching config from {self._url}")
response = self._session.get(self._url)
response.raise_for_status()
return response
def json(self) -> dict: @property
return self._session.get(self._url).json() def text(self):
return self._response.text
def __str__(self) -> str: @property
return self.text() def json(self):
return self._response.json()
def __str__(self):
return self.text
oxi = OxidizedAPI(username=settings.oxi_username, password=settings.oxi_password, verify=False) def vlans(self):
print(oxi.node.show('Novok_HOME').config) return self._parsed_data.vlans
mikrotik = oxi.node.show('Novok_HOME').model
print(mikrotik.model) def l3interfaces(self):
return self._parsed_data.l3interfaces
def vlaninterfaces(self):
return self._parsed_data.vlaninterfaces

17
oxid.py Normal file
View File

@@ -0,0 +1,17 @@
from oxi.manager import OxidizedAPI
from settings import settings
oxi = OxidizedAPI(username=settings.oxi_username, password=settings.oxi_password, verify=False)
user_input = input('введите название оборудования: ')
result = oxi.node(user_input)
print("Model:", result.model)
print("IP:", result.ip)
print("Group:", result.group)
print("Full Name:", result.full_name)
print("-" * 15)
print("Configurations:")
print(result.config.vlans())
print(result.config.l3interfaces())
print(result.config.vlaninterfaces())

View File

@@ -1,18 +0,0 @@
import pynetbox
from settings import settings
netbox = pynetbox.api(
settings.nb_url,
token=settings.nb_token)
netbox.http_session.verify = False
filters = {
"has_primary_ip": "true",
"tenant": "vimpelcom",
"role": "Kommutator",
}
devices = netbox.dcim.devices.filter(**filters)
for device in devices:
print(f"{device.name} (IP: {device.primary_ip})")

19
type.py
View File

@@ -1,5 +1,24 @@
from dataclasses import dataclass
from ipaddress import IPv4Network
vimpelcomSWType = { vimpelcomSWType = {
"has_primary_ip": "true", "has_primary_ip": "true",
"tenant": "vimpelcom", "tenant": "vimpelcom",
"role": "Kommutator", "role": "Kommutator",
} }
@dataclass
class vimpelcomIPaddress374:
prefix: IPv4Network = IPv4Network('15.0.0.0/8')
vrf: str = 'VK374'
@dataclass
class vimpelcomIPaddressSSPD:
prefix: IPv4Network = IPv4Network('12.0.0.0/8')
vrf: str = 'SORM_BB'
@dataclass
class vimpelcomIPaddressTEST:
prefix: IPv4Network = IPv4Network('192.168.0.0/16')
vrf: str = 'TEST'