Server : LiteSpeed System : Linux server321.web-hosting.com 4.18.0-513.18.1.lve.el8.x86_64 #1 SMP Thu Feb 22 12:55:50 UTC 2024 x86_64 User : apotdzgr ( 7060) PHP Version : 8.0.30 Disable Function : NONE Directory : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/ |
Upload File : |
import logging import itertools import os from contextlib import suppress from datetime import datetime from shutil import copy from sqlite3 import connect, DatabaseError from playhouse.sqlite_ext import SqliteExtDatabase from defence360agent.application import app from defence360agent import simple_rpc from defence360agent.contracts.config import Model from defence360agent.model import simplification logger = logging.getLogger(__name__) class OperationError(Exception): pass WORKAROUND_MSG = "Blank database will be created on agent start " def check_and_repair(): base = Model.PATH if simple_rpc.is_running(): raise OperationError( "Cannot perform database check and backup while agent is running. " "Please, stop the imunify360 agent with `service imunify360 stop`" ) elif not os.path.isfile(base): raise OperationError( "DB %s is not exists. %s" % (base, WORKAROUND_MSG) ) else: if is_db_corrupted(base): backup = make_backup(base) if not backup: raise OperationError( "Cannot proceed without backup copy of the database." "Please contact imunify360 support team at " "https://cloudlinux.zendesk.com" ) dump = dump_to_sql(base) logger.info("Removing original corrupted database at %s" % base) # TODO: Notify user in UI that original DB was dropped os.remove(base) if not dump: raise OperationError( "Cannot dump database to sql. Old DB backuped at %s. %s" % (backup, WORKAROUND_MSG) ) else: restored = load_from_sql(base, dump) if not restored: raise OperationError( "Loading dump to new database failed. Database will " "be recreated during migrations." ) if is_db_corrupted(restored): os.remove(restored) raise OperationError( "Restored database is still corrupt. Removing " "restored database. %s" % WORKAROUND_MSG ) logger.info( "Database restored successfully. Removing dump %s" % dump ) os.remove(dump) try: logger.info("Performing migrations on restored database") simplification.migrate() except Exception as e: os.remove(base) raise OperationError( "Migrations on restored database failed: %s. %s" % (e, WORKAROUND_MSG) ) else: if not all_tables_are_present(): os.remove(base) raise OperationError( "Restored database does not " "contain all necessary tables. " "%s" % WORKAROUND_MSG ) def mark_with_timestamp(filename, extension=None): """ >>> mark_with_timestamp('/var/imunify360/imunify360.db') '/var/imunify360/imunify360.db_2017-09-26_03:33:44.705967' >>> mark_with_timestamp('/var/imunify360/imunify360.db', extension='sql') '/var/imunify360/imunify360.db_2017-09-26_03:34:01.098544.sql' """ instant = datetime.now() basename = "{}_{}".format(filename, instant.isoformat("_")) if extension: return basename + ".%s" % extension else: return basename def is_db_corrupted(db_path): logger.info("Database %s integrity check..." % db_path) is_corrupted = True with connect(db_path) as connection: try: cursor = connection.execute("PRAGMA INTEGRITY_CHECK;") result = next(cursor) if "ok" in result: logger.info("Database integrity check succeeded.") is_corrupted = False except DatabaseError as e: logger.warning("DatabaseError detected: %s", e) return is_corrupted def dump_to_sql(db_path): dumpfile = mark_with_timestamp(db_path, extension="sql") logger.info("Dumping imunify360 database to %s" % dumpfile) try: with open(dumpfile, "w") as dump, connect(db_path) as connection: for row in connection.iterdump(): dump.write(row) except (DatabaseError, OSError) as e: logger.error("Error during dump: %s. Operation aborted" % e) with suppress(OSError): os.remove(dumpfile) dumpfile = None return dumpfile def load_from_sql(db_path, dumpfile): # This is unlikely to happen because we delete the original file # but to be defensive here won't hurt in case of reuse in other places. if os.path.exists(db_path): logger.warning( "Database already exists. Loading dump to existing " "database may cause errors. Operation aborted" ) return None logger.info( "Reading dump %s into new database %s..." % (dumpfile, db_path) ) with open(dumpfile, "r") as dump, connect(db_path) as connection: # We cannot read line by line because SQL statements are dumped # not in a statement-per-line way try: sql = dump.read() connection.executescript(sql) except MemoryError as e: logger.error(e) with suppress(OSError): os.remove(db_path) db_path = None return db_path def make_backup(db_path): logger.info("Making backup of the %s..." % db_path) backup_filename = mark_with_timestamp(db_path, "backup") try: copy(db_path, backup_filename) logger.info("Database copied successfully to: %s " % backup_filename) except Exception as e: logger.error("Making backup failed: %s", e) with suppress(OSError): os.remove(backup_filename) backup_filename = None return backup_filename def all_tables_are_present(): logger.info( "Verifying that db schema is up-to-date and all tables are present..." ) models = itertools.chain( *[ simplification.get_models(module) for module in app.MODULES_WITH_MODELS ] ) if all(model.table_exists() for model in models): logger.info("All tables are present") return True else: logger.error("Some tables are missing in db.") return False def recreate_schema() -> None: simplification.instance.db.init(Model.PATH) logger.info("Recreating schema for linked DBs...") attached_schemas = [] for db_path, schema in app.MIGRATIONS_ATTACHED_DBS: logger.info("Attach db: %s", db_path) simplification.instance.db.execute_sql( "ATTACH ? AS ?", (db_path, schema) ) attached_schemas.append(schema) recreate_schema_models(simplification.instance.db, attached_schemas) logger.info("Schema recreated successfully.") def recreate_schema_models( db: SqliteExtDatabase, target_schemas: list[str] ) -> None: models_to_create = [ model for model in itertools.chain( *[ simplification.get_models(module) for module in app.MODULES_WITH_MODELS ] ) if model._meta.schema in target_schemas ] logger.info("%r", models_to_create) # bind models to the db to avoid issues related to initialization order db.bind(models_to_create) db.create_tables(models_to_create) logger.info("Schema models recreated successfully.")