From d5428044d17ab405989fde36339bff2b2c56fc64 Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 17 Jun 2025 15:47:44 +0300 Subject: [PATCH] init --- .gitignore | 3 + oxi/models/__init__.py | 9 +++ oxi/models/base.py | 131 +++++++++++++++++++++++++++++++++++++++++ oxi/models/mikrotik.py | 5 ++ oxi/models/qtech.py | 23 ++++++++ oxi/models/vrp.py | 39 ++++++++++++ requirements.txt | Bin 0 -> 256 bytes 7 files changed, 210 insertions(+) create mode 100644 .gitignore create mode 100644 oxi/models/__init__.py create mode 100644 oxi/models/base.py create mode 100644 oxi/models/mikrotik.py create mode 100644 oxi/models/qtech.py create mode 100644 oxi/models/vrp.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8d53c15 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/.env +/.idea +/.venv diff --git a/oxi/models/__init__.py b/oxi/models/__init__.py new file mode 100644 index 0000000..31364fe --- /dev/null +++ b/oxi/models/__init__.py @@ -0,0 +1,9 @@ +from .vrp import Vrp +from .qtech import Qtech +from .mikrotik import Mikrotik + +__all__ = [ + 'Vrp', + 'Qtech', + 'Mikrotik' +] diff --git a/oxi/models/base.py b/oxi/models/base.py new file mode 100644 index 0000000..26dbfc8 --- /dev/null +++ b/oxi/models/base.py @@ -0,0 +1,131 @@ +import re +from abc import ABC +from dataclasses import dataclass + + +@dataclass +class L3Interface: + interface: str + description: str | None = None + ip_address: str | None = None + + +@dataclass +class Vlan: + vlan: int + name: str | None + description: str | None + + +@dataclass +class ParsedDeviceData: + hostname: str + l3interfaces: list[L3Interface] + vlaninterfaces: list[L3Interface] + vlans: list[Vlan] + + +class BaseDevice(ABC): + anchor_pattern: str = '!' + hostname_pattern: str = 'hostname' + + @property + def l3interface_parse_pattern(self): + return rf"^interface([^\n]*)\n(.*?)(?=^{self.anchor_pattern})" + + @property + def vlan_parse_pattern(self): + return rf"^vlan\s+(\d{{1,4}})\n(.*?)(?=^{self.anchor_pattern}|\Z)" + + unamed_vlan_splitter: str = ',' + unamed_vlan_counter = '-' + unamed_vlans_parse_pattern = r"^vlan\s+(?:\w+\s+)?([\d,-]*[ ,][\d (to),-]*)$" + + def __init__(self, config): + self.config: str = config + + def parse_config(self) -> ParsedDeviceData: + """Парсит конфигурацию и возвращает структурированные данные.""" + hostname = self._parse_hostname() + l3interfaces = self._parse_l3_interfaces() + vlaninterfaces = self._parse_vlan_interfaces(l3interfaces) + vlans = self._parse_vlans() + unamed_vlans = self._parse_unamed_vlans() + vlan_map: dict[int, Vlan] = {} + for v in unamed_vlans: + vlan_map[v.vlan] = v + for v in vlans: + vlan_map[v.vlan] = v + vlans = list(vlan_map.values()) + return ParsedDeviceData( + hostname=hostname, + l3interfaces=l3interfaces, + vlaninterfaces=vlaninterfaces, + vlans=vlans + ) + + def _parse_hostname(self) -> str: + """Извлекает hostname из конфигурации.""" + pattern = self.hostname_pattern + match = re.search(rf"^{pattern}\s+(\S+)", self.config, re.MULTILINE) + return match.group(1) if match else "unknown" + + def _parse_l3_interfaces(self) -> list[L3Interface]: + """Парсит L3 интерфейсы""" + interfaces = [] + pattern = self.l3interface_parse_pattern + for match in re.finditer(pattern, self.config, re.MULTILINE | re.DOTALL): + interface_name = match.group(1).replace(' ', '') + interface_block = match.group(2) + description_match = re.search(r'\s?description\s(.+)$', interface_block, re.MULTILINE) + ip_address_match = re.search(r'\s?ip address\s(.+)$', interface_block, re.MULTILINE) + if not description_match and not ip_address_match: + continue + ip_address = ip_address_match.group(1) if ip_address_match else None + description = description_match.group(1) if description_match else None + interfaces.append(L3Interface( + interface=interface_name, + description=description, + ip_address=ip_address + )) + return interfaces + + @staticmethod + def _parse_vlan_interfaces(l3_interfaces: list[L3Interface]): + '''Парсит Vlan интерфейсы от уже собранный l3 интерфейсов''' + interfaces = [] + for interface in l3_interfaces: + if interface.interface.lower().startswith('vlan'): + interfaces.append(interface) + return interfaces + + def _parse_vlans(self): + '''Парсит Vlan с naming''' + vlans = [] + pattern = self.vlan_parse_pattern + for match in re.finditer(pattern, self.config, re.MULTILINE | re.DOTALL): + if match: + vlan = match.group(1) + vlan_block = match.group(2) + description_match = re.search(r'\s?description\s(.+)$', vlan_block, re.MULTILINE) + name_match = re.search(r'\s?name\s(.+)$', vlan_block, re.MULTILINE) + description = description_match.group(1) if description_match else None + name = name_match.group(1) if name_match else None + vlans.append(Vlan(vlan=vlan, description=description, name=name)) + return vlans + + def _parse_unamed_vlans(self) -> list[Vlan]: + '''Парсит строчку с перечислением vlan'ов''' + vlans = [] + pattern = self.unamed_vlans_parse_pattern + for match in re.finditer(pattern, self.config, re.MULTILINE): + if match: + _iter = match.group(1).split(self.unamed_vlan_splitter) + for part in _iter: + if self.unamed_vlan_counter in part: + start, end = map(int, part.split(self.unamed_vlan_counter)) + for vlan_id in range(start, end + 1): + vlans.append(Vlan(vlan=str(vlan_id), name=None, description=None)) + else: + vlans.append(Vlan(vlan=str(part), name=None, description=None)) + return vlans diff --git a/oxi/models/mikrotik.py b/oxi/models/mikrotik.py new file mode 100644 index 0000000..ac5b820 --- /dev/null +++ b/oxi/models/mikrotik.py @@ -0,0 +1,5 @@ +from oxi.models.base import BaseDevice + + +class Mikrotik(BaseDevice): + ... diff --git a/oxi/models/qtech.py b/oxi/models/qtech.py new file mode 100644 index 0000000..92ec4fb --- /dev/null +++ b/oxi/models/qtech.py @@ -0,0 +1,23 @@ +import re + +from oxi.models.base import BaseDevice + + +class Qtech(BaseDevice): + + def __init__(self, config): + self.config: str = self._fix_config(config) + + def _fix_config(self, config): + pattern = r"Pending configurations.*" + cleaned_text = re.sub(pattern, "", config, flags=re.DOTALL) + return cleaned_text + + +with open('../../core_switch.txt', 'r') as file: + data = file.read() + +result = Qtech(data).parse_config() +print(result.vlans) +print(result.l3interfaces) +print(result.vlaninterfaces) diff --git a/oxi/models/vrp.py b/oxi/models/vrp.py new file mode 100644 index 0000000..0667c8e --- /dev/null +++ b/oxi/models/vrp.py @@ -0,0 +1,39 @@ +import re + +from oxi.models.base import BaseDevice, Vlan + + +class Vrp(BaseDevice): + anchor_pattern: str = '#' + hostname_pattern = 'sysname' + unamed_vlan_splitter = ' ' + unamed_vlan_counter = 'to' + + def _parse_unamed_vlans(self) -> list[Vlan]: + vlans = [] + pattern = self.unamed_vlans_parse_pattern + for match in re.finditer(pattern, self.config, re.MULTILINE): + tokens = match.group(1).split(self.unamed_vlan_splitter) + i = 0 + while i < len(tokens): + if i + 2 < len(tokens) and tokens[i + 1].lower() == 'to': + start = int(tokens[i]) + end = int(tokens[i + 2]) + for vlan_id in range(start, end + 1): + vlans.append(Vlan(vlan=str(vlan_id), name=None, description=None)) + i += 3 # пропустить X, 'to', Y + else: + vlans.append(Vlan(vlan=str(tokens[i]), name=None, description=None)) + i += 1 + return vlans + + + +with open('../../vrp_switch.txt', 'r') as file: + data = file.read() + + +result = Vrp(data).parse_config() +print(result.vlans) +print(result.l3interfaces) +print(result.vlaninterfaces) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..922dbe42e481a0ccc5f00608c29536d0020ee5c1 GIT binary patch literal 256 zcmYL^Sq_3g5Jcv45&mzWgtNhufCpf3F%Izx;9=w9!-(c^#F*4Dfr+)at_P4A!7~L>E&pe>zjuDq-f)*DedZ4ZnhUo!AFBEjuM! t>X&f`x2%GsG2XXa|BR07O#?f?3Ll4z1}=86oPmx_Y*>J&jPzeA