D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
contracts
/
Filename :
messages.py
back
Copy
from defence360agent import utils from defence360agent.contracts.messages import ( Accumulatable, Message, MessageList, Received, Reportable, ShortenReprListMixin, ) from im360.contracts.message_pb2 import WebShieldData from im360.utils.validate import IP HIGHEST_PRIORITY = 0 class StrategyChange(Message): """The message is generated when an IDS change is detected.""" DEFAULT_METHOD = "STRATEGY_CHANGE" PRIORITY = HIGHEST_PRIORITY class SensorIncident(Message): """Single incident, e.g. user auth failed once""" DEFAULT_METHOD = "INCIDENT" class SensorIncidentList(MessageList, Reportable): """Aggregated incident list""" DEFAULT_METHOD = "INCIDENT_LIST" class UnreportableLocalIncidentList(MessageList): """Aggregate local incident list that are not reported to server""" DEFAULT_METHOD = "LOCALINCIDENT_LIST" class LocalIncidentList(MessageList, Reportable): """Aggregate local incident list - where no ip provided""" DEFAULT_METHOD = "INCIDENT_LIST" class SensorAlert(Message, Reportable): """Alert incident, e.g. user auth failures reached threshold""" DEFAULT_METHOD = "ALERT" PRIORITY = 1 @classmethod def from_incident(cls, message): """When generate ALERT from INCIDENT change method""" new_message = message.copy() new_message["method"] = cls.DEFAULT_METHOD return cls(**new_message) class ClientUnblock(Message, Reportable): DEFAULT_METHOD = "UNBLOCK" PRIORITY = HIGHEST_PRIORITY class CaptchaEventList(ShortenReprListMixin, Message, Reportable): DEFAULT_METHOD = "CAPTCHA_LIST" class CaptchaEvent(Accumulatable): DEFAULT_METHOD = "CAPTCHA" FAILED = "FAILED" PASSED = "PASSED" REQUESTED = "REQUESTED" LIST_CLASS = CaptchaEventList @classmethod def from_parcel(cls, parcel): if parcel.websh.captcha == WebShieldData.NA: return None return cls( timestamp=parcel.timestamp, attackers_ip=parcel.ip, event=WebShieldData.Captcha.Name(parcel.websh.captcha), user_id=parcel.websh.user_id, plugin_id="captcha", ) class CaptchaDosAlert(Message): DEFAULT_METHOD = "CAPTCHA_DOS_ALERT" PRIORITY = 1 class SynclistResponse(Message, Received): DEFAULT_METHOD = "SYNCLIST" PRIORITY = 2 @utils.sync.timefun( action="SynclistResponse{str->IP.adopt_to_ipvX_network}" ) def __init__(self, *args, **kwargs): """ Do str -> Union[IPv4Network, IPv6Network] conversion only once per SynclistResponse message processing :raise ValueError: if str keys to IPv4Network, IPv6Network conversion fails """ super().__init__(*args, **kwargs) for field in ["blocklist", "unblocklist"]: self[field] = { IP.adopt_to_ipvX_network(ip_str): dict_ for ip_str, dict_ in self[field].items() } @staticmethod def filter_blocklist(ips: dict, *, action_type: str): """Given [un]blocklist *ips* with their properties return ips matching given *action_type*. ip without an action type set matches any action type. """ return ( ip for ip, p in (ips.items() if ips else []) if not p # no properties or "action_type" not in p # no action_type or p["action_type"] == action_type # with given action_type ) class SynclistRequest(Message, Reportable): DEFAULT_METHOD = "SYNCLIST" class BlockUnblockList(Message): """Used internally for block/unblock ip from lists { "blocklist": {(IPNetwork, "listname"): {"expiration": int}}, "unblocklist": [(IPNetwork,"listname")] , } If ip is present in both lists: first unblock then block it (upsert semantics if applicable). """ DEFAULT_METHOD = "BLOCK_UNBLOCK" PRIORITY = HIGHEST_PRIORITY class ProactiveQueueList(MessageList, Reportable): DEFAULT_METHOD = "PROACTIVE_QUEUE_LIST" class RuleDisabled(Message, Reportable): """ Rule disabled by customer """ DEFAULT_METHOD = "RULE_DISABLED" class RuleEnabled(Message, Reportable): """ Rule enabled back """ DEFAULT_METHOD = "RULE_ENABLED" class ConfigSet(Message, Received): """Updates to the agent's config.""" RECEIVED_ACTIONS = ["CONFIG_SET"] class UpdateCustomLists(Message): """ Send message for class RealProtector for updating custom ip white and black list """ DEFAULT_METHOD = "UPDATE_CUSTOM_LISTS" class IPListsUpdate(Message): """ Send message for iplists plugin for updating IPs purposed lists """ DEFAULT_METHOD = "IP_LISTS_UPDATE" class GroupIPSync(Message, Reportable): DEFAULT_METHOD = "GROUP_SYNC" class GroupIPSyncPush(Message, Received): DEFAULT_METHOD = "GROUP_SYNC" class EnduserPasswordReset(Message, Received): DEFAULT_METHOD = "ENDUSER_PASSWORD_RESET" class WhitelistCacheUpdate(Message): """Receiveing this message indicates that resident part should clear whitelist cache""" DEFAULT_METHOD = "WHITELIST_CACHE_UPDATE" class IpsetUpdate(Message): """Receiveing this message indicates that resident part should check ipsets consistency""" DEFAULT_METHOD = "IPSET_UPDATE" class BlockedPortUpdate(Message): """Receiveing this message indicates that resident part should update blockedport""" DEFAULT_METHOD = "BLOCKED_PORT_UPDATE" class BlockedPortIPUpdate(Message): """Receiveing this message indicates that resident part should update blockedportip""" DEFAULT_METHOD = "BLOCKED_PORT_IP_UPDATE"