Файловый менеджер - Редактировать - /opt/cpmigrate/environments/firewall.py
Ðазад
"""Firewall Environment module""" import os import shutil import subprocess from enum import Enum from environments.base import Environment class FirewallType(Enum): """It's the type of firewall.""" NONE = 0 APF = 1 CSF = 2 class Firewall(Environment): """ Checks the origin server for APF or CSF. If CSF is detected on the origin server, APF will be removed and CSF will be installed automatically on this server. """ def __init__(self): Environment.__init__(self) self.firewall_type = FirewallType.NONE self.local_firewall_type = FirewallType.NONE self.apf_package = 'apf-ded' self.csf_package = 'csf-ded' self.apf_allow = '/etc/apf/allow_hosts.rules' self.csf_allow = '/etc/csf/csf.allow' self.check_ips = ['144.208.77.66'] def check(self, _): self.check_origin_firewall() self.check_local_firewall() self.check_should_install() if self.firewall_type == FirewallType.CSF: self.actions.append( "+ Copy csf.allow from origin server to target server." ) elif self.firewall_type == FirewallType.APF: self.actions.append( "+ Copy allow_hosts.rules from origin server to target server." ) def run(self, _): if self.xfer.server_type == "ded": self.check_ips.append('198.46.90.10') if self.firewall_type == FirewallType.CSF: if self.local_firewall_type == FirewallType.APF: self.uninstall_apf() self.install_csf() self.copy_csf_allow() elif self.firewall_type == FirewallType.APF: self.copy_apf_allow() else: self.warning( "Origin does not have CSF or APF installed.", note=True ) def check_origin_firewall(self): """Obtains which firewall is installed on the origin server.""" ret_code, out = self.xfer.origin_command( f"/bin/rpm -qa {self.apf_package} {self.csf_package}", command_out=subprocess.PIPE, sleep=2, quiet=True, ) if ret_code == 0: for line in out: if self.apf_package in line: self.firewall_type = FirewallType.APF self.info("Origin has APF installed.") return if self.csf_package in line: self.firewall_type = FirewallType.CSF self.info("Origin has CSF installed.") return else: self.error("Failed to get RPM information for origin.") def check_local_firewall(self): """Finds which firewall is installed on the target server.""" ret_code, out = self.xfer.local_command( [ '/bin/rpm', '-qa', self.apf_package, self.csf_package, ], command_out=subprocess.PIPE, sleep=2, quiet=True, ) if ret_code == 0: for line in out: if self.apf_package in line: self.local_firewall_type = FirewallType.APF self.info("This server has APF installed.") return if self.csf_package in line: self.local_firewall_type = FirewallType.CSF self.info("This server has CSF installed.") return else: self.error("Failed to get RPM information for origin.") def check_should_install(self): """Checks to see if we should install CSF and if we can.""" if self.firewall_type == FirewallType.CSF: if self.local_firewall_type == FirewallType.CSF: self.info("Skipping CSF install, appears already installed.") else: self.actions.append("- Remove APF.") self.actions.append("+ Install CSF.") def uninstall_apf(self): """Handles APF entire uninstallation process.""" self.stop_apf() self.del_apf_chkconfig() self.cleanup_apf() self.remove_apf_packages() def install_csf(self): """Handles CSF entire installation process.""" success = self.install_csf_package() if success: self.restart_csf() self.ccsrc_csf() self.restart_csf() def stop_apf(self): """Stops APF from running on target server.""" self.info("Stopping APF.") ret_code, _ = self.xfer.local_command( ['/usr/sbin/service', 'apf', 'stop'] ) if ret_code == 0: self.info("APF stopped successfully.") else: self.error("Failed to stop APF.") def del_apf_chkconfig(self): """Removes APF from chkconfig on target server.""" ret_code, _ = self.xfer.local_command( ['/sbin/chkconfig', '--del', 'apf'] ) if ret_code == 0: self.info("Removed APF from chkconfig.") else: self.error("Failed to remove APF from chkconfig.") def cleanup_apf(self): """Removes extraneous APF files.""" if os.path.exists('/etc/apf'): os.rename('/etc/apf', '/etc/apf.pre_csf') if os.path.exists('/etc/init.d/apf'): os.remove('/etc/init.d/apf') if os.path.exists('/usr/local/sbin/apf'): os.remove('/usr/local/sbin/apf') if os.path.exists('/etc/cron.d/apf-fastload'): os.remove('/etc/cron.d/apf-fastload') if os.path.exists('/usr/local/cpanel/whostmgr/cgi/addon_add2apf.cgi'): os.remove('/usr/local/cpanel/whostmgr/cgi/addon_add2apf.cgi') if os.path.exists('/usr/local/cpanel/whostmgr/cgi/apfadd/'): shutil.rmtree('/usr/local/cpanel/whostmgr/cgi/apfadd/') self.info("Removed extraneous APF files.") def remove_apf_packages(self): """Removes extraneous APF packages including APF itself.""" self.info("Removing APF package and whm-addip.") ret_code, _ = self.xfer.local_command( ['/usr/bin/yum', 'remove', '-y', 'apf-ded', 'whm-addip'] ) if ret_code == 0: self.info("Successfully removed APF package and whm-addip.") else: self.error("Failed to remove APF package and whm-addip.") def install_csf_package(self): """Installs the csf-ded package.""" self.info("Installing CSF package.") ret_code, _ = self.xfer.local_command( ['/usr/bin/yum', 'install', '-y', 'csf-ded'] ) if ret_code == 0: self.info("Installed CSF successfully.") return True self.error("Failed to install CSF.") return False def ccsrc_csf(self): """Switches CC_SRC to 2 instead of 1 for csf.conf.""" self.info("Switching CC_SRC to 2 instead of 1.") ret_code, _ = self.xfer.local_command( [ '/usr/bin/sed', '-i', r's/CC_SRC = "1"/CC_SRC = "2"/g', '/etc/csf/csf.conf', ], sleep=1, quiet=True, ) if ret_code == 0: self.info("Successfully changed CC_SRC to 2.") else: self.warning("Failed to set CC_SRC to 2.") def restart_csf(self): """Restarts CSF.""" self.info("Restarting CSF.") ret_code, _ = self.xfer.local_command(['/usr/sbin/csf', '-r'], sleep=1) if ret_code == 0: self.info("Started CSF successfully.") return True self.error("Failed to start CSF.") return False def restart_apf(self): """Restarts APF.""" self.info("Restarting APF.") ret_code, _ = self.xfer.local_command( ['/usr/local/sbin/apf', '-r'], sleep=1 ) if ret_code == 0: self.info("Started APF successfully.") return True self.error("Failed to start APF.") return False def copy_csf_allow(self): """Copies the CSF rules from the origin to this server. If it detects an issue, it will revert it. """ if os.path.exists(self.csf_allow): shutil.copy(self.csf_allow, f"{self.csf_allow}.bak") rsync_success = self.xfer.do_rsync( origin=f"{self.xfer.origin_server}:{self.csf_allow}", destination=self.csf_allow, name="mv_csf_allow", ) if rsync_success: restart_success = self.restart_csf() if restart_success: js_whitelisted = self.verify_iptables() if js_whitelisted: self.info( "Successfully copied over CSF rules without errors." ) else: self.warning( "Failed to copy over CSF rules, JS not in iptables." ) self.revert_csf_allow() else: self.warning( "Failed to copy over CSF rules, CSF did not restart." ) self.revert_csf_allow() else: self.warning( "Failed to copied over CSF rules from origin.", note=True ) else: self.error( "CSF allow file does not exist, despite CSF installed?", note=True, ) def revert_csf_allow(self): """Reverts the CSF rules back to what they were previously.""" self.warning("Reverting CSF rules back to fresh rules.") revert_path = f"{self.csf_allow}.bak" broken_path = f"{self.csf_allow}.broken" if os.path.exists(revert_path): shutil.copy(self.csf_allow, broken_path) os.remove(self.csf_allow) shutil.move(revert_path, self.csf_allow) restart_success = self.restart_csf() if restart_success: self.warning( "Successfully restarted CSF but failed to copy over " f"rules. Broken rules can be found here: {broken_path}", note=True, ) else: self.error("Failed to restart CSF.") else: self.error("Backup of fresh CSF rules are missing.") def copy_apf_allow(self): """Copies the APF rules from the origin to this server. If it detects an issue, it will revert it. """ if os.path.exists(self.apf_allow): shutil.copy(self.apf_allow, f"{self.apf_allow}.bak") rsync_success = self.xfer.do_rsync( origin=f"{self.xfer.origin_server}:{self.apf_allow}", destination=self.apf_allow, name="mv_apf_allow", ) if rsync_success: restart_success = self.restart_apf() if restart_success: required_ips_check = self.verify_iptables() if required_ips_check: self.info( "Successfully copied over APF rules without errors." ) else: self.warning( "Failed to copy over APF rules, missing rules." ) self.revert_apf_allow() else: self.warning( "Failed to copy over APF rules, APF did not restart." ) self.revert_apf_allow() else: self.warning( "Failed to copied over APF rules from origin.", note=True ) else: self.error( "APF allow file does not exist, despite APF installed?", note=True, ) def revert_apf_allow(self): """Reverts the APF rules back to what they were previously.""" self.warning("Reverting APF rules back to fresh rules.") revert_path = f"{self.apf_allow}.bak" broken_path = f"{self.apf_allow}.broken" if os.path.exists(revert_path): shutil.copy(self.apf_allow, broken_path) os.remove(self.apf_allow) shutil.move(revert_path, self.apf_allow) restart_success = self.restart_apf() if restart_success: self.warning( "Successfully restarted APF but failed to copy over " f"rules. Broken rules can be found here: {broken_path}", note=True, ) else: self.error("Failed to restart APF.") else: self.error("Backup of fresh APF rules are missing.") def verify_iptables(self): """Returns if the JS IP Address is in iptables.""" ret_code, out = self.xfer.local_command( ['/sbin/iptables-save'], command_out=subprocess.PIPE, sleep=1, quiet=True, ) require_ips = self.check_ips.copy() if ret_code == 0: for line in out: for ip in self.check_ips: if ip in line and ip in require_ips: require_ips.remove(ip) else: self.error("Failed to run iptables-save.") return False if require_ips: self.error( f"IPTables missing required IPs after migration: {require_ips}" ) return False return True def capture_state(self): state = { 'firewall_type': self.firewall_type.name, 'local_firewall_type': self.local_firewall_type.name, } return super().capture_state(state) def load_state(self, loadstate): self.firewall_type = FirewallType[loadstate.get('firewall_type')] self.local_firewall_type = FirewallType[ loadstate.get('local_firewall_type') ] super().load_state(loadstate)
| ver. 1.1 | |
.
| PHP 8.3.30 | Ð“ÐµÐ½ÐµÑ€Ð°Ñ†Ð¸Ñ Ñтраницы: 0 |
proxy
|
phpinfo
|
ÐаÑтройка