D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
plugins
/
sensor
/
Filename :
webshield.py
back
Copy
import logging import pwd from im360.contracts.message_pb2 import SendInfo from defence360agent.contracts.plugins import MessageSource from defence360agent.contracts.messages import MessageType from im360.plugins.sensor.generic_protobuf import Protobuf from im360.plugins.sensor.unix_socket import create_unix_socket logger = logging.getLogger(__name__) class _Protocol(Protobuf): def process_protobuf(self, parcel): if parcel.method != SendInfo.WEBSHIELD: raise ValueError("Only WEBSHIELD method is supported") captcha_event = MessageType.CaptchaEvent.from_parcel(parcel) if captcha_event is not None: if captcha_event.event == captcha_event.PASSED: unblock = MessageType.ClientUnblock(captcha_event) unblock["method"] = MessageType.ClientUnblock.DEFAULT_METHOD self._loop.create_task(self._sink.process_message(unblock)) self._loop.create_task(self._sink.process_message(captcha_event)) class WebshieldSensor(MessageSource): SOCKET_PATH = "/var/run/defence360agent/protobuf.sock" USER = "imunify360-webshield" async def create_source(self, loop, sink): self._loop = loop self._sink = sink user_info = pwd.getpwnam(self.USER) sock = create_unix_socket( self.SOCKET_PATH, 0o755, 0o770, user_info.pw_uid ) self._server = await self._loop.create_unix_server( lambda: _Protocol(self.SOCKET_PATH, self._loop, self._sink), sock=sock, )