D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
plugins
/
resident
/
Filename :
aggregate.py
back
Copy
import asyncio from collections import defaultdict from logging import getLogger from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.utils import USER_IDENTITY_FIELD, recurring_check from im360.contracts import config from defence360agent.contracts.messages import MessageType from im360.plugins.resident.persistent_storage import PersistentStorage from im360.subsys.modsec_audit_log import ADVANCED, ENGINE_MODE, STATUS_CODE logger = getLogger(__name__) DEFAULT_INCIDENT = "INCIDENT" LOCAL_INCIDENT = "LOCAL_INCIDENT" UNREPORTABLE_LOCAL_INCIDENT = "UNREPORTABLE_LOCAL_INCIDENT" class IncidentAggregateSettings: RESULT_MESSAGE = MessageType.SensorIncidentList _AGGREGATE_BY = ( "attackers_ip", "name", "plugin_id", "rule", "severity", "domain", ) _FIELDS_TO_SAVE = ( "message", "timestamp", # ModSecurity optional fields "inbound_anomality_score", "outbound_anomality_score", "vendor", "modsec_version", "tag", "access_denied", USER_IDENTITY_FIELD, ADVANCED, STATUS_CODE, ENGINE_MODE, "transaction_id", ) @classmethod def extract_key(cls, msg): return tuple(msg.get(k, None) for k in cls._AGGREGATE_BY) @classmethod def extract_fields(cls, msg): fields = {field: msg.get(field) for field in cls._AGGREGATE_BY} fields.update( { field: value for field, value in msg.items() if field in cls._FIELDS_TO_SAVE } ) return fields class LocalIncidentAggregateSettings(IncidentAggregateSettings): RESULT_MESSAGE = MessageType.LocalIncidentList class UnreportableLocalIncidentAggregateSettings(IncidentAggregateSettings): RESULT_MESSAGE = MessageType.UnreportableLocalIncidentList AGGREGATE_SETTINGS = { DEFAULT_INCIDENT: IncidentAggregateSettings, LOCAL_INCIDENT: LocalIncidentAggregateSettings, UNREPORTABLE_LOCAL_INCIDENT: UnreportableLocalIncidentAggregateSettings, } class Aggregate(MessageSink, MessageSource): # We intentionally derive from MessageSink for @expect magic to work # send aggregated incident list not often that # specified timeout (in seconds) AGGREGATE_TIMEOUT = 60 def __init__(self, **kwargs): super().__init__(**kwargs) self._timer = None async def create_sink(self, loop): self._loop = loop self._msg_buf = {} async def create_source(self, loop, sink): self._sink = sink self._timer = self._loop.create_task(self._send_by_timer()) async def shutdown(self): if self._timer is not None: self._timer, t = None, self._timer t.cancel() await t @expect(MessageType.SensorIncident) async def accumulate(self, message): if ( isinstance(message, MessageType.SensorIncident) and message.get("attackers_ip") is None ): # Check if it's incident from sensor # and attackers_ip not provided or None # LOCAL_INCIDENT method for that case # if the local incident is not severe enough, it is not reported severe = ( message.get("severity", 0) >= config.LocalIncidentReporting.MIN_SEVERITY ) # noshow make incident reportable method = ( LOCAL_INCIDENT if severe or any( tag in PersistentStorage.IGNORED_INCIDENT_TAGS_TO_SAVE for tag in message.get("tag") or [] ) else UNREPORTABLE_LOCAL_INCIDENT ) if method == UNREPORTABLE_LOCAL_INCIDENT: assert not any( tag in PersistentStorage.IGNORED_INCIDENT_TAGS_TO_SAVE for tag in message.get("tag") or [] ) self._msg_buf.setdefault(method, []).append(message) else: self._msg_buf.setdefault(message["method"], []).append(message) @recurring_check(AGGREGATE_TIMEOUT) async def _send_by_timer(self): tasks = [] for msg_type, msgs in self._msg_buf.items(): if msgs: list_msg = AGGREGATE_SETTINGS[msg_type].RESULT_MESSAGE message = list_msg(self._aggregate(msg_type, msgs)) self._msg_buf[msg_type].clear() tasks.append(self._sink.process_message(message)) if len(tasks): await asyncio.gather(*tasks) def _aggregate(self, msg_type, msgs): aggregate_dict = defaultdict(dict) AggregateSettings = AGGREGATE_SETTINGS[msg_type] for msg in msgs: key = AggregateSettings.extract_key(msg) aggregated = aggregate_dict[key] aggregated.setdefault("retries", 0) aggregated["retries"] += msg.get("retries", 1) aggregated.update(AggregateSettings.extract_fields(msg)) return list(aggregate_dict.values())