ballistica/tools/efrotools/filecommand.py
2023-07-29 13:45:49 -07:00

153 lines
5.4 KiB
Python

# Released under the MIT License. See LICENSE for details.
#
"""Operate on large sets of files efficiently."""
from __future__ import annotations
import logging
from collections import deque
from typing import TYPE_CHECKING
from threading import Condition, Thread
import os
if TYPE_CHECKING:
from typing import Iterable, Callable
class _FileBatchesRun:
def __init__(
self,
paths: list[str],
batch_size: int,
file_filter: Callable[[str], bool] | None,
include_mac_packages: bool = False,
) -> None:
self.condition = Condition()
self.paths = paths
self.batches = deque[list[str]]()
self.batch_size = batch_size
self.done = False
self.errored = False
self.file_filter = file_filter
self.batch_buffer_size = 5
self._pending_batch: list[str] = []
self._include_mac_packages = include_mac_packages
if self._include_mac_packages:
# pylint: disable=useless-suppression
# pylint: disable=no-name-in-module, import-error
# noinspection PyUnresolvedReferences
from Cocoa import NSWorkspace # pyright: ignore
self._shared_nsworkspace = NSWorkspace.sharedWorkspace()
# pylint: enable=useless-suppression
else:
self._shared_nsworkspace = None
def _submit_pending_batch(self) -> None:
assert self._pending_batch
# Wait until there's room on the list (or we've been marked done),
# stuff our new results in, and inform any listeners that it has
# changed.
with self.condition:
self.condition.wait_for(
lambda: len(self.batches) < self.batch_buffer_size or self.done
)
self.batches.append(self._pending_batch)
self._pending_batch = []
self.condition.notify()
def _possibly_add_to_pending_batch(self, path: str) -> None:
try:
if self.file_filter is None or self.file_filter(path):
self._pending_batch.append(path)
if len(self._pending_batch) >= self.batch_size:
self._submit_pending_batch()
except Exception:
# FIXME: we should translate this into failing overall...
logging.exception('Error in file_filter')
def bg_thread(self) -> None:
"""Add batches in the bg thread."""
# pylint: disable=too-many-nested-blocks
# Build batches and push them when they're big enough.
for path in self.paths:
if os.path.isfile(path):
self._possibly_add_to_pending_batch(path)
elif os.path.isdir(path):
# From os.walk docs: we can prune dirs in-place when
# running in top-down mode. We can use this to skip
# diving into mac packages.
for root, dirs, fnames in os.walk(path, topdown=True):
# If we find dirs that are actually mac packages, pull
# them out of the dir list we'll dive into and pass
# them directly to our batch for processing.
if self._include_mac_packages:
assert self._shared_nsworkspace is not None
for dirname in list(dirs):
fullpath = os.path.join(root, dirname)
if self._shared_nsworkspace.isFilePackageAtPath_(
fullpath
):
dirs.remove(dirname)
self._possibly_add_to_pending_batch(fullpath)
for fname in fnames:
fullpath = os.path.join(root, fname)
self._possibly_add_to_pending_batch(fullpath)
if self._pending_batch:
self._submit_pending_batch()
# Tell the world we're done.
with self.condition:
self.done = True
self.condition.notify()
def file_batches(
paths: list[str],
batch_size: int = 1,
file_filter: Callable[[str], bool] | None = None,
include_mac_packages: bool = False,
) -> Iterable[list[str]]:
"""Efficiently yield batches of files to operate on.
Accepts a list of paths which can be files or directories to be recursed.
The batch lists are buffered in a background thread so time-consuming
synchronous operations on the returned batches will not slow the gather.
"""
run = _FileBatchesRun(
paths=paths,
batch_size=batch_size,
file_filter=file_filter,
include_mac_packages=include_mac_packages,
)
# Spin up a bg thread to feed us batches.
thread = Thread(target=run.bg_thread)
thread.start()
# Now spin waiting for new batches to come in or completion/errors.
while True:
with run.condition:
run.condition.wait_for(
lambda: run.done or run.errored or run.batches
)
try:
if run.errored:
raise RuntimeError('BG batch run errored.')
while run.batches:
yield run.batches.popleft()
if run.done:
break
except GeneratorExit:
# Lets the bg thread know to abort.
run.done = True
raise
finally:
run.condition.notify()