D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib64
/
python3.11
/
site-packages
/
defence360agent
/
subsys
/
panels
/
plesk
/
Filename :
api.py
back
Copy
"""Gather information from Plesk via DB querries. """ import logging from collections import defaultdict from typing import Dict, List, Sequence from defence360agent.subsys.panels.base import PanelException from defence360agent.utils import CheckRunError, check_run, retry_on logger = logging.getLogger(__name__) async def raise_panel_exception(*args, **kwargs): raise PanelException(*args, **kwargs) @retry_on(CheckRunError, max_tries=3, on_error=raise_panel_exception) async def _run_query(query): return (await check_run(["plesk", "db", "-N", "-e", query])).decode() async def get_user_to_domain() -> Dict[str, List[str]]: """Return mapping: user -> user's domains.""" result = ( await _run_query( "select login, name from domains " " left join hosting on dom_id = domains.id " " right join sys_users on hosting.sys_user_id = sys_users.id" ) ).split() result_mapping = defaultdict(list) for user, domain in zip(result[0::2], result[1::2]): if domain != "NULL": result_mapping[user].append(domain) return result_mapping async def get_domain_to_user() -> Dict[str, List[str]]: """Return mapping: domain -> user.""" result = ( await _run_query( "select name, login " "from domains " " left join hosting on dom_id = domains.id " " left join sys_users on hosting.sys_user_id = sys_users.id" ) ).split() return {domain: [user] for domain, user in zip(result[0::2], result[1::2])} async def get_user_details() -> Dict[str, Dict[str, str]]: """ Returns dict with user to email and locale pairs Not used, because MyImunify implemented for cPanel only yet """ user_details = {} results = await _run_query( "SELECT CONCAT(login, ';',email, ';',locale) FROM clients;" ) for record in results.split("\n"): if not record: continue user, email, locale = record.split(";") user_details[user] = { "email": email, "locale": locale.replace("-", "_"), } return user_details async def get_domains() -> List[str]: """Return: list of domains""" return (await _run_query("select name from domains")).split() async def get_users() -> List[str]: """Return: users that created by Plesk""" return ( await _run_query( "SELECT sys_users.login FROM sys_users JOIN hosting ON" " hosting.sys_user_id=sys_users.id JOIN domains ON" " hosting.dom_id=domains.id AND domains.webspace_id=0" ) ).split() async def get_users_for_patchman() -> List[str]: """ Returns [ ['admin', 'john.smith@tardis.gal', 'NULL', 'en-US', 'admin', 'NULL', 'NULL', 'NULL', '0'], ['user0', 'NULL', 'admin', 'en-US', 'client', '1', 'user0.com', '/var/www/vhosts/user0.com/httpdocs', '1'] ] There is only 1 return type. NULL is converted to 'NULL' Each possible empty string should be covered with IF(clients.email='', NULL, clients.email) or it will break data structure """ raw_data = await _run_query( """ SELECT clients.login, IF(clients.email='', NULL, clients.email), parent.login, IF(clients.locale='', NULL, clients.locale), clients.type, domains.name, hosting.www_root, clients.status=16 suspended FROM clients LEFT JOIN clients parent ON parent.id=clients.parent_id LEFT JOIN domains ON domains.cl_id=clients.id LEFT JOIN hosting ON domains.id=hosting.dom_id; """ ) tuples = [string.split() for string in raw_data.split("\n") if string] return tuples async def count_customers_with_subscriptions() -> int: # pragma: no cover """Return: count active customers with at least one (any) domain""" return int( await _run_query( "select count(distinct cl_id) from clients c join domains d on " "d.cl_id = c.id where c.status = 0" ) ) async def get_admin_emails() -> list: emails = await _run_query("SELECT email FROM clients WHERE type='admin';") return [email for email in emails.split("\n") if email] async def list_docroots() -> Sequence[str]: query = "SELECT DISTINCT hosting.www_root FROM hosting;" return (await _run_query(query)).split() async def list_docroots_domains_users(): sql = ( "select hosting.www_root, domains.name, sys_users.login" " from hosting" " inner join domains on hosting.dom_id = domains.id" " inner join sys_users on hosting.sys_user_id=sys_users.id" ) data = await _run_query(sql) retval = [string.split() for string in data.split("\n") if string] return retval