Refactor BaseDevice methods for improved data handling and validation

- Updated the `BaseDevice` class to replace `_raw` with `raw` for consistency in data access.
- Enhanced the `vlans`, `interfaces`, and `system` methods to utilize the new `raw` attribute.
- Introduced a `_validate_contract` method to streamline the validation of parsed data into structured models.
- Adjusted the `Keenetic` and `Mikrotik` models to align with the updated data handling approach, ensuring proper parsing and decoding of interface and VLAN data.
This commit is contained in:
IluaAir
2026-02-22 09:52:17 +03:00
parent 2394296f5b
commit 3635a07b27
4 changed files with 44 additions and 50 deletions

View File

@@ -30,7 +30,7 @@ class NodeConfig:
@property @property
def json(self): def json(self):
return self._response.json() return self._parsed_data.json()
def __str__(self): def __str__(self):
return self.text return self.text

View File

@@ -1,12 +1,9 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from ttp import ttp from ttp import ttp
from oxi.interfaces.contract import Device from oxi.interfaces.contract import Device
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from oxi.interfaces.contract import Interfaces, System, Vlans
if TYPE_CHECKING:
from oxi.interfaces.contract import Interfaces, System, Vlans
class BaseDevice(ABC): class BaseDevice(ABC):
@@ -17,19 +14,19 @@ class BaseDevice(ABC):
self.config: str = config self.config: str = config
self._loaded_template = self._load_template() self._loaded_template = self._load_template()
self._validate_template_groups() self._validate_template_groups()
self._raw: dict = self._run_ttp() self.raw: dict = self._run_ttp()
@property @property
@abstractmethod @abstractmethod
def template(self) -> str: def template(self) -> str:
""" """
:return: Returns:
Название файла с парсером ttp
""" """
@abstractmethod def vlans(self) -> list[dict]:
def vlans(self) -> list["Vlans"]:
""" """
Parse VLAN configuration from self._raw['vlans']. Parse VLAN configuration from self.raw['vlans'].
Expected raw structure: Expected raw structure:
[{"id": 10, "description": "MGMT"}, {"id": 15, "name": "SSH"}, ...] [{"id": 10, "description": "MGMT"}, {"id": 15, "name": "SSH"}, ...]
@@ -39,42 +36,48 @@ class BaseDevice(ABC):
пустой список если секция отсутствует. пустой список если секция отсутствует.
Raises: Raises:
ValueError: если _raw содержит некорректные данные. ValueError: если raw содержит некорректные данные.
""" """
... return self.raw.get("vlans", [])
@abstractmethod def interfaces(self) -> list[dict]:
def interfaces(self) -> list["Interfaces"]:
""" """
Parse Interface configuration from self._raw['interfaces']. Parse Interface configuration from self.raw['interfaces'].
Expected raw structure: Expected raw structure:
[{"name": "GEthernet1/0/1", "ip_address": "192.168.1.1", "mask": "24", "description": "IPBB interface"}] [{"name": "GEthernet1/0/1", "ip_address": "192.168.1.1", "mask": "24", "description": "IPBB interface"}]
Raises:
ValueError: если _raw содержит некорректные данные.
"""
...
@abstractmethod Raises:
def system(self) -> "System": ValueError: если raw содержит некорректные данные.
""" """
Parse System configuration from self._raw['system']. return self.raw.get("interfaces", [])
def system(self) -> dict:
"""
Parse System configuration from self.raw['system'].
Expected raw structure: Expected raw structure:
{"model":"RB951Ui-2nD", serial_number: "B88C0B31117B", "version": "7.12.1"} {"model":"RB951Ui-2nD", serial_number: "B88C0B31117B", "version": "7.12.1"}
Raises: Raises:
ValueError: если _raw содержит некорректные данные. ValueError: если raw содержит некорректные данные.
""" """
... return self.raw.get("system", None)
def _validate_contract(self):
optional_vlans = self.vlans()
if optional_vlans:
optional_vlans = [Vlans(**item) for item in optional_vlans]
return {
"system": System(**self.system()),
"interfaces": [Interfaces(**items) for items in self.interfaces()],
"vlans": optional_vlans,
}
def _load_template(self): def _load_template(self):
"""Подгрузка темплейтов из папки models/templates""" """Подгрузка темплейтов из папки models/templates"""
path = Path(__file__).parent / "models" / "templates" / self.template path = Path(__file__).parent / "models" / "templates" / self.template
if not path.exists(): if not path.exists():
print("-" * 12)
print(path)
raise FileNotFoundError(f"Template {self.template} not found") raise FileNotFoundError(f"Template {self.template} not found")
return path.read_text(encoding="utf-8") return path.read_text(encoding="utf-8")
@@ -97,7 +100,7 @@ class BaseDevice(ABC):
) )
def _run_ttp(self) -> dict: def _run_ttp(self) -> dict:
""" Основной парсер """ """Основной парсер"""
p = ttp(data=self.config, template=self._loaded_template) p = ttp(data=self.config, template=self._loaded_template)
p.parse() p.parse()
raw: dict = p.result()[0][0] raw: dict = p.result()[0][0]
@@ -111,8 +114,4 @@ class BaseDevice(ABC):
return raw return raw
def parse(self) -> Device: def parse(self) -> Device:
return Device( return Device(**self._validate_contract())
system=self.system(),
interfaces=self.interfaces(),
vlans=self.vlans(),
)

View File

@@ -1,17 +1,13 @@
from ipaddress import ip_interface from ipaddress import ip_interface
from pprint import pprint
from oxi.interfaces import register_parser from oxi.interfaces import register_parser
from oxi.interfaces.base import BaseDevice from oxi.interfaces.base import BaseDevice
from oxi.interfaces.contract import Interfaces, System, Vlans from oxi.interfaces.contract import Interfaces, Vlans
@register_parser(["NDMS", "keenetic", "KeeneticOS"]) @register_parser(["NDMS", "keenetic", "KeeneticOS"])
class Keenetic(BaseDevice): class Keenetic(BaseDevice):
template = "keenetic.ttp" template = "keenetic.ttp"
def system(self):
return System(**self._raw["system"])
def _decode_utf(self, text: str): def _decode_utf(self, text: str):
if "\\x" in text: if "\\x" in text:
desc = text.strip('"') desc = text.strip('"')
@@ -25,7 +21,7 @@ class Keenetic(BaseDevice):
return text return text
def interfaces(self): def interfaces(self):
interfaces: list[dict] = self._raw["interfaces"] interfaces: list[dict] = self.raw["interfaces"]
for item in interfaces: for item in interfaces:
if item.get("ip_address") and item.get("netmask"): if item.get("ip_address") and item.get("netmask"):
ipaddress = ip_interface( ipaddress = ip_interface(
@@ -36,15 +32,15 @@ class Keenetic(BaseDevice):
if item.get("description"): if item.get("description"):
decoded = self._decode_utf(item.get("description", "")) decoded = self._decode_utf(item.get("description", ""))
item["description"] = decoded item["description"] = decoded
return [Interfaces(**item) for item in interfaces] return interfaces
def vlans(self): def vlans(self):
vlans = self._raw["vlans"] vlans = self.raw["vlans"]
for item in vlans: for item in vlans:
if item.get("description"): if item.get("description"):
decoded = self._decode_utf(item.get("description", "")) decoded = self._decode_utf(item.get("description", ""))
item["description"] = decoded item["description"] = decoded
return [Vlans(**item) for item in vlans] return vlans
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,22 +1,21 @@
import os import os
from oxi.interfaces import register_parser from oxi.interfaces import register_parser
from oxi.interfaces.base import BaseDevice from oxi.interfaces.base import BaseDevice
from oxi.interfaces.contract import Interfaces, System, Vlans
@register_parser(["routeros", "ros", "mikrotik"]) @register_parser(["routeros", "ros", "mikrotik"])
class Mikrotik(BaseDevice): class Mikrotik(BaseDevice):
template = "mikrotik.ttp" template = "mikrotik.ttp"
def system(self) -> "System": # def system(self) -> "System":
systems = self._raw.get("system") # systems = self._raw.get("system")
return System(**systems) # return System(**systems)
def interfaces(self) -> "Interfaces": # def interfaces(self) -> "Interfaces":
return [Interfaces(**item) for item in self._raw.get("interfaces")] # return [Interfaces(**item) for item in self._raw.get("interfaces")]
def vlans(self) -> list["Vlans"]: # def vlans(self) -> list["Vlans"]:
return [Vlans(**item) for item in self._raw.get("vlans")] # return [Vlans(**item) for item in self._raw.get("vlans")]
if __name__ == "__main__": if __name__ == "__main__":