import logging import os import time logger = logging.getLogger(__name__) class LoggerNameFilter(logging.Filter): def filter(self, record): # return record.name.startswith("loom_core") or record.name in "ERROR" or ( # record.name.startswith("uvicorn.error") # and record.getMessage().startswith("Uvicorn running on") # ) return True def get_log_file(log_path: str, sub_dir: str): """ sub_dir should contain a timestamp. """ log_dir = os.path.join(log_path, sub_dir) # Here should be creating a new directory each time, so `exist_ok=False` os.makedirs(log_dir, exist_ok=False) return os.path.join(log_dir, "loom_core.log") def get_config_dict( log_level: str, log_file_path: str, log_backup_count: int, log_max_bytes: int ) -> dict: # for windows, the path should be a raw string. log_file_path = ( log_file_path.encode("unicode-escape").decode() if os.name == "nt" else log_file_path ) log_level = log_level.upper() config_dict = { "version": 1, "disable_existing_loggers": False, "formatters": { "formatter": { "format": ( "%(asctime)s %(name)-12s %(process)d %(levelname)-8s %(message)s" ) }, }, "filters": { "logger_name_filter": { "()": __name__ + ".LoggerNameFilter", }, }, "handlers": { "stream_handler": { "class": "logging.StreamHandler", "formatter": "formatter", "level": log_level, # "stream": "ext://sys.stdout", # "filters": ["logger_name_filter"], }, "file_handler": { "class": "logging.handlers.RotatingFileHandler", "formatter": "formatter", "level": log_level, "filename": log_file_path, "mode": "a", "maxBytes": log_max_bytes, "backupCount": log_backup_count, "encoding": "utf8", }, }, "loggers": { "loom_core": { "handlers": ["stream_handler", "file_handler"], "level": log_level, "propagate": False, } }, "root": { "level": log_level, "handlers": ["stream_handler", "file_handler"], }, } return config_dict def get_timestamp_ms(): t = time.time() return int(round(t * 1000))