mirror of
https://github.com/RYDE-WORK/ballistica.git
synced 2026-01-22 14:59:25 +08:00
248 lines
8.7 KiB
Python
248 lines
8.7 KiB
Python
# Released under the MIT License. See LICENSE for details.
|
|
#
|
|
"""Logging functionality."""
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import time
|
|
import logging
|
|
import datetime
|
|
import threading
|
|
from enum import Enum
|
|
from dataclasses import dataclass
|
|
from typing import TYPE_CHECKING, Annotated
|
|
|
|
from efro.util import utc_now
|
|
from efro.terminal import TerminalColor
|
|
from efro.dataclassio import ioprepped, IOAttrs, dataclass_to_json
|
|
|
|
if TYPE_CHECKING:
|
|
from pathlib import Path
|
|
from typing import Any, Callable
|
|
|
|
|
|
class LogLevel(Enum):
|
|
"""Severity level for a log entry.
|
|
|
|
These enums have numeric values so they can be compared in severity.
|
|
Note that these values are not currently interchangeable with the
|
|
logging.ERROR, logging.DEBUG, etc. values.
|
|
"""
|
|
DEBUG = 0
|
|
INFO = 1
|
|
WARNING = 2
|
|
ERROR = 3
|
|
CRITICAL = 4
|
|
|
|
|
|
@ioprepped
|
|
@dataclass
|
|
class LogEntry:
|
|
"""Single logged message."""
|
|
name: Annotated[str,
|
|
IOAttrs('n', soft_default='root', store_default=False)]
|
|
message: Annotated[str, IOAttrs('m')]
|
|
level: Annotated[LogLevel, IOAttrs('l')]
|
|
time: Annotated[datetime.datetime, IOAttrs('t')]
|
|
|
|
|
|
class LogHandler(logging.Handler):
|
|
"""Fancy-pants handler for logging output.
|
|
|
|
Writes logs to disk in structured json format and echoes them
|
|
to stdout/stderr with pretty colors.
|
|
"""
|
|
|
|
def __init__(self,
|
|
path: str | Path | None,
|
|
echofile: Any,
|
|
suppress_non_root_debug: bool = False):
|
|
super().__init__()
|
|
# pylint: disable=consider-using-with
|
|
self._file = (None
|
|
if path is None else open(path, 'w', encoding='utf-8'))
|
|
self._echofile = echofile
|
|
self._callbacks: list[Callable[[LogEntry], None]] = []
|
|
self._suppress_non_root_debug = suppress_non_root_debug
|
|
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
|
|
# Special case - filter out this common extra-chatty category.
|
|
# TODO - should use a standard logging.Filter for this.
|
|
if (self._suppress_non_root_debug and record.name != 'root'
|
|
and record.levelname == 'DEBUG'):
|
|
return
|
|
|
|
# Bake down all log formatting into a simple string.
|
|
msg = self.format(record)
|
|
|
|
# Translate Python log levels to our own.
|
|
level = {
|
|
'DEBUG': LogLevel.DEBUG,
|
|
'INFO': LogLevel.INFO,
|
|
'WARNING': LogLevel.WARNING,
|
|
'ERROR': LogLevel.ERROR,
|
|
'CRITICAL': LogLevel.CRITICAL
|
|
}[record.levelname]
|
|
|
|
entry = LogEntry(message=msg,
|
|
name=record.name,
|
|
level=level,
|
|
time=datetime.datetime.fromtimestamp(
|
|
record.created, datetime.timezone.utc))
|
|
|
|
for call in self._callbacks:
|
|
call(entry)
|
|
|
|
# Also route log entries to the echo file (generally stdout/stderr)
|
|
# with pretty colors.
|
|
if self._echofile is not None:
|
|
cbegin: str
|
|
cend: str
|
|
cbegin, cend = {
|
|
LogLevel.DEBUG:
|
|
(TerminalColor.CYAN.value, TerminalColor.RESET.value),
|
|
LogLevel.INFO: ('', ''),
|
|
LogLevel.WARNING:
|
|
(TerminalColor.YELLOW.value, TerminalColor.RESET.value),
|
|
LogLevel.ERROR:
|
|
(TerminalColor.RED.value, TerminalColor.RESET.value),
|
|
LogLevel.CRITICAL:
|
|
(TerminalColor.STRONG_MAGENTA.value +
|
|
TerminalColor.BOLD.value + TerminalColor.BG_BLACK.value,
|
|
TerminalColor.RESET.value),
|
|
}[level]
|
|
|
|
self._echofile.write(f'{cbegin}{msg}{cend}\n')
|
|
|
|
# Note to self: it sounds like logging wraps calls to us
|
|
# in a lock so we shouldn't have to worry about garbled
|
|
# json output due to multiple threads writing at once,
|
|
# but may be good to find out for sure?
|
|
if self._file is not None:
|
|
entry_s = dataclass_to_json(entry)
|
|
assert '\n' not in entry_s # make sure its a single line
|
|
print(entry_s, file=self._file, flush=True)
|
|
|
|
def emit_custom(self, name: str, message: str, level: LogLevel) -> None:
|
|
"""Custom emit call for our stdout/stderr redirection."""
|
|
entry = LogEntry(name=name,
|
|
message=message,
|
|
level=level,
|
|
time=utc_now())
|
|
|
|
for call in self._callbacks:
|
|
call(entry)
|
|
|
|
if self._file is not None:
|
|
entry_s = dataclass_to_json(entry)
|
|
assert '\n' not in entry_s # Make sure its a single line.
|
|
print(entry_s, file=self._file, flush=True)
|
|
|
|
def add_callback(self, call: Callable[[LogEntry], None]) -> None:
|
|
"""Add a callback to be run for each added entry."""
|
|
self._callbacks.append(call)
|
|
|
|
|
|
class LogRedirect:
|
|
"""A file-like object for redirecting stdout/stderr to our log."""
|
|
|
|
def __init__(self, name: str, orig_out: Any, log_handler: LogHandler,
|
|
log_level: LogLevel):
|
|
self._name = name
|
|
self._orig_out = orig_out
|
|
self._log_handler = log_handler
|
|
self._log_level = log_level
|
|
self._chunk = ''
|
|
self._chunk_start_time = 0.0
|
|
self._lock = threading.Lock()
|
|
|
|
def write(self, s: str) -> None:
|
|
"""Write something to output."""
|
|
|
|
assert isinstance(s, str)
|
|
|
|
# First, ship it off to the original destination.
|
|
self._orig_out.write(s)
|
|
|
|
# Now add this to our chunk and ship completed chunks
|
|
# off to the logger.
|
|
# Let's consider a chunk completed when we're passed
|
|
# a single '\n' by itself. (print() statement will do
|
|
# this at the end by default).
|
|
# We may get some false positives/negatives this way
|
|
# but it should result in *most* big multi-line print
|
|
# statements being wrapped into a single log entry.
|
|
# Also, flush with only_old=True can be called periodically
|
|
# to dump any pending chunks that don't happen to fit
|
|
# this pattern.
|
|
with self._lock:
|
|
if s == '\n':
|
|
self._log_handler.emit_custom(name=self._name,
|
|
message=self._chunk,
|
|
level=self._log_level)
|
|
self._chunk = ''
|
|
else:
|
|
if self._chunk == '':
|
|
self._chunk_start_time = time.time()
|
|
self._chunk += s
|
|
|
|
def flush(self, only_old: bool = False) -> None:
|
|
"""Flushhhhh!"""
|
|
self._orig_out.flush()
|
|
if only_old and time.time() - self._chunk_start_time < 0.5:
|
|
return
|
|
with self._lock:
|
|
if self._chunk != '':
|
|
chunk = self._chunk
|
|
if chunk.endswith('\n'):
|
|
chunk = chunk[:-1]
|
|
self._log_handler.emit_custom(name=self._name,
|
|
message=chunk,
|
|
level=self._log_level)
|
|
self._chunk = ''
|
|
|
|
|
|
def setup_logging(log_path: str | Path | None,
|
|
level: LogLevel,
|
|
suppress_non_root_debug: bool = False) -> LogHandler:
|
|
"""Set up our logging environment.
|
|
|
|
Returns the custom handler which can be used to fetch information
|
|
about logs that have passed through it. (worst log-levels, etc.).
|
|
"""
|
|
|
|
lmap = {
|
|
LogLevel.DEBUG: logging.DEBUG,
|
|
LogLevel.INFO: logging.INFO,
|
|
LogLevel.WARNING: logging.WARNING,
|
|
LogLevel.ERROR: logging.ERROR,
|
|
LogLevel.CRITICAL: logging.CRITICAL,
|
|
}
|
|
|
|
# Wire logger output to go to a structured log file.
|
|
# Also echo it to stderr IF we're running in a terminal.
|
|
loghandler = LogHandler(
|
|
log_path,
|
|
echofile=sys.stderr if sys.stderr.isatty() else None,
|
|
suppress_non_root_debug=suppress_non_root_debug)
|
|
|
|
logging.basicConfig(level=lmap[level],
|
|
format='%(message)s',
|
|
handlers=[loghandler])
|
|
|
|
# DISABLING THIS BIT FOR NOW - want to keep things as pure as possible.
|
|
if bool(False):
|
|
# Now wire Python stdout/stderr output to generate log entries
|
|
# in addition to its regular routing. Make sure to do this *after* we
|
|
# tell the log-handler to write to stderr, otherwise we get an infinite
|
|
# loop.
|
|
# NOTE: remember that this won't capture subcommands or other
|
|
# non-python stdout/stderr output.
|
|
sys.stdout = LogRedirect( # type: ignore
|
|
'stdout', sys.stdout, loghandler, LogLevel.INFO)
|
|
sys.stderr = LogRedirect( # type: ignore
|
|
'stderr', sys.stderr, loghandler, LogLevel.INFO)
|
|
|
|
return loghandler
|