Files
netbox-audit/oxi/manager.py
2025-06-24 11:37:45 +03:00

109 lines
2.8 KiB
Python

import logging
import requests
from typing import Optional
from settings import settings
log = logging.getLogger()
class OxidizedAPI:
def __init__(
self,
session: Optional[requests.Session] = None,
username: Optional[str] = None,
password: Optional[str] = None,
verify: bool = True
):
self.base_url = settings.oxi_url.rstrip('/')
self._session = session or requests.Session()
self._session.verify = verify
if username and password:
self._session.auth = (username, password)
@property
def node(self):
return Node(self._session, self.base_url)
def __enter__(self):
return self
def __exit__(self, *args):
self._session.close()
def get(self, endpoint: str, **kwargs) -> dict:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
if not url.endswith('.json'):
url += '.json'
result = self._session.get(url, **kwargs)
if result.status_code == 500:
raise ValueError(f'page {url} not found')
return result.json()
class Node:
def __init__(self, session: requests.Session, base_url: str):
self._session = session
self._base_url = base_url
self._data = None
def show(self, name: str) -> 'NodeView':
url = f"{self._base_url}/node/show/{name}"
if not url.endswith('.json'):
url += '.json'
self._data = self._session.get(url)
return NodeView(
session=self._session,
base_url=self._base_url,
data=self._data.json()
)
class NodeView:
def __init__(self, session: requests.Session, base_url: str, data: dict):
self._session = session
self._base_url = base_url
self._data = data
@property
def ip(self):
return self._data.get('ip')
@property
def full_name(self):
return self._data.get('full_name')
@property
def group(self):
return self._data.get('group')
@property
def model(self):
return self._data.get('model')
@property
def config(self):
return NodeConfig(self._session, self.full_name, self._base_url)
class NodeConfig:
def __init__(self, session: requests.Session, full_name: str, base_url: str):
self._session = session
self._full_name = full_name
self._url = f"{base_url}/node/fetch/{full_name}"
def text(self) -> str:
return self._session.get(self._url).text
def json(self) -> dict:
return self._session.get(self._url).json()
def __str__(self) -> str:
return self.text()
oxi = OxidizedAPI(username=settings.oxi_username, password=settings.oxi_password, verify=False)
print(oxi.node.show('Novok_HOME').config)
mikrotik = oxi.node.show('Novok_HOME').model
print(mikrotik.model)