Server : LiteSpeed System : Linux server321.web-hosting.com 4.18.0-513.18.1.lve.el8.x86_64 #1 SMP Thu Feb 22 12:55:50 UTC 2024 x86_64 User : apotdzgr ( 7060) PHP Version : 8.0.30 Disable Function : NONE Directory : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/ |
Upload File : |
#!/opt/imunify360/venv/bin/python3 """This module import peewee_migrate and apply migrations, for Imunify-AV it's entrypoint for service""" import contextlib import os import sys import signal import threading import time from collections.abc import Iterable from logging import getLogger from peewee_migrate import migrator from playhouse.sqlite_ext import SqliteExtDatabase import defence360agent.internals.logger from defence360agent.application import app from defence360agent.application.settings import configure from defence360agent.contracts.config import Core from defence360agent.contracts.config import Model from defence360agent.router import Router from defence360agent.subsys import systemd_notifier from defence360agent.model.instance import db as db_instance from defence360agent.model import tls_check from defence360agent.utils import ( write_pid_file, IM360_RESIDENT_PID_PATH, cleanup_pid_file, ) from defence360agent.utils.check_db import ( recreate_schema_models, ) logger = getLogger(__name__) GO_SERVICE_NAME = "/usr/bin/imunify-resident" @contextlib.contextmanager def exc_handler(log_msg: str, reraise: bool): """ Logs error in case of exception. Depending on `reraise`: - re-raise exception and don't include exception info in the log operation - do not re-raise exception and include exception info in the log operation """ try: yield except Exception: logger.error(log_msg, exc_info=not reraise) if reraise: raise def apply_migrations(db: SqliteExtDatabase, migrations_dirs: Iterable[str]): """Apply migrations: restructure db, config files, etc.""" router = Router( db, migrations_dirs=migrations_dirs, logger=logger, ) # HACK: Migrator uses global unconfigurable LOGGER, # overrride it, to use our logging settings migrator.LOGGER = logger router.run() def prepare_databases( migrations_dirs: Iterable[str], attached_dbs: tuple[tuple[str, str], ...] = tuple(), ): """ Apply migrations and recreate attached databases. The workflow: 1. Apply migrations 2. Regardless whether the migrations were applied - recreate attached databases 3. If the recreation of the attached databases was successful - apply migrations again - this is done to verify that migrations will successfully apply in future for the recreated databases - the recreation + the migrations in this step are within the same transaction, so databases will only be recreated if the migrations can applied after the recreation. """ # prepare database to operate in WAL journal_mode and run migrations tls_check.reset() db_instance.init(Model.PATH) attached_schemas = [] for db_path, schema_name in attached_dbs: db_instance.execute_sql("ATTACH ? AS ?", (db_path, schema_name)) attached_schemas.append(schema_name) try: logger.info("Applying database migrations...") systemd_notifier.notify(systemd_notifier.AgentState.MIGRATING) with db_instance.atomic("EXCLUSIVE"), exc_handler( "Error applying migrations", reraise=False ): apply_migrations(db_instance, migrations_dirs) logger.info("Recreating attached databases...") with db_instance.atomic("EXCLUSIVE"), exc_handler( "Error recreating attached databases", reraise=True ): # Migration history is stored in main db, so to automatically recreate # attached dbs it is required to recreate schema for them from models recreate_schema_models(db_instance, attached_schemas) # verify migrations can be applied after the attached dbs recreation with exc_handler( "Error applying migrations after recreating attached" " databases", reraise=True, ): apply_migrations(db_instance, migrations_dirs) finally: # close connection immediately since later this process # will be replaced by execv db_instance.close() # required in case package manager or user sends signals while migrations are still running def signal_handler(sig, _): logger.warning("Received signal %s in signal_handler", sig) logger.warning( "waiting %d seconds so that migrations can finish", Core.SIGNAL_HANDLER_MIGRATION_TIMEOUT_SECS, ) time.sleep(Core.SIGNAL_HANDLER_MIGRATION_TIMEOUT_SECS) logger.info("Exiting") sys.exit(0) def run(*, start_pkg="defence360agent", configure=configure): """Entry point for Imunify-AV service. Apply migrations, and then replace process with {start_pkg}.run module.""" for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP): signal.signal(sig, signal_handler) try: if start_pkg == "im360.run_resident": write_pid_file(IM360_RESIDENT_PID_PATH) os.umask(Core.FILE_UMASK) configure() defence360agent.internals.logger.reconfigure() migration_thread = threading.Thread( target=prepare_databases, args=(app.MIGRATIONS_DIRS, app.MIGRATIONS_ATTACHED_DBS), ) migration_thread.start() migration_thread.join() systemd_notifier.notify(systemd_notifier.AgentState.READY) logger.info("Starting main process...") systemd_notifier.notify(systemd_notifier.AgentState.STARTING) if start_pkg == "im360.run_resident": Core.GO_FLAG_FILE.touch(exist_ok=True) logger.info("Run imunify-resident service") os.execv( GO_SERVICE_NAME, [ GO_SERVICE_NAME, ] + sys.argv[1:], ) else: os.execv( sys.executable, [sys.executable, "-m", "{}".format(start_pkg)] + sys.argv[1:], ) except Exception: if start_pkg == "im360.run_resident": cleanup_pid_file(IM360_RESIDENT_PID_PATH) if __name__ == "__main__": run()