D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
plugins
/
resident
/
Filename :
ossec_alert.py
back
Copy
""" Generates SensorAlert from ossec incidents with high severity """ from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.contracts.messages import MessageType class OssecAlert(MessageSink, MessageSource): MIN_ALERT_LEVEL = 6 FIELDS = ("plugin_id", "attackers_ip", "rule", "user", "timestamp") async def create_sink(self, loop): self._loop = loop async def create_source(self, loop, sink): self._loop = loop self._sink = sink @expect(MessageType.SensorIncident, plugin_id="ossec") async def generate_alert(self, msg): if (msg["severity"] >= self.MIN_ALERT_LEVEL) and ( "attackers_ip" in msg ): alert = MessageType.SensorAlert( **{field: msg[field] for field in self.FIELDS if field in msg} ) await self._sink.process_message(alert)