This commit is contained in:
ilya
2025-06-17 15:47:44 +03:00
commit d5428044d1
7 changed files with 210 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/.env
/.idea
/.venv

9
oxi/models/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
from .vrp import Vrp
from .qtech import Qtech
from .mikrotik import Mikrotik
__all__ = [
'Vrp',
'Qtech',
'Mikrotik'
]

131
oxi/models/base.py Normal file
View File

@@ -0,0 +1,131 @@
import re
from abc import ABC
from dataclasses import dataclass
@dataclass
class L3Interface:
interface: str
description: str | None = None
ip_address: str | None = None
@dataclass
class Vlan:
vlan: int
name: str | None
description: str | None
@dataclass
class ParsedDeviceData:
hostname: str
l3interfaces: list[L3Interface]
vlaninterfaces: list[L3Interface]
vlans: list[Vlan]
class BaseDevice(ABC):
anchor_pattern: str = '!'
hostname_pattern: str = 'hostname'
@property
def l3interface_parse_pattern(self):
return rf"^interface([^\n]*)\n(.*?)(?=^{self.anchor_pattern})"
@property
def vlan_parse_pattern(self):
return rf"^vlan\s+(\d{{1,4}})\n(.*?)(?=^{self.anchor_pattern}|\Z)"
unamed_vlan_splitter: str = ','
unamed_vlan_counter = '-'
unamed_vlans_parse_pattern = r"^vlan\s+(?:\w+\s+)?([\d,-]*[ ,][\d (to),-]*)$"
def __init__(self, config):
self.config: str = config
def parse_config(self) -> ParsedDeviceData:
"""Парсит конфигурацию и возвращает структурированные данные."""
hostname = self._parse_hostname()
l3interfaces = self._parse_l3_interfaces()
vlaninterfaces = self._parse_vlan_interfaces(l3interfaces)
vlans = self._parse_vlans()
unamed_vlans = self._parse_unamed_vlans()
vlan_map: dict[int, Vlan] = {}
for v in unamed_vlans:
vlan_map[v.vlan] = v
for v in vlans:
vlan_map[v.vlan] = v
vlans = list(vlan_map.values())
return ParsedDeviceData(
hostname=hostname,
l3interfaces=l3interfaces,
vlaninterfaces=vlaninterfaces,
vlans=vlans
)
def _parse_hostname(self) -> str:
"""Извлекает hostname из конфигурации."""
pattern = self.hostname_pattern
match = re.search(rf"^{pattern}\s+(\S+)", self.config, re.MULTILINE)
return match.group(1) if match else "unknown"
def _parse_l3_interfaces(self) -> list[L3Interface]:
"""Парсит L3 интерфейсы"""
interfaces = []
pattern = self.l3interface_parse_pattern
for match in re.finditer(pattern, self.config, re.MULTILINE | re.DOTALL):
interface_name = match.group(1).replace(' ', '')
interface_block = match.group(2)
description_match = re.search(r'\s?description\s(.+)$', interface_block, re.MULTILINE)
ip_address_match = re.search(r'\s?ip address\s(.+)$', interface_block, re.MULTILINE)
if not description_match and not ip_address_match:
continue
ip_address = ip_address_match.group(1) if ip_address_match else None
description = description_match.group(1) if description_match else None
interfaces.append(L3Interface(
interface=interface_name,
description=description,
ip_address=ip_address
))
return interfaces
@staticmethod
def _parse_vlan_interfaces(l3_interfaces: list[L3Interface]):
'''Парсит Vlan интерфейсы от уже собранный l3 интерфейсов'''
interfaces = []
for interface in l3_interfaces:
if interface.interface.lower().startswith('vlan'):
interfaces.append(interface)
return interfaces
def _parse_vlans(self):
'''Парсит Vlan с naming'''
vlans = []
pattern = self.vlan_parse_pattern
for match in re.finditer(pattern, self.config, re.MULTILINE | re.DOTALL):
if match:
vlan = match.group(1)
vlan_block = match.group(2)
description_match = re.search(r'\s?description\s(.+)$', vlan_block, re.MULTILINE)
name_match = re.search(r'\s?name\s(.+)$', vlan_block, re.MULTILINE)
description = description_match.group(1) if description_match else None
name = name_match.group(1) if name_match else None
vlans.append(Vlan(vlan=vlan, description=description, name=name))
return vlans
def _parse_unamed_vlans(self) -> list[Vlan]:
'''Парсит строчку с перечислением vlan'ов'''
vlans = []
pattern = self.unamed_vlans_parse_pattern
for match in re.finditer(pattern, self.config, re.MULTILINE):
if match:
_iter = match.group(1).split(self.unamed_vlan_splitter)
for part in _iter:
if self.unamed_vlan_counter in part:
start, end = map(int, part.split(self.unamed_vlan_counter))
for vlan_id in range(start, end + 1):
vlans.append(Vlan(vlan=str(vlan_id), name=None, description=None))
else:
vlans.append(Vlan(vlan=str(part), name=None, description=None))
return vlans

5
oxi/models/mikrotik.py Normal file
View File

@@ -0,0 +1,5 @@
from oxi.models.base import BaseDevice
class Mikrotik(BaseDevice):
...

23
oxi/models/qtech.py Normal file
View File

@@ -0,0 +1,23 @@
import re
from oxi.models.base import BaseDevice
class Qtech(BaseDevice):
def __init__(self, config):
self.config: str = self._fix_config(config)
def _fix_config(self, config):
pattern = r"Pending configurations.*"
cleaned_text = re.sub(pattern, "", config, flags=re.DOTALL)
return cleaned_text
with open('../../core_switch.txt', 'r') as file:
data = file.read()
result = Qtech(data).parse_config()
print(result.vlans)
print(result.l3interfaces)
print(result.vlaninterfaces)

39
oxi/models/vrp.py Normal file
View File

@@ -0,0 +1,39 @@
import re
from oxi.models.base import BaseDevice, Vlan
class Vrp(BaseDevice):
anchor_pattern: str = '#'
hostname_pattern = 'sysname'
unamed_vlan_splitter = ' '
unamed_vlan_counter = 'to'
def _parse_unamed_vlans(self) -> list[Vlan]:
vlans = []
pattern = self.unamed_vlans_parse_pattern
for match in re.finditer(pattern, self.config, re.MULTILINE):
tokens = match.group(1).split(self.unamed_vlan_splitter)
i = 0
while i < len(tokens):
if i + 2 < len(tokens) and tokens[i + 1].lower() == 'to':
start = int(tokens[i])
end = int(tokens[i + 2])
for vlan_id in range(start, end + 1):
vlans.append(Vlan(vlan=str(vlan_id), name=None, description=None))
i += 3 # пропустить X, 'to', Y
else:
vlans.append(Vlan(vlan=str(tokens[i]), name=None, description=None))
i += 1
return vlans
with open('../../vrp_switch.txt', 'r') as file:
data = file.read()
result = Vrp(data).parse_config()
print(result.vlans)
print(result.l3interfaces)
print(result.vlaninterfaces)

BIN
requirements.txt Normal file

Binary file not shown.