D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
im360
/
subsys
/
panels
/
directadmin
/
Filename :
mod_security.py
back
Copy
import logging import os import pwd import shutil import zipfile from contextlib import suppress from functools import lru_cache from pathlib import Path from typing import Container, Dict, Optional from urllib.parse import urlparse from defence360agent.contracts.config import ConfigFile from defence360agent.subsys.panels.base import PanelException from defence360agent.subsys.panels.directadmin.panel import get_user_domains from defence360agent.utils import ( atomic_rewrite, check_run, ensure_line_in_file, remove_line_from_file, ) from im360.subsys.panels.base import ( APACHE, LITESPEED, MODSEC_NAME_TEMPLATE, NGINX, FilesVendor, FilesVendorList, ModSecSettingInterface, ModSecurityInterface, ) from defence360agent.subsys.web_server import ( apache_running, check_with_timeout, litespeed_running, ) from .custombuild import CustomBuildOptions, build, custombuild2_only logger = logging.getLogger(__name__) MODSEC_CONF_DIR = "/etc/modsecurity.d" RULESET_FILENAME = "I360_RULESET" CB_MODSEC_CUSTOM_DIR = "/usr/local/directadmin/custombuild/custom/modsecurity" CB_MODSEC_CUSTOM_CONF_DIR = os.path.join(CB_MODSEC_CUSTOM_DIR, "conf") _BROKEN_CUSTOM_CONFIG_MSG = "User has a broken custom httpd config" _DIRADMIN_USER = "diradmin" _DIRECTADMIN_TASK_QUEUE = "/usr/local/directadmin/data/task.queue" _WEBSERVER_RESTART_CHECK_TIMEOUT = 60 WEB_SERVER_CHECKS = { APACHE: apache_running, LITESPEED: litespeed_running, } SUPPORTED_WEB_SERVERS = WEB_SERVER_CHECKS.keys() # type: Container[str] class DirectAdminModSecException(PanelException): pass async def run_cmd(cmd): logger.debug("Running CMD: %s", cmd) data = await check_run(cmd.split(), raise_exc=DirectAdminModSecException) return data.decode().strip() class ModSecSettings(ModSecSettingInterface): INCLUDE_CONFIG = "/etc/httpd/conf/extra/httpd-includes.conf" I360_INCLUDE = 'Include "/etc/httpd/conf/extra/modsec2.imunify.conf"' config_key = "prev_settings" @classmethod async def revert(cls, **kwargs): remove_line_from_file(cls.INCLUDE_CONFIG, cls.I360_INCLUDE) # TODO: Revert SecRuleEngine value @classmethod async def apply(cls): ensure_line_in_file(cls.INCLUDE_CONFIG, cls.I360_INCLUDE) # TODO: Set SecRuleEngine value @lru_cache(1) def get_diradmin_pwnam(): return pwd.getpwnam(_DIRADMIN_USER) def rewrite_httpd_config(user: str): action = "action=rewrite&value=httpd&user={}\n".format(user) with open(_DIRECTADMIN_TASK_QUEUE, "a") as queue: queue.write(action) async def get_custombuild_webserver(): web_server = CustomBuildOptions("webserver").get() # NGINX is handling real requests in this combination proxying it to # Apache as needed. if web_server == "nginx_apache": web_server = NGINX return web_server async def get_outer_web_server(): """Return the name of the web server that handles incoming requests. For the purposes of ModSecurity we are interested in the one web server running on the machine that handles actual requests from users. This is related to the usage of Nginx as a reverse proxy to Apache. """ web_server = await get_custombuild_webserver() if web_server in SUPPORTED_WEB_SERVERS: return web_server return None class DirectAdminModSecurity(ModSecurityInterface): GLOBAL_DISABLED_RULES_CONFIG = ( "/etc/httpd/conf/extra/i360_modsec_disable.conf" ) GLOBAL_DISABLED_RULES_LINK = "zz_i360_modsec_disable.conf" USER_INCLUDE_PATH_TMPL = ( "/usr/local/directadmin/data/users/{user}/domains/{domain}.cust_httpd" ) USER_RULES_START_MARK = "# IMUNIFY360 CONFIG START" USER_RULES_END_MARK = "# IMUNIFY360 CONFIG END" CUSTOM_RULES_BACKUP = os.path.join(CB_MODSEC_CUSTOM_DIR, "conf.i360backup") @classmethod def _get_conf_dir(cls) -> str: return MODSEC_CONF_DIR @classmethod def detect_cwaf(cls): pass CWAF_INSTALLATION_DIR = "/usr/local/cwaf" @classmethod async def sync_disabled_rules_for_domains( cls, domain_rules_map: Dict[str, list] ): for domain, rule_list in domain_rules_map.items(): user = get_user_domains().get(domain) if user is None: raise DirectAdminModSecException( "Cannot find owner of domain " + domain ) cls._write_user_custom_httpd_conf(user, domain, rule_list) rewrite_httpd_config(user) @classmethod def _write_user_custom_httpd_conf(cls, user, domain, rule_list): filename = cls.USER_INCLUDE_PATH_TMPL.format(user=user, domain=domain) our_config = [cls.USER_RULES_START_MARK] our_config.extend( cls.generate_disabled_rules_config(rule_list).split("\n") ) our_config.append(cls.USER_RULES_END_MARK) uid, gid = get_diradmin_pwnam()[2:4] if not os.path.isfile(filename): with open(filename, "w") as cust_httpd: os.chown(cust_httpd.fileno(), uid, gid) content = [] with open(filename, "r") as cust_httpd: lines = cust_httpd.read().split("\n") start_idx = end_idx = None try: start_idx = lines.index(cls.USER_RULES_START_MARK) end_idx = lines.index(cls.USER_RULES_END_MARK) except ValueError: pass if start_idx is None and end_idx is None: content = lines content.extend(our_config) elif ( start_idx is not None and end_idx is not None and start_idx < end_idx ): content = lines[:] content[start_idx : end_idx + 1] = our_config else: raise DirectAdminModSecException(_BROKEN_CUSTOM_CONFIG_MSG) if content[-1] != "": content.append("") # ensure newline at EOF atomic_rewrite( filename, "\n".join(content), backup=False, uid=uid, gid=gid ) @classmethod def write_global_disabled_rules(cls, rule_list): """ :param list rule_list: rule ids to sync :return: """ atomic_rewrite( cls.GLOBAL_DISABLED_RULES_CONFIG, cls.generate_disabled_rules_config(rule_list), backup=False, ) cls._ensure_global_disabled_rules_link_present() @classmethod def _ensure_global_disabled_rules_link_present(cls): linkname = os.path.join( MODSEC_CONF_DIR, cls.GLOBAL_DISABLED_RULES_LINK ) try: os.remove(linkname) except FileNotFoundError: pass os.symlink(cls.GLOBAL_DISABLED_RULES_CONFIG, linkname) @classmethod async def sync_global_disabled_rules(cls, rule_list): """ just alias to write_global_disabled_rules() """ cls.write_global_disabled_rules(rule_list) @classmethod def get_audit_log_path(cls): return "/var/log/httpd/modsec_audit.log" @classmethod def get_audit_logdir_path(cls): return "/var/log/modsec_audit" @classmethod async def installed_modsec(cls): return CustomBuildOptions("modsecurity").get() == "yes" @classmethod async def _get_web_server(cls) -> Optional[str]: """ Return the name of the web server for which ModSecurity rules will be applied. """ return await get_outer_web_server() async def _rollback(self): await DirectAdminFilesVendorList.revert() CustomBuildOptions("modsecurity_ruleset").set("no") await build("modsecurity_ruleset") await self.revert_settings() @custombuild2_only async def _install_settings(self): web_server = await self._get_web_server() if web_server is None: logger.warning( "ModSecurity rules installation is not supported" " for the running web server configuration." ) return prev_settings = CustomBuildOptions("modsecurity_ruleset").get() # backup custom mod_security settings, if any vendorlist = await DirectAdminModSecurity.modsec_vendor_list() if "unknown_custom" in vendorlist and not os.path.exists( self.CUSTOM_RULES_BACKUP ): shutil.copytree( CB_MODSEC_CUSTOM_CONF_DIR, self.CUSTOM_RULES_BACKUP ) await ModSecSettings.apply() await DirectAdminFilesVendorList.apply() await build("modsecurity_ruleset") config = ConfigFile() config.set("MOD_SEC", ModSecSettings.config_key, prev_settings) self._ensure_global_disabled_rules_link_present() # checking if we did not break web server configuration is_server_running = WEB_SERVER_CHECKS[web_server] if not await check_with_timeout( is_server_running, _WEBSERVER_RESTART_CHECK_TIMEOUT ): await self._rollback() logger.warning("Web server failed to start, settings reverted") async def modsec_get_directive(self, directive_name, default=None): raise NotImplementedError async def reset_modsec_directives(self): raise NotImplementedError async def reset_modsec_rulesets(self): raise NotImplementedError @custombuild2_only async def revert_settings(self): if not await self.installed_modsec(): logger.warning( "Skipping vendor removal, because ModSecurity isn't installed" ) return config = ConfigFile() await ModSecSettings.revert() await DirectAdminFilesVendorList.revert() prev_settings = config.get("MOD_SEC", ModSecSettings.config_key) if prev_settings in ("comodo", "owasp"): CustomBuildOptions("modsecurity_ruleset").set(prev_settings) if os.path.exists(self.CUSTOM_RULES_BACKUP): shutil.rmtree(CB_MODSEC_CUSTOM_CONF_DIR) os.rename(self.CUSTOM_RULES_BACKUP, CB_MODSEC_CUSTOM_CONF_DIR) await build("modsecurity_ruleset") config.set("MOD_SEC", ModSecSettings.config_key, None) @classmethod async def enabled_modsec_vendor_list(cls): """Return a list of enabled ModSecurity vendors.""" # seems that on DirectAdmin all rulesets are always enabled return await cls.modsec_vendor_list() @classmethod async def modsec_vendor_list(cls): """Return a list of installed ModSecurity vendors.""" vendorlist = [] ruleset_file = os.path.join(MODSEC_CONF_DIR, RULESET_FILENAME) if os.path.exists(ruleset_file): # imunify360 ruleset installed with open(ruleset_file) as f: vendorlist.append(f.read()) elif os.path.isdir(CB_MODSEC_CUSTOM_CONF_DIR) and os.listdir( CB_MODSEC_CUSTOM_CONF_DIR ): vendorlist.append("unknown_custom") ruleset = CustomBuildOptions("modsecurity_ruleset").get() if ruleset and ruleset != "no": vendorlist.append(ruleset) return vendorlist @classmethod async def build_vendor_file_path(cls, vendor: str, filename: str) -> Path: return Path(MODSEC_CONF_DIR) / filename @classmethod async def _apply_modsec_files_update(cls): installed = await DirectAdminFilesVendorList.install_or_update() # don't build modsecurity_ruleset if vendor is not installed # to avoid DEF-5848 if installed: await build("modsecurity_ruleset") cls._ensure_global_disabled_rules_link_present() class DirectAdminFilesVendor(FilesVendor): modsec_interface = DirectAdminModSecurity def _add_vendor(self, url, name, *args, **kwargs): pass async def _remove_vendor(self, vendor, *args, **kwargs): with suppress(FileNotFoundError): shutil.rmtree(CB_MODSEC_CUSTOM_CONF_DIR) os.makedirs(CB_MODSEC_CUSTOM_CONF_DIR, exist_ok=True) async def apply(self): shutil.rmtree(CB_MODSEC_CUSTOM_CONF_DIR, ignore_errors=True) os.makedirs(CB_MODSEC_CUSTOM_CONF_DIR, exist_ok=True) with zipfile.ZipFile(self._item["local_path"]) as zf: for member in zf.namelist(): filename = os.path.basename(member) if not filename: continue target = os.path.join(CB_MODSEC_CUSTOM_CONF_DIR, filename) with zf.open(member) as src, open(target, "wb") as dst: shutil.copyfileobj(src, dst) with suppress(FileNotFoundError): os.unlink( os.path.join(CB_MODSEC_CUSTOM_CONF_DIR, "rules.conf.main") ) with open( os.path.join(CB_MODSEC_CUSTOM_CONF_DIR, RULESET_FILENAME), "w" ) as f: f.write(self._vendor_id()) CustomBuildOptions("modsecurity_ruleset").set("no") await build("modsecurity_ruleset") def _vendor_id(self): basename = os.path.basename(urlparse(self._item["url"]).path) basename_no_zip, _ = os.path.splitext(basename) return basename_no_zip class DirectAdminFilesVendorList(FilesVendorList): files_vendor = DirectAdminFilesVendor modsec_interface = DirectAdminModSecurity @classmethod def vendor_fit_panel(cls, item): return item["name"].endswith("plesk") @classmethod async def _get_compatible_name(cls, installed_vendors): web_server = await get_outer_web_server() if web_server is None: raise cls.CompatiblityCheckFailed( "Imunify360 mod_security vendor does not support" " a running web server" ) return MODSEC_NAME_TEMPLATE.format( ruleset_suffix=cls.get_ruleset_suffix(), webserver=web_server, panel="plesk", )