D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
plugins
/
resident
/
Filename :
ttl_graylist.py
back
Copy
""" Append timeout to graylist ip """ import time from datetime import timedelta from logging import getLogger from peewee import DoesNotExist from defence360agent.contracts.plugins import expect, MessageSink from defence360agent.model.simplification import run_in_executor from defence360agent.contracts.messages import MessageType from im360.internals.core.ipset.ip import IPSetGray from im360.model.firewall import IPList logger = getLogger(__name__) class GraylistTimeout(MessageSink): PROCESSING_ORDER = MessageSink.ProcessingOrder.GRAYLIST_TIMEOUT _TIMEOUTS = ( timedelta(minutes=5), timedelta(minutes=30), timedelta(hours=3), timedelta(hours=12), timedelta(days=3), timedelta(days=15), timedelta( days=timedelta(seconds=IPSetGray.GRAYLIST_DEFAULT_TIMEOUT).days ), # 24 days ) async def create_sink(self, loop): self._loop = loop @expect(MessageType.SensorAlert) async def append_timeout(self, message): try: deep = await run_in_executor( self._loop, lambda: IPList.get( ip=message["attackers_ip"], listname=IPList.GRAY ).deep, ) except DoesNotExist: deep = None message["properties"] = self.next_timeout(deep) return message def next_timeout(self, deep=None): """ Calculate next timeout :param deep: previous deep - block level :return: """ if deep is None: deep = 0 else: deep = min(deep + 1, len(self._TIMEOUTS) - 1) ttl = int(self._TIMEOUTS[deep].total_seconds()) return { # TTL for debug "ttl": ttl, # All modules should be use expiration time "expiration": int(time.time() + ttl), # Blocking level "deep": deep, }