Файловый менеджер - Редактировать - /opt/cpmigrate/transfer.py
Ðазад
""" Handles everything Transfer related. """ import json import logging import os import random import socket import string import subprocess import sys import time import atexit from datetime import datetime from dataclasses import dataclass from enum import Enum from queue import PriorityQueue from subprocess import DEVNULL import requests import click from cpapis import whmapi1 from colorlogger import ColorFormatter from environments.base import APIFailure, FatalMigrationFailure from journal import Journal, save_state from user import User, Status, Domain @dataclass class SSHKey: """Holds generated SSH key information.""" name: str path: str key: str validated: bool = False removed: bool = False class TransferStage(Enum): """Stages of the Transfer""" NOT_STARTED = 0 ENVIRONMENT = 1 USERS_PKG = 2 USERS_RSYNC = 3 FINAL_STEPS = 4 COMPLETED = 5 class Transfer: """Primary class for handling all things Transfer related.""" def __init__(self, args, envs): self.migration_id = datetime.now().strftime("migration-%d%m%Y%H%M%S") self.my_ipaddr = self.get_ipaddr() self.environments = envs self.args = args self.origin_server = args.get('host') self.origin_port = str(args.get('port')) self.skip_envs = args.get('skipenv') self.bwlimit = args.get('bwlimit') self.debug = args.get('debug', False) self.log_path = os.path.join(args.get('logpath'), self.migration_id) self.ssh_key = None self.access_hash = None self.ea_version = None self.cpanel_version = None self.server_type = None self.checked_env = False self.approved_env = False self.users = [] self.notes = [] self.stage = TransferStage.NOT_STARTED self.journal = Journal(self) atexit.register(self.clean_up) def check_server_type(self): """Check if vps or ded""" if os.path.exists('/proc/vz/veinfo'): logging.info("Server has been detected as a VPS.") self.server_type = "vps" else: logging.info("Server has been detected as non-VPS.") self.server_type = "ded" def print_progress(self, msg='.', done=False): """Prints progress without newlines.""" if done: print(msg, flush=True) else: print(msg, end='', flush=True) def setup_logging(self): """Sets up logging.""" print(f"Log directory path: {self.log_path}") if not os.path.exists(self.log_path): os.makedirs(self.log_path) logging_level = logging.DEBUG if self.debug else logging.INFO main_logger = logging.getLogger() main_logger.propagate = False main_logger.setLevel(logging_level) if main_logger.hasHandlers(): main_logger.handlers.clear() console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(ColorFormatter()) console_handler.setLevel(logging_level) main_logger.addHandler(console_handler) log_format = logging.Formatter( "%(asctime)s: [%(levelname)s] %(message)s" ) log_handler = logging.FileHandler( os.path.join(self.log_path, 'master.log') ) log_handler.setFormatter(log_format) log_handler.setLevel(logging_level) main_logger.addHandler(log_handler) def setup(self): """Sets up the SSH key and confirms it works.""" self.check_local_cpanel_license() self.generate_sshkey() self.test_sshkey() if not self.ssh_key: raise FatalMigrationFailure("No SSH key setup.") if not self.access_hash: raise FatalMigrationFailure("No access_hash found/loaded.") if not self.ssh_key.validated: raise FatalMigrationFailure( "SSH key test failed. Exiting. Did you whitelist this " f"server's IP address on {self.origin_server}?" ) self.check_origin_cpanel_license() self.check_cpanel_version() self.check_ea4() self.check_server_type() def set_stage(self, stage): """Sets every user to the same stage, if we are overriding it""" status = Status.NOT_STARTED if stage == 'pkg_xfer': status = Status.FINISHED_PKG self.stage = TransferStage.USERS_PKG elif stage == 'pkg_restore': status = Status.FINISHED_PKG_XFER self.stage = TransferStage.USERS_PKG elif stage == 'rsync_home': status = Status.FINISHED_PKG_RESTORE self.stage = TransferStage.USERS_RSYNC elif stage == 'acct_verify': status = Status.FINISHED_RSYNC self.stage = TransferStage.FINAL_STEPS if status != Status.NOT_STARTED: logging.info("Setting status of all users to %s.", status) for user in self.users: user.status = status def check_cpanel_version(self): """Checks the origin server's cPanel version""" logging.debug("Checking origin cPanel version.") ret_code, out = self.origin_command( "/bin/cat /usr/local/cpanel/version", command_out=subprocess.PIPE, sleep=1, quiet=True, ) if ret_code == 0: version = out[0].split('.')[1] self.cpanel_version = int(version) if self.cpanel_version > 70: logging.info("Origin server is using cPanel v%s.", version) else: logging.warning("Origin server is using cPanel v%s.", version) elif ret_code == 127: raise FatalMigrationFailure( "Failed to obtain cPanel version. No /bin/cat binary." ) else: raise FatalMigrationFailure("Failed to obtain cPanel version.") def check_ea4(self): """Checks if the server is using EasyApache 4""" logging.debug("Checking origin EasyApache version.") ret_code, _ = self.origin_command( "/usr/bin/test -d /etc/cpanel/ea4", sleep=1, quiet=True ) if ret_code == 0: logging.info("Origin server is using EasyApache 4.") self.ea_version = 4 else: logging.warning("Origin server is using EasyApache 3.") self.ea_version = 3 def get_user(self, name): """Returns User() object for the name given.""" for user in self.users: if user.name == name: return user return None def get_ipaddr(self): """Returns the hostname IP address. Should be the public IP on our servers. """ return socket.gethostbyname(socket.gethostname()) @save_state def validate_transfers(self): """Performs validation checks for every user.""" for user in self.users: user.test_sites() @save_state def rsync_accounts(self): """Performs rsync phase for every user.""" for user in self.users: user.rsync_homedir() @save_state def pkg_accounts(self): """Performs package account phase for every user""" for user in self.users: user.start_pkgacct() user.check_pkgacct() user.transfer_pkgacct() user.restore_pkgacct() user.check_restorepkg() logging.info("Packaging phase has completed.") @save_state def load_users(self, all_users=True, user_list=None): """Loads users provided and confirms they exist.""" if not user_list: user_list = [] data = self.origin_whmapi_call('list_users') remote_users = data.get('users', ['root']) remote_users.remove('root') if not all_users: for user in user_list: if user in remote_users: self.users.append(User(user, self)) else: raise FatalMigrationFailure( f"The user {user} was not found on the origin server." ) else: if remote_users: self.users = [User(user, self) for user in remote_users] if len(self.users) == 0: raise FatalMigrationFailure("No users to transfer, exiting.") @save_state def load_domains(self): """Loads all domains for the users we are migrating.""" if self.cpanel_version > 70: resp = self.origin_whmapi_call('get_domain_info') domains = resp.get('domains') if domains: for domain in domains: domain_user = domain.get('user') domain_name = domain.get('domain') user = self.get_user(domain_user) if user and '*' not in domain_name: user.domains.append(Domain(domain_name)) else: for user in self.users: user.legacy_load_domains() @save_state def load_ipaddrs(self): """Loads the IP addresses for all of the users.""" accts = self.origin_whmapi_call('listaccts') for acct in accts.get('acct'): name = acct.get('user') ip_addr = acct.get('ip') user = self.get_user(name) if user: user.ipaddr = ip_addr def check_local_cpanel_license(self): """Checks the local cPanel license to ensure it is valid. The script will wait 60 seconds before trying again. """ logging.info("Checking local cPanel license.") valid = False try: while not valid: ret_code, _ = self.local_command( ['/usr/local/cpanel/cpkeyclt'], sleep=1, quiet=True ) if ret_code == 0: logging.info("Confirmed local cPanel license is valid.") valid = True else: logging.error( "Local cPanel license is invalid. Please check to " "ensure a license has been added for " "%s. Waiting 60 seconds to check again.", self.my_ipaddr, ) time.sleep(60) except KeyboardInterrupt as exc: raise FatalMigrationFailure( "Local cPanel license is invalid. Cannot Continue." ) from exc def check_origin_cpanel_license(self): """Checks the origin cPanel license to ensure it is valid. Script will exit if this fails. """ logging.info("Checking origin cPanel license.") ret_code, _ = self.origin_command( "/usr/local/cpanel/cpkeyclt", sleep=1, quiet=True ) if ret_code == 0: logging.info("Confirmed origin cPanel license is valid.") else: raise FatalMigrationFailure( "Origin cPanel license is invalid. Cannot Continue." ) def whmapi_success(self, data, command, args, suppress_errors=False): """Checks whmapi1 result for success, returns data""" metadata = data.get('metadata') if metadata: result = metadata.get('result') if result != 1: reason = metadata.get('reason') raise APIFailure( f"WHMAPI1 call failed: {reason}\nCommand: {command}\n" f"Args: {args}\nResponse: {metadata}", suppress_errors=suppress_errors, ) return data.get('data') cpanelresult = data.get('cpanelresult') if cpanelresult: result = bool(cpanelresult.get('event').get('result')) if result: return cpanelresult.get('data') raise APIFailure( f"WHMAPI1 call failed, command: {command}\n" f"Args: {args}\nResponse: {cpanelresult}", suppress_errors=suppress_errors, ) result = data.get('result') if result: status = result.get('status') if status != 1: reason = result.get('errors') raise APIFailure( f"WHMAPI+UAPI call failed: {reason}\n" f"Command: {command}\nArgs:{args}\nFull:{data}" ) return result.get('data') return None def origin_cpapi2_call(self, user, module, func, args=None): """Makes a cpapi2 call on the origin server using whmapi.""" if not args: args = {} args['cpanel_jsonapi_user'] = user args['cpanel_jsonapi_apiversion'] = '2' args['cpanel_jsonapi_module'] = module args['cpanel_jsonapi_func'] = func return self.origin_whmapi_call('cpanel', args) def origin_uapi_call(self, user, module, func, args=None): """Makes a cpapi2 call on the origin server using whmapi.""" if not args: args = {} args['cpanel_jsonapi_user'] = user args['cpanel_jsonapi_apiversion'] = '3' args['cpanel_jsonapi_module'] = module args['cpanel_jsonapi_func'] = func return self.origin_whmapi_call('cpanel', args) def origin_whmapi_call(self, command, args=None, suppress_errors=False): """Performs a WHMAPI1 call on the origin server and returns its data if successful. Requires accesshash which is obtained automatically. """ if not args: args = {} data = None try: args['api.version'] = '1' data = requests.post( f"https://{self.origin_server}:2087/json-api/{command}", data=args, headers={'Authorization': f'WHM root:{self.access_hash}'}, timeout=120, verify=False, ) logging.debug( 'Command: %s Args: %s Output: %s', command, args, data.content ) return self.whmapi_success( data.json(), command, args, suppress_errors ) except (ValueError, TypeError) as err: if 'Access denied' in data.text: raise FatalMigrationFailure( "Failed to make remote WHMAPI call, invalid accesshash." ) from err if data: raise APIFailure( f"Failed to make remote WHMAPI call: {err} | Output: " f"{data.content}", suppress_errors, ) from err raise APIFailure( f"Failed to make remote WHMAPI call: {err}", suppress_errors, ) from err def whmapi_call(self, command, args=None, suppress_errors=False): """Performs a WHMAPI1 call and returns its data if successful""" if not args: args = {} data = whmapi1(command, args) return self.whmapi_success(data, command, args, suppress_errors) def generate_string(self, length): """Generates a random alnum string with the given length""" letnum = string.ascii_letters + string.digits return ''.join(random.choice(letnum) for i in range(length)) def generate_sshkey(self): """Generates an SSH key to use for the migration. Provides a oneliner to install the SSH key and also whitelist the IP address. """ key_name = 'automigration-' + self.generate_string(12) self.whmapi_call( 'generatesshkeypair', { 'name': key_name, 'passphrase': '', 'abort_on_existing_key': '0', 'comment': "cpmigrate", }, ) key_path = f'/root/.ssh/{key_name}' key_code = '' with open(key_path, encoding="utf-8") as out: key_code = out.read() self.ssh_key = SSHKey(key_name, key_path, key_code) logging.info("Created SSH key %s.", key_name) click.echo( click.style( "Please paste the following on the origin server to " "install the key and whitelist this server's IP:\n", fg="magenta", bold=True, ), ) with open(f'{key_path}.pub', encoding="utf-8") as out: pubkey = ' '.join(out.read().split()) ipaddr = self.my_ipaddr print( "mkdir -p /root/.ssh/; " f"( read key; echo $key >> /root/.ssh/authorized_keys; " f"echo $key > /root/.ssh/{key_name}.pub; ) <<< '{pubkey}'; " f"csf -a {ipaddr}; apf -a {ipaddr};" ) input("\nHit enter when finished to continue migration.") @save_state def do_rsync( self, origin, destination, name, user=None, ): """Performs rsync with the provided origin and destination. Returns whether or not it was successful. """ rsync_log = os.path.join(self.log_path, f'{name}.log') command = [ '/usr/bin/rsync', '--bwlimit', str(self.bwlimit), '--compress', '--archive', '--hard-links', '--log-file', rsync_log, '-e', ( f'ssh -o StrictHostKeyChecking=no -o PasswordAuthentication=no' f' -i {self.ssh_key.path} -p {self.origin_port}' ), ] if self.args.get('delete'): command.append('--delete') if self.args.get('resync_homedirs') or self.args.get('resync_mail'): command.append('--ignore-existing') if isinstance(origin, list): command.extend(origin) else: command.append(origin) command.append(destination) logging.debug("RSYNC COMMAND: %s", command) logging.info("RSYNC %s has started. Log file: %s", name, rsync_log) try: ret_code = None with subprocess.Popen( command, stdout=DEVNULL, stderr=DEVNULL, ) as rsync: self.print_progress("Waiting.") while ret_code is None: ret_code = rsync.poll() self.print_progress() time.sleep(5) self.print_progress(done=True) if ret_code in [255, 30, 35]: raise FatalMigrationFailure( f"RSYNC {name} failed due to SSH connection issue." ) if ret_code == 20: logging.error( "RSYNC %s received interrupt (SIGINT) signal.", name ) elif ret_code in [23, 24]: logging.warning( "RSYNC %s reported partial transfer. Check rsync log.", name, ) if user: user.notes.append( f"RSYNC {name} reported partial transfer. " "Check log.", ) else: self.notes.append( f"RSYNC {name} reported partial transfer. " "Check log.", ) return True elif ret_code == 0: return True else: logging.warning( "RSYNC %s returned error code %s. Check log.", name, ret_code, ) except KeyboardInterrupt as keyboard_err: raise FatalMigrationFailure( f"RSYNC ({name}) received interrupt signal." ) from keyboard_err return False def local_command( self, command, sleep=5, command_out=DEVNULL, quiet=False, command_in=DEVNULL, ): """Runs a local command, returns code and Popen object""" try: ret_code = None out = None with subprocess.Popen( command, stdin=command_in, stdout=command_out, stderr=command_out, ) as cmd_run: if not quiet: self.print_progress("Waiting.") while ret_code is None: ret_code = cmd_run.poll() if not quiet: self.print_progress() time.sleep(sleep) if not quiet: self.print_progress(done=True) if command_out == subprocess.PIPE: out = str(cmd_run.stdout.read(), "utf-8").splitlines() return ret_code, out except KeyboardInterrupt: return -1, None def origin_command(self, cmd, sleep=5, command_out=DEVNULL, quiet=False): """Runs a remote command via SSH, returns the return code and output""" ssh_command = [ '/usr/bin/ssh', '-o', 'StrictHostKeyChecking=no', '-o', 'PasswordAuthentication=no', '-o', 'ConnectTimeout=30', '-i', self.ssh_key.path, '-p', self.origin_port, self.origin_server, cmd, ] try: ret_code = None out = None with subprocess.Popen( ssh_command, stdout=command_out, stderr=command_out ) as ssh: if not quiet: self.print_progress('Waiting.') while ret_code is None: ret_code = ssh.poll() if not quiet: self.print_progress() time.sleep(sleep) if not quiet: self.print_progress(done=True) if ret_code == 255: raise FatalMigrationFailure( f"origin_command {cmd} failed due to SSH failure or " "remote command failure." ) if command_out == subprocess.PIPE: out = str(ssh.stdout.read(), 'utf-8').splitlines() return ret_code, out except KeyboardInterrupt: return -1, None @save_state def mysql_repair(self): """Runs mysqlcheck with auto-repair on the remote server""" repair_log = os.path.join(self.log_path, 'mysql-repair.log') logging.info( "Starting MySQL database check+repair. Log: %s", repair_log ) with open(repair_log, 'w', encoding="utf-8") as out: ret_code, _ = self.origin_command( "/usr/bin/mysqlcheck --all-databases --auto-repair", command_out=out, ) if ret_code == 0: logging.info("Completed MySQL check+repair process.") elif ret_code == -1: raise FatalMigrationFailure( "MySQL repair ssh connection received interrupt signal. " "Cannot check status on remote server. Exiting." ) else: logging.error("Failed MySQL check+repair process. Continuing.") def test_sshkey(self): """Tests the SSH connection to ensure it works and obtains the remote accesshash """ logging.info("Testing SSH connection to %s.", self.origin_server) ret_code, out = self.origin_command( "/usr/local/cpanel/bin/whmapi1 accesshash --output=json", command_out=subprocess.PIPE, sleep=2, ) if ret_code == 0: self.load_accesshash(out) self.ssh_key.validated = True def load_accesshash(self, stdout): """Loads the accesshash from the provided output from test_sshkey""" try: if len(stdout) > 0: parsed = json.loads(stdout[0]) data = parsed.get('data') if data: self.access_hash = data.get('accesshash') return True except json.decoder.JSONDecodeError as json_error: raise FatalMigrationFailure( "Received JSONDecodeError from parsing accesshash." ) from json_error raise FatalMigrationFailure( "Failed to obtain accesshash from remote server." ) def remove_sshkey(self, key, remote=False): """Removes the SSH key from the provided destination""" if remote: self.origin_whmapi_call( 'deletesshkey', {'file': key, 'leave_authorized': '0'} ) logging.info("Removed key %s on %s.", key, self.origin_server) else: self.whmapi_call( 'deletesshkey', {'file': key, 'leave_authorized': '0'} ) logging.info("Removed key locally: %s", key) def check_origin_package(self, package): """Returns True if the given package is installed on the origin.""" ret_code, out = self.origin_command( f"/bin/rpm -qa {package}", command_out=subprocess.PIPE, sleep=2, quiet=True, ) if ret_code == 0: for line in out: if package in line: return True return False def check_target_package(self, package): """Returns True if the given package is installed on the target.""" ret_code, out = self.local_command( ["/bin/rpm", "-qa", package], command_out=subprocess.PIPE, sleep=2, quiet=True, ) if ret_code == 0: for line in out: if package in line: return True return False def restart_service(self, service): """Returns 0 if given service restarts, else returns error code.""" ret_code, _ = self.local_command( ['/usr/bin/systemctl', 'restart', service], sleep=1 ) return ret_code def service_status(self, service): """Returns 0 if given service is running, else returns error code.""" ret_code, _ = self.local_command( ['/usr/bin/systemctl', 'status', service], command_out=subprocess.PIPE, sleep=1, quiet=True, ) return ret_code def should_run_environment(self, env): """Checks if the Environment should be ran or not.""" if self.check_should_resync(): return bool(env.resync_relevant) if 'all' in self.skip_envs or env.name in self.skip_envs: return False return True @save_state def match_environment(self, check=False): """Checks origin for any config that need to be moved over.""" queue = PriorityQueue() for env in self.environments: if self.should_run_environment(env): queue.put((env.priority * -1, queue.qsize(), env)) else: env.skipped = True self.stage = TransferStage.ENVIRONMENT while not queue.empty(): next_env = queue.get()[2] if not next_env.completed: if check: next_env.check(self) else: next_env.run(self) else: logging.info( "Skipping Environment %s, already done.", next_env.name ) @save_state def clean_up(self): """Performs final clean, removes extraneous SSH keys and performs final report """ failed = False try: if self.ssh_key: self.remove_sshkey(self.ssh_key.name) self.remove_sshkey(f'{self.ssh_key.name}.pub') self.remove_sshkey(f'{self.ssh_key.name}.pub', remote=True) self.ssh_key.removed = True else: logging.info("No SSH key was generated, nothing to remove.") except Exception as err: logging.error("Failed to remove keys: %s", err) failed = True finally: if failed: logging.warning( "Due to failure, please double check both servers to " "ensure there are no left-over authorized keys." ) logging.info("Migration clean up completed.") if not self.check_should_resync(): logging.info("User status report:") for user in self.users: user.report() if len(self.notes) > 0: notes = '\n\n'.join(self.notes) logging.warning( "Important end of transfer notes:\n\n%s\n", notes ) click.echo(click.style("End of migration.", fg='green', bold=True)) @save_state def post_transfer(self): """Final post-migration steps, any Environment related tasks and also domain validation. """ for env in self.environments: if 'all' not in self.skip_envs: if not env.skipped: env.xfer = self env.post_transfer() self.validate_transfers() self.check_ipaddrs() @save_state def check_ipaddrs(self): """Checks all IP addresses on origin to find out what needs to be moved/assigned. """ resp = self.origin_whmapi_call('listips') ipaddrs = resp.get('ip') if ipaddrs: for ipinfo in ipaddrs: uip = ipinfo.get('ip') main = bool(ipinfo.get('mainaddr')) used = bool(ipinfo.get('used')) if not main: if used: users = [] for user in self.users: if user.ipaddr == uip: users.append(f"\n- {user.name}") if len(users) > 0: note = f"Origin has dedicated IP {uip} assigned to:" note += ''.join(users) else: note = ( f"Origin has dedicated IP {uip} assigned to a " "user not being migrated or is marked as used " "without being assigned to a user." ) self.notes.append(note) else: self.notes.append( f"Origin has dedicated IP {uip} which is not " "assigned to any accounts." ) def get_environment(self, env_name): """Returns the Environment class based on name provided.""" for env in self.environments: if env.name == env_name: return env raise KeyError(f"{env_name!r} not found in environments") @save_state def run_environment(self): """Responsible for checking and running the Environments.""" if self.approved_env: self.match_environment() else: if not self.checked_env: logging.info("Running environment checks on origin server.") self.match_environment(check=True) if 'all' not in self.skip_envs and not self.check_should_resync(): click.echo( click.style( "NOTICE: Please read the actions each environment check" " will perform and confirm that you wish to continue.", fg='magenta', bold=True, ) ) for env in self.environments: if len(env.actions) > 0: click.echo(click.style(env.name, fg='cyan', bold=True)) click.echo(env.__doc__) click.echo("Relevant information and action(s):") for action in env.actions: if action.startswith('!!'): click.echo( click.style(action, fg='red', bold=True) ) else: click.echo(click.style(action, fg='green')) click.echo() env.completed = False self.checked_env = True self.journal.save() if 'STY' in os.environ: click.echo( click.style( "Screen detected. Please ensure to read over all " "of the Environment actions above. You can use " "Ctrl+A followed by the '[' key to enter " "'copy mode' which will allow you to scroll up " "using the arrow keys. Press escape to exit " "copy mode.", fg='magenta', bold=True, ) ) if not self.args.get('noenvconfirm'): if click.confirm("Are you sure you want to continue?"): self.approved_env = True else: raise FatalMigrationFailure( "Did not accept environment changes." ) self.match_environment() def run_migration(self): """Primary function for running the migration.""" logging.info("Running migration, user list: %s", self.users) if self.stage.value <= TransferStage.ENVIRONMENT.value: self.run_environment() if not self.args.get('skiprepair'): self.mysql_repair() if self.check_should_resync(): self.run_resyncs() logging.info("Re-sync process is complete.") self.stage = TransferStage.COMPLETED return if self.stage.value <= TransferStage.USERS_PKG.value: self.pkg_accounts() if self.stage.value <= TransferStage.USERS_RSYNC.value: self.rsync_accounts() if self.stage.value <= TransferStage.FINAL_STEPS.value: self.post_transfer() logging.info("Migration is complete.") self.stage = TransferStage.COMPLETED def check_should_resync(self): """Check if a resync arg was set""" if self.args.get('resync_databases'): return True if self.args.get('resync_mail'): return True if self.args.get('resync_homedirs'): return True return False def run_resyncs(self): """Run re-syncs""" click.echo( click.style( "Running requested re-syncs. Everything else will be skipped.", fg='green', bold=True, ) ) if self.args.get('resync_databases'): mysql_env = self.get_environment('mysql') mysql_env.run_resync() if self.args.get('resync_homedirs'): self.rsync_accounts() if self.args.get('resync_mail'): for user in self.users: user.rsync_maildir() def capture_state(self): """Captures necessary information for journal.json""" return { 'users': [user.capture_state() for user in self.users], 'environments': [env.capture_state() for env in self.environments], 'log_path': self.log_path, 'migration_id': self.migration_id, 'notes': self.notes, 'stage': self.stage.name, 'args': self.args, 'ea_version': self.ea_version, 'cpanel_version': self.cpanel_version, 'approved_env': self.approved_env, 'checked_env': self.checked_env, } def load_state(self, loadstate): """Loads necessary information from journal.json and resumes the migration. """ self.log_path = loadstate.get('log_path') self.migration_id = loadstate.get('migration_id') self.notes = loadstate.get('notes') self.stage = TransferStage[loadstate.get('stage')] self.ea_version = loadstate.get('ea_version') self.cpanel_version = int(loadstate.get('cpanel_version')) self.journal = Journal(self) self.approved_env = bool(loadstate.get('approved_env')) self.checked_env = bool(loadstate.get('checked_env')) for usr in loadstate.get('users'): user = User(usr.get('name'), self) user.load_state(usr) self.users.append(user) for ein in loadstate.get('environments'): for env in self.environments: if env.name == ein.get('name'): env.load_state(ein) env.xfer = self self.setup_logging() if self.stage == TransferStage.COMPLETED: raise FatalMigrationFailure( "Cannot resume a transfer marked as completed." ) self.setup() logging.info("Resuming migration from provided journal.") self.run_migration()
| ver. 1.1 | |
.
| PHP 8.3.30 | Ð“ÐµÐ½ÐµÑ€Ð°Ñ†Ð¸Ñ Ñтраницы: 0 |
proxy
|
phpinfo
|
ÐаÑтройка