D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
plugins
/
resident
/
Filename :
persistent_storage.py
back
Copy
"""Update agent's db on receiving message & trigger other changes if necessary. """ import logging from defence360agent.contracts.messages import Reject from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.model.simplification import run_in_executor from defence360agent.contracts.messages import MessageType from im360.internals import geo from im360.model.firewall import IPList, is_expired from im360.model.incident import Incident from im360.utils.validate import IP logger = logging.getLogger(__name__) class PersistentStorage(MessageSink, MessageSource): IGNORED_INCIDENT_TAGS_TO_SAVE = ("noshow",) async def create_sink(self, loop): self._loop = loop async def create_source(self, loop, sink): self._loop = loop self._sink = sink @staticmethod def _filter_out(incidents): """ yield incidents that should be stored into local db """ for inc in incidents: # filter out incidents with 'noshow' tag if any( tag in PersistentStorage.IGNORED_INCIDENT_TAGS_TO_SAVE for tag in inc.get("tag", []) ): continue # filter out incidents from whitelisted IP addresses if inc.get("ip_whitelisted", False): continue yield inc @expect(MessageType.SensorIncidentList) @expect(MessageType.LocalIncidentList) @expect(MessageType.UnreportableLocalIncidentList) async def record_incident(self, message): logger.debug( 'record_incident: "%s"', message.get("description", "<no decription>"), ) incident_list = [] is_local = isinstance( message, ( MessageType.UnreportableLocalIncidentList, MessageType.LocalIncidentList, ), ) with geo.reader() as geo_reader: for incident in self._filter_out(message.list): if is_local: ip = None country_code = None else: ip = IP.ip_net_to_string(incident.get("attackers_ip")) country_code = geo_reader.get_id(ip) incident_list.append( { "plugin": incident["plugin_id"], "rule": incident["rule"], "timestamp": incident["timestamp"], "retries": incident["retries"], "severity": incident["severity"], "name": incident["name"], "description": incident["message"], "abuser": ip, "country": country_code, "domain": incident.get("domain"), } ) await run_in_executor( self._loop, lambda: Incident.save_incident_list(incident_list) ) @expect(MessageType.CaptchaDosAlert) async def record_captcha_dos_alert(self, message): """Update iplists db tables on CaptchaDosAlert. & Send BlockUnblockList, to update ipsets/webshield. """ if is_expired(message["expiration"]): raise Reject("Expired CaptchaDos alert") lists_or_error = await run_in_executor( self._loop, lambda: IPList.blacklist_graylisted_on_captcha_dos_alert( ip=message["attackers_ip"], expiration=message["expiration"], comment=message["message"], ), ) if isinstance(lists_or_error, Exception): raise lists_or_error await self._sink.process_message( MessageType.BlockUnblockList(lists_or_error) )