mirror of
https://github.com/RYDE-WORK/ballistica.git
synced 2026-01-19 21:37:57 +08:00
2231 lines
88 KiB
Python
2231 lines
88 KiB
Python
# Released under the MIT License. See LICENSE for details.
|
|
#
|
|
"""Spinoff system for spawning child projects from a ballistica project."""
|
|
# pylint: disable=too-many-lines
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import fnmatch
|
|
import tempfile
|
|
import subprocess
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, assert_never
|
|
|
|
from efrotools.code import format_python_str, format_cpp_str
|
|
from efrotools import getconfig, replace_exact
|
|
from efro.error import CleanError
|
|
from efro.terminal import Clr
|
|
from efro.util import timedelta_str
|
|
|
|
from batools.featureset import FeatureSet
|
|
from batools.spinoff._state import (
|
|
EntityType,
|
|
DstEntitySet,
|
|
SrcEntity,
|
|
DstEntity,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Callable, Iterable
|
|
|
|
from batools.project import ProjectUpdater
|
|
|
|
# Tags that default_filter_file() looks for; these can be used to strip
|
|
# out sections that should never be included in spinoff projects.
|
|
STRIP_TAG_PAIRS: list[tuple[str, str]] = [
|
|
('# __SPINOFF_STRIP_BEGIN__', '# __SPINOFF_STRIP_END__'),
|
|
('// __SPINOFF_STRIP_BEGIN__', '// __SPINOFF_STRIP_END__'),
|
|
]
|
|
|
|
|
|
class SpinoffContext:
|
|
"""Guts of the spinoff system."""
|
|
|
|
_active_context: SpinoffContext | None = None
|
|
|
|
class BackportInProgressError(Exception):
|
|
"""Error we can raise to bow out of processing during a backport."""
|
|
|
|
class Mode(Enum):
|
|
"""Mode the context can operate in."""
|
|
|
|
STATUS = 'status'
|
|
UPDATE = 'update'
|
|
CHECK = 'check'
|
|
CLEAN_LIST = 'cleanlist'
|
|
CLEAN = 'clean'
|
|
CLEAN_CHECK = 'cleancheck'
|
|
OVERRIDE = 'override'
|
|
DIFF = 'diff'
|
|
BACKPORT = 'backport'
|
|
|
|
def __init__(
|
|
self,
|
|
src_root: str,
|
|
dst_root: str,
|
|
mode: Mode,
|
|
force: bool = False,
|
|
verbose: bool = False,
|
|
print_full_lists: bool = False,
|
|
override_paths: list[str] | None = None,
|
|
backport_file: str | None = None,
|
|
auto_backport: bool = False,
|
|
) -> None:
|
|
# pylint: disable=too-many-statements
|
|
|
|
# By default, if dst files have their modtimes changed but
|
|
# still line up with src files, we can recover. But one may
|
|
# choose to error in that case to track down things mucking
|
|
# with dst files when they shouldn't be.
|
|
self.strict = False
|
|
|
|
self._mode = mode
|
|
self._force = force
|
|
self._verbose = verbose
|
|
self._print_full_lists = print_full_lists
|
|
self._override_paths = override_paths
|
|
self._backport_file = backport_file
|
|
self._auto_backport = auto_backport
|
|
|
|
self._project_updater: ProjectUpdater | None = None
|
|
|
|
if not os.path.isdir(src_root):
|
|
raise CleanError(f"Spinoff src dir not found: '{src_root}'.")
|
|
if not os.path.isdir(dst_root):
|
|
raise CleanError(f"Spinoff dst dir not found: '{dst_root}'.")
|
|
|
|
# The requested set of FeatureSet names (or None to include all).
|
|
self.src_feature_sets: set[str] | None = None
|
|
|
|
# Just to be safe, make sure we're working with abs paths.
|
|
self._src_root = os.path.abspath(src_root)
|
|
self._dst_root = os.path.abspath(dst_root)
|
|
|
|
self._data_file_path = os.path.join(self._dst_root, '.spinoffdata')
|
|
|
|
self._built_parent_repo_tool_configs = False
|
|
|
|
self._auto_backport_success_count = 0
|
|
self._auto_backport_fail_count = 0
|
|
|
|
self._src_name = 'BallisticaKit'
|
|
|
|
self._public: bool = getconfig(Path(self._src_root))['public']
|
|
assert isinstance(self._public, bool)
|
|
|
|
self._src_all_feature_sets = {
|
|
f.name: f for f in FeatureSet.get_all_for_project(self._src_root)
|
|
}
|
|
|
|
self._src_git_files: set[str] | None = None
|
|
self._dst_git_files: set[str] | None = None
|
|
self._dst_git_file_dirs: set[str] | None = None
|
|
|
|
self.filter_file_call: Callable[[SpinoffContext, str, str], str] = type(
|
|
self
|
|
).default_filter_file
|
|
|
|
self.filter_path_call: Callable[[SpinoffContext, str], str] = type(
|
|
self
|
|
).default_filter_path
|
|
|
|
self._execution_error = False
|
|
|
|
self.project_file_paths = set[str]()
|
|
self.project_file_names = set[str]()
|
|
self.project_file_suffixes = set[str]()
|
|
|
|
# Set of files/symlinks in src.
|
|
self._src_entities: dict[str, SrcEntity] = {}
|
|
|
|
# Set of files/symlinks in dst.
|
|
self._dst_entities: dict[str, DstEntity] = {}
|
|
|
|
# Src entities for which errors have occurred
|
|
# (dst modified independently, etc).
|
|
self._src_error_entities: dict[str, str] = {}
|
|
|
|
# Dst entries with errors
|
|
# (non-spinoff files in spinoff-owned dirs, etc).
|
|
self._dst_error_entities: dict[str, str] = {}
|
|
|
|
# Entities in src we should filter/copy.
|
|
self._src_copy_entities = set[str]()
|
|
|
|
# Entities in src we should simply re-cache modtimes/sizes for.
|
|
self._src_recache_entities = set[str]()
|
|
|
|
# Dst entities still found in src.
|
|
self._dst_entities_claimed = set[str]()
|
|
|
|
# Entities in dst we should kill.
|
|
self._dst_purge_entities = set[str]()
|
|
|
|
# Normally spinoff errors if it finds any files in its managed dirs
|
|
# that it did not put there. This is to prevent accidentally working
|
|
# in these parts of a dst project; since these sections are git-ignored,
|
|
# git itself won't raise any warnings in such cases and it would be easy
|
|
# to accidentally lose work otherwise.
|
|
# This list can be used to suppress spinoff's errors for specific
|
|
# locations. This is generally used to allow build output or other
|
|
# dynamically generated files to exist within spinoff-managed
|
|
# directories. It is possible to use src_write_paths for such purposes,
|
|
# but this has the side-effect of greatly complicating the dst
|
|
# project's gitignore list; selectively marking a few dirs as
|
|
# unchecked makes for a cleaner setup. Just be careful to not set
|
|
# excessively broad regions as unchecked; you don't want to mask
|
|
# actual useful error messages.
|
|
self.src_unchecked_paths = set[str]()
|
|
|
|
# TODO(ericf): describe this.
|
|
self.project_file_paths = set[str]()
|
|
|
|
# Anything under these dirs WILL be filtered.
|
|
self.filter_dirs = set[str]()
|
|
|
|
# ELSE anything under these dirs will NOT be filtered.
|
|
self.no_filter_dirs = set[str]()
|
|
|
|
# ELSE files matching these exact base names WILL be filtered
|
|
# (so FOO matches a/b/FOO as well as just FOO).
|
|
self.filter_file_names = set[str]()
|
|
|
|
# ELSE files matching these exact base names will NOT be filtered.
|
|
self.no_filter_file_names = set[str]()
|
|
|
|
# ELSE files with these extensions WILL be filtered.
|
|
self.filter_file_extensions = set[str]()
|
|
|
|
# ELSE files with these extensions will NOT be filtered.
|
|
self.no_filter_file_extensions = set[str]()
|
|
|
|
self._spinoff_managed_dirs: set[str] | None = None
|
|
|
|
# These paths in the src project will be skipped over during updates and
|
|
# not synced into the dst project. The dst project can use this to
|
|
# trim out parts of the src project that it doesn't want or that it
|
|
# intends to 'override' with its own versions.
|
|
self.src_omit_paths = set[str]()
|
|
|
|
# Any files/dirs with these base names will be ignored by spinoff
|
|
# on both src and dst.
|
|
self.ignore_names = set[str]()
|
|
|
|
# Use this to 'carve out' directories or exact file paths which will be
|
|
# git-managed on dst. By default, spinoff will consider dirs containing
|
|
# the files it generates as 'spinoff-managed'; it will set them as
|
|
# git-ignored and will complain if any files appear in them that it does
|
|
# not manage itself (to prevent accidentally working in such places).
|
|
self.src_write_paths = set[str]()
|
|
|
|
# Paths which will NOT be gitignored/etc. (in dst format)
|
|
self.dst_write_paths = set[str]()
|
|
|
|
# Special set of paths managed by spinoff but ALSO stored in git in
|
|
# the dst project. This is for bare minimum stuff needed to be always
|
|
# present in dst for bootstrapping, indexing by github, etc). Changes
|
|
# to these files in dst will be silently and happily overwritten by
|
|
# spinoff, so tread carefully.
|
|
self.git_mirrored_paths = set[str]()
|
|
|
|
# File names that can be quietly ignored or cleared out when found.
|
|
# This should encompass things like .DS_Store files created by the
|
|
# Mac Finder when browsing directories. This helps spinoff remove
|
|
# empty directories when doing a 'clean', etc.
|
|
self.cruft_file_names = set[str]()
|
|
|
|
self.dst_name = 'Untitled'
|
|
|
|
self._src_config_path = os.path.join(
|
|
self._src_root, 'config', 'spinoffconfig.py'
|
|
)
|
|
if not os.path.exists(self._src_config_path):
|
|
raise CleanError(
|
|
f"Spinoff src config not found at '{self._src_config_path}'."
|
|
)
|
|
self._dst_config_path = os.path.join(
|
|
self._dst_root, 'config', 'spinoffconfig.py'
|
|
)
|
|
if not os.path.exists(self._dst_config_path):
|
|
raise CleanError(
|
|
f"Spinoff dst config not found at '{self._dst_config_path}'."
|
|
)
|
|
|
|
# Sets various stuff from user config .py files.
|
|
self._apply_project_configs()
|
|
|
|
# Based on feature-sets they requested, calc which feature-sets
|
|
# from src we *exclude*.
|
|
self._src_omit_feature_sets = self._calc_src_omit_feature_sets()
|
|
|
|
# Generate a version of src_omit_paths that includes our feature-set
|
|
# omissions. Basically, omitting a feature set simply omits
|
|
# particular names at a few particular places.
|
|
self._src_omit_paths_expanded = self.src_omit_paths.copy()
|
|
self._add_feature_set_omit_paths(self._src_omit_paths_expanded)
|
|
|
|
# Create a version of dst-write-paths that also includes filtered
|
|
# src-write-paths as well as parents of everything.
|
|
# (so if a/b/c is added as a write path, stuff under a and a/b
|
|
# will also be covered).
|
|
# We also add git_mirrored_paths since that stuff is intended
|
|
# to be in git as well.
|
|
self._dst_write_paths_expanded = self._filter_paths(
|
|
self.src_write_paths
|
|
)
|
|
self._dst_write_paths_expanded.update(self.dst_write_paths)
|
|
self._dst_write_paths_expanded.update(
|
|
self._filter_paths(self.git_mirrored_paths)
|
|
)
|
|
for path in self._dst_write_paths_expanded.copy():
|
|
for subpath in _get_dir_levels(path):
|
|
self._dst_write_paths_expanded.add(subpath)
|
|
|
|
# Create a version of src_unchecked_paths for dst.
|
|
self._dst_unchecked_paths = self._filter_paths(self.src_unchecked_paths)
|
|
self._sanity_test_setup()
|
|
self._generate_env_hash()
|
|
|
|
def _calc_src_omit_feature_sets(self) -> set[str]:
|
|
# If they want everything, omit nothing.
|
|
if self.src_feature_sets is None:
|
|
return set()
|
|
|
|
# Based on the requested set, calc the total sets we'll need.
|
|
# Also always include 'core' since we'd be totally broken
|
|
# without it.
|
|
reqs = FeatureSet.resolve_requirements(
|
|
list(self._src_all_feature_sets.values()),
|
|
self.src_feature_sets | {'core'},
|
|
)
|
|
|
|
# Now simply return any sets *not* included in our resolved set.
|
|
return {s for s in self._src_all_feature_sets.keys() if s not in reqs}
|
|
|
|
def _add_feature_set_omit_paths(self, paths: set[str]) -> None:
|
|
for fsname in sorted(self._src_omit_feature_sets):
|
|
featureset = self._src_all_feature_sets.get(fsname)
|
|
if featureset is None:
|
|
raise CleanError(
|
|
f"src_omit_feature_sets entry '{featureset}' not found"
|
|
f' on src project.'
|
|
)
|
|
|
|
# Omit its config file.
|
|
# Make sure this featureset exists on src.
|
|
fsconfigpath = f'config/featuresets/featureset_{fsname}.py'
|
|
paths.add(fsconfigpath)
|
|
|
|
# Omit its Python package.
|
|
fspackagename = featureset.name_python_package
|
|
paths.add(f'src/assets/ba_data/python/{fspackagename}')
|
|
|
|
# Omit its C++ dir.
|
|
paths.add(f'src/ballistica/{fsname}')
|
|
|
|
# Omits its meta dir.
|
|
fsmetapackagename = featureset.name_python_package_meta
|
|
paths.add(f'src/meta/{fsmetapackagename}')
|
|
|
|
# Omit its tests package.
|
|
fstestspackagename = featureset.name_python_package_tests
|
|
paths.add(f'tests/{fstestspackagename}')
|
|
|
|
@classmethod
|
|
def get_active(cls) -> SpinoffContext:
|
|
"""Return the context currently running."""
|
|
if cls._active_context is None:
|
|
raise RuntimeError('No active context.')
|
|
return cls._active_context
|
|
|
|
def run(self) -> None:
|
|
"""Do the thing."""
|
|
# pylint: disable=too-many-branches
|
|
|
|
self._read_state()
|
|
|
|
# First, ask git if there are any untracked files in src. we use
|
|
# git's managed file list so these wouldn't get synced which
|
|
# would be confusing. So we'd rather just error in this case.
|
|
try:
|
|
output = subprocess.check_output(
|
|
['git', 'status', '--porcelain=v2'],
|
|
cwd=self._src_root,
|
|
).decode()
|
|
if any(line.startswith('?') for line in output.splitlines()):
|
|
raise CleanError(
|
|
'There appear to be files in the src project'
|
|
' untracked by git. Everything must be added to'
|
|
' git for spinoff to function.'
|
|
)
|
|
except subprocess.CalledProcessError as exc:
|
|
raise CleanError(
|
|
"'git status' command failed in src dir."
|
|
' Spinoff requires the src project to be git managed.'
|
|
) from exc
|
|
|
|
# Get the list of src files managed by git.
|
|
self._src_git_files = set[str](
|
|
subprocess.run(
|
|
['git', 'ls-files'],
|
|
check=True,
|
|
cwd=self._src_root,
|
|
capture_output=True,
|
|
)
|
|
.stdout.decode()
|
|
.splitlines()
|
|
)
|
|
|
|
# Ignore anything under omitted paths/names.
|
|
self._filter_src_git_file_list()
|
|
|
|
# Now map whatever is left to paths in dst.
|
|
self._dst_git_files = set(
|
|
self._filter_path(s) for s in self._src_git_files
|
|
)
|
|
|
|
# Build a set of all dirs on dst containing a mapped file
|
|
# (excluding root).
|
|
fdirs = self._dst_git_file_dirs = set[str]()
|
|
for dst_git_file in self._dst_git_files:
|
|
dname = os.path.dirname(dst_git_file)
|
|
if dname:
|
|
# Expand to include directories above these as well.
|
|
# We want this set to be 'everything that (even recursively)
|
|
# contains a synced dst file'.
|
|
for leveldir in _get_dir_levels(dname):
|
|
fdirs.add(leveldir)
|
|
|
|
# Now take that list and filter out ones under our write paths
|
|
# to get our final list of spinoff-managed-dirs.
|
|
self._calc_spinoff_managed_dirs()
|
|
|
|
# Check our spinoff-managed-dirs for any unrecognized files/etc.
|
|
# Since we git-ignore all of them, this is an important safety
|
|
# feature to avoid blowing away work.
|
|
self._check_spinoff_managed_dirs()
|
|
|
|
if self._mode in {
|
|
self.Mode.CLEAN,
|
|
self.Mode.CLEAN_LIST,
|
|
self.Mode.CLEAN_CHECK,
|
|
}:
|
|
# For clean operations, simply stuff all dst entities
|
|
# into our purge list.
|
|
self._purge_all_dst_entities()
|
|
else:
|
|
# For normal operations, queue up our copy ops/etc.
|
|
self._register_sync_operations()
|
|
|
|
# Tracked dst files that didn't get claimed can be killed.
|
|
for key in self._dst_entities:
|
|
if key not in self._dst_entities_claimed:
|
|
self._dst_purge_entities.add(key)
|
|
|
|
# Special case: if we're doing an auto-backport, stop here.
|
|
# Otherwise we wind up showing all the errors we probably just fixed.
|
|
if self._mode is self.Mode.BACKPORT and self._auto_backport:
|
|
bpcolor = Clr.YLW if self._auto_backport_fail_count else Clr.GRN
|
|
print(
|
|
f'{bpcolor}Auto-backport complete; backported'
|
|
f' {self._auto_backport_success_count}; '
|
|
f'skipped {self._auto_backport_fail_count}.{Clr.RST}'
|
|
)
|
|
raise self.BackportInProgressError
|
|
|
|
# If anything is off, print errors; otherwise actually do the deed.
|
|
if self._src_error_entities or self._dst_error_entities:
|
|
self._print_error_entities()
|
|
else:
|
|
if (
|
|
self._mode is self.Mode.STATUS
|
|
or self._mode is self.Mode.CLEAN_LIST
|
|
):
|
|
self._status()
|
|
elif self._mode is self.Mode.DIFF:
|
|
self._diff()
|
|
elif (
|
|
self._mode is self.Mode.UPDATE or self._mode is self.Mode.CLEAN
|
|
):
|
|
self._update()
|
|
elif self._mode is self.Mode.OVERRIDE:
|
|
self._override()
|
|
elif self._mode is self.Mode.BACKPORT:
|
|
# If backport gets here, the file they passed isn't erroring.
|
|
raise CleanError(
|
|
'Nothing needs backporting.'
|
|
if self._backport_file is None
|
|
else 'Provided file does not need backporting.'
|
|
)
|
|
elif (
|
|
self._mode is self.Mode.CHECK
|
|
or self._mode is self.Mode.CLEAN_CHECK
|
|
):
|
|
pass
|
|
else:
|
|
assert_never(self._mode)
|
|
|
|
# Always write state at this point. Even if there have been
|
|
# errors, we want to keep track of the latest states we have for
|
|
# anything wrote/etc.
|
|
self._write_state()
|
|
|
|
# Bail at this point if anything went wrong.
|
|
if (
|
|
self._src_error_entities
|
|
or self._dst_error_entities
|
|
or self._execution_error
|
|
):
|
|
# Any of these have printed error info already so no need to
|
|
# do so ourself.
|
|
raise CleanError()
|
|
|
|
# If we did anything that possibly deleted stuff, clean up any
|
|
# empty dirs that got left behind (hmm should we be more selective
|
|
# here to avoid dirs we didn't manage?..)
|
|
if self._mode is self.Mode.CLEAN or self._mode is self.Mode.UPDATE:
|
|
self._clean_cruft()
|
|
|
|
# Update .gitignore to ignore everything spinoff-managed.
|
|
if self._mode is self.Mode.UPDATE or self._mode is self.Mode.OVERRIDE:
|
|
self._write_gitignore()
|
|
|
|
def _apply_project_configs(self) -> None:
|
|
# pylint: disable=exec-used
|
|
try:
|
|
assert self._active_context is None
|
|
type(self)._active_context = self
|
|
|
|
# Apply both src and dist spinoff configs.
|
|
for config_path in (self._src_config_path, self._dst_config_path):
|
|
exec_context: dict = {}
|
|
with open(config_path, encoding='utf-8') as infile:
|
|
config_contents = infile.read()
|
|
|
|
# Use compile here so we can provide a nice file path for
|
|
# error tracebacks.
|
|
exec(
|
|
compile(config_contents, config_path, 'exec'),
|
|
exec_context,
|
|
exec_context,
|
|
)
|
|
|
|
finally:
|
|
assert type(self)._active_context is self
|
|
type(self)._active_context = None
|
|
|
|
def _calc_spinoff_managed_dirs(self) -> None:
|
|
assert self._dst_git_file_dirs is not None
|
|
# Take our list of dirs containing stuff synced in from src
|
|
# and strip out anything that has been explicitly been called
|
|
# out as a write-path. What's left will the set of dirs we consider
|
|
# spinoff-managed.
|
|
all_spinoff_managed_dirs = set[str]()
|
|
for gitfiledir in self._dst_git_file_dirs:
|
|
# If we see this exact dir in our expanded write-paths set,
|
|
# (which includes parents), pop it out.
|
|
if gitfiledir in self._dst_write_paths_expanded:
|
|
continue
|
|
all_spinoff_managed_dirs.add(gitfiledir)
|
|
|
|
top_level_spinoff_managed_dirs = set[str]()
|
|
|
|
# Now take this big soup of dirs and filter it down to top-level ones.
|
|
for rdir in all_spinoff_managed_dirs:
|
|
if any(rdir.startswith(f'{d}/') for d in all_spinoff_managed_dirs):
|
|
continue
|
|
top_level_spinoff_managed_dirs.add(rdir)
|
|
|
|
self._spinoff_managed_dirs = top_level_spinoff_managed_dirs
|
|
|
|
def _sanity_test_setup(self) -> None:
|
|
# Sanity tests:
|
|
# None of our names lists should ever end in a trailing backslash
|
|
# (currently breaks our logic).
|
|
for entitylist in [
|
|
self.filter_dirs,
|
|
self.no_filter_dirs,
|
|
self.filter_file_names,
|
|
self.no_filter_file_names,
|
|
self.filter_file_extensions,
|
|
self.no_filter_file_extensions,
|
|
self.git_mirrored_paths,
|
|
self._src_omit_paths_expanded,
|
|
self.ignore_names,
|
|
self.src_unchecked_paths,
|
|
]:
|
|
for ent in entitylist:
|
|
if ent.endswith('/'):
|
|
raise RuntimeError(f"list item {ent} ends in '/'")
|
|
|
|
# Make sure nothing in a directory list refers to something that's a
|
|
# file.
|
|
for entitylist in [
|
|
self.filter_dirs,
|
|
self.no_filter_dirs,
|
|
]:
|
|
for ent in entitylist:
|
|
if os.path.exists(ent):
|
|
if not os.path.isdir(ent):
|
|
raise RuntimeError(
|
|
f'list item {ent} in a dir-list is not a dir'
|
|
)
|
|
|
|
# Likewise make sure nothing in a file list refers to a
|
|
# directory.
|
|
for ent in []:
|
|
if os.path.exists(ent):
|
|
if os.path.isdir(ent):
|
|
raise RuntimeError(
|
|
f'list item {ent} in a file-list is a dir'
|
|
)
|
|
|
|
def _generate_env_hash(self) -> None:
|
|
# pylint: disable=cyclic-import
|
|
from efrotools import get_files_hash
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
import batools.spinoff
|
|
import batools.project
|
|
|
|
# Generate an 'env' hash we can tag tracked files with, so that
|
|
# if spinoff scripts or config files change it will invalidate
|
|
# all tracked files.
|
|
hashfiles = set[str]()
|
|
|
|
# Add all Python files under our 'spinoff' and 'project'
|
|
# subpackages since those are most likely to affect results.
|
|
for pkgdir in [
|
|
os.path.dirname(batools.spinoff.__file__),
|
|
os.path.dirname(batools.project.__file__),
|
|
]:
|
|
for root, _subdirs, fnames in os.walk(pkgdir):
|
|
for fname in fnames:
|
|
if fname.endswith('.py') and not fname.startswith(
|
|
'flycheck_'
|
|
):
|
|
hashfiles.add(os.path.join(root, fname))
|
|
|
|
# Also add src & dst config files since they can affect
|
|
# anything.
|
|
hashfiles.add(self._src_config_path)
|
|
hashfiles.add(self._dst_config_path)
|
|
|
|
self._envhash = get_files_hash(sorted(hashfiles))
|
|
|
|
def _read_state(self) -> None:
|
|
"""Read persistent state from disk."""
|
|
if os.path.exists(self._data_file_path):
|
|
self._dst_entities = DstEntitySet.read_from_file(
|
|
self._data_file_path
|
|
).entities
|
|
|
|
def _write_state(self) -> None:
|
|
"""Write persistent state to disk."""
|
|
DstEntitySet(entities=self._dst_entities).write_to_file(
|
|
self._data_file_path
|
|
)
|
|
|
|
def _write_gitignore(self) -> None:
|
|
"""filter/write out a gitignore file."""
|
|
assert self._dst_git_files is not None
|
|
assert self._spinoff_managed_dirs is not None
|
|
|
|
# We've currently got a list of spinoff-managed-dirs which each
|
|
# results in a gitignore entry. On top of that we add entries
|
|
# for individual files that aren't covered by those dirs.
|
|
gitignore_entries = self._spinoff_managed_dirs.copy()
|
|
for gitpath in self._dst_git_files:
|
|
if self._should_add_gitignore_path(gitpath):
|
|
gitignore_entries.add(gitpath)
|
|
|
|
# Pull in src .gitignore.
|
|
with open(
|
|
os.path.join(self._src_root, '.gitignore'), encoding='utf-8'
|
|
) as infile:
|
|
gitignoreraw = infile.read()
|
|
|
|
# Run standard filters on it.
|
|
gitignoreraw = self._filter_file('.gitignore', gitignoreraw)
|
|
gitignorelines = gitignoreraw.splitlines()
|
|
|
|
# Now add our ignore entries at the bottom.
|
|
start_line = (
|
|
'# Ignore everything managed by spinoff.\n'
|
|
'# To control this, modify src_write_paths in'
|
|
" 'config/spinoffconfig.py'.\n"
|
|
"# If you ever want to 'flatten' your project and remove it"
|
|
' from spinoff\n'
|
|
'# control completely: simply delete this section, delete'
|
|
" the 'tools/spinoff'\n"
|
|
"# symlink, and delete 'config/spinoffconfig.py'. Then you can add"
|
|
' everything\n'
|
|
'# in its current state to your git repo and forget that spinoff'
|
|
' ever existed.'
|
|
)
|
|
if gitignorelines and gitignorelines[-1] != '':
|
|
gitignorelines.append('')
|
|
gitignorelines.append(start_line)
|
|
for entry in sorted(gitignore_entries):
|
|
gitignorelines.append(f'/{entry}')
|
|
|
|
# Add a blurb about this coming from spinoff.
|
|
blurb = (
|
|
'# THIS FILE IS AUTOGENERATED BY SPINOFF;'
|
|
' MAKE ANY EDITS IN SOURCE PROJECT'
|
|
)
|
|
gitignorelines = [blurb, ''] + gitignorelines
|
|
with open(
|
|
os.path.join(self._dst_root, '.gitignore'), 'w', encoding='utf-8'
|
|
) as outfile:
|
|
outfile.write('\n'.join(gitignorelines) + '\n')
|
|
|
|
def _filter_path(self, path: str) -> str:
|
|
"""Run filtering on a given path."""
|
|
|
|
return self.filter_path_call(self, path)
|
|
|
|
def default_filter_path(self, text: str) -> str:
|
|
"""Run default filtering on path text."""
|
|
return self.default_filter_text(text)
|
|
|
|
def replace_path_components(
|
|
self, path: str, replace_src: str, replace_dst: str
|
|
) -> str:
|
|
"""Replace a path hierarchy with another.
|
|
|
|
Does the right thing for parents. For instance, src 'a/b/c'
|
|
and dst 'a2/b2/c2' will correctly filter 'a/foo' to 'a2/foo'
|
|
and 'a/b/foo' to 'a2/b2/foo'.
|
|
"""
|
|
pathsrc = replace_src.split('/')
|
|
pathdst = replace_dst.split('/')
|
|
assert len(pathsrc) == len(pathdst)
|
|
splits = path.split('/')
|
|
cmplen = min(len(splits), len(pathsrc))
|
|
if splits[:cmplen] == pathsrc[:cmplen]:
|
|
return '/'.join(pathdst[:cmplen] + splits[cmplen:])
|
|
return path
|
|
|
|
def default_filter_text(self, text: str) -> str:
|
|
"""Run default filtering on a piece of text."""
|
|
|
|
# Replace uppercase, lowercase, and mixed versions of our name.
|
|
return (
|
|
text.replace(self._src_name.upper(), self.dst_name.upper())
|
|
.replace(self._src_name.lower(), self.dst_name.lower())
|
|
.replace(self._src_name, self.dst_name)
|
|
)
|
|
|
|
def default_filter_file(self, src_path: str, text: str) -> str:
|
|
"""Run default filtering on a file."""
|
|
# pylint: disable=too-many-branches
|
|
|
|
# Strip out any sections frames by our strip-begin/end tags.
|
|
if any(t[0] in text for t in STRIP_TAG_PAIRS):
|
|
lines = text.splitlines()
|
|
|
|
for begin_tag, end_tag in STRIP_TAG_PAIRS:
|
|
while begin_tag in lines:
|
|
index = lines.index(begin_tag)
|
|
endindex = index
|
|
while lines[endindex] != end_tag:
|
|
endindex += 1
|
|
|
|
# If the line after us is blank,
|
|
# include it too to keep spacing clean.
|
|
if (
|
|
len(lines) > (endindex + 1)
|
|
and not lines[endindex + 1].strip()
|
|
):
|
|
endindex += 1
|
|
|
|
del lines[index : endindex + 1]
|
|
|
|
text = '\n'.join(lines) + '\n'
|
|
|
|
# Add warnings to some of the git-managed files that we write.
|
|
if src_path == 'README.md':
|
|
blurb = (
|
|
'(this readme is autogenerated by spinoff; '
|
|
'make any edits in source project)'
|
|
)
|
|
lines = self.default_filter_text(text).splitlines()
|
|
return '\n'.join(lines[:1] + ['', blurb] + lines[1:])
|
|
if 'Jenkinsfile' in src_path:
|
|
blurb = (
|
|
'// THIS FILE IS AUTOGENERATED BY SPINOFF;'
|
|
' MAKE ANY EDITS IN SOURCE PROJECT'
|
|
)
|
|
lines = self.default_filter_text(text).splitlines()
|
|
return '\n'.join([blurb, ''] + lines)
|
|
if src_path in ['.gitattributes']:
|
|
blurb = (
|
|
'# THIS FILE IS AUTOGENERATED BY SPINOFF;'
|
|
' MAKE ANY EDITS IN SOURCE PROJECT'
|
|
)
|
|
lines = self.default_filter_text(text).splitlines()
|
|
return '\n'.join([blurb, ''] + lines)
|
|
|
|
# Jetbrains dict files will get sorted differently after filtering
|
|
# words; go ahead and do that as we filter to avoid triggering
|
|
# difference errors next time the dst dict is saved.
|
|
# FIXME: generalize this for any jetbrains dict path; not just mine.
|
|
if src_path.endswith('/ericf.xml'):
|
|
from efrotools.code import sort_jetbrains_dict
|
|
|
|
return sort_jetbrains_dict(self.default_filter_text(text))
|
|
|
|
# baenv.py will run a standard app loop if exec'ed, but this
|
|
# requires base. Error instead if base is missing.
|
|
if src_path == 'src/assets/ba_data/python/baenv.py':
|
|
assert 'base' in self._src_all_feature_sets
|
|
if 'base' in self._src_omit_feature_sets:
|
|
text = replace_exact(
|
|
text,
|
|
(
|
|
'def _main() -> None:\n'
|
|
' # Run a default configure BEFORE importing'
|
|
' babase.\n'
|
|
' # (may affect where babase comes from).\n'
|
|
' configure()\n'
|
|
'\n'
|
|
' import babase\n'
|
|
'\n'
|
|
' babase.app.run()\n'
|
|
'\n'
|
|
),
|
|
(
|
|
'def _main() -> None:\n'
|
|
' # DISABLED; REQUIRES BASE FEATURE SET.\n'
|
|
' # Run a default configure BEFORE importing'
|
|
' babase.\n'
|
|
' # (may affect where babase comes from).\n'
|
|
' # configure()\n'
|
|
'\n'
|
|
' # import babase\n'
|
|
'\n'
|
|
' # babase.app.run()\n'
|
|
'\n'
|
|
" raise RuntimeError('App-exec requires"
|
|
" base feature set.')\n"
|
|
'\n'
|
|
),
|
|
label=src_path,
|
|
)
|
|
|
|
# In our public repo, if the plus featureset is not included, we
|
|
# don't want to fetch or link against the precompiled plus
|
|
# library.
|
|
assert 'plus' in self._src_all_feature_sets
|
|
if self._public and 'plus' in self._src_omit_feature_sets:
|
|
if src_path == 'ballisticakit-cmake/CMakeLists.txt':
|
|
# Strip precompiled plus library out of the cmake file.
|
|
text = replace_exact(
|
|
text,
|
|
'${CMAKE_CURRENT_BINARY_DIR}/prefablib/libballisticaplus.a'
|
|
' ode ',
|
|
'ode ',
|
|
label=src_path,
|
|
count=2,
|
|
)
|
|
if src_path.startswith(
|
|
'ballisticakit-windows/'
|
|
) and src_path.endswith('.vcxproj'):
|
|
# Strip precompiled plus library out of visual studio projects.
|
|
text = replace_exact(
|
|
text,
|
|
' <ItemGroup>\r\n'
|
|
' <Library Include="..\\..\\build\\prefab\\lib\\windows'
|
|
'\\$(Configuration)_$(Platform)\\'
|
|
'$(MSBuildProjectName)Plus.lib" />\r\n'
|
|
' </ItemGroup>\r\n',
|
|
'',
|
|
label=src_path,
|
|
)
|
|
if src_path == 'Makefile':
|
|
# Remove downloads of prebuilt plus lib for win builds.
|
|
text = replace_exact(
|
|
text,
|
|
' build/prefab/lib/windows/Debug_Win32/'
|
|
'BallisticaKitGenericPlus.lib \\\n'
|
|
' build/prefab/lib/windows/Debug_Win32/'
|
|
'BallisticaKitGenericPlus.pdb\n',
|
|
'',
|
|
count=2,
|
|
label=src_path,
|
|
)
|
|
text = replace_exact(
|
|
text,
|
|
' build/prefab/lib/windows/Release_Win32/'
|
|
'BallisticaKitGenericPlus.lib \\\n'
|
|
' build/prefab/lib/windows/Release_Win32/'
|
|
'BallisticaKitGenericPlus.pdb\n',
|
|
'',
|
|
count=2,
|
|
label=src_path,
|
|
)
|
|
# Remove prebuilt lib download for cmake & cmake-modular
|
|
# targets.
|
|
text = replace_exact(
|
|
text,
|
|
'\t@tools/pcommand update_cmake_prefab_lib standard'
|
|
' $(CM_BT_LC) build/cmake/$(CM_BT_LC)\n',
|
|
'',
|
|
label=src_path,
|
|
)
|
|
text = replace_exact(
|
|
text,
|
|
'\t@tools/pcommand update_cmake_prefab_lib server'
|
|
' $(CM_BT_LC) build/cmake/server-$(CM_BT_LC)/dist\n',
|
|
'',
|
|
label=src_path,
|
|
)
|
|
text = replace_exact(
|
|
text,
|
|
'\t@tools/pcommand update_cmake_prefab_lib standard'
|
|
' $(CM_BT_LC) build/cmake/modular-$(CM_BT_LC)\n',
|
|
'',
|
|
label=src_path,
|
|
)
|
|
|
|
return self.default_filter_text(text)
|
|
|
|
def _encoding_for_file(self, path: str) -> str:
|
|
"""Returns the text encoding a file requires."""
|
|
|
|
# Just make sure this path is valid; at some point we may want to
|
|
# crack the file.
|
|
if not os.path.isfile(path):
|
|
raise RuntimeError('invalid path passed to _encoding_for_file')
|
|
|
|
# These files seem to get cranky if we try to convert them to utf-8.
|
|
# TODO: I think I read that MSVC 2017+ might be more lenient here;
|
|
# should check that out because this is annoying.
|
|
if path.endswith('BallisticaKit.rc') or path.endswith('Resource.rc'):
|
|
return 'utf-16le'
|
|
return 'utf-8'
|
|
|
|
def _filter_file(self, src_path: str, text: str) -> str:
|
|
"""Run filtering on a given file."""
|
|
|
|
# Run our registered filter call.
|
|
out = self.filter_file_call(self, src_path, text)
|
|
|
|
# Run formatting on some files if they change. Otherwise, running
|
|
# a preflight in the dst project could change things, leading to
|
|
# 'spinoff-managed-file-changed' errors.
|
|
|
|
# Note that we use our parent repo for these commands to pick up their
|
|
# tool configs, since those might not exist yet in our child repo.
|
|
# (This also means we need to make sure tool configs have been
|
|
# generated in the parent repo).
|
|
|
|
# WARNING: hard-coding a few 'script' files that don't end in .py too.
|
|
# The proper way might be to ask the parent repo for its full list of
|
|
# script files but that would add more expense.
|
|
if (
|
|
src_path.endswith('.py') or src_path in {'tools/cloudshell'}
|
|
) and out != text:
|
|
self._ensure_parent_repo_tool_configs_exist()
|
|
out = format_python_str(out)
|
|
|
|
# Ditto for .cc
|
|
if src_path.endswith('.cc') and out != text:
|
|
self._ensure_parent_repo_tool_configs_exist()
|
|
out = format_cpp_str(
|
|
projroot=Path(self._src_root),
|
|
text=out,
|
|
filename=os.path.basename(src_path),
|
|
)
|
|
|
|
return out
|
|
|
|
def _ensure_parent_repo_tool_configs_exist(self) -> None:
|
|
if not self._built_parent_repo_tool_configs:
|
|
# Interestingly, seems we need to use shell command cd here
|
|
# instead of just passing cwd arg.
|
|
subprocess.run(
|
|
f'cd {self._src_root} && make prereqs',
|
|
shell=True,
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
self._built_parent_repo_tool_configs = True
|
|
|
|
def _should_filter_src_file(self, path: str) -> bool:
|
|
"""Return whether a given file should be filtered."""
|
|
basename = os.path.basename(path)
|
|
ext = os.path.splitext(basename)[1]
|
|
if any(path.startswith(f'{p}/') for p in self.filter_dirs):
|
|
return True
|
|
if any(path.startswith(f'{p}/') for p in self.no_filter_dirs):
|
|
return False
|
|
if basename in self.filter_file_names:
|
|
return True
|
|
if basename in self.no_filter_file_names:
|
|
return False
|
|
if ext in self.filter_file_extensions:
|
|
return True
|
|
if ext in self.no_filter_file_extensions:
|
|
return False
|
|
raise RuntimeError(f"No filter rule for path '{path}'.")
|
|
|
|
def _should_add_gitignore_path(self, path: str) -> bool:
|
|
"""Return whether a file path should be added to gitignore."""
|
|
assert self._spinoff_managed_dirs is not None
|
|
|
|
# Special case: specific dirs/files we *always* want in git
|
|
# should never get added to gitignore.
|
|
if _any_path_contains(self.git_mirrored_paths, path):
|
|
return False
|
|
|
|
# If there's a spinoff-managed dir above us, we're already covered.
|
|
if any(path.startswith(f'{d}/') for d in self._spinoff_managed_dirs):
|
|
return False
|
|
|
|
# Go ahead and ignore.
|
|
return True
|
|
|
|
def _print_error_entities(self) -> None:
|
|
"""Print info about entity errors encountered."""
|
|
print(
|
|
'\nSpinoff Error(s) Found:\n'
|
|
" Tip: to resolve 'spinoff-managed file modified' errors,\n"
|
|
" use the 'backport' subcommand.\n",
|
|
file=sys.stderr,
|
|
)
|
|
for key, val in sorted(self._src_error_entities.items()):
|
|
dst = self._src_entities[key].dst
|
|
print(
|
|
f' {Clr.RED}Error: {dst}{Clr.RST} ({val})',
|
|
file=sys.stderr,
|
|
)
|
|
for key, val in sorted(self._dst_error_entities.items()):
|
|
print(
|
|
f' {Clr.RED}Error: {key}{Clr.RST} ({val})',
|
|
file=sys.stderr,
|
|
)
|
|
print('')
|
|
|
|
def _validate_final_lists(self) -> None:
|
|
"""Make sure we never delete the few files we're letting git store."""
|
|
|
|
for ent in self._dst_purge_entities.copy():
|
|
if _any_path_contains(self.git_mirrored_paths, ent):
|
|
print(
|
|
'WARNING; git-mirrored entity'
|
|
f" '{ent}' unexpectedly found on purge list. Ignoring.",
|
|
file=sys.stderr,
|
|
)
|
|
self._dst_purge_entities.remove(ent)
|
|
|
|
def _purge_all_dst_entities(self) -> None:
|
|
"""Go through everything in dst and add it to our purge list.
|
|
|
|
(or error if unsafe to do so)
|
|
"""
|
|
for key, val in list(self._dst_entities.items()):
|
|
# We never want to purge git-managed stuff.
|
|
if _any_path_contains(self.git_mirrored_paths, key):
|
|
continue
|
|
|
|
dst_path = key
|
|
dst_path_full = os.path.join(self._dst_root, dst_path)
|
|
|
|
# If dst doesnt exist we just ignore it.
|
|
if not os.path.exists(dst_path_full):
|
|
continue
|
|
|
|
# For symlinks we just error if dst is no longer a symlink;
|
|
# otherwise kill it.
|
|
if val.entity_type is EntityType.SYMLINK:
|
|
if not os.path.islink(dst_path_full):
|
|
self._dst_error_entities[dst_path] = 'expected a symlink'
|
|
continue
|
|
self._dst_purge_entities.add(dst_path)
|
|
continue
|
|
|
|
# Cor regular files we try to make sure nothing changed
|
|
# since we put it there.
|
|
src_path = val.src_path
|
|
assert src_path is not None
|
|
src_path_full = os.path.join(self._src_root, src_path)
|
|
dst_size = val.dst_size
|
|
dst_mtime = val.dst_mtime
|
|
|
|
if (os.path.getsize(dst_path_full) == dst_size) and (
|
|
os.path.getmtime(dst_path_full) == dst_mtime
|
|
):
|
|
self._dst_purge_entities.add(key)
|
|
else:
|
|
self._attempt_purge_modified_dst(
|
|
src_path, src_path_full, dst_path, dst_path_full, key
|
|
)
|
|
|
|
def _attempt_purge_modified_dst(
|
|
self,
|
|
src_path: str,
|
|
src_path_full: str,
|
|
dst_path: str,
|
|
dst_path_full: str,
|
|
key: str,
|
|
) -> None:
|
|
# Ick; dst changed. Now the only way we allow
|
|
# the delete is if we can re-filter its src
|
|
# and come up with the same dst again
|
|
# (meaning it probably just had its timestamp
|
|
# changed and nothing else).
|
|
if self._should_filter_src_file(src_path):
|
|
encoding = self._encoding_for_file(src_path_full)
|
|
with open(src_path_full, 'rb') as infile:
|
|
try:
|
|
src_data = self._filter_file(
|
|
src_path, infile.read().decode(encoding)
|
|
)
|
|
except Exception:
|
|
print(f"Error decoding/filtering file: '{src_path}'.")
|
|
raise
|
|
with open(dst_path_full, 'rb') as infile:
|
|
try:
|
|
dst_data = infile.read().decode(encoding)
|
|
except Exception:
|
|
print(f"Error decoding file: '{dst_path}'.")
|
|
raise
|
|
still_same = src_data == dst_data
|
|
else:
|
|
with open(src_path_full, 'rb') as infile_b:
|
|
src_data_b = infile_b.read()
|
|
with open(dst_path_full, 'rb') as infile_b:
|
|
dst_data_b = infile_b.read()
|
|
still_same = src_data_b == dst_data_b
|
|
if still_same:
|
|
self._dst_purge_entities.add(key)
|
|
else:
|
|
self._dst_error_entities[dst_path] = 'spinoff-managed file modified'
|
|
|
|
def _remove_empty_folders(
|
|
self, path: str, remove_root: bool = True
|
|
) -> None:
|
|
"""Remove empty folders."""
|
|
if not os.path.isdir(path):
|
|
return
|
|
|
|
# Ignore symlinks.
|
|
if os.path.islink(path):
|
|
return
|
|
|
|
# Remove empty subdirs.
|
|
fnames = os.listdir(path)
|
|
if fnames:
|
|
for fname in fnames:
|
|
# Special case; never recurse into .git dirs; blowing
|
|
# away empty dirs there can be harmful. Note: Do we want
|
|
# to use ignore_names here? Seems like we'd still want
|
|
# to delete other entries there like __pycache__ though.
|
|
if fname == '.git':
|
|
continue
|
|
|
|
fullpath = os.path.join(path, fname)
|
|
if os.path.isdir(fullpath):
|
|
self._remove_empty_folders(fullpath)
|
|
|
|
# If folder is *now* empty, delete it.
|
|
fnames = os.listdir(path)
|
|
if not fnames and remove_root:
|
|
os.rmdir(path)
|
|
|
|
def _handle_recache_entities(self) -> None:
|
|
"""Re-cache some special case entries.
|
|
|
|
For these entries we simply re-cache modtimes/sizes
|
|
but don't touch any actual files.
|
|
"""
|
|
for src_path in self._src_recache_entities:
|
|
src_entity = self._src_entities[src_path]
|
|
dst_path = src_entity.dst
|
|
src_path_full = os.path.join(self._src_root, src_path)
|
|
dst_path_full = os.path.join(self._dst_root, dst_path)
|
|
self._dst_entities[dst_path] = DstEntity(
|
|
entity_type=src_entity.entity_type,
|
|
env_hash=self._envhash,
|
|
src_path=src_path,
|
|
src_mtime=os.path.getmtime(src_path_full),
|
|
src_size=os.path.getsize(src_path_full),
|
|
dst_mtime=os.path.getmtime(dst_path_full),
|
|
dst_size=os.path.getsize(dst_path_full),
|
|
)
|
|
|
|
def _status(self) -> None:
|
|
self._validate_final_lists()
|
|
self._handle_recache_entities()
|
|
max_print = 10
|
|
|
|
# FIXME: We should show .gitignore here in cases when it would change
|
|
# (we handle that specially).
|
|
if self._src_copy_entities:
|
|
print(
|
|
f'\n{len(self._src_copy_entities)}'
|
|
f' file(s) would be updated:\n',
|
|
file=sys.stderr,
|
|
)
|
|
src_copy_entities_truncated = sorted(self._src_copy_entities)
|
|
if (
|
|
not self._print_full_lists
|
|
and len(src_copy_entities_truncated) > max_print
|
|
):
|
|
src_copy_entities_truncated = src_copy_entities_truncated[
|
|
:max_print
|
|
]
|
|
for ename in src_copy_entities_truncated:
|
|
dst_path_full = os.path.join(
|
|
self._dst_root, self._src_entities[ename].dst
|
|
)
|
|
exists = os.path.exists(dst_path_full)
|
|
modstr = 'modified' if exists else 'new'
|
|
dstent = self._src_entities[ename].dst
|
|
print(
|
|
f' {Clr.GRN}{modstr}: {dstent}{Clr.RST}',
|
|
file=sys.stderr,
|
|
)
|
|
if len(src_copy_entities_truncated) != len(self._src_copy_entities):
|
|
morecnt = len(self._src_copy_entities) - len(
|
|
src_copy_entities_truncated
|
|
)
|
|
print(
|
|
f' {Clr.GRN}{Clr.BLD}(plus {morecnt} more;'
|
|
f' pass --full for complete list){Clr.RST}',
|
|
file=sys.stderr,
|
|
)
|
|
dst_purge_entities_valid: set[str] = set()
|
|
if self._dst_purge_entities:
|
|
self._list_dst_purge_entities(dst_purge_entities_valid, max_print)
|
|
if not self._src_copy_entities and not dst_purge_entities_valid:
|
|
print(f'{Clr.GRN}Spinoff is up-to-date.{Clr.RST}', file=sys.stderr)
|
|
else:
|
|
print('')
|
|
|
|
def _list_dst_purge_entities(
|
|
self, dst_purge_entities_valid: set[str], max_print: int
|
|
) -> None:
|
|
for ent in self._dst_purge_entities:
|
|
dst_path_full = os.path.join(self._dst_root, ent)
|
|
|
|
# Only make note of the deletion if it exists.
|
|
if (
|
|
os.path.exists(dst_path_full)
|
|
# and ent not in self._dst_entities_delete_quietly
|
|
):
|
|
dst_purge_entities_valid.add(ent)
|
|
if dst_purge_entities_valid:
|
|
print(
|
|
f'\n{len(dst_purge_entities_valid)} file(s)'
|
|
' would be removed:\n',
|
|
file=sys.stderr,
|
|
)
|
|
dst_purge_entities_truncated = sorted(dst_purge_entities_valid)
|
|
if (
|
|
not self._print_full_lists
|
|
and len(dst_purge_entities_truncated) > max_print
|
|
):
|
|
dst_purge_entities_truncated = dst_purge_entities_truncated[
|
|
:max_print
|
|
]
|
|
for ent in sorted(dst_purge_entities_truncated):
|
|
print(f' {Clr.GRN}{ent}{Clr.RST}', file=sys.stderr)
|
|
if len(dst_purge_entities_truncated) != len(dst_purge_entities_valid):
|
|
num_more = len(dst_purge_entities_valid) - len(
|
|
dst_purge_entities_truncated
|
|
)
|
|
print(
|
|
f' {Clr.GRN}{Clr.BLD}(plus {num_more} more;'
|
|
f' pass --full for complete list){Clr.RST}',
|
|
file=sys.stderr,
|
|
)
|
|
|
|
def _override(self) -> None:
|
|
"""Add one or more overrides."""
|
|
try:
|
|
override_paths, src_paths = self._check_override_paths()
|
|
|
|
# To take an existing dst file out of spinoff management we need
|
|
# to do 3 things:
|
|
# - Add it to src_omit_paths to keep the src version from being
|
|
# synced in.
|
|
# - Add it to src_write_paths to ensure git has control over
|
|
# its location in dst.
|
|
# - Remove our dst entry for it to prevent spinoff from blowing
|
|
# it away when it sees the src entry no longer exists.
|
|
|
|
if not os.path.exists(self._dst_config_path):
|
|
raise RuntimeError(
|
|
f"Config file not found: '{self._dst_config_path}'."
|
|
)
|
|
with open(self._dst_config_path, encoding='utf-8') as infile:
|
|
config = infile.read()
|
|
|
|
config = _add_config_list_entry(config, 'src_omit_paths', src_paths)
|
|
config = _add_config_list_entry(
|
|
config, 'src_write_paths', src_paths
|
|
)
|
|
|
|
# Ok, now we simply remove it from tracking while leaving the
|
|
# existing file in place.
|
|
for override_path in override_paths:
|
|
del self._dst_entities[override_path]
|
|
|
|
with open(self._dst_config_path, 'w', encoding='utf-8') as outfile:
|
|
outfile.write(config)
|
|
|
|
for override_path in override_paths:
|
|
print(
|
|
f"'{override_path}' overridden. It should now show"
|
|
' up as untracked by git (you probably want to add it).'
|
|
)
|
|
|
|
except Exception as exc:
|
|
self._execution_error = True
|
|
print(f'{Clr.RED}Error{Clr.RST}: {exc}', file=sys.stderr)
|
|
|
|
def _check_override_paths(self) -> tuple[set[str], set[str]]:
|
|
assert self._override_paths is not None
|
|
# Return the set of dst overridden paths and the src paths
|
|
# they came from.
|
|
src_paths = set[str]()
|
|
override_paths = set[str]()
|
|
for arg in self._override_paths:
|
|
override_path_full = os.path.abspath(arg)
|
|
if not override_path_full.startswith(self._dst_root):
|
|
raise CleanError(
|
|
f'Override-path {override_path_full} does not reside'
|
|
f' under dst ({self._dst_root}).'
|
|
)
|
|
# TODO(ericf): generalize this now that we're no longer hard-coded
|
|
# to use submodules/ballistica. Should disallow any path under
|
|
# any submodule I suppose.
|
|
if override_path_full.startswith(
|
|
os.path.join(self._dst_root, 'submodules')
|
|
):
|
|
raise RuntimeError('Path can not reside under submodules.')
|
|
override_path = override_path_full[len(self._dst_root) + 1 :]
|
|
if not os.path.exists(override_path_full):
|
|
raise RuntimeError(f"Path does not exist: '{override_path}'.")
|
|
|
|
# For the time being we only support individual files here.
|
|
if not os.path.isfile(override_path_full):
|
|
raise RuntimeError(
|
|
f"path does not appear to be a file: '{override_path}'."
|
|
)
|
|
|
|
# Make sure this is a file we're tracking.
|
|
if override_path not in self._dst_entities:
|
|
raise RuntimeError(
|
|
f'Path does not appear to be'
|
|
f" tracked by spinoff: '{override_path}'."
|
|
)
|
|
|
|
# Disallow git-mirrored-paths.
|
|
# We would have to add special handling for this.
|
|
if _any_path_contains(self.git_mirrored_paths, override_path):
|
|
raise RuntimeError(
|
|
'Not allowed to override special git-managed path:'
|
|
f" '{override_path}'."
|
|
)
|
|
|
|
src_path = self._dst_entities[override_path].src_path
|
|
assert src_path is not None
|
|
src_paths.add(src_path)
|
|
override_paths.add(override_path)
|
|
return override_paths, src_paths
|
|
|
|
def _diff(self) -> None:
|
|
self._validate_final_lists()
|
|
self._handle_recache_entities()
|
|
|
|
if os.system('which colordiff > /dev/null 2>&1') == 0:
|
|
display_diff_cmd = 'colordiff'
|
|
else:
|
|
print(
|
|
'NOTE: for color-coded output, install "colordiff" via brew.',
|
|
file=sys.stderr,
|
|
)
|
|
display_diff_cmd = 'diff'
|
|
|
|
for src_path in sorted(self._src_copy_entities):
|
|
src_entity = self._src_entities[src_path]
|
|
dst_path = src_entity.dst
|
|
src_path_full = os.path.join(self._src_root, src_path)
|
|
dst_path_full = os.path.join(self._dst_root, dst_path)
|
|
try:
|
|
if src_entity.entity_type is EntityType.SYMLINK:
|
|
pass
|
|
|
|
elif src_entity.entity_type is EntityType.FILE:
|
|
self._diff_file(
|
|
src_path,
|
|
src_path_full,
|
|
dst_path,
|
|
dst_path_full,
|
|
display_diff_cmd,
|
|
)
|
|
else:
|
|
assert_never(src_entity.entity_type)
|
|
|
|
except Exception as exc:
|
|
self._execution_error = True
|
|
print(
|
|
f"{Clr.RED}Error diffing file: '{src_path_full}'"
|
|
f'{Clr.RST}: {exc}',
|
|
file=sys.stderr,
|
|
)
|
|
|
|
def _diff_file(
|
|
self,
|
|
src_path: str,
|
|
src_path_full: str,
|
|
dst_path: str,
|
|
dst_path_full: str,
|
|
display_diff_cmd: str,
|
|
) -> None:
|
|
if os.path.isfile(src_path_full) and os.path.isfile(dst_path_full):
|
|
# We want to show how this update would change the dst
|
|
# file, so we need to compare a filtered version of src
|
|
# to the existing dst. For non-filtered src files we
|
|
# can just do a direct compare
|
|
delete_file_name: str | None
|
|
if self._should_filter_src_file(src_path):
|
|
with tempfile.NamedTemporaryFile('wb', delete=False) as tmpf:
|
|
with open(src_path_full, 'rb') as infile:
|
|
encoding = self._encoding_for_file(src_path_full)
|
|
try:
|
|
contents_in = infile.read().decode(encoding)
|
|
except Exception:
|
|
print(f"Error decoding file: '{src_path}'.")
|
|
raise
|
|
contents_out = self._filter_file(src_path, contents_in)
|
|
tmpf.write(contents_out.encode(encoding))
|
|
delete_file_name = tmpf.name
|
|
tmpf.close()
|
|
diff_src_path_full = delete_file_name
|
|
else:
|
|
diff_src_path_full = src_path_full
|
|
delete_file_name = None
|
|
result = os.system(
|
|
f'diff "{diff_src_path_full}" "{dst_path_full}"'
|
|
f' > /dev/null 2>&1'
|
|
)
|
|
if result != 0:
|
|
print(f'\n{dst_path}:')
|
|
os.system(
|
|
f'{display_diff_cmd} "{dst_path_full}"'
|
|
f' "{diff_src_path_full}"'
|
|
)
|
|
print('')
|
|
|
|
if delete_file_name is not None:
|
|
os.remove(delete_file_name)
|
|
|
|
def _is_project_file(self, path: str) -> bool:
|
|
if path.startswith('tools/') or path.startswith('src/external'):
|
|
return False
|
|
bname = os.path.basename(path)
|
|
return (
|
|
path in self.project_file_paths
|
|
or bname in self.project_file_names
|
|
or any(bname.endswith(s) for s in self.project_file_suffixes)
|
|
)
|
|
|
|
def _update(self) -> None:
|
|
"""Run a variation of the "update" command."""
|
|
self._validate_final_lists()
|
|
self._handle_recache_entities()
|
|
|
|
# Let's print individual updates only if there's few of them.
|
|
print_individual_updates = len(self._src_copy_entities) < 50
|
|
|
|
project_src_paths: list[str] = []
|
|
|
|
# Run all file updates except for project ones (Makefiles, etc.)
|
|
# Which we wait for until the end.
|
|
for src_path in sorted(self._src_copy_entities):
|
|
if self._is_project_file(src_path):
|
|
project_src_paths.append(src_path)
|
|
else:
|
|
self._handle_src_copy(src_path, print_individual_updates)
|
|
|
|
# Now attempt to remove anything in our purge list.
|
|
removed_f_count = self._remove_purge_entities()
|
|
|
|
# Update project files after all other copies and deletes are done.
|
|
# This is because these files take the state of the project on disk
|
|
# into account, so we need all files they're looking at to be final.
|
|
if project_src_paths:
|
|
from batools.project import ProjectUpdater
|
|
|
|
assert self._project_updater is None
|
|
self._project_updater = ProjectUpdater(
|
|
self._dst_root,
|
|
check=False,
|
|
fix=False,
|
|
empty=True,
|
|
projname=self.default_filter_text(self._src_name),
|
|
)
|
|
|
|
# For project-updater to do its thing, we need to provide
|
|
# filtered source versions of *all* project files which
|
|
# might be changing. (Some project files may implicitly generate
|
|
# others as part of their own generation so we need all sources
|
|
# in place before any generation happens).
|
|
for src_path in project_src_paths:
|
|
self._handle_src_copy_project_updater_register(src_path)
|
|
|
|
# Ok; everything is registered. Can now use the updater to
|
|
# filter dst versions of these.
|
|
self._project_updater.prepare_to_generate()
|
|
for src_path in project_src_paths:
|
|
self._handle_src_copy(
|
|
src_path, print_individual_updates, is_project_file=True
|
|
)
|
|
|
|
# Print some overall results.
|
|
if self._src_copy_entities:
|
|
print(
|
|
f'{len(self._src_copy_entities)} file(s) updated.',
|
|
file=sys.stderr,
|
|
)
|
|
|
|
if removed_f_count > 0:
|
|
print(f'{removed_f_count} file(s) removed.', file=sys.stderr)
|
|
|
|
# If we didn't update any files or delete anything, say so.
|
|
if removed_f_count == 0 and not self._src_copy_entities:
|
|
print('Spinoff is up-to-date.', file=sys.stderr)
|
|
|
|
def _handle_src_copy_project_updater_register(self, src_path: str) -> None:
|
|
src_entity = self._src_entities[src_path]
|
|
dst_path = src_entity.dst
|
|
src_path_full = os.path.join(self._src_root, src_path)
|
|
# dst_path_full = os.path.join(self._dst_root, dst_path)
|
|
|
|
# Currently assuming these are filtered.
|
|
assert self._should_filter_src_file(src_path)
|
|
assert src_entity.entity_type is EntityType.FILE
|
|
encoding = self._encoding_for_file(src_path_full)
|
|
with open(src_path_full, 'rb') as infile:
|
|
try:
|
|
contents_in = infile.read().decode(encoding)
|
|
except Exception:
|
|
print(f"Error decoding file: '{src_path}'.")
|
|
raise
|
|
contents_out = self._filter_file(src_path, contents_in)
|
|
|
|
# Take the filtered spinoff contents from src and plug that into
|
|
# the project updater as the 'current' version of the file. The
|
|
# updater will then update it based on the current state of the
|
|
# project.
|
|
assert self._project_updater is not None
|
|
self._project_updater.enqueue_update(dst_path, contents_out)
|
|
|
|
def _handle_src_copy(
|
|
self,
|
|
src_path: str,
|
|
print_individual_updates: bool,
|
|
is_project_file: bool = False,
|
|
) -> None:
|
|
# pylint: disable=too-many-locals
|
|
src_entity = self._src_entities[src_path]
|
|
dst_path = src_entity.dst
|
|
src_path_full = os.path.join(self._src_root, src_path)
|
|
dst_path_full = os.path.join(self._dst_root, dst_path)
|
|
try:
|
|
# Create its containing dir if need be.
|
|
dirname = os.path.dirname(dst_path_full)
|
|
if not os.path.exists(dirname):
|
|
os.makedirs(dirname)
|
|
|
|
mode = os.lstat(src_path_full).st_mode
|
|
|
|
if src_entity.entity_type is EntityType.SYMLINK:
|
|
assert not is_project_file # Undefined.
|
|
linkto = os.readlink(src_path_full)
|
|
if os.path.islink(dst_path_full):
|
|
os.remove(dst_path_full)
|
|
os.symlink(linkto, dst_path_full)
|
|
dst_entity = DstEntity(
|
|
entity_type=src_entity.entity_type,
|
|
env_hash=None,
|
|
src_path=None,
|
|
src_mtime=None,
|
|
src_size=None,
|
|
dst_mtime=None,
|
|
dst_size=None,
|
|
)
|
|
|
|
elif src_entity.entity_type is EntityType.FILE:
|
|
dst_entity = self._handle_src_copy_file(
|
|
src_path,
|
|
src_path_full,
|
|
dst_path,
|
|
dst_path_full,
|
|
src_entity,
|
|
is_project_file,
|
|
)
|
|
os.chmod(dst_path_full, mode)
|
|
else:
|
|
raise RuntimeError(
|
|
f"Invalid entity type: '{src_entity['type']}'."
|
|
)
|
|
|
|
# NOTE TO SELF - was using lchmod here but it doesn't exist
|
|
# on linux (apparently symlinks can't have perms modified).
|
|
# Now doing a chmod above only for the 'file' path.
|
|
# os.lchmod(dst_path_full, mode)
|
|
self._dst_entities[dst_path] = dst_entity
|
|
if print_individual_updates:
|
|
print(
|
|
f' updated: {Clr.GRN}{dst_path}{Clr.RST}', file=sys.stderr
|
|
)
|
|
|
|
except Exception as exc:
|
|
# Attempt to remove whatever we just put there so we avoid
|
|
# 'non-managed-file-found' errors in subsequent runs.
|
|
try:
|
|
if os.path.exists(dst_path_full):
|
|
os.unlink(dst_path_full)
|
|
except Exception as exc2:
|
|
print(
|
|
f'{Clr.RED}Error removing failed dst file: {exc2}{Clr.RST}'
|
|
)
|
|
self._execution_error = True
|
|
verbose_note = (
|
|
'' if self._verbose else ' (use --verbose for full traceback)'
|
|
)
|
|
print(
|
|
f'{Clr.RED}Error copying/filtering file:'
|
|
f" '{src_path_full}'{Clr.RST}: {exc}{verbose_note}",
|
|
file=sys.stderr,
|
|
)
|
|
if self._verbose:
|
|
import traceback
|
|
|
|
traceback.print_exc(file=sys.stderr)
|
|
|
|
def _handle_src_copy_file(
|
|
self,
|
|
src_path: str,
|
|
src_path_full: str,
|
|
dst_path: str,
|
|
dst_path_full: str,
|
|
src_entity: SrcEntity,
|
|
is_project_file: bool,
|
|
) -> DstEntity:
|
|
# del dst_path # Unused.
|
|
|
|
# If this is a project file, we already fed the filtered
|
|
# src into our ProjectUpdater instance, so all we do here is
|
|
# have the updater give us its output.
|
|
if is_project_file:
|
|
assert self._project_updater is not None
|
|
try:
|
|
pupdatedata = self._project_updater.generate_file(dst_path)
|
|
except Exception:
|
|
if bool(False):
|
|
print(f"ProjectUpdate error generating '{dst_path}'.")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
raise
|
|
with open(dst_path_full, 'w', encoding='utf-8') as outfile:
|
|
outfile.write(pupdatedata)
|
|
else:
|
|
# Normal non-project file path.
|
|
if not self._should_filter_src_file(src_path):
|
|
with open(src_path_full, 'rb') as infile:
|
|
data = infile.read()
|
|
with open(dst_path_full, 'wb') as outfile:
|
|
outfile.write(data)
|
|
else:
|
|
with open(src_path_full, 'rb') as infile:
|
|
encoding = self._encoding_for_file(src_path_full)
|
|
try:
|
|
contents_in = infile.read().decode(encoding)
|
|
except Exception:
|
|
print(f"Error decoding file: '{src_path}'.")
|
|
raise
|
|
contents_out = self._filter_file(src_path, contents_in)
|
|
with open(dst_path_full, 'wb') as outfile:
|
|
outfile.write(contents_out.encode(encoding))
|
|
|
|
return DstEntity(
|
|
entity_type=src_entity.entity_type,
|
|
env_hash=self._envhash,
|
|
src_path=src_path,
|
|
src_mtime=os.path.getmtime(src_path_full),
|
|
src_size=os.path.getsize(src_path_full),
|
|
dst_mtime=os.path.getmtime(dst_path_full),
|
|
dst_size=os.path.getsize(dst_path_full),
|
|
)
|
|
|
|
def _remove_purge_entities(self) -> int:
|
|
removed_f_count = 0
|
|
if self._dst_purge_entities:
|
|
for ent in sorted(self._dst_purge_entities):
|
|
dst_path_full = os.path.join(self._dst_root, ent)
|
|
try:
|
|
if os.path.isfile(dst_path_full) or os.path.islink(
|
|
dst_path_full
|
|
):
|
|
os.remove(dst_path_full)
|
|
del self._dst_entities[ent]
|
|
removed_f_count += 1
|
|
elif not os.path.exists(dst_path_full):
|
|
# It's already gone; no biggie.
|
|
del self._dst_entities[ent]
|
|
else:
|
|
print(
|
|
f"Anomaly removing file: '{dst_path_full}'.",
|
|
file=sys.stderr,
|
|
)
|
|
except Exception:
|
|
self._execution_error = True
|
|
print(
|
|
f"Error removing file: '{dst_path_full}'.",
|
|
file=sys.stderr,
|
|
)
|
|
return removed_f_count
|
|
|
|
def _clean_cruft(self) -> None:
|
|
"""Clear out some known cruft-y files.
|
|
|
|
Makes us more likely to be able to clear directories (.DS_Store, etc)
|
|
"""
|
|
|
|
# Go through our list of dirs above files we've mapped to dst,
|
|
# cleaning out any 'cruft' files we find there.
|
|
assert self._dst_git_file_dirs is not None
|
|
for dstdir in self._dst_git_file_dirs:
|
|
dstdirfull = os.path.join(self._dst_root, dstdir)
|
|
if not os.path.isdir(dstdirfull):
|
|
continue
|
|
for fname in os.listdir(dstdirfull):
|
|
if fname in self.cruft_file_names:
|
|
cruftpath = os.path.join(dstdirfull, fname)
|
|
try:
|
|
os.remove(cruftpath)
|
|
except Exception:
|
|
print(
|
|
f"error removing cruft file: '{cruftpath}'.",
|
|
file=sys.stderr,
|
|
)
|
|
self._remove_empty_folders(self._dst_root, False)
|
|
|
|
def _check_spinoff_managed_dirs(self) -> None:
|
|
assert self._spinoff_managed_dirs is not None
|
|
# Spinoff-managed dirs are marked gitignore which means we are
|
|
# fully responsible for them. We thus want to be careful
|
|
# to avoid silently blowing away work that may have happened
|
|
# in one. So let's be rather strict about it and complain about
|
|
# any files we come across that aren't directly managed by us
|
|
# (or cruft).
|
|
dstrootsl = f'{self._dst_root}/'
|
|
for rdir in self._spinoff_managed_dirs:
|
|
for root, dirnames, fnames in os.walk(
|
|
os.path.join(self._dst_root, rdir),
|
|
topdown=True,
|
|
):
|
|
# Completely ignore ignore-names in both dirs and files
|
|
# and cruft-file names in files.
|
|
for dirname in dirnames.copy():
|
|
if dirname in self.ignore_names:
|
|
dirnames.remove(dirname)
|
|
for fname in fnames.copy():
|
|
if (
|
|
fname in self.ignore_names
|
|
or fname in self.cruft_file_names
|
|
):
|
|
fnames.remove(fname)
|
|
|
|
for fname in fnames:
|
|
dst_path_full = os.path.join(root, fname)
|
|
assert dst_path_full.startswith(dstrootsl)
|
|
dst_path = dst_path_full.removeprefix(dstrootsl)
|
|
|
|
# If its not a mapped-in file from src and not
|
|
# covered by generated-paths or git-mirror-paths,
|
|
# complain.
|
|
if (
|
|
dst_path not in self._dst_entities
|
|
and not _any_path_contains(
|
|
self._dst_unchecked_paths, dst_path
|
|
)
|
|
and not _any_path_contains(
|
|
self.git_mirrored_paths, dst_path
|
|
)
|
|
and not self._force
|
|
):
|
|
self._dst_error_entities[dst_path] = (
|
|
'non-spinoff file in spinoff-managed dir;'
|
|
' --force to ignore'
|
|
)
|
|
|
|
def _filter_src_git_file_list(self) -> None:
|
|
# Crate a filtered version of src git files based on our omit entries.
|
|
out = set[str]()
|
|
assert self._src_git_files is not None
|
|
for gitpath in self._src_git_files:
|
|
# If omit-path contains this one or any component is found
|
|
# in omit-names, pretend it doesn't exist.
|
|
if _any_path_contains(self._src_omit_paths_expanded, gitpath):
|
|
continue # Omitting
|
|
if any(name in gitpath.split('/') for name in self.ignore_names):
|
|
continue
|
|
out.add(gitpath)
|
|
self._src_git_files = out
|
|
|
|
def _register_sync_operations(self) -> None:
|
|
assert self._src_git_files is not None
|
|
for src_path in self._src_git_files:
|
|
dst_path = self._filter_path(src_path)
|
|
|
|
src_path_full = os.path.join(self._src_root, src_path)
|
|
dst_path_full = os.path.join(self._dst_root, dst_path)
|
|
|
|
if os.path.islink(src_path_full):
|
|
self._do_copy_symlink(
|
|
src_path, src_path_full, dst_path, dst_path_full
|
|
)
|
|
else:
|
|
assert os.path.isfile(src_path_full)
|
|
self._do_file_copy_and_filter(
|
|
src_path, src_path_full, dst_path, dst_path_full
|
|
)
|
|
|
|
def _do_copy_symlink(
|
|
self,
|
|
src_path: str,
|
|
src_path_full: str,
|
|
dst_path: str,
|
|
dst_path_full: str,
|
|
) -> None:
|
|
self._src_entities[src_path] = SrcEntity(
|
|
entity_type=EntityType.SYMLINK, dst=dst_path
|
|
)
|
|
if dst_path not in self._dst_entities:
|
|
self._src_copy_entities.add(src_path)
|
|
else:
|
|
dst_type = self._dst_entities[dst_path].entity_type
|
|
if dst_type is not EntityType.SYMLINK:
|
|
self._src_error_entities[
|
|
src_path
|
|
] = f'expected symlink; found {dst_type}'
|
|
else:
|
|
# Ok; looks like there's a symlink already there.
|
|
self._dst_entities_claimed.add(dst_path)
|
|
|
|
# See if existing link is pointing to the right place &
|
|
# schedule a copy if not.
|
|
linkto = os.readlink(src_path_full)
|
|
if (
|
|
not os.path.islink(dst_path_full)
|
|
or os.readlink(dst_path_full) != linkto
|
|
):
|
|
self._src_copy_entities.add(src_path)
|
|
|
|
def _do_file_copy_and_filter(
|
|
self,
|
|
src_path: str,
|
|
src_path_full: str,
|
|
dst_path: str,
|
|
dst_path_full: str,
|
|
) -> None:
|
|
self._src_entities[src_path] = SrcEntity(
|
|
entity_type=EntityType.FILE, dst=dst_path
|
|
)
|
|
if dst_path not in self._dst_entities:
|
|
# If we're unaware of dst, copy or error if something's
|
|
# there already (except for our git-managed files in which
|
|
# case we *expect* something to be there).
|
|
if (
|
|
os.path.exists(dst_path_full)
|
|
and not _any_path_contains(self.git_mirrored_paths, src_path)
|
|
and not self._force
|
|
):
|
|
self._src_error_entities[src_path] = (
|
|
'would overwrite non-spinoff file in dst;'
|
|
' --force to override'
|
|
)
|
|
else:
|
|
self._src_copy_entities.add(src_path)
|
|
else:
|
|
dst_type = self._dst_entities[dst_path].entity_type
|
|
if dst_type is not EntityType.FILE:
|
|
self._src_error_entities[
|
|
src_path
|
|
] = f'expected file; found {dst_type}'
|
|
else:
|
|
dst_exists = os.path.isfile(dst_path_full)
|
|
|
|
# Ok; we know of a dst file and it seems to exist. If both
|
|
# src and dst data still lines up with our cache we can
|
|
# assume there's nothing to be done.
|
|
dst_entity = self._dst_entities[dst_path]
|
|
# pylint: disable=too-many-boolean-expressions
|
|
if (
|
|
dst_exists
|
|
and dst_entity.env_hash == self._envhash
|
|
and os.path.getsize(dst_path_full) == dst_entity.dst_size
|
|
and os.path.getmtime(dst_path_full) == dst_entity.dst_mtime
|
|
and os.path.getsize(src_path_full) == dst_entity.src_size
|
|
and os.path.getmtime(src_path_full) == dst_entity.src_mtime
|
|
):
|
|
pass
|
|
else:
|
|
# *Something* differs from our cache; we have work to do.
|
|
self._do_differing_file_copy_and_filter(
|
|
src_path,
|
|
src_path_full,
|
|
dst_path,
|
|
dst_path_full,
|
|
dst_entity,
|
|
dst_exists,
|
|
)
|
|
|
|
self._dst_entities_claimed.add(dst_path)
|
|
|
|
def _do_differing_file_copy_and_filter(
|
|
self,
|
|
src_path: str,
|
|
src_path_full: str,
|
|
dst_path: str,
|
|
dst_path_full: str,
|
|
dst_entity: DstEntity,
|
|
dst_exists: bool,
|
|
) -> None:
|
|
# pylint: disable=too-many-branches
|
|
# pylint: disable=too-many-statements
|
|
# pylint: disable=too-many-locals
|
|
|
|
# Ok, *something* differs from our cache. Need to take a closer look.
|
|
|
|
# With no dst we have to do the copy of course.
|
|
if not dst_exists:
|
|
self._src_copy_entities.add(src_path)
|
|
return
|
|
|
|
do_backport = False
|
|
|
|
src_datab: bytes | None = None
|
|
dst_datab: bytes | None = None
|
|
src_data: str | None = None
|
|
dst_data: str | None = None
|
|
|
|
# In strict mode we want it to always be an error if dst mod-time
|
|
# varies from the version we wrote (we want to track down anyone
|
|
# writing to our files who is not us).
|
|
# Note that we need to ignore git-mirrored-paths because git might
|
|
# be mucking with modtimes itself.
|
|
if (
|
|
self.strict
|
|
and not self._force
|
|
and os.path.getmtime(dst_path_full) != dst_entity.dst_mtime
|
|
and not _any_path_contains(self.git_mirrored_paths, src_path)
|
|
):
|
|
# Try to include when the dst file got modified in
|
|
# case its helpful.
|
|
sincestr = (
|
|
''
|
|
if dst_entity.dst_mtime is None
|
|
else (
|
|
' '
|
|
+ timedelta_str(
|
|
os.path.getmtime(dst_path_full) - dst_entity.dst_mtime,
|
|
maxparts=1,
|
|
decimals=2,
|
|
)
|
|
)
|
|
)
|
|
self._src_error_entities[src_path] = (
|
|
f'[STRICT] spinoff-managed file modified{sincestr}'
|
|
f' after spinoff wrote it;'
|
|
f' --force to overwrite from src'
|
|
)
|
|
return
|
|
|
|
is_project_file = self._is_project_file(src_path)
|
|
|
|
if is_project_file:
|
|
# Project files apply arbitrary logic on top of our
|
|
# copying/filtering (which we cannot check here) so we can
|
|
# never assume results are unchanging.
|
|
results_are_same = False
|
|
else:
|
|
# Let's filter the src file and if it matches dst we can just
|
|
# re-grab our cache info and call it a day.
|
|
if self._should_filter_src_file(src_path):
|
|
encoding = self._encoding_for_file(src_path_full)
|
|
with open(src_path_full, 'rb') as infile:
|
|
try:
|
|
src_data = self._filter_file(
|
|
src_path, infile.read().decode(encoding)
|
|
)
|
|
except Exception:
|
|
print(f"Error decoding/filtering file: '{src_path}'.")
|
|
raise
|
|
with open(dst_path_full, 'rb') as infile:
|
|
try:
|
|
dst_data = infile.read().decode(encoding)
|
|
except Exception:
|
|
print(f"Error decoding file: '{dst_path}'.")
|
|
raise
|
|
results_are_same = src_data == dst_data
|
|
|
|
# Bytes versions are only used very rarely by 'backport'
|
|
# command so let's lazy compute them here.
|
|
src_datab = dst_datab = None
|
|
else:
|
|
# Ok our src isn't filtered; can be a bit more streamlined.
|
|
with open(src_path_full, 'rb') as infile:
|
|
src_datab = infile.read()
|
|
with open(dst_path_full, 'rb') as infile:
|
|
dst_datab = infile.read()
|
|
results_are_same = src_datab == dst_datab
|
|
|
|
# No string versions needed in this case.
|
|
src_data = dst_data = None
|
|
|
|
if results_are_same:
|
|
# Things match; just update the times we've got recorded
|
|
# for these fellas.
|
|
self._src_recache_entities.add(src_path)
|
|
else:
|
|
if (os.path.getsize(dst_path_full) == dst_entity.dst_size) and (
|
|
os.path.getmtime(dst_path_full) == dst_entity.dst_mtime
|
|
):
|
|
# If it looks like dst did not change, we can go
|
|
# through with a standard update.
|
|
self._src_copy_entities.add(src_path)
|
|
elif _any_path_contains(self.git_mirrored_paths, src_path):
|
|
# Ok, dst changed but it is managed by git so this
|
|
# happens (switching git branches or whatever else...)
|
|
# in this case we just blindly replace it; no erroring.
|
|
self._src_copy_entities.add(src_path)
|
|
elif self._force:
|
|
# If the user is forcing the issue, do the overwrite.
|
|
self._src_copy_entities.add(src_path)
|
|
elif (os.path.getsize(src_path_full) == dst_entity.src_size) and (
|
|
os.path.getmtime(src_path_full) == dst_entity.src_mtime
|
|
):
|
|
# Ok, dst changed but src did not. This is an error.
|
|
|
|
# Try to include when the dst file got modified in
|
|
# case its helpful.
|
|
sincestr = (
|
|
''
|
|
if dst_entity.dst_mtime is None
|
|
else (
|
|
' '
|
|
+ timedelta_str(
|
|
os.path.getmtime(dst_path_full)
|
|
- dst_entity.dst_mtime,
|
|
maxparts=1,
|
|
decimals=2,
|
|
)
|
|
)
|
|
)
|
|
self._src_error_entities[src_path] = (
|
|
f'spinoff-managed file modified{sincestr}'
|
|
f' after spinoff wrote it;'
|
|
f' --force to overwrite from src'
|
|
)
|
|
|
|
# Allow backport process here to correct this.
|
|
if self._mode is self.Mode.BACKPORT and (
|
|
self._backport_file == dst_path
|
|
or self._backport_file is None
|
|
):
|
|
do_backport = True
|
|
else:
|
|
# Ok, *nothing* matches (file contents don't match
|
|
# and both modtimes differ from cached ones).
|
|
# User needs to sort this mess out.
|
|
self._src_error_entities[src_path] = (
|
|
'src AND spinoff-managed file modified;'
|
|
' --force to overwrite from src'
|
|
)
|
|
|
|
# Allow backport process here to correct this.
|
|
if self._mode is self.Mode.BACKPORT and (
|
|
self._backport_file == dst_path
|
|
or self._backport_file is None
|
|
):
|
|
do_backport = True
|
|
|
|
if do_backport:
|
|
# Lazy compute string version if needed.
|
|
if src_data is None:
|
|
assert src_datab is not None
|
|
src_data = src_datab.decode()
|
|
if dst_data is None:
|
|
assert dst_datab is not None
|
|
dst_data = dst_datab.decode()
|
|
self._backport(src_path, dst_path, src_data, dst_data)
|
|
|
|
def _backport(
|
|
self, src_path: str, dst_path: str, src_data: str, dst_data: str
|
|
) -> None:
|
|
is_filtered = self._should_filter_src_file(src_path)
|
|
full_src_path = os.path.join(self._src_root, src_path)
|
|
|
|
# If we're doing auto-backport, just do the thing (when we can)
|
|
# and keep on going.
|
|
if self._auto_backport:
|
|
if is_filtered:
|
|
print(
|
|
f"{Clr.YLW}Can't auto-backport filtered file:{Clr.RST}"
|
|
f' {Clr.BLD}{dst_path}{Clr.RST}'
|
|
)
|
|
self._auto_backport_fail_count += 1
|
|
else:
|
|
src_path_full = os.path.join(self._src_root, src_path)
|
|
dst_path_full = os.path.join(self._dst_root, dst_path)
|
|
assert os.path.isfile(src_path_full)
|
|
assert os.path.isfile(dst_path_full)
|
|
subprocess.run(['cp', dst_path_full, src_path_full], check=True)
|
|
print(
|
|
f'{Clr.BLU}Auto-backporting{Clr.RST}'
|
|
f' {Clr.BLD}{dst_path}{Clr.RST}'
|
|
)
|
|
self._auto_backport_success_count += 1
|
|
return
|
|
|
|
# Ok NOT auto-backporting; we'll show a diff and stop after the
|
|
# first file.
|
|
|
|
# If this isn't a filtered file, it makes things easier.
|
|
if not is_filtered:
|
|
print(
|
|
f'Backporting {Clr.BLD}{dst_path}{Clr.RST}:\n'
|
|
f'{Clr.GRN}This file is NOT filtered so backporting'
|
|
f' is simple.{Clr.RST}\n'
|
|
f'{Clr.BLU}{Clr.BLD}LEFT:{Clr.RST}'
|
|
f' src file\n'
|
|
f'{Clr.BLU}{Clr.BLD}RIGHT:{Clr.RST} dst file\n'
|
|
f'{Clr.BLU}{Clr.BLD}YOUR MISSION:{Clr.RST}'
|
|
f' move changes from dst back to src.\n'
|
|
f"{Clr.CYN}Or pass '--auto' to the backport subcommand"
|
|
f' to do this for you.{Clr.RST}'
|
|
)
|
|
subprocess.run(
|
|
[
|
|
'opendiff',
|
|
os.path.join(self._src_root, src_path),
|
|
os.path.join(self._dst_root, dst_path),
|
|
],
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
|
|
else:
|
|
# It IS filtered.
|
|
|
|
print(
|
|
f'Backporting {Clr.BLD}{dst_path}{Clr.RST}:\n'
|
|
f'{Clr.YLW}This file is filtered which complicates'
|
|
f' backporting a bit.{Clr.RST}\n'
|
|
f'{Clr.BLU}{Clr.BLD}LEFT:{Clr.RST}'
|
|
f' {Clr.CYN}{Clr.BLD}FILTERED{Clr.RST}'
|
|
' src file\n'
|
|
f'{Clr.BLU}{Clr.BLD}RIGHT:{Clr.RST} dst file\n'
|
|
f'{Clr.BLU}{Clr.BLD}YOUR MISSION:{Clr.RST}'
|
|
f' modify {Clr.CYN}{Clr.BLD}ORIGINAL{Clr.RST}'
|
|
f' src file such that filtered src matches dst:\n'
|
|
f'{Clr.BLD}{full_src_path}{Clr.RST}'
|
|
)
|
|
with tempfile.TemporaryDirectory() as tempdir:
|
|
srcname = os.path.basename(src_path)
|
|
dstname = os.path.basename(dst_path)
|
|
tsrcpath = os.path.join(tempdir, f'FILTERED-PARENT({srcname})')
|
|
tdstpath = os.path.join(tempdir, f'SPINOFF({dstname})')
|
|
with open(tsrcpath, 'w', encoding='utf-8') as outfile:
|
|
outfile.write(src_data)
|
|
with open(tdstpath, 'w', encoding='utf-8') as outfile:
|
|
outfile.write(dst_data)
|
|
subprocess.run(
|
|
['opendiff', tsrcpath, tdstpath],
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
|
|
# Bow out after this one single file. Otherwise we wind up showing
|
|
# all errors (one of which we might have just fixed) which is
|
|
# misleading.
|
|
raise self.BackportInProgressError()
|
|
|
|
def _filter_paths(self, paths: Iterable[str]) -> set[str]:
|
|
return set(self._filter_path(p) for p in paths)
|
|
|
|
|
|
def _any_path_contains(paths: Iterable[str], path: str) -> bool:
|
|
assert not path.endswith('/')
|
|
|
|
for tpath in paths:
|
|
# Use simple logic if there's no special chars used by fnmatch.
|
|
if not any(char in tpath for char in ('*', '?', '[')):
|
|
if tpath == path or path.startswith(f'{tpath}/'):
|
|
return True
|
|
else:
|
|
# Bust out the fancy logic.
|
|
# Split both paths into segments ('a/b/c' -> ['a','b','c'])
|
|
# and compare each using fnmatch. If all segments
|
|
# from tpath match corresponding ones in path then tpath
|
|
# is a parent.
|
|
pathsegs = path.split('/')
|
|
tpathsegs = tpath.split('/')
|
|
if len(tpathsegs) > len(pathsegs):
|
|
continue # tpath is deeper than path; can't contain it.
|
|
all_matches = True
|
|
for i in range(len(tpathsegs)): # pylint: disable=C0200
|
|
seg_matches = fnmatch.fnmatchcase(pathsegs[i], tpathsegs[i])
|
|
if not seg_matches:
|
|
all_matches = False
|
|
break
|
|
if all_matches:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _get_dir_levels(dirpath: str) -> list[str]:
|
|
"""For 'a/b/c' return ['a', 'a/b', 'a/b/c']."""
|
|
splits = dirpath.split('/')
|
|
return ['/'.join(splits[: (i + 1)]) for i in range(len(splits))]
|
|
|
|
|
|
def _add_config_list_entry(
|
|
config: str, list_name: str, add_paths: set[str]
|
|
) -> str:
|
|
# pylint: disable=eval-used
|
|
splits = config.split(f'{list_name}: list[str] = [')
|
|
if len(splits) != 2:
|
|
raise RuntimeError('Parse error.')
|
|
splits2 = splits[1].split(']')
|
|
paths = eval(f'[{splits2[0]}]')
|
|
assert isinstance(paths, list)
|
|
for add_path in add_paths:
|
|
if add_path in paths:
|
|
raise RuntimeError(
|
|
f'Path already present in {list_name} in spinoffconfig:'
|
|
f" '{add_path}'."
|
|
)
|
|
paths.append(add_path)
|
|
|
|
config = (
|
|
splits[0]
|
|
+ f'{list_name}: list[str] = [\n'
|
|
+ ''.join([f' {repr(p)},\n' for p in sorted(paths)])
|
|
+ ']'.join([''] + splits2[1:])
|
|
)
|
|
return config
|