D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
proc
/
self
/
root
/
opt
/
imunify360
/
venv
/
share
/
imunify360
/
scripts
/
Filename :
remote_iplist.py
back
Copy
#!/opt/imunify360/venv/bin/python3 """ Usage: /opt/imunify360/venv/bin/python3 remote_iplist.py """ import argparse import asyncio import json import os import sys import pickle from logging import getLogger from pathlib import Path from typing import Optional from defence360agent.contracts.config import Model from defence360agent.internals import logger as lg from defence360agent.model import instance, tls_check from defence360agent.utils.benchmark import Benchmark from im360.api.server.remote_iplist import IpListAPIRequestException from im360.contracts.config import IPSET_LISTS_PATH from im360.files import WHITELISTS, Index from im360.internals.core import ip_versions from im360.plugins.resident.remote_iplist import RemoteIPListPlugin logger = getLogger("remote-iplist") REMOTE_IPLIST_RESPONSE = Path("/var/imunify360/.remote_iplist.response") TIMOUT_PERSIST_FILE = Path("/var/imunify360/.remote_iplist_timeout.lock") MINUTE_TIMEOUT = 60 TEN_MINUTES_TIMEOUT = MINUTE_TIMEOUT * 10 HOUR_TIMEOUT = MINUTE_TIMEOUT * 60 ONE_DAY_TIMEOUT = HOUR_TIMEOUT * 24 TIMEOUT_RANGE = [MINUTE_TIMEOUT, TEN_MINUTES_TIMEOUT, HOUR_TIMEOUT, ONE_DAY_TIMEOUT] class RemoteIPListResponse: def __init__(self, sync_point, delay, responses): self.sync_point = sync_point self.delay = delay self.responses = responses def unpack(self): return self.sync_point, self.delay, self.responses def save_timeout(timeout: float): """Save last timeout value.""" if not TIMOUT_PERSIST_FILE.exists(): TIMOUT_PERSIST_FILE.parent.mkdir(parents=True, exist_ok=True) TIMOUT_PERSIST_FILE.write_text(str(timeout)) def get_timeout(): """Get last timeout value.""" if not TIMOUT_PERSIST_FILE.exists(): TIMOUT_PERSIST_FILE.parent.mkdir(parents=True, exist_ok=True) TIMOUT_PERSIST_FILE.write_text(str(MINUTE_TIMEOUT)) return MINUTE_TIMEOUT return int(TIMOUT_PERSIST_FILE.read_text()) def reset_timeout(): current_timeout = TIMOUT_PERSIST_FILE.read_text() logger.debug(f"Remote IP lists API request timeout reset from {current_timeout} to {MINUTE_TIMEOUT}" f"seconds") save_timeout(MINUTE_TIMEOUT) return MINUTE_TIMEOUT def increase_timeout(): current_timeout = get_timeout() if (current_timeout == ONE_DAY_TIMEOUT or current_timeout not in TIMEOUT_RANGE): return current_timeout value_index = TIMEOUT_RANGE.index(current_timeout) + 1 next_value = TIMEOUT_RANGE[value_index] save_timeout(next_value) return next_value def reset_timeout(): save_timeout(MINUTE_TIMEOUT) def save_state(rips: RemoteIPListResponse): """Save RealProtector state.""" pickle.dump(rips, REMOTE_IPLIST_RESPONSE.open("wb")) def restore_state() -> Optional[RemoteIPListResponse]: """Restore RealProtector state.""" if REMOTE_IPLIST_RESPONSE.exists(): try: return pickle.load(REMOTE_IPLIST_RESPONSE.open("rb")) except Exception as e: logger.error("Failed to restore remote-ipslist response: %s", e) return None return None def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument( "-j", "--json", action="store_true", help="output more information in JSON format", ) parser.add_argument( "-s", "--sync-request", action="store_true", help=( "send sync request to the server and save the response to the file" ), ) return parser.parse_args() def setup_environment(): lg.reconfigure() ip_versions.init() instance.db.init(Model.PATH) instance.db.execute_sql("ATTACH ? AS resident", (Model.RESIDENT_PATH,)) instance.db.execute_sql("ATTACH ? AS ipsetlists", (IPSET_LISTS_PATH,)) Index.add_type(WHITELISTS, "whitelist/v2", 0o770, 0o660, all_zip=True) async def run_sync_request() -> None: try: # remove previous response file if it exists REMOTE_IPLIST_RESPONSE.unlink(missing_ok=True) plugin = RemoteIPListPlugin() sync_point, delay, responses = await plugin.get_sync_from_server( lazy_responses=False, raise_exception=True ) logger.info(f"Sync point: {sync_point}, delay: {delay}") save_state(RemoteIPListResponse(sync_point, delay, responses)) reset_timeout() except IpListAPIRequestException as e: current_timeout = increase_timeout() logger.warning(f"Remote IP lists API request timeout increased to {current_timeout} seconds" f" by exception {e}") print(current_timeout) sys.exit(2) except Exception as ex: logger.error("Failed to get sync from server: %s", ex) current_timeout = increase_timeout() logger.warning(f"Remote IP lists API request timeout increased to {current_timeout} seconds" f" by exception {ex}") print(current_timeout) sys.exit(2) async def run(args: argparse.Namespace) -> None: _response = restore_state() if _response is None: logger.error("response file not found") current_timeout = increase_timeout() print(current_timeout) sys.exit(2) try: sync_point, delay, responses = _response.unpack() logger.info( "Restored response: %s", (sync_point, delay, len(responses)) ) plugin = RemoteIPListPlugin() with Benchmark() as bench: ( total_blocked_count, total_unblocked_count, sync_point, delay_s, ) = await plugin.do_single_update( sync=(sync_point, delay, responses) ) if args.json: print( json.dumps( { "unblocked": total_unblocked_count, "blocked": total_blocked_count, "sync_point": sync_point, "elapsed": bench.elapsed_time_ms, "delay": delay_s, }, indent=True, ) ) reset_timeout() print(delay_s) except Exception as ex: logger.error(ex) current_timeout = increase_timeout() logger.warning(f"Remote IP lists API request timeout increased to {current_timeout} seconds" f" by exception {ex}") print(current_timeout) sys.exit(2) def main(): args = parse_args() loop = asyncio.get_event_loop() tls_check.reset() setup_environment() logger.info("Starting remote_iplist with args %s", args) if args.sync_request: loop.run_until_complete(run_sync_request()) else: loop.run_until_complete(run(args)) if __name__ == "__main__": main()