D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
simple_rpc
/
Filename :
validate.py
back
Copy
import asyncio import json import logging import os import time from collections import namedtuple from datetime import date, datetime, timedelta from functools import wraps from ipaddress import IPv4Network, IPv6Network, ip_network from pathlib import Path import jsonschema from defence360agent.rpc_tools.validate import ( SchemaValidator as SchemaValidatorBase, ) from defence360agent.rpc_tools.validate import validate from im360.contracts.config import Webshield from im360.internals.core import IPSetPort, libipset from im360.utils.validate import IP logger = logging.getLogger(__name__) TODAY = "today" YESTERDAY = "yesterday" PortProtoBase = namedtuple("PortProtoBase", ["port", "proto"]) PeriodBase = namedtuple("PeriodBase", ["since", "to"]) class PortProto(PortProtoBase): def __new__(cls, port, proto): if proto not in IPSetPort.PROTOS: raise ValueError("Protocol {} is not supported".format(proto)) if not (IPSetPort.MIN_PORT < port < IPSetPort.MAX_PORT): raise ValueError("Port {} is incorrect".format(port)) return super().__new__(cls, port, proto) @classmethod def fromstring(cls, pp_string): try: port, proto = pp_string.split(":") port = int(port) return cls(port, proto) except ValueError as e: raise ValueError( "Incorrect port_proto ({}): {}".format(str(e), pp_string) ) class Period(PeriodBase): def __new__(cls, since, to): try: datetime.fromtimestamp(since), datetime.fromtimestamp(to) except ValueError as e: raise ValueError("Incorrect value for period: {}".format(str(e))) return super().__new__(cls, since, to) @classmethod def fromstring(cls, period_string): now = datetime.now() seconds_since_midnight = ( now - now.replace(hour=0, minute=0, second=0, microsecond=0) ).total_seconds() if period_string == TODAY: since, to = time.time() - seconds_since_midnight, time.time() elif period_string == YESTERDAY: from_date = ( time.time() - seconds_since_midnight - timedelta(days=1).total_seconds() ) to_date = time.time() - seconds_since_midnight since, to = from_date, to_date else: period_names = "weeks", "days", "hours", "minutes", "seconds" try: val, sfx = int(period_string[:-1]), period_string[-1:] except (ValueError, IndexError) as e: raise ValueError( "Invalid string from period: {} ({})".format( period_string, str(e) ) ) if not sfx.endswith(tuple(p_name[0] for p_name in period_names)): # argparse will handle this exception raise ValueError("Invalid suffix: {}".format(sfx)) sfx_expanded = next(xp for xp in period_names if sfx == xp[0]) real_args = {sfx_expanded: val} since, to = ( (datetime.now() - timedelta(**real_args)).timestamp(), time.time(), ) return cls(since, to) class SchemaValidator(SchemaValidatorBase): MAX_IPSET_TIMEOUT = libipset.IPSET_TIMEOUT_MAX # ipset's maximum ttl def _normalize_coerce_port_proto(self, value): if isinstance(value, PortProto): return value elif isinstance(value, str): return PortProto.fromstring(value) raise ValueError("String or PortProto must be provided") def _normalize_coerce_ip(self, value): if isinstance(value, (IPv4Network, IPv6Network)): return value elif isinstance(value, str): return ip_network(value) def _normalize_coerce_ip_discard_host_bits(self, value): if isinstance(value, (IPv4Network, IPv6Network)): return value elif isinstance(value, str): return ip_network(value, strict=False) def _normalize_coerce_period(self, value): if isinstance(value, Period): return value elif isinstance(value, str): return Period.fromstring(value) raise ValueError("String or Period must be provided") def _normalize_coerce_tolower(self, value): # please, don't try to casefold() instead of lower() # see https://tools.ietf.org/html/rfc4343 return value.lower() def _validate_type_port_proto(self, value): if isinstance(value, PortProto): return True return False def _validate_type_period(self, value): if isinstance(value, Period): return True return False def _validate_type_ip(self, value): return isinstance(value, (IPv4Network, IPv6Network)) def _validator_enforce64min_subnet_mask_for_ipv6(self, field, value): if IP.is_valid_ipv6_network(value): # 64 - min subnet mask for ipv6 addr if IPv6Network(value).prefixlen > 64: self._error( field, "Supported only ipv6 /64 networks: {}".format(value) ) def _validator_max_days(self, field, value): max_days = timedelta.max.days max_past = date.today() - date(1970, 1, 1) if value > max_days or timedelta(days=value) > max_past: self._error( field, "Number of days ({}) exceeds maximum value of {}. " "Please, specify lesser amount of days".format( value, max_past.days ), ) def _validator_timestamp(self, field, value): try: datetime.fromtimestamp(value) except ValueError as e: self._error( field, "Incorrect timestamp: {} ({})".format(value, str(e)) ) def _validator_expiration(self, field, value): if not value: return expiration_time = value now = time.time() if expiration_time <= now: self._error( field, "Expiration contains expired timestamp {}!".format( time.strftime("%x %X %Z", time.gmtime(expiration_time)) ), ) max_expiration_time = now + self.MAX_IPSET_TIMEOUT if expiration_time > max_expiration_time: self._error( field, ( "Expiration time {} is too far into the future." " It is more than {} seconds from now" ).format(expiration_time, self.MAX_IPSET_TIMEOUT), ) def _validator_webshield_is_enabled(self, field, value): if not Webshield.ENABLE: self._error( field, "This command is not supported when webshield is disabled", ) def validate_middleware(validator): base = Path(os.path.dirname(__file__)) / "../.." core_schemas = base / "defence360agent/simple_rpc/schema_responses/another" imav_schemas = base / "imav/simple_rpc/schema_responses/another" im360_schemas = base / "im360/simple_rpc/schema_responses/another" def get_file_from_schema_responses_dirs(filename): core_schema_path = core_schemas.with_name(filename) if core_schema_path.exists(): return core_schema_path imav_schema_path = imav_schemas.with_name(filename) if imav_schema_path.exists(): return imav_schema_path return im360_schemas.with_name(filename) def get_response_schema(return_type): # asserts that return_type does not contain '/' schema_path = get_file_from_schema_responses_dirs( return_type + ".json" ) with schema_path.open("r") as f: schema = json.load(f) return schema async def validate_response(hashable, result): return_type = validator.schema.get(hashable).get("return_type", None) if return_type is None: return schema = get_response_schema(return_type) # validation should be performed after # result gets such format (ui accepts it) # in some cases result never gets such format # like test_addmany_invalid_request target = {"result": "success", "messages": [], "data": result} try: jsonschema.validate(target, schema) except jsonschema.ValidationError as error: logger.critical( 'Validating %r using schema %r failed with error "%s".', target, schema, error, exc_info=error, ) def wrapped(f): @wraps(f) async def wrapper(request, *args, **kwargs): hashable = tuple(request["command"]) request["params"] = validate( validator, hashable, request["params"] ) result = await f(request, *args, **kwargs) # no cpu overhead during rpc request # since validation is asynchronous # (only next request may be delayed a little) # run_until_complete waits until this task will be finished (why?) # so test will be failed asyncio.ensure_future(validate_response(hashable, result)) return result return wrapper return wrapped