Файловый менеджер - Редактировать - /opt/cpmigrate/user.py
Ðазад
""" Responsible for all things user. """ import difflib import json import logging import os from typing import TYPE_CHECKING import re import time from dataclasses import dataclass from enum import Enum import requests from cpapis import uapi from journal import save_state if TYPE_CHECKING: from transfer import Transfer @dataclass class Domain: """Holds domain name and if it passed validation.""" name: str passed: bool = False class Status(Enum): """Status of the User's transfer""" FAILED_PKG = -1 FAILED_PKG_XFER = -2 FAILED_RSYNC = -3 FAILED_PKG_RESTORE = -4 VERIFICATION_FAILED = -5 NOT_STARTED = 0 STARTED_PKG = 1 FINISHED_PKG = 2 STARTED_PKG_XFER = 3 FINISHED_PKG_XFER = 4 STARTED_PKG_RESTORE = 5 FINISHED_PKG_RESTORE = 6 STARTED_RSYNC = 7 FINISHED_RSYNC = 8 COMPLETED = 9 class User: """Primary class for handling all user related tasks.""" def __init__(self, name: str, transfer: 'Transfer'): self.name = name self.xfer = transfer self.pkglogpath: str = '/var/cpanel/pkgacct_sessions' self.pkgacct_id: str | None = None self.pkgacct_pid = None self.restore_id: str | None = None self.log: str | None = None self.error_log: str | None = None self.ipaddr = None self.running = False self.status: Status = Status.NOT_STARTED self.notes: list[str] = [] self.domains: list[Domain] = [] self.journal = transfer.journal def report(self): """Reports on user completion status and any relevant notes""" message = f'{self.status} ' if len(self.notes) > 0: message += "- Notes:\n{}".format('\n\n'.join(self.notes)) if self.status != Status.COMPLETED or len(self.notes) > 0: self.warning(message) else: self.info(message) def error(self, msg, note=False): """Logs and prints an error for this user""" logging.error("(%s) %s", self.name, msg) if note: self.notes.append(msg) def info(self, msg): """Logs and prints info for this user""" logging.info("[User %s] %s", self.name, msg) def warning(self, msg, note=False): """Logs and prints a warning for this user""" logging.warning("[User %s] %s", self.name, msg) if note: self.notes.append(msg) def is_status(self, status, action=None): """Returns whether this user is on the status provided""" if self.status != status: if action: self.warning(f"Skipping {action} as I am not on {status}.") return False return True def legacy_load_domains(self): """Legacy handler for loading domains to support older cPanel versions. """ self.load_maindomain() self.load_addondomains() self.load_subdomains() self.load_parkdomains() def load_maindomain(self): """Loads the primary domain for this user.""" data = self.xfer.origin_cpapi2_call( user=self.name, module='DomainLookup', func='getmaindomain' ) if data: main_domain = data[0].get('main_domain') if main_domain: self.domains.append(Domain(main_domain)) else: self.error("Could not retrieve main domain?") def load_addondomains(self): """Loads the addon domains for this user.""" data = self.xfer.origin_cpapi2_call( user=self.name, module='AddonDomain', func='listaddondomains' ) if data: for addon in data: domain = addon.get('domain') if '*' not in domain: self.domains.append(Domain(domain)) def load_parkdomains(self): """Loads the parked domains for this user.""" data = self.xfer.origin_cpapi2_call( user=self.name, module='Park', func='listparkeddomains' ) if data: for park in data: domain = park.get('domain') if '*' not in domain: self.domains.append(Domain(domain)) def load_subdomains(self): """Loads the subdomains for this user.""" data = self.xfer.origin_cpapi2_call( user=self.name, module='SubDomain', func='listsubdomains' ) if data: for subdomain in data: domain = subdomain.get('domain') if '*' not in domain: self.domains.append(Domain(domain)) @save_state def start_pkgacct(self): """Starts the PKGACCT on the origin server.""" if not self.is_status(Status.NOT_STARTED, 'pkg_acct'): return resp = self.xfer.origin_whmapi_call( 'start_background_pkgacct', {'user': self.name, 'skiphomedir': '1'} ) self.info("Started PKGACCT.") self.pkgacct_id = resp.get('session_id') if self.pkgacct_id: self.pkglogpath = os.path.join(self.pkglogpath, self.pkgacct_id) log = resp.get('complete_master_log') error_log = resp.get('complete_master_error_log') self.log = os.path.join(self.pkglogpath, log) self.error_log = os.path.join(self.pkglogpath, error_log) self.status = Status.STARTED_PKG self.info(f"PKGACCT log: {self.log}") self.wait_pkglog_exists() else: self.status = Status.FAILED_PKG_XFER self.error("Failed to obtain sessionid for PKGACCT.") def wait_pkglog_exists(self): """Waits for the pkgacct log to exist, which means it finished.""" self.xfer.print_progress("Waiting.") exists = None while not exists: self.xfer.print_progress() ret_code, _ = self.xfer.origin_command( f"/usr/bin/test -f {self.log}", quiet=True ) if ret_code == 0: exists = True time.sleep(0.2) print() @save_state def check_pkgacct(self): """Confirms that pkgacct did complete on the origin server.""" if not self.is_status(Status.STARTED_PKG): return self.info("Confirming that PKGACCT did complete.") ret_code, _ = self.xfer.origin_command( f"/bin/grep 'pkgacct completed' {self.log}", sleep=2, quiet=True ) if ret_code == 0: self.info("PKGACCT completed successfully.") self.status = Status.FINISHED_PKG else: self.error("PKGACCT appears to have failed.") self.status = Status.FAILED_PKG @save_state def transfer_pkgacct(self): """Begins transferring the PKGACCT to the target server.""" if not self.is_status(Status.FINISHED_PKG, 'transfer_pkgacct'): return self.info("Starting transfer of PKGACCT.") self.status = Status.STARTED_PKG_XFER rsync_success = self.xfer.do_rsync( origin=( f'root@{self.xfer.origin_server}:' f'/home/cpmove-{self.name}.tar.gz' ), destination='/home/', name=f'cpmove-{self.name}-xfer', user=self, ) if rsync_success: self.status = Status.FINISHED_PKG_XFER self.info("Completed PKGACCT transfer successfully.") else: self.status = Status.FAILED_PKG_XFER self.error("PKGACCT transfer has reported as failed.") @save_state def restore_pkgacct(self): """Ensures package exists and then starts the restorepkg.""" if not self.is_status(Status.FINISHED_PKG_XFER, 'restore_pkg'): return self.info("Starting PKGACCT restore.") pkg_path = f'/home/cpmove-{self.name}.tar.gz' if not os.path.exists(pkg_path): self.error(f"Failed to find PKGACCT {pkg_path}.") self.status = Status.FAILED_PKG_RESTORE return resp = self.xfer.whmapi_call( 'start_local_cpmove_restore', {'cpmovepath': pkg_path, 'username': self.name}, ) self.restore_id = resp.get('transfer_session_id') if self.restore_id: self.status = Status.STARTED_PKG_RESTORE else: self.error("Failed to start PKGACCT restore.") @save_state def check_restorepkg(self): """Checks and waits for restorepkg completion.""" if not self.is_status(Status.STARTED_PKG_RESTORE): return self.xfer.print_progress("Waiting.") while True: state = self.get_restorepkg_state() if state in ['RESTORE_PENDING', 'RESTORE_INPROGRESS', 'RUNNING']: self.xfer.print_progress() elif state in ['FAILED', 'ABORTED']: print() self.error("Failed to restore PKGACCT.") self.status = Status.FAILED_PKG_RESTORE break elif state == 'COMPLETED': print() self.verify_restorepkg() break else: print() self.status = Status.FINISHED_PKG_RESTORE self.error(f"Unknown restore pkgacct state received: {state}") break time.sleep(5) # No recursions def get_restorepkg_state(self): """Returns the state for the restorepkg.""" resp = self.xfer.whmapi_call( 'get_transfer_session_state', {'transfer_session_id': self.restore_id}, ) return resp.get('state_name') @save_state def verify_restorepkg(self): """Check transfer session logs for failures. Session returns as COMPLETE even though it fails. """ resp = self.xfer.whmapi_call( 'fetch_transfer_session_log', {'transfer_session_id': self.restore_id, 'logfile': 'master.log'}, ) logs = resp.get('log') if logs: for log in logs.split('\n'): try: json_log = json.loads(log) contents = json_log.get('contents') if contents: msg = contents.get('msg') if isinstance(msg, dict): failure = msg.get('failure') if failure: self.error( f"Restoration of PKGACCT has failed. " f"Reason: {failure}", note=True, ) self.status = Status.FAILED_PKG_RESTORE return except json.decoder.JSONDecodeError: pass # Don't care, cpanel sucks self.status = Status.FINISHED_PKG_RESTORE self.info("Restoration of PKGACCT has completed successfully.") self.cleanup_cpmove() self.cleanup_cpmove_origin() @save_state def cleanup_cpmove(self): """Cleans up the cpmove file. To be called after restoration.""" pkg_path = f'/home/cpmove-{self.name}.tar.gz' if os.path.exists(pkg_path): os.remove(pkg_path) self.info(f"Cleaned up cpmove file: {pkg_path}") @save_state def cleanup_cpmove_origin(self): """ Cleans up the cpmove file on the origin server after restoration. """ pkg_path = f'/home/cpmove-{self.name}.tar.gz' ret_code, _ = self.xfer.origin_command( f'/bin/rm -f -- {pkg_path}', sleep=1, quiet=True ) if ret_code == 0: self.info(f"Cleaned up cpmove file on origin: {pkg_path}") else: self.error(f"Failed to clean up cpmove file on origin: {pkg_path}") @save_state def rsync_maildir(self): """Handles the mail directory RSYNC from origin to target server.""" self.info("Starting mail directory rsync.") rsync_success = self.xfer.do_rsync( origin=f'root@{self.xfer.origin_server}:/home/{self.name}/mail/', destination=f'/home/{self.name}/mail/', name=f'rsync-mail-{self.name}', user=self, ) if rsync_success: self.info("Completed mail directory rsync successfully.") else: self.error("Failed to rsync mail directory.", note=True) def should_rsync_homedir(self): """Checks if we should rsync the home directory.""" return ( self.xfer.check_should_resync() or self.is_status(Status.FINISHED_PKG_RESTORE) or self.is_status(Status.STARTED_RSYNC) ) @save_state def rsync_homedir(self): """Handles the homedir RSYNC from origin to target server.""" if not self.should_rsync_homedir(): self.warning( "Skipping rsync_homedir as I am not on " f"{Status.FINISHED_PKG_RESTORE} or {Status.STARTED_RSYNC}." ) return self.info("Starting homedir rsync.") self.status = Status.STARTED_RSYNC rsync_success = self.xfer.do_rsync( origin=f'root@{self.xfer.origin_server}:/home/{self.name}/', destination=f'/home/{self.name}/', name=f'rsync-{self.name}', user=self, ) if rsync_success: self.status = Status.FINISHED_RSYNC self.info("Completed homedir rsync successfully.") else: self.status = Status.FAILED_RSYNC @save_state def test_sites(self): """Tests all domains on the account to verify transfer""" if not self.is_status(Status.FINISHED_RSYNC, 'test_sites'): return self.info("Testing all domains match the origin and target.") if len(self.domains) == 0: self.status = Status.VERIFICATION_FAILED return for domain in self.domains: lstatus, ltext = self.test_website( domain=domain.name, ipaddr=self.xfer.my_ipaddr ) rstatus, rtext = self.test_website(domain.name, self.ipaddr) if lstatus != rstatus: self.error( f"Status code for {domain.name} returned {lstatus} " f"instead of {rstatus}. Please check manually.", note=True, ) self.status = Status.VERIFICATION_FAILED else: if rtext == ltext: self.info(f"{domain.name} passed verification.") domain.passed = True continue diff = difflib.unified_diff( rtext.splitlines(), ltext.splitlines(), fromfile="Origin server", tofile="Target server (this server)", ) if self.check_directory_index(rtext, ltext): self.log_site_diff(domain.name, diff, index=True) else: self.log_site_diff(domain.name, diff) if self.status is not Status.VERIFICATION_FAILED: self.status = Status.COMPLETED def check_directory_index(self, site_output1, site_output2): """Checks if both site outputs are directory index.""" if 'Index of /' in site_output1: if 'Index of /' in site_output2: return True return False def log_site_diff(self, domain, diff, index=False): """Logs the site output differences, if any.""" log = os.path.join(self.xfer.log_path, f'{domain}-diff.log') if index: self.warning( f"Directory index detected on both servers for {domain}:" f"\n{log}" ) else: self.warning( f"Difference in site output found for {domain}:\n{log}", note=True, ) with open(log, 'w', encoding="utf-8") as out: out.write(f"Difference in site output found for {domain}: \n") for line in diff: out.write(f"\n{line}") def test_website(self, domain, ipaddr): """Tests a website using the provided IP address. Returns status code and content of site. Redirects will be followed if within same domain. Args: domain (str): domain to be tested ipaddr (str): IP address of host to connect to Returns: [int, str]: status code, content of site """ test_domain = domain resp = self.make_request(test_domain, ipaddr) redirect_count = 0 try: while 'Location' in resp.headers and redirect_count < 4: test_domain = resp.headers['Location'] redirect_count += 1 domain_check = re.match( fr"https?:\/\/(?:www\.)?{domain}", test_domain ) if domain_check: test_domain = resp.url resp = self.make_request(test_domain, ipaddr) else: self.warning( f"The domain, {domain}, has redirected outside of " f"context to {test_domain} on {ipaddr}. Cannot " "properly validate this domain." ) return [200, ''] return [resp.status_code, resp.text] except AttributeError: self.error( f"Validation test for {domain} on {ipaddr} failed.", note=True ) return [0, ''] def make_request(self, domain, ipaddr): """Makes a request to the IP address for the provided domain. Args: domain (str): domain to check ipaddr (str): IP address of host Returns: requests.Response: what is returned from the request """ try: resp = requests.get( f"http://{ipaddr}/", verify=False, headers={'Host': domain, 'Cache-Control': 'no-cache'}, allow_redirects=False, timeout=20, ) return resp except requests.exceptions.RequestException: self.warning(f"Failed to connect to {domain} on {ipaddr}.") return None def uapi_call(self, command, args=None): """Makes a UAPI call for this user.""" if not args: args = {} data = uapi(command, self.name, args) result = data.get('result') if result: success = bool(result.get('status')) if success: return True, result.get('data') return False, ', '.join(result.get('errors')) self.error( f"UAPI call failed. Command: {command}\n" f"Args: {args}\nResponse: {result}" ) return False, None def capture_state(self): """Captures the state of the of the User class for journal.""" return { 'name': self.name, 'ipaddr': self.ipaddr, 'pkgacct_id': self.pkgacct_id, 'pkgacct_pid': self.pkgacct_pid, 'restore_id': self.restore_id, 'status': self.status.name, 'notes': self.notes, 'domains': [domain.__dict__ for domain in self.domains], } def load_state(self, loadstate): """Loads everything needed from the journal for the User class.""" self.ipaddr = loadstate.get('ipaddr') self.pkgacct_id = loadstate.get('pkgacct_id') self.pkgacct_pid = loadstate.get('pkgacct_pid') self.restore_id = loadstate.get('restore_id') self.status = Status[loadstate.get('status')] self.notes = loadstate.get('notes') for domain in loadstate.get('domains'): self.domains.append(Domain(**domain)) def __repr__(self): return f"<User {self.name} {self.status.name}>"
| ver. 1.1 | |
.
| PHP 8.3.30 | Ð“ÐµÐ½ÐµÑ€Ð°Ñ†Ð¸Ñ Ñтраницы: 0 |
proxy
|
phpinfo
|
ÐаÑтройка