mirror of
https://github.com/RYDE-WORK/ballistica.git
synced 2026-01-19 21:37:57 +08:00
efrocache WIP
This commit is contained in:
parent
0dbe834987
commit
89e0d22755
8256
.efrocachemap
generated
8256
.efrocachemap
generated
File diff suppressed because it is too large
Load Diff
@ -186,6 +186,7 @@ ctx.filter_file_names = {
|
||||
'cloudtool',
|
||||
'bacloud',
|
||||
'config_template.yaml',
|
||||
'.efrocachemap',
|
||||
}
|
||||
|
||||
# ELSE files matching these exact base names will NOT be filtered.
|
||||
@ -206,7 +207,6 @@ ctx.no_filter_file_names = {
|
||||
'.pylintrc',
|
||||
'CPPLINT.cfg',
|
||||
'.mypy.ini',
|
||||
'.efrocachemap',
|
||||
'._ba_sources_hash',
|
||||
'._baplus_sources_hash',
|
||||
'._bascenev1_sources_hash',
|
||||
|
||||
@ -13,11 +13,19 @@ from __future__ import annotations
|
||||
|
||||
import os
|
||||
import json
|
||||
import zlib
|
||||
import subprocess
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Annotated
|
||||
from dataclasses import dataclass
|
||||
from multiprocessing import cpu_count
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from efro.dataclassio import (
|
||||
ioprepped,
|
||||
IOAttrs,
|
||||
dataclass_to_json,
|
||||
dataclass_from_json,
|
||||
)
|
||||
from efro.terminal import Clr
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -32,18 +40,31 @@ CACHE_MAP_NAME = '.efrocachemap'
|
||||
|
||||
UPLOAD_STATE_CACHE_FILE = '.cache/efrocache_upload_state'
|
||||
|
||||
# Cache file consists of these header bytes, single metadata length byte,
|
||||
# metadata utf8 bytes, compressed data bytes.
|
||||
CACHE_HEADER = b'efca'
|
||||
|
||||
def get_file_hash(path: str) -> str:
|
||||
"""Return the hash used for caching.
|
||||
|
||||
This incorporates the file contents as well as its path.
|
||||
"""
|
||||
@ioprepped
|
||||
@dataclass
|
||||
class CacheMetadata:
|
||||
"""Metadata stored with a cache file."""
|
||||
|
||||
executable: Annotated[bool, IOAttrs('e')]
|
||||
|
||||
|
||||
g_cache_prefix_noexec: bytes | None = None
|
||||
g_cache_prefix_exec: bytes | None = None
|
||||
|
||||
|
||||
def get_existing_file_hash(path: str) -> str:
|
||||
"""Return the hash used for caching."""
|
||||
import hashlib
|
||||
|
||||
prefix = _cache_prefix_for_file(path)
|
||||
md5 = hashlib.md5()
|
||||
with open(path, 'rb') as infile:
|
||||
md5.update(infile.read())
|
||||
md5.update(path.encode())
|
||||
md5.update(prefix + infile.read())
|
||||
return md5.hexdigest()
|
||||
|
||||
|
||||
@ -64,6 +85,8 @@ def _project_centric_path(path: str) -> str:
|
||||
|
||||
def get_target(path: str) -> None:
|
||||
"""Fetch a target path from the cache, downloading if need be."""
|
||||
# pylint: disable=too-many-locals
|
||||
# pylint: disable=too-many-statements
|
||||
from efro.error import CleanError
|
||||
|
||||
path = _project_centric_path(path)
|
||||
@ -78,18 +101,18 @@ def get_target(path: str) -> None:
|
||||
local_cache_path_dl = local_cache_path + '.download'
|
||||
hashval = ''.join(subpath.split('/'))
|
||||
|
||||
# First off: if there's already a file in place, check its hash.
|
||||
# If it matches the cache, we can just update its timestamp and
|
||||
# call it a day.
|
||||
# First off: if there's already a file in place, check its hash. If
|
||||
# it matches the cache, we can just update its timestamp and call it
|
||||
# a day.
|
||||
if os.path.isfile(path):
|
||||
existing_hash = get_file_hash(path)
|
||||
existing_hash = get_existing_file_hash(path)
|
||||
if existing_hash == hashval:
|
||||
os.utime(path, None)
|
||||
print(f'Refreshing from cache: {path}')
|
||||
return
|
||||
|
||||
# Ok there's not a valid file in place already.
|
||||
# Clear out whatever is there to start with.
|
||||
# Ok there's not a valid file in place already. Clear out whatever
|
||||
# is there to start with.
|
||||
if os.path.exists(path):
|
||||
os.unlink(path)
|
||||
|
||||
@ -104,14 +127,19 @@ def get_target(path: str) -> None:
|
||||
check=False,
|
||||
)
|
||||
|
||||
# We prune old cache files on the server, so its possible for one to
|
||||
# be trying to build something the server can no longer provide.
|
||||
# try to explain the situation.
|
||||
# We prune old cache files on the server, so its possible for
|
||||
# one to be trying to build something the server can no longer
|
||||
# provide. try to explain the situation.
|
||||
if result.returncode == 22:
|
||||
raise CleanError(
|
||||
'Server gave an error.'
|
||||
' Old build files may no longer be available;'
|
||||
' make sure you are using a recent commit.'
|
||||
'Server gave an error. Old build files may no longer'
|
||||
' be available on the server; make sure you are using'
|
||||
' a recent commit.\n'
|
||||
'Note that build files will remain available'
|
||||
' indefinitely once downloaded, even if deleted by the'
|
||||
f' server. So as long as your {CACHE_DIR_NAME} directory'
|
||||
' stays intact you should be able to repeat any builds you'
|
||||
' have run before.'
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise CleanError('Download failed; is your internet working?')
|
||||
@ -122,32 +150,52 @@ def get_target(path: str) -> None:
|
||||
check=True,
|
||||
)
|
||||
|
||||
# Ok we should have a valid .tar.gz file in our cache dir at this point.
|
||||
# Just expand it and it get placed wherever it belongs.
|
||||
# Ok we should have a valid file in our cache dir at this point.
|
||||
# Just expand it to the target path.
|
||||
|
||||
# Strangely, decompressing lots of these simultaneously leads to occasional
|
||||
# "File does not exist" errors when running on Windows Subsystem for Linux.
|
||||
# There should be no overlap in files getting written, but perhaps
|
||||
# something about how tar rebuilds the directory structure causes clashes.
|
||||
# It seems that just explicitly creating necessary directories first
|
||||
# prevents the problem.
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
# UPDATE: Should not be a problem anymore; waiting to see...
|
||||
# Strangely, decompressing lots of these simultaneously leads to
|
||||
# occasional "File does not exist" errors when running on Windows
|
||||
# Subsystem for Linux. There should be no overlap in files getting
|
||||
# written, but perhaps something about how tar rebuilds the
|
||||
# directory structure causes clashes. It seems that just explicitly
|
||||
# creating necessary directories first prevents the problem.
|
||||
# os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
|
||||
print(f'Extracting: {path}')
|
||||
|
||||
try:
|
||||
subprocess.run(['tar', '-zxf', local_cache_path], check=True)
|
||||
with open(local_cache_path, 'rb') as infile:
|
||||
data = infile.read()
|
||||
header = data[:4]
|
||||
if header != CACHE_HEADER:
|
||||
raise RuntimeError('Invalid cache header.')
|
||||
metalen = data[4]
|
||||
metabytes = data[5 : 5 + metalen]
|
||||
datac = data[5 + metalen :]
|
||||
metajson = metabytes.decode()
|
||||
metadata = dataclass_from_json(CacheMetadata, metajson)
|
||||
data = zlib.decompress(datac)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
with open(path, 'wb') as outfile:
|
||||
outfile.write(data)
|
||||
if metadata.executable:
|
||||
subprocess.run(['chmod', '+x', path], check=True)
|
||||
except Exception:
|
||||
# If something goes wrong, try to make sure we don't leave a half
|
||||
# decompressed file lying around or whatnot.
|
||||
print(f"Error expanding cache archive for '{local_cache_path}'.")
|
||||
if os.path.exists(local_cache_path):
|
||||
os.remove(local_cache_path)
|
||||
# If something goes wrong, try to make sure we don't leave a
|
||||
# half decompressed file lying around or whatnot.
|
||||
print(f"Error expanding cache archive for '{path}'.")
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
raise
|
||||
|
||||
# The file will wind up with the timestamp it was compressed with,
|
||||
# so let's update its timestamp or else it will still be considered
|
||||
# dirty.
|
||||
subprocess.run(f'touch {path}', shell=True, check=True)
|
||||
# UPDATE - shouldn't be a problem anymore since we're writing things
|
||||
# ourselves.
|
||||
# subprocess.run(f'touch {path}', shell=True, check=True)
|
||||
|
||||
if not os.path.exists(path):
|
||||
raise RuntimeError(f'File {path} did not wind up as expected.')
|
||||
|
||||
@ -155,12 +203,12 @@ def get_target(path: str) -> None:
|
||||
def filter_makefile(makefile_dir: str, contents: str) -> str:
|
||||
"""Filter makefile contents to use efrocache lookups."""
|
||||
|
||||
if makefile_dir:
|
||||
# Assuming two levels deep at the moment; can revisit if needed.
|
||||
assert len(makefile_dir.split('/')) == 2
|
||||
to_proj_root = '../..'
|
||||
else:
|
||||
to_proj_root = ''
|
||||
# '' should give us ''; 'foo/bar' should give us '../..', etc.
|
||||
to_proj_root = (
|
||||
''
|
||||
if not makefile_dir
|
||||
else '/'.join(['..'] * len(makefile_dir.split('/')))
|
||||
)
|
||||
|
||||
cachemap = os.path.join(to_proj_root, CACHE_MAP_NAME)
|
||||
lines = contents.splitlines()
|
||||
@ -232,9 +280,9 @@ def update_cache(makefile_dirs: list[str]) -> None:
|
||||
fnames2.append(fullpath)
|
||||
|
||||
# Ok, we've got 2 lists of filenames that we need to cache in the cloud.
|
||||
# First, however, let's look up modtimes for everything and if everything
|
||||
# First, however, let's do a big hash of everything and if everything
|
||||
# is exactly the same as last time we can skip this step.
|
||||
hashes = _gen_hashes(fnames1 + fnames2)
|
||||
hashes = _gen_complete_state_hashes(fnames1 + fnames2)
|
||||
if os.path.isfile(UPLOAD_STATE_CACHE_FILE):
|
||||
with open(UPLOAD_STATE_CACHE_FILE, encoding='utf-8') as infile:
|
||||
hashes_existing = infile.read()
|
||||
@ -251,7 +299,8 @@ def update_cache(makefile_dirs: list[str]) -> None:
|
||||
|
||||
print(f'{Clr.SBLU}Efrocache update successful!{Clr.RST}')
|
||||
|
||||
# Write the cache state so we can skip the next run if nothing changes.
|
||||
# Write the cache state so we can skip the next run if nothing
|
||||
# changes.
|
||||
os.makedirs(os.path.dirname(UPLOAD_STATE_CACHE_FILE), exist_ok=True)
|
||||
with open(UPLOAD_STATE_CACHE_FILE, 'w', encoding='utf-8') as outfile:
|
||||
outfile.write(hashes)
|
||||
@ -315,10 +364,10 @@ def _upload_cache(
|
||||
)
|
||||
|
||||
|
||||
def _gen_hashes(fnames: list[str]) -> str:
|
||||
def _gen_complete_state_hashes(fnames: list[str]) -> str:
|
||||
import hashlib
|
||||
|
||||
def _get_file_hash(fname: str) -> tuple[str, str]:
|
||||
def _get_simple_file_hash(fname: str) -> tuple[str, str]:
|
||||
md5 = hashlib.md5()
|
||||
with open(fname, mode='rb') as infile:
|
||||
md5.update(infile.read())
|
||||
@ -326,7 +375,7 @@ def _gen_hashes(fnames: list[str]) -> str:
|
||||
|
||||
# Now use all procs to hash the files efficiently.
|
||||
with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
|
||||
hashes = dict(executor.map(_get_file_hash, fnames))
|
||||
hashes = dict(executor.map(_get_simple_file_hash, fnames))
|
||||
|
||||
return json.dumps(hashes, separators=(',', ':'))
|
||||
|
||||
@ -355,10 +404,11 @@ def _write_cache_files(
|
||||
mapping[result[0]] = BASE_URL + result[1]
|
||||
fhashes2.add(result[1])
|
||||
|
||||
# We want the server to have a startercache.tar.xz file which contains
|
||||
# the entire first set. It is much more efficient to build that file
|
||||
# on the server than it is to build it here and upload the whole thing.
|
||||
# ...so let's simply write a script to generate it and upload that.
|
||||
# We want the server to have a startercache.tar.xz file which
|
||||
# contains the entire first set. It is much more efficient to build
|
||||
# that file on the server than it is to build it here and upload the
|
||||
# whole thing. ...so let's simply write a script to generate it and
|
||||
# upload that.
|
||||
|
||||
# Also let's have the script touch both sets of files so we can use
|
||||
# mod-times to prune older files. (otherwise files that never change
|
||||
@ -398,15 +448,71 @@ def _write_cache_files(
|
||||
outfile.write(json.dumps(mapping, indent=2, sort_keys=True))
|
||||
|
||||
|
||||
def _cache_prefix_for_file(fname: str) -> bytes:
|
||||
# pylint: disable=global-statement
|
||||
global g_cache_prefix_exec
|
||||
global g_cache_prefix_noexec
|
||||
|
||||
# We'll be calling this a lot when checking existing files, so we
|
||||
# want it to be efficient. Let's cache the two options there are at
|
||||
# the moment.
|
||||
executable = os.access(fname, os.X_OK)
|
||||
if executable:
|
||||
if g_cache_prefix_exec is None:
|
||||
metadata = dataclass_to_json(
|
||||
CacheMetadata(executable=True)
|
||||
).encode()
|
||||
assert len(metadata) < 256
|
||||
g_cache_prefix_exec = (
|
||||
CACHE_HEADER + len(metadata).to_bytes() + metadata
|
||||
)
|
||||
return g_cache_prefix_exec
|
||||
|
||||
# Ok; non-executable it is.
|
||||
metadata = dataclass_to_json(CacheMetadata(executable=False)).encode()
|
||||
assert len(metadata) < 256
|
||||
g_cache_prefix_noexec = CACHE_HEADER + len(metadata).to_bytes() + metadata
|
||||
return g_cache_prefix_noexec
|
||||
|
||||
|
||||
def _write_cache_file(staging_dir: str, fname: str) -> tuple[str, str]:
|
||||
import hashlib
|
||||
|
||||
print(f'Caching {fname}')
|
||||
|
||||
prefix = _cache_prefix_for_file(fname)
|
||||
|
||||
with open(fname, 'rb') as infile:
|
||||
fdataraw = infile.read()
|
||||
|
||||
# Calc a hash of the prefix plus the raw file contents. We want to
|
||||
# hash the *uncompressed* file since we'll need to calc this for
|
||||
# lots of existing files when seeing if they need to be updated.
|
||||
|
||||
# Just going with ol' md5 here; we're the only ones creating these
|
||||
# so security isn't a concern.
|
||||
md5 = hashlib.md5()
|
||||
md5.update(prefix + fdataraw)
|
||||
finalhash = md5.hexdigest()
|
||||
hashpath = os.path.join(finalhash[:2], finalhash[2:4], finalhash[4:])
|
||||
path = os.path.join(staging_dir, hashpath)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
|
||||
with open(path, 'wb') as outfile:
|
||||
outfile.write(prefix + zlib.compress(fdataraw))
|
||||
|
||||
return (fname, hashpath)
|
||||
|
||||
|
||||
def _write_cache_file_old(staging_dir: str, fname: str) -> tuple[str, str]:
|
||||
import hashlib
|
||||
|
||||
print(f'Caching {fname}')
|
||||
if ' ' in fname:
|
||||
raise RuntimeError('Spaces in paths not supported.')
|
||||
|
||||
# Just going with ol' md5 here; we're the only ones creating these so
|
||||
# security isn't a concern.
|
||||
# Just going with ol' md5 here; we're the only ones creating these
|
||||
# so security isn't a concern.
|
||||
md5 = hashlib.md5()
|
||||
with open(fname, 'rb') as infile:
|
||||
md5.update(infile.read())
|
||||
@ -417,9 +523,9 @@ def _write_cache_file(staging_dir: str, fname: str) -> tuple[str, str]:
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
|
||||
# Fancy pipe stuff which will give us deterministic tar.gz files
|
||||
# with no embedded timestamps.
|
||||
# Note: The 'COPYFILE_DISABLE' prevents mac tar from adding
|
||||
# file attributes/resource-forks to the archive as as ._filename.
|
||||
# with no embedded timestamps. Note: The 'COPYFILE_DISABLE' prevents
|
||||
# mac tar from adding file attributes/resource-forks to the archive
|
||||
# as as ._filename.
|
||||
subprocess.run(
|
||||
f'COPYFILE_DISABLE=1 tar cf - {fname} | gzip -n > {path}',
|
||||
shell=True,
|
||||
@ -429,18 +535,18 @@ def _write_cache_file(staging_dir: str, fname: str) -> tuple[str, str]:
|
||||
|
||||
|
||||
def _check_warm_start_entry(entry: tuple[str, str]) -> None:
|
||||
import hashlib
|
||||
# import hashlib
|
||||
|
||||
fname, filehash = entry
|
||||
md5 = hashlib.md5()
|
||||
with open(fname, 'rb') as infile:
|
||||
md5.update(infile.read())
|
||||
md5.update(fname.encode())
|
||||
finalhash = md5.hexdigest()
|
||||
# md5 = hashlib.md5()
|
||||
# with open(fname, 'rb') as infile:
|
||||
# md5.update(infile.read())
|
||||
# md5.update(fname.encode())
|
||||
# finalhash = md5.hexdigest()
|
||||
|
||||
# If the file still matches the hash value we have for it,
|
||||
# go ahead and update its timestamp.
|
||||
if finalhash == filehash:
|
||||
if get_existing_file_hash(fname) == filehash:
|
||||
os.utime(fname, None)
|
||||
|
||||
|
||||
@ -453,12 +559,12 @@ def _check_warm_start_entries(entries: list[tuple[str, str]]) -> None:
|
||||
def warm_start_cache() -> None:
|
||||
"""Run a pre-pass on the efrocache to improve efficiency."""
|
||||
|
||||
# We maintain a starter-cache on the staging server, which
|
||||
# is simply the latest set of cache entries compressed into a single
|
||||
# We maintain a starter-cache on the staging server, which is simply
|
||||
# the latest set of cache entries compressed into a single
|
||||
# compressed archive. If we have no local cache yet we can download
|
||||
# and expand this to give us a nice head start and greatly reduce
|
||||
# the initial set of individual files we have to fetch.
|
||||
# (downloading a single compressed archive is much more efficient than
|
||||
# the initial set of individual files we have to fetch. (downloading
|
||||
# a single compressed archive is much more efficient than
|
||||
# downloading thousands)
|
||||
if not os.path.exists(CACHE_DIR_NAME):
|
||||
print('Downloading asset starter-cache...', flush=True)
|
||||
@ -479,12 +585,11 @@ def warm_start_cache() -> None:
|
||||
|
||||
# In the public build, let's scan through all files managed by
|
||||
# efrocache and update any with timestamps older than the latest
|
||||
# cache-map that we already have the data for.
|
||||
# Otherwise those files will update individually the next time
|
||||
# they are 'built'. Even though that only takes a fraction of a
|
||||
# second per file, it adds up when done for thousands of assets
|
||||
# each time the cache map changes. It is much more efficient to do
|
||||
# it in one go here.
|
||||
# cache-map that we already have the data for. Otherwise those files
|
||||
# will update individually the next time they are 'built'. Even
|
||||
# though that only takes a fraction of a second per file, it adds up
|
||||
# when done for thousands of assets each time the cache map changes.
|
||||
# It is much more efficient to do it in one go here.
|
||||
cachemap: dict[str, str]
|
||||
with open(CACHE_MAP_NAME, encoding='utf-8') as infile:
|
||||
cachemap = json.loads(infile.read())
|
||||
@ -505,12 +610,12 @@ def warm_start_cache() -> None:
|
||||
if not os.path.exists(cachefile):
|
||||
continue
|
||||
|
||||
# Ok, add it to the list of files we can potentially update timestamps
|
||||
# on once we check its hash.
|
||||
# Ok, add it to the list of files we can potentially update
|
||||
# timestamps on once we check its hash.
|
||||
filehash = ''.join(url.split('/')[-3:])
|
||||
entries.append((fname, filehash))
|
||||
|
||||
if entries:
|
||||
# Now fire off a multithreaded executor to check hashes and update
|
||||
# timestamps.
|
||||
# Now fire off a multithreaded executor to check hashes and
|
||||
# update timestamps.
|
||||
_check_warm_start_entries(entries)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user