Compare commits
10 Commits
009f007a38
...
338b9e69a7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
338b9e69a7 | ||
|
|
e1eeefd42d | ||
|
|
c30729ec63 | ||
|
|
2bdbad096d | ||
|
|
08eef97ec3 | ||
|
|
8715f2223d | ||
|
|
8bb5ae94df | ||
|
|
2e54749159 | ||
|
|
7f3119a99d | ||
|
|
1eb0ff1eca |
7
app.py
Normal file
7
app.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
from crud import get_devices_filtered, create_vlans
|
||||||
|
from type import vimpelcomSWType
|
||||||
|
|
||||||
|
devices_vimpelcom = get_devices_filtered(vimpelcomSWType)
|
||||||
|
|
||||||
|
vlans_vimpelcom = create_vlans(*devices_vimpelcom)
|
||||||
|
|
||||||
84
crud.py
Normal file
84
crud.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
from init import netbox, oxi
|
||||||
|
from type import vimpelcomIPaddress374, vimpelcomIPaddressSSPD, vimpelcomIPaddressTEST
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from pynetbox.core.response import RecordSet
|
||||||
|
|
||||||
|
logging.basicConfig()
|
||||||
|
log = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
def get_devices_filtered(filters: dict) -> tuple['RecordSet', list]:
|
||||||
|
devices = netbox.dcim.devices.filter(**filters)
|
||||||
|
device_list = [item.name for item in devices]
|
||||||
|
devices = netbox.dcim.devices.filter(**filters)
|
||||||
|
if not os.path.exists(f'devices_{filters.get('tenant')}.json'):
|
||||||
|
with open(f'devices_{filters.get('tenant')}.json', 'w') as file:
|
||||||
|
json.dump(device_list, file, ensure_ascii=False, indent=3)
|
||||||
|
return devices, device_list
|
||||||
|
|
||||||
|
|
||||||
|
def create_interfaces(devices: 'RecordSet', device_list: list):
|
||||||
|
for device in devices:
|
||||||
|
if device.name not in device_list:
|
||||||
|
continue
|
||||||
|
ex_interface = netbox.dcim.interfaces.filter(device=device.name)
|
||||||
|
log.debug("device: %r", device.name)
|
||||||
|
try:
|
||||||
|
oxidized_device = oxi.node(device.name.replace("(1)", ""))
|
||||||
|
except ValueError:
|
||||||
|
log.warning("%r no exist in Oxidized", device.name)
|
||||||
|
continue
|
||||||
|
log.debug("Vlan-interfaces: %r", oxidized_device.config.vlaninterfaces())
|
||||||
|
if oxidized_device.config.vlaninterfaces():
|
||||||
|
for vlan_interface in oxidized_device.config.vlaninterfaces():
|
||||||
|
log.debug("IPaddress: %r", vlan_interface.ip_address)
|
||||||
|
checker = netbox.dcim.interfaces.get(
|
||||||
|
name=vlan_interface.interface,
|
||||||
|
device=device.name
|
||||||
|
)
|
||||||
|
if checker:
|
||||||
|
continue
|
||||||
|
child_interface = netbox.dcim.interfaces.create(
|
||||||
|
name=vlan_interface.interface,
|
||||||
|
type='virtual',
|
||||||
|
device=device.id
|
||||||
|
)
|
||||||
|
print(device)
|
||||||
|
print("description: ", vlan_interface.description)
|
||||||
|
if vlan_interface.description:
|
||||||
|
child_interface.description = vlan_interface.description
|
||||||
|
child_interface.save()
|
||||||
|
print("child_interface: ", child_interface)
|
||||||
|
print("ip address:", vlan_interface.ip_address)
|
||||||
|
if vlan_interface.ip_address:
|
||||||
|
netbox_ip = netbox.ipam.ip_addresses.create(
|
||||||
|
address=str(vlan_interface.ip_address),
|
||||||
|
tenant=device.tenant.id,
|
||||||
|
)
|
||||||
|
if vlan_interface.ip_address in vimpelcomIPaddress374.prefix:
|
||||||
|
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddress374.vrf).id
|
||||||
|
elif vlan_interface.ip_address in vimpelcomIPaddressSSPD.prefix:
|
||||||
|
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddressSSPD.vrf).id
|
||||||
|
else:
|
||||||
|
netbox_ip.vrf = netbox.ipam.vrfs.get(name=vimpelcomIPaddressTEST.vrf).id
|
||||||
|
netbox_ip.assigned_object = child_interface
|
||||||
|
netbox_ip.assigned_object_id = child_interface.id
|
||||||
|
netbox_ip.assigned_object_type = 'dcim.interface'
|
||||||
|
netbox_ip.save()
|
||||||
|
device_list.remove(device.name)
|
||||||
|
with open('devices.json', 'w') as file:
|
||||||
|
json.dump(device_list, file, ensure_ascii=False, indent=3)
|
||||||
|
|
||||||
|
|
||||||
|
def create_vlans(devices: 'RecordSet', device_list: list):
|
||||||
|
for device in devices:
|
||||||
|
print(device.name.replace("(1)", ""))
|
||||||
|
oxidized_device = oxi.node(device.name.replace("(1)", ""))
|
||||||
|
print(oxidized_device.config.l3interfaces())
|
||||||
|
print(oxidized_device.config.vlans())
|
||||||
|
break
|
||||||
14
init.py
Normal file
14
init.py
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
import pynetbox
|
||||||
|
|
||||||
|
from oxi.manager import OxidizedAPI
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
netbox = pynetbox.api(
|
||||||
|
settings.nb_url,
|
||||||
|
token=settings.nb_token)
|
||||||
|
netbox.http_session.verify = False
|
||||||
|
|
||||||
|
oxi = OxidizedAPI(
|
||||||
|
username=settings.oxi_username,
|
||||||
|
password=settings.oxi_password,
|
||||||
|
verify=False)
|
||||||
0
oxi/__init__.py
Normal file
0
oxi/__init__.py
Normal file
@@ -1,10 +1,6 @@
|
|||||||
from .vrp import Vrp
|
from oxi.interface.registry import register_parser, device_registry
|
||||||
from .qtech import Qtech
|
|
||||||
from .mikrotik import Mikrotik
|
|
||||||
from .bdcom import BDcom
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'Vrp',
|
'register_parser',
|
||||||
'Qtech',
|
'device_registry'
|
||||||
'Mikrotik',
|
|
||||||
'BDcom'
|
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
|
import ipaddress
|
||||||
import re
|
import re
|
||||||
from abc import ABC
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from typing import Protocol
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -25,7 +26,7 @@ class ParsedDeviceData:
|
|||||||
vlans: list[Vlan]
|
vlans: list[Vlan]
|
||||||
|
|
||||||
|
|
||||||
class BaseDevice(ABC):
|
class BaseDevice(Protocol):
|
||||||
anchor_pattern: str = '!'
|
anchor_pattern: str = '!'
|
||||||
hostname_pattern: str = 'hostname'
|
hostname_pattern: str = 'hostname'
|
||||||
|
|
||||||
@@ -35,7 +36,7 @@ class BaseDevice(ABC):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def vlan_parse_pattern(self):
|
def vlan_parse_pattern(self):
|
||||||
return rf"^vlan\s+(\d{{1,4}})\n(.*?)(?=^{self.anchor_pattern}|\Z)"
|
return rf"^vlan\s+(\d{{1,4}})\r?\n(.*?)(?=^{self.anchor_pattern}|\Z)"
|
||||||
|
|
||||||
unamed_vlan_splitter: str = ','
|
unamed_vlan_splitter: str = ','
|
||||||
unamed_vlan_counter = '-'
|
unamed_vlan_counter = '-'
|
||||||
@@ -81,7 +82,12 @@ class BaseDevice(ABC):
|
|||||||
ip_address_match = re.search(r'\s?ip address\s(.+)$', interface_block, re.MULTILINE)
|
ip_address_match = re.search(r'\s?ip address\s(.+)$', interface_block, re.MULTILINE)
|
||||||
if not description_match and not ip_address_match:
|
if not description_match and not ip_address_match:
|
||||||
continue
|
continue
|
||||||
ip_address = ip_address_match.group(1) if ip_address_match else None
|
ip_address = ip_address_match.group(1).replace(' ', '/').replace('\r', '') if ip_address_match else None
|
||||||
|
try:
|
||||||
|
if ip_address is not None:
|
||||||
|
ip_address = ipaddress.ip_interface(ip_address)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
description = description_match.group(1) if description_match else None
|
description = description_match.group(1) if description_match else None
|
||||||
interfaces.append(L3Interface(
|
interfaces.append(L3Interface(
|
||||||
interface=interface_name,
|
interface=interface_name,
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
from oxi.interface import Qtech
|
|
||||||
|
|
||||||
|
|
||||||
class BDcom(Qtech):
|
|
||||||
pass
|
|
||||||
7
oxi/interface/models/__init__.py
Normal file
7
oxi/interface/models/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
import importlib
|
||||||
|
import pkgutil
|
||||||
|
|
||||||
|
package = __package__
|
||||||
|
|
||||||
|
for loader, module_name, is_pkg in pkgutil.iter_modules(__path__):
|
||||||
|
importlib.import_module(f"{package}.{module_name}")
|
||||||
7
oxi/interface/models/bdcom.py
Normal file
7
oxi/interface/models/bdcom.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
from oxi.interface import register_parser
|
||||||
|
from oxi.interface.models.qtech import Qtech
|
||||||
|
|
||||||
|
|
||||||
|
@register_parser("BDCOM")
|
||||||
|
class BDcom(Qtech):
|
||||||
|
pass
|
||||||
@@ -1,5 +1,7 @@
|
|||||||
|
from oxi.interface import register_parser
|
||||||
from oxi.interface.base import BaseDevice
|
from oxi.interface.base import BaseDevice
|
||||||
|
|
||||||
|
|
||||||
|
@register_parser("Mikrotik")
|
||||||
class Mikrotik(BaseDevice):
|
class Mikrotik(BaseDevice):
|
||||||
...
|
...
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
import re
|
import re
|
||||||
|
|
||||||
|
from oxi.interface import register_parser
|
||||||
from oxi.interface.base import BaseDevice
|
from oxi.interface.base import BaseDevice
|
||||||
|
|
||||||
|
|
||||||
|
@register_parser("QTECH")
|
||||||
class Qtech(BaseDevice):
|
class Qtech(BaseDevice):
|
||||||
|
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
@@ -12,12 +14,3 @@ class Qtech(BaseDevice):
|
|||||||
pattern = r"Pending configurations.*"
|
pattern = r"Pending configurations.*"
|
||||||
cleaned_text = re.sub(pattern, "", config, flags=re.DOTALL)
|
cleaned_text = re.sub(pattern, "", config, flags=re.DOTALL)
|
||||||
return cleaned_text
|
return cleaned_text
|
||||||
|
|
||||||
|
|
||||||
with open('../../core_switch.txt', 'r') as file:
|
|
||||||
data = file.read()
|
|
||||||
|
|
||||||
result = Qtech(data).parse_config()
|
|
||||||
print(result.vlans)
|
|
||||||
print(result.l3interfaces)
|
|
||||||
print(result.vlaninterfaces)
|
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
import re
|
import re
|
||||||
|
|
||||||
|
from oxi.interface import register_parser
|
||||||
from oxi.interface.base import BaseDevice, Vlan
|
from oxi.interface.base import BaseDevice, Vlan
|
||||||
|
|
||||||
|
|
||||||
|
@register_parser("VRP")
|
||||||
class Vrp(BaseDevice):
|
class Vrp(BaseDevice):
|
||||||
anchor_pattern: str = '#'
|
anchor_pattern: str = '#'
|
||||||
hostname_pattern = 'sysname'
|
hostname_pattern = 'sysname'
|
||||||
@@ -26,14 +28,3 @@ class Vrp(BaseDevice):
|
|||||||
vlans.append(Vlan(vlan=str(tokens[i]), name=None, description=None))
|
vlans.append(Vlan(vlan=str(tokens[i]), name=None, description=None))
|
||||||
i += 1
|
i += 1
|
||||||
return vlans
|
return vlans
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
with open('../../vrp_switch.txt', 'r') as file:
|
|
||||||
data = file.read()
|
|
||||||
|
|
||||||
|
|
||||||
result = Vrp(data).parse_config()
|
|
||||||
print(result.vlans)
|
|
||||||
print(result.l3interfaces)
|
|
||||||
print(result.vlaninterfaces)
|
|
||||||
12
oxi/interface/registry.py
Normal file
12
oxi/interface/registry.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
from typing import Callable, Type
|
||||||
|
|
||||||
|
from oxi.interface.base import BaseDevice
|
||||||
|
|
||||||
|
device_registry = {}
|
||||||
|
|
||||||
|
|
||||||
|
def register_parser(name: str) -> Callable[[Type[BaseDevice]], Type[BaseDevice]]:
|
||||||
|
def wrapper(cls):
|
||||||
|
device_registry[name.lower()] = cls
|
||||||
|
return cls
|
||||||
|
return wrapper
|
||||||
@@ -1,9 +1,13 @@
|
|||||||
import logging
|
import logging
|
||||||
|
from functools import cached_property
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
from oxi.interface.base import BaseDevice
|
||||||
from settings import settings
|
from settings import settings
|
||||||
|
import oxi.interface.models # noqa
|
||||||
|
from oxi.interface import device_registry
|
||||||
|
|
||||||
log = logging.getLogger()
|
log = logging.getLogger()
|
||||||
|
|
||||||
@@ -21,10 +25,7 @@ class OxidizedAPI:
|
|||||||
self._session.verify = verify
|
self._session.verify = verify
|
||||||
if username and password:
|
if username and password:
|
||||||
self._session.auth = (username, password)
|
self._session.auth = (username, password)
|
||||||
|
self.node = Node(self._session, self.base_url)
|
||||||
@property
|
|
||||||
def node(self):
|
|
||||||
return Node(self._session, self.base_url)
|
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
return self
|
||||||
@@ -48,15 +49,18 @@ class Node:
|
|||||||
self._base_url = base_url
|
self._base_url = base_url
|
||||||
self._data = None
|
self._data = None
|
||||||
|
|
||||||
def show(self, name: str) -> 'NodeView':
|
def __call__(self, name: str) -> 'NodeView':
|
||||||
url = f"{self._base_url}/node/show/{name}"
|
url = f"{self._base_url}/node/show/{name}"
|
||||||
if not url.endswith('.json'):
|
if not url.endswith('.json'):
|
||||||
url += '.json'
|
url += '.json'
|
||||||
self._data = self._session.get(url)
|
response = self._session.get(url)
|
||||||
|
if response.status_code == 500:
|
||||||
|
log.warning("Oxidized response: %r , %r not found", response.status_code, url)
|
||||||
|
raise ValueError(f'page {url} not found')
|
||||||
return NodeView(
|
return NodeView(
|
||||||
session=self._session,
|
session=self._session,
|
||||||
base_url=self._base_url,
|
base_url=self._base_url,
|
||||||
data=self._data.json()
|
data=response.json()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -82,28 +86,45 @@ class NodeView:
|
|||||||
def model(self):
|
def model(self):
|
||||||
return self._data.get('model')
|
return self._data.get('model')
|
||||||
|
|
||||||
@property
|
@cached_property
|
||||||
def config(self):
|
def config(self):
|
||||||
return NodeConfig(self._session, self.full_name, self._base_url)
|
return NodeConfig(self._session, self.full_name, self.model, self._base_url)
|
||||||
|
|
||||||
|
|
||||||
class NodeConfig:
|
class NodeConfig:
|
||||||
def __init__(self, session: requests.Session, full_name: str, base_url: str):
|
def __init__(self, session: requests.Session, full_name: str, model: str, base_url: str):
|
||||||
self._session = session
|
self._session = session
|
||||||
self._full_name = full_name
|
self._full_name = full_name
|
||||||
|
self._model = model
|
||||||
self._url = f"{base_url}/node/fetch/{full_name}"
|
self._url = f"{base_url}/node/fetch/{full_name}"
|
||||||
|
self._device: type[BaseDevice] = device_registry.get(self._model.lower())
|
||||||
|
if self._device is None:
|
||||||
|
raise ValueError(f"Device model '{self._model}' not found in registry")
|
||||||
|
self._parsed_data = self._device(self.text).parse_config()
|
||||||
|
|
||||||
def text(self) -> str:
|
@cached_property
|
||||||
return self._session.get(self._url).text
|
def _response(self):
|
||||||
|
log.debug(f"Fetching config from {self._url}")
|
||||||
|
response = self._session.get(self._url)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response
|
||||||
|
|
||||||
def json(self) -> dict:
|
@property
|
||||||
return self._session.get(self._url).json()
|
def text(self):
|
||||||
|
return self._response.text
|
||||||
|
|
||||||
def __str__(self) -> str:
|
@property
|
||||||
return self.text()
|
def json(self):
|
||||||
|
return self._response.json()
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.text
|
||||||
|
|
||||||
oxi = OxidizedAPI(username=settings.oxi_username, password=settings.oxi_password, verify=False)
|
def vlans(self):
|
||||||
print(oxi.node.show('Novok_HOME').config)
|
return self._parsed_data.vlans
|
||||||
mikrotik = oxi.node.show('Novok_HOME').model
|
|
||||||
print(mikrotik.model)
|
def l3interfaces(self):
|
||||||
|
return self._parsed_data.l3interfaces
|
||||||
|
|
||||||
|
def vlaninterfaces(self):
|
||||||
|
return self._parsed_data.vlaninterfaces
|
||||||
|
|||||||
17
oxid.py
Normal file
17
oxid.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
from oxi.manager import OxidizedAPI
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
oxi = OxidizedAPI(username=settings.oxi_username, password=settings.oxi_password, verify=False)
|
||||||
|
|
||||||
|
user_input = input('введите название оборудования: ')
|
||||||
|
result = oxi.node(user_input)
|
||||||
|
|
||||||
|
print("Model:", result.model)
|
||||||
|
print("IP:", result.ip)
|
||||||
|
print("Group:", result.group)
|
||||||
|
print("Full Name:", result.full_name)
|
||||||
|
print("-" * 15)
|
||||||
|
print("Configurations:")
|
||||||
|
print(result.config.vlans())
|
||||||
|
print(result.config.l3interfaces())
|
||||||
|
print(result.config.vlaninterfaces())
|
||||||
18
pynet.py
18
pynet.py
@@ -1,18 +0,0 @@
|
|||||||
import pynetbox
|
|
||||||
|
|
||||||
from settings import settings
|
|
||||||
|
|
||||||
netbox = pynetbox.api(
|
|
||||||
settings.nb_url,
|
|
||||||
token=settings.nb_token)
|
|
||||||
netbox.http_session.verify = False
|
|
||||||
|
|
||||||
filters = {
|
|
||||||
"has_primary_ip": "true",
|
|
||||||
"tenant": "vimpelcom",
|
|
||||||
"role": "Kommutator",
|
|
||||||
}
|
|
||||||
|
|
||||||
devices = netbox.dcim.devices.filter(**filters)
|
|
||||||
for device in devices:
|
|
||||||
print(f"{device.name} (IP: {device.primary_ip})")
|
|
||||||
21
type.py
21
type.py
@@ -1,5 +1,24 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from ipaddress import IPv4Network
|
||||||
|
|
||||||
vimpelcomSWType = {
|
vimpelcomSWType = {
|
||||||
"has_primary_ip": "true",
|
"has_primary_ip": "true",
|
||||||
"tenant": "vimpelcom",
|
"tenant": "vimpelcom",
|
||||||
"role": "Kommutator",
|
"role": "Kommutator",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class vimpelcomIPaddress374:
|
||||||
|
prefix: IPv4Network = IPv4Network('15.0.0.0/8')
|
||||||
|
vrf: str = 'VK374'
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class vimpelcomIPaddressSSPD:
|
||||||
|
prefix: IPv4Network = IPv4Network('12.0.0.0/8')
|
||||||
|
vrf: str = 'SORM_BB'
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class vimpelcomIPaddressTEST:
|
||||||
|
prefix: IPv4Network = IPv4Network('192.168.0.0/16')
|
||||||
|
vrf: str = 'TEST'
|
||||||
|
|||||||
Reference in New Issue
Block a user