File: //opt/imunify360/venv/lib/python3.11/site-packages/imav/api/cleanup_revert.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import logging
from dataclasses import dataclass, fields
from typing import Any, Self
from urllib.parse import urljoin
from urllib.request import Request
from defence360agent.api.server import API, APIError
from defence360agent.internals.iaid import (
IAIDTokenError,
IndependentAgentIDAPI,
)
logger = logging.getLogger(__name__)
@dataclass(frozen=True, eq=True)
class RemoteRevertHitInfo:
sig_id: str
app_root_path: str
app_name: str
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Self:
allowed_keys = {f.name for f in fields(cls)}
filtered_data = {k: v for k, v in data.items() if k in allowed_keys}
return cls(**filtered_data)
class CleanupRevertAPI(API):
URL = urljoin(API._BASE_URL, "/api/cleanup/revert")
@classmethod
async def paths(cls) -> tuple[list[str], list[RemoteRevertHitInfo]]:
try:
token = await IndependentAgentIDAPI.get_token()
except IAIDTokenError:
return ([], [])
request = Request(cls.URL, headers={"X-Auth": token})
try:
result = await cls.async_request(request)
except APIError as exc:
logger.warning("Failed to fetch cleanup revert data: %s", exc)
return ([], [])
db_reverts = [
RemoteRevertHitInfo(**db_revert)
for db_revert in (
result["dbs"]
if "dbs" in result and isinstance(result["dbs"], list)
else []
)
]
file_reverts = result["paths"] if "paths" in result else []
logger.info(
"Cleanup revert data fetched: %d files, %d databases",
len(file_reverts),
len(db_reverts),
)
return (file_reverts, db_reverts)