D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
plugins
/
resident
/
Filename :
config_set.py
back
Copy
"""Handle CONFIG_SET server message.""" import json import logging from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.utils import await_for, retry_on from im360.plugins.sensor.generic import send_to_agent_socket logger = logging.getLogger(__name__) class ConfigSet(MessageSink, MessageSource): async def create_sink(self, loop): pass async def create_source(self, loop, sink): self._sink = sink @retry_on( ConnectionRefusedError, on_error=await_for(seconds=5), max_tries=5, log=logger, ) async def send_config_update(self, data: dict, user: str = None): send_to_agent_socket( command=["config", "update"], params={"data": json.dumps(data), "user": user}, wait_for_response=False, ) @expect(MessageType.ConfigSet) async def config_set(self, message): new_data = message["data"] user = message.get("user") # don't modify config file directly, since we expect that # all config management should be in non-resident part await self.send_config_update(new_data, user) logger.info("Updated config with %s", new_data)