add oxi manager
This commit is contained in:
33
oxi/manager.py
Normal file
33
oxi/manager.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import requests
|
||||
from typing import Optional
|
||||
from settings import settings
|
||||
|
||||
|
||||
class OxiManager:
|
||||
def __init__(
|
||||
self,
|
||||
session: Optional[requests.Session] = None
|
||||
):
|
||||
self.base_url = settings.oxi_url
|
||||
self._session = session or requests.Session()
|
||||
self._session.auth = (settings.oxi_username, settings.oxi_password)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self._session.close()
|
||||
|
||||
def get(self, endpoint: str, **kwargs) -> requests.Response:
|
||||
url = f"{self.base_url}/{endpoint.lstrip('/')}"
|
||||
if not url.endswith('.json'):
|
||||
url += '.json'
|
||||
result = self._session.get(url, **kwargs)
|
||||
if result.status_code == 500:
|
||||
raise ValueError(f'page {url} not found')
|
||||
return result.json()
|
||||
|
||||
|
||||
oxi = OxiManager()
|
||||
oxi.get('node/show/AKD-MSK30-AGG_S-01')
|
||||
|
||||
Reference in New Issue
Block a user