mirror of
https://github.com/RYDE-WORK/ballistica.git
synced 2026-01-19 21:37:57 +08:00
153 lines
5.4 KiB
Python
153 lines
5.4 KiB
Python
# Released under the MIT License. See LICENSE for details.
|
|
#
|
|
"""Operate on large sets of files efficiently."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from collections import deque
|
|
from typing import TYPE_CHECKING
|
|
from threading import Condition, Thread
|
|
import os
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Iterable, Callable
|
|
|
|
|
|
class _FileBatchesRun:
|
|
def __init__(
|
|
self,
|
|
paths: list[str],
|
|
batch_size: int,
|
|
file_filter: Callable[[str], bool] | None,
|
|
include_mac_packages: bool = False,
|
|
) -> None:
|
|
self.condition = Condition()
|
|
self.paths = paths
|
|
self.batches = deque[list[str]]()
|
|
self.batch_size = batch_size
|
|
self.done = False
|
|
self.errored = False
|
|
self.file_filter = file_filter
|
|
self.batch_buffer_size = 5
|
|
self._pending_batch: list[str] = []
|
|
self._include_mac_packages = include_mac_packages
|
|
|
|
if self._include_mac_packages:
|
|
# pylint: disable=useless-suppression
|
|
# pylint: disable=no-name-in-module, import-error
|
|
# noinspection PyUnresolvedReferences
|
|
from Cocoa import NSWorkspace # pyright: ignore
|
|
|
|
self._shared_nsworkspace = NSWorkspace.sharedWorkspace()
|
|
# pylint: enable=useless-suppression
|
|
else:
|
|
self._shared_nsworkspace = None
|
|
|
|
def _submit_pending_batch(self) -> None:
|
|
assert self._pending_batch
|
|
|
|
# Wait until there's room on the list (or we've been marked done),
|
|
# stuff our new results in, and inform any listeners that it has
|
|
# changed.
|
|
with self.condition:
|
|
self.condition.wait_for(
|
|
lambda: len(self.batches) < self.batch_buffer_size or self.done
|
|
)
|
|
self.batches.append(self._pending_batch)
|
|
self._pending_batch = []
|
|
self.condition.notify()
|
|
|
|
def _possibly_add_to_pending_batch(self, path: str) -> None:
|
|
try:
|
|
if self.file_filter is None or self.file_filter(path):
|
|
self._pending_batch.append(path)
|
|
if len(self._pending_batch) >= self.batch_size:
|
|
self._submit_pending_batch()
|
|
except Exception:
|
|
# FIXME: we should translate this into failing overall...
|
|
logging.exception('Error in file_filter')
|
|
|
|
def bg_thread(self) -> None:
|
|
"""Add batches in the bg thread."""
|
|
# pylint: disable=too-many-nested-blocks
|
|
|
|
# Build batches and push them when they're big enough.
|
|
for path in self.paths:
|
|
if os.path.isfile(path):
|
|
self._possibly_add_to_pending_batch(path)
|
|
elif os.path.isdir(path):
|
|
# From os.walk docs: we can prune dirs in-place when
|
|
# running in top-down mode. We can use this to skip
|
|
# diving into mac packages.
|
|
for root, dirs, fnames in os.walk(path, topdown=True):
|
|
# If we find dirs that are actually mac packages, pull
|
|
# them out of the dir list we'll dive into and pass
|
|
# them directly to our batch for processing.
|
|
if self._include_mac_packages:
|
|
assert self._shared_nsworkspace is not None
|
|
for dirname in list(dirs):
|
|
fullpath = os.path.join(root, dirname)
|
|
if self._shared_nsworkspace.isFilePackageAtPath_(
|
|
fullpath
|
|
):
|
|
dirs.remove(dirname)
|
|
self._possibly_add_to_pending_batch(fullpath)
|
|
|
|
for fname in fnames:
|
|
fullpath = os.path.join(root, fname)
|
|
self._possibly_add_to_pending_batch(fullpath)
|
|
|
|
if self._pending_batch:
|
|
self._submit_pending_batch()
|
|
|
|
# Tell the world we're done.
|
|
with self.condition:
|
|
self.done = True
|
|
self.condition.notify()
|
|
|
|
|
|
def file_batches(
|
|
paths: list[str],
|
|
batch_size: int = 1,
|
|
file_filter: Callable[[str], bool] | None = None,
|
|
include_mac_packages: bool = False,
|
|
) -> Iterable[list[str]]:
|
|
"""Efficiently yield batches of files to operate on.
|
|
|
|
Accepts a list of paths which can be files or directories to be recursed.
|
|
The batch lists are buffered in a background thread so time-consuming
|
|
synchronous operations on the returned batches will not slow the gather.
|
|
"""
|
|
|
|
run = _FileBatchesRun(
|
|
paths=paths,
|
|
batch_size=batch_size,
|
|
file_filter=file_filter,
|
|
include_mac_packages=include_mac_packages,
|
|
)
|
|
|
|
# Spin up a bg thread to feed us batches.
|
|
thread = Thread(target=run.bg_thread)
|
|
thread.start()
|
|
|
|
# Now spin waiting for new batches to come in or completion/errors.
|
|
while True:
|
|
with run.condition:
|
|
run.condition.wait_for(
|
|
lambda: run.done or run.errored or run.batches
|
|
)
|
|
try:
|
|
if run.errored:
|
|
raise RuntimeError('BG batch run errored.')
|
|
while run.batches:
|
|
yield run.batches.popleft()
|
|
if run.done:
|
|
break
|
|
except GeneratorExit:
|
|
# Lets the bg thread know to abort.
|
|
run.done = True
|
|
raise
|
|
finally:
|
|
run.condition.notify()
|