Add OxiAdapter for enhanced HTTP request handling in OxiAPI

- Introduced a new `OxiAdapter` class that extends `HTTPAdapter` to manage timeouts and retries for HTTP requests.
- Integrated the `OxiAdapter` into the `OxiAPI` class, setting a default timeout and enabling retry logic for both HTTP and HTTPS requests.
This commit is contained in:
IluaAir
2026-03-26 00:31:13 +03:00
parent a107662e99
commit 8cebbf743a
2 changed files with 27 additions and 2 deletions

21
oxi/adapter.py Normal file
View File

@@ -0,0 +1,21 @@
from typing import Optional
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
class OxiAdapter(HTTPAdapter):
def __init__(
self,
timeout: Optional[int] = None,
max_retries: int = 3,
*args,
**kwargs,
):
self.timeout = timeout
retry = Retry(total=max_retries, backoff_factor=0.3)
super().__init__(*args, max_retries=retry, **kwargs)
def send(self, request, **kwargs):
if kwargs.get("timeout") is None:
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)

View File

@@ -1,9 +1,10 @@
from typing import Optional from typing import Optional
from requests import Session from requests import Session
from oxi.adapter import OxiAdapter
from .node import Node from .node import Node
# TODO: Add custom adapter for Oxi
class OxiAPI: class OxiAPI:
def __init__( def __init__(
self, self,
@@ -14,8 +15,11 @@ class OxiAPI:
): ):
self.base_url = url.rstrip("/") self.base_url = url.rstrip("/")
self._session = Session() self._session = Session()
self._adapter = OxiAdapter(timeout=10, max_retries=3)
self._session.mount("https://", self._adapter)
self._session.mount("http://", self._adapter)
self._session.verify = verify self._session.verify = verify
if username is not None and password is not None: if username and password:
self._session.auth = (username, password) self._session.auth = (username, password)
self.node = Node(self._session, self.base_url) self.node = Node(self._session, self.base_url)