D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
plugins
/
sensor
/
Filename :
generic_protobuf.py
back
Copy
import asyncio import logging from google.protobuf.message import DecodeError from im360.contracts.message_pb2 import SendInfo from defence360agent.utils.buffer import SizeBuffer logger = logging.getLogger(__name__) class Protobuf(asyncio.Protocol): def __init__(self, name, loop, sink): self._buf = SizeBuffer() self._loop = loop self._name = name self._sink = sink def data_received(self, data): self._buf.append(data) for data in self._buf: parcel = SendInfo() try: parcel.ParseFromString(data) except DecodeError: logger.debug("Invalid protobuf message: %s", data) logger.exception( "Failed to parse protobuf message from %s", self._name ) else: logger.debug("Received protobuf message: %s", parcel) self.process_protobuf(parcel) def process_protobuf(self, parcel): pass