add oxiApi

This commit is contained in:
IluaAir
2025-06-25 10:32:51 +03:00
parent 009f007a38
commit 1eb0ff1eca
11 changed files with 129 additions and 33 deletions

0
oxi/__init__.py Normal file
View File

View File

@@ -1,10 +1,6 @@
from .vrp import Vrp from oxi.interface.registry import register_parser, device_registry
from .qtech import Qtech
from .mikrotik import Mikrotik
from .bdcom import BDcom
__all__ = [ __all__ = [
'Vrp', 'register_parser',
'Qtech', 'device_registry'
'Mikrotik',
'BDcom'
] ]

View File

@@ -1,6 +1,6 @@
import re import re
from abc import ABC
from dataclasses import dataclass from dataclasses import dataclass
from typing import Protocol
@dataclass @dataclass
@@ -25,7 +25,7 @@ class ParsedDeviceData:
vlans: list[Vlan] vlans: list[Vlan]
class BaseDevice(ABC): class BaseDevice(Protocol):
anchor_pattern: str = '!' anchor_pattern: str = '!'
hostname_pattern: str = 'hostname' hostname_pattern: str = 'hostname'
@@ -81,7 +81,7 @@ class BaseDevice(ABC):
ip_address_match = re.search(r'\s?ip address\s(.+)$', interface_block, re.MULTILINE) ip_address_match = re.search(r'\s?ip address\s(.+)$', interface_block, re.MULTILINE)
if not description_match and not ip_address_match: if not description_match and not ip_address_match:
continue continue
ip_address = ip_address_match.group(1) if ip_address_match else None ip_address = ip_address_match.group(1).replace(' ', '/') if ip_address_match else None
description = description_match.group(1) if description_match else None description = description_match.group(1) if description_match else None
interfaces.append(L3Interface( interfaces.append(L3Interface(
interface=interface_name, interface=interface_name,

View File

@@ -0,0 +1,7 @@
import importlib
import pkgutil
package = __package__
for loader, module_name, is_pkg in pkgutil.iter_modules(__path__):
importlib.import_module(f"{package}.{module_name}")

View File

@@ -0,0 +1,7 @@
from oxi.interface import register_parser
from oxi.interface.models.qtech import Qtech
@register_parser("BDCOM")
class BDcom(Qtech):
pass

View File

@@ -0,0 +1,7 @@
from oxi.interface import register_parser
from oxi.interface.base import BaseDevice
@register_parser("Mikrotik")
class Mikrotik(BaseDevice):
...

View File

@@ -0,0 +1,16 @@
import re
from oxi.interface import register_parser
from oxi.interface.base import BaseDevice
@register_parser("QTECH")
class Qtech(BaseDevice):
def __init__(self, config):
self.config: str = self._fix_config(config)
def _fix_config(self, config):
pattern = r"Pending configurations.*"
cleaned_text = re.sub(pattern, "", config, flags=re.DOTALL)
return cleaned_text

View File

@@ -0,0 +1,30 @@
import re
from oxi.interface import register_parser
from oxi.interface.base import BaseDevice, Vlan
@register_parser("VRP")
class Vrp(BaseDevice):
anchor_pattern: str = '#'
hostname_pattern = 'sysname'
unamed_vlan_splitter = ' '
unamed_vlan_counter = 'to'
def _parse_unamed_vlans(self) -> list[Vlan]:
vlans = []
pattern = self.unamed_vlans_parse_pattern
for match in re.finditer(pattern, self.config, re.MULTILINE):
tokens = match.group(1).split(self.unamed_vlan_splitter)
i = 0
while i < len(tokens):
if i + 2 < len(tokens) and tokens[i + 1].lower() == 'to':
start = int(tokens[i])
end = int(tokens[i + 2])
for vlan_id in range(start, end + 1):
vlans.append(Vlan(vlan=str(vlan_id), name=None, description=None))
i += 3 # пропустить X, 'to', Y
else:
vlans.append(Vlan(vlan=str(tokens[i]), name=None, description=None))
i += 1
return vlans

12
oxi/interface/registry.py Normal file
View File

@@ -0,0 +1,12 @@
from typing import Callable, Type
from oxi.interface.base import BaseDevice
device_registry = {}
def register_parser(name: str) -> Callable[[Type[BaseDevice]], Type[BaseDevice]]:
def wrapper(cls):
device_registry[name.lower()] = cls
return cls
return wrapper

View File

@@ -1,9 +1,13 @@
import logging import logging
from functools import cached_property
import requests import requests
from typing import Optional from typing import Optional
from oxi.interface.base import BaseDevice
from settings import settings from settings import settings
import oxi.interface.models # noqa
from oxi.interface import device_registry
log = logging.getLogger() log = logging.getLogger()
@@ -21,10 +25,7 @@ class OxidizedAPI:
self._session.verify = verify self._session.verify = verify
if username and password: if username and password:
self._session.auth = (username, password) self._session.auth = (username, password)
self.node = Node(self._session, self.base_url)
@property
def node(self):
return Node(self._session, self.base_url)
def __enter__(self): def __enter__(self):
return self return self
@@ -48,15 +49,15 @@ class Node:
self._base_url = base_url self._base_url = base_url
self._data = None self._data = None
def show(self, name: str) -> 'NodeView': def __call__(self, name: str) -> 'NodeView':
url = f"{self._base_url}/node/show/{name}" url = f"{self._base_url}/node/show/{name}"
if not url.endswith('.json'): if not url.endswith('.json'):
url += '.json' url += '.json'
self._data = self._session.get(url) response = self._session.get(url)
return NodeView( return NodeView(
session=self._session, session=self._session,
base_url=self._base_url, base_url=self._base_url,
data=self._data.json() data=response.json()
) )
@@ -84,26 +85,43 @@ class NodeView:
@property @property
def config(self): def config(self):
return NodeConfig(self._session, self.full_name, self._base_url) return NodeConfig(self._session, self.full_name, self.model, self._base_url)
class NodeConfig: class NodeConfig:
def __init__(self, session: requests.Session, full_name: str, base_url: str): def __init__(self, session: requests.Session, full_name: str, model: str, base_url: str):
self._session = session self._session = session
self._full_name = full_name self._full_name = full_name
self._model = model
self._url = f"{base_url}/node/fetch/{full_name}" self._url = f"{base_url}/node/fetch/{full_name}"
self._device: type[BaseDevice] = device_registry.get(self._model.lower())
if self._device is None:
raise ValueError(f"Device model '{self._model}' not found in registry")
self._data = None
def text(self) -> str: @cached_property
return self._session.get(self._url).text def _response(self):
log.debug(f"Fetching config from {self._url}")
response = self._session.get(self._url)
response.raise_for_status()
return response
def json(self) -> dict: @property
return self._session.get(self._url).json() def text(self):
return self._response.text
def __str__(self) -> str: @property
return self.text() def json(self):
return self._response.json()
def __str__(self):
return self.text
oxi = OxidizedAPI(username=settings.oxi_username, password=settings.oxi_password, verify=False) def vlans(self):
print(oxi.node.show('Novok_HOME').config) return self._device(self.text).parse_config().vlans
mikrotik = oxi.node.show('Novok_HOME').model
print(mikrotik.model) def l3interfaces(self):
return self._device(self.text).parse_config().l3interfaces
def vlaninterfaces(self):
return self._device(self.text).parse_config().vlaninterfaces

View File

@@ -1,7 +1,9 @@
import pynetbox import pynetbox
from oxi.manager import OxidizedAPI
from settings import settings from settings import settings
netbox = pynetbox.api( netbox = pynetbox.api(
settings.nb_url, settings.nb_url,
token=settings.nb_token) token=settings.nb_token)
@@ -13,6 +15,7 @@ filters = {
"role": "Kommutator", "role": "Kommutator",
} }
devices = netbox.dcim.devices.filter(**filters)
for device in devices: # devices = netbox.dcim.devices.filter(**filters)
print(f"{device.name} (IP: {device.primary_ip})") # for device in devices:
# print(f"{device.name} (IP: {device.primary_ip})")