more bacloud/asset-package work

This commit is contained in:
Eric 2024-07-01 17:22:33 -07:00
parent 29fbb5d7eb
commit 7e24e6ee45
No known key found for this signature in database
GPG Key ID: 89C93F0F8D6D5A98
15 changed files with 252 additions and 93 deletions

56
.efrocachemap generated
View File

@ -4038,26 +4038,26 @@
"build/assets/windows/Win32/ucrtbased.dll": "2def5335207d41b21b9823f6805997f1",
"build/assets/windows/Win32/vc_redist.x86.exe": "b08a55e2e77623fe657bea24f223a3ae",
"build/assets/windows/Win32/vcruntime140d.dll": "865b2af4d1e26a1a8073c89acb06e599",
"build/prefab/full/linux_arm64_gui/debug/ballisticakit": "2463d9fec0254150bd385e9ef80d1098",
"build/prefab/full/linux_arm64_gui/release/ballisticakit": "3dc65856b5a30df83ee088428a7ed862",
"build/prefab/full/linux_arm64_server/debug/dist/ballisticakit_headless": "f108ac7b56522e05aa061a43ec0f7ad7",
"build/prefab/full/linux_arm64_server/release/dist/ballisticakit_headless": "0b4ef8f5b00f2917467d6bc3926003ae",
"build/prefab/full/linux_x86_64_gui/debug/ballisticakit": "11495371a0ff7a476b88b715e0d4899c",
"build/prefab/full/linux_x86_64_gui/release/ballisticakit": "e1ffd608a74e1135a74535ef0f24de84",
"build/prefab/full/linux_x86_64_server/debug/dist/ballisticakit_headless": "0df1d668840d11debccf8cd018acf04f",
"build/prefab/full/linux_x86_64_server/release/dist/ballisticakit_headless": "a8c964a36ad22fab92cb003f66e0e463",
"build/prefab/full/mac_arm64_gui/debug/ballisticakit": "5f6a362cdc3dc07a9e320310db5c4e62",
"build/prefab/full/mac_arm64_gui/release/ballisticakit": "a85895d48516991dd9a3cd5f3dbd11b0",
"build/prefab/full/mac_arm64_server/debug/dist/ballisticakit_headless": "b85ae2de125f02145cade4205891a10d",
"build/prefab/full/mac_arm64_server/release/dist/ballisticakit_headless": "ea55aef603d7998f134bc25567d2e175",
"build/prefab/full/mac_x86_64_gui/debug/ballisticakit": "7de5d121b6bd1306aeb98ffe27c6e6a3",
"build/prefab/full/mac_x86_64_gui/release/ballisticakit": "06b5aaf53195021bd21d05ec372fe403",
"build/prefab/full/mac_x86_64_server/debug/dist/ballisticakit_headless": "dad46bc32a9994277900ddde9ccfe253",
"build/prefab/full/mac_x86_64_server/release/dist/ballisticakit_headless": "94dfd167a76d72054ec3f41f2a0b82fa",
"build/prefab/full/windows_x86_gui/debug/BallisticaKit.exe": "94524925f988de8e0b2921822d596442",
"build/prefab/full/windows_x86_gui/release/BallisticaKit.exe": "33f03404c19e971272e277e33e77c093",
"build/prefab/full/windows_x86_server/debug/dist/BallisticaKitHeadless.exe": "fd3efc611054414d92f4188042590482",
"build/prefab/full/windows_x86_server/release/dist/BallisticaKitHeadless.exe": "b6858701550a3bb5ed520cd6d3a8a72a",
"build/prefab/full/linux_arm64_gui/debug/ballisticakit": "a39e78f94f2257360a0b5891ccb1e6f3",
"build/prefab/full/linux_arm64_gui/release/ballisticakit": "3fdd6d2e093dbe55c66f863790fe0712",
"build/prefab/full/linux_arm64_server/debug/dist/ballisticakit_headless": "ba679c627647597ce1ae0ee3c79bc628",
"build/prefab/full/linux_arm64_server/release/dist/ballisticakit_headless": "85e1e9d4f28402201a8fba3fdc444a94",
"build/prefab/full/linux_x86_64_gui/debug/ballisticakit": "f96218be024fb0f54d875b3bb0d6695d",
"build/prefab/full/linux_x86_64_gui/release/ballisticakit": "0e192113022123f6535d451d04bae7f6",
"build/prefab/full/linux_x86_64_server/debug/dist/ballisticakit_headless": "bfbcc1a92c5e52aa44db0484d8b1e7e5",
"build/prefab/full/linux_x86_64_server/release/dist/ballisticakit_headless": "f52ff47413dc64627347a1ada68b2e1b",
"build/prefab/full/mac_arm64_gui/debug/ballisticakit": "32b17d80c59ea676f3a548b2ec8799a5",
"build/prefab/full/mac_arm64_gui/release/ballisticakit": "92f821766087c12f7ecf54a5f1505a1a",
"build/prefab/full/mac_arm64_server/debug/dist/ballisticakit_headless": "f0c77f8eb0c5978ed6d1ff11ffc22a3f",
"build/prefab/full/mac_arm64_server/release/dist/ballisticakit_headless": "538e04846ec2e3b4c33f1a211ac4e11a",
"build/prefab/full/mac_x86_64_gui/debug/ballisticakit": "373b7d863584936cb507dbb7da674949",
"build/prefab/full/mac_x86_64_gui/release/ballisticakit": "8ba2796b27a6596fb67c27360b7e7239",
"build/prefab/full/mac_x86_64_server/debug/dist/ballisticakit_headless": "432074c79906f984b1df221398139cf9",
"build/prefab/full/mac_x86_64_server/release/dist/ballisticakit_headless": "c246914f53747c2c94d3b3b32ab3fab7",
"build/prefab/full/windows_x86_gui/debug/BallisticaKit.exe": "6e2490e0a43a69c7983a39817117d77c",
"build/prefab/full/windows_x86_gui/release/BallisticaKit.exe": "2d8bef376bf4ab160089f477609d5421",
"build/prefab/full/windows_x86_server/debug/dist/BallisticaKitHeadless.exe": "7d78f5d122e58c4e5a3264ad6aae1289",
"build/prefab/full/windows_x86_server/release/dist/BallisticaKitHeadless.exe": "ee97d4b95248d68681eda85b658b7531",
"build/prefab/lib/linux_arm64_gui/debug/libballisticaplus.a": "f231b10895bdcb542de87b887ca181fd",
"build/prefab/lib/linux_arm64_gui/release/libballisticaplus.a": "ae936a119668ede7b36f38c8672f4bf8",
"build/prefab/lib/linux_arm64_server/debug/libballisticaplus.a": "f231b10895bdcb542de87b887ca181fd",
@ -4074,14 +4074,14 @@
"build/prefab/lib/mac_x86_64_gui/release/libballisticaplus.a": "efffc4f330e77530accd9a9f82840a6c",
"build/prefab/lib/mac_x86_64_server/debug/libballisticaplus.a": "c20363fe2af3d54e666b1c8ee67f6b76",
"build/prefab/lib/mac_x86_64_server/release/libballisticaplus.a": "efffc4f330e77530accd9a9f82840a6c",
"build/prefab/lib/windows/Debug_Win32/BallisticaKitGenericPlus.lib": "953c62fb42149cc8c51b5d8a4ad36a46",
"build/prefab/lib/windows/Debug_Win32/BallisticaKitGenericPlus.pdb": "c7185413efb31b6093a2916df7928754",
"build/prefab/lib/windows/Debug_Win32/BallisticaKitHeadlessPlus.lib": "98ab1eb9fb1aba1606962b3c51efb49c",
"build/prefab/lib/windows/Debug_Win32/BallisticaKitHeadlessPlus.pdb": "51bbf56a16bc9f301e4fb1744f237ec4",
"build/prefab/lib/windows/Release_Win32/BallisticaKitGenericPlus.lib": "fa3b1874be36e55d4de7951f8aa8603e",
"build/prefab/lib/windows/Release_Win32/BallisticaKitGenericPlus.pdb": "50a0207af5b0c5c9775b0533a2a5e5f4",
"build/prefab/lib/windows/Release_Win32/BallisticaKitHeadlessPlus.lib": "2f209ae0d1994a4d2ed8f403957acd1f",
"build/prefab/lib/windows/Release_Win32/BallisticaKitHeadlessPlus.pdb": "7adb6321e32d575c9666b5eda99d43a0",
"build/prefab/lib/windows/Debug_Win32/BallisticaKitGenericPlus.lib": "f012fac2242e90a5df21ee4b50004248",
"build/prefab/lib/windows/Debug_Win32/BallisticaKitGenericPlus.pdb": "c2590ba0ed0426a8a8aae627d00d5447",
"build/prefab/lib/windows/Debug_Win32/BallisticaKitHeadlessPlus.lib": "846e75a2a07ec8c17f2e6eb622c456ee",
"build/prefab/lib/windows/Debug_Win32/BallisticaKitHeadlessPlus.pdb": "d70c4ed329cc01aecb41eb2fa8f95130",
"build/prefab/lib/windows/Release_Win32/BallisticaKitGenericPlus.lib": "de6384ade5417d115d5b6a3ff89d3bb8",
"build/prefab/lib/windows/Release_Win32/BallisticaKitGenericPlus.pdb": "b564e1ebdb709343ce66bf63cd1b0568",
"build/prefab/lib/windows/Release_Win32/BallisticaKitHeadlessPlus.lib": "ab388c54bdcc895bc99414aec7d40273",
"build/prefab/lib/windows/Release_Win32/BallisticaKitHeadlessPlus.pdb": "f1bab1ce15ef218c191d0e85444f265e",
"src/assets/ba_data/python/babase/_mgen/__init__.py": "f885fed7f2ed98ff2ba271f9dbe3391c",
"src/assets/ba_data/python/babase/_mgen/enums.py": "5548f407d97e380069f6c596c4e36cd7",
"src/ballistica/base/mgen/pyembed/binding_base.inc": "efa61468cf098f77cc6a234461d8b86d",

View File

@ -1,4 +1,4 @@
### 1.7.36 (build 21897, api 8, 2024-06-26)
### 1.7.36 (build 21899, api 8, 2024-07-01)
- bacloud workspace commands are now a bit smarter; you can now do things like
`bacloud workspace put .` or even just `bacloud workspace put` and it will
work. Previously such cases required explicitly passing the workspace name

View File

@ -48,8 +48,8 @@ endif
# Env targets that should be safe to run anytime; even if project-files
# are out of date.
ENV_REQS_SAFE = .cache/checkenv $(PCOMMANDBATCHBIN) .dir-locals.el .mypy.ini \
.pyrightconfig.json .pylintrc .clang-format .rgignore \
ENV_REQS_SAFE = .cache/checkenv $(PCOMMANDBATCHBIN) .dir-locals.el .rgignore \
.mypy.ini .pyrightconfig.json .pylintrc .clang-format \
ballisticakit-cmake/.clang-format .editorconfig tools/cloudshell \
tools/bacloud tools/pcommand
@ -1289,9 +1289,6 @@ tools/bacloud: tools/efrotools/genwrapper.py .venv/.efro_venv_complete
@PYTHONPATH=tools python3 -m \
efrotools.genwrapper bacloud batools.bacloud tools/bacloud
.rgignore: config/toolconfigsrc/rgignore $(TOOL_CFG_SRC)
@$(TOOL_CFG_INST) $< $@
.clang-format: config/toolconfigsrc/clang-format $(TOOL_CFG_SRC)
@$(TOOL_CFG_INST) $< $@
@ -1307,6 +1304,9 @@ tools/bacloud: tools/efrotools/genwrapper.py .venv/.efro_venv_complete
.dir-locals.el: config/toolconfigsrc/dir-locals.el $(TOOL_CFG_SRC)
@$(TOOL_CFG_INST) $< $@
.rgignore: config/toolconfigsrc/rgignore $(TOOL_CFG_SRC)
@$(TOOL_CFG_INST) $< $@
.mypy.ini: config/toolconfigsrc/mypy.ini $(TOOL_CFG_SRC)
@$(TOOL_CFG_INST) $< $@

View File

@ -6,7 +6,7 @@ mypy==1.10.1
pbxproj==4.2.0
pdoc==14.5.1
pur==7.3.2
pylint==3.2.4
pylint==3.2.5
pylsp-mypy==0.6.8
pytest==8.2.2
python-daemon==3.0.1

View File

@ -1,6 +1,9 @@
[MASTER]
jobs=1
# PIL seems to throw pylint for a loop (possibly related to our plugins?)
ignored-modules=PIL
load-plugins=efrotools.pylintplugins
persistent=no

View File

@ -7507,12 +7507,12 @@ clean:
$(PROJ_DIR)/.cache/asset_package_resolved: $(PROJ_DIR)/config/projectconfig.json
@$(PCOMMAND) asset_package_resolve $@
$(PROJ_DIR)/.cache/assetmanifests/cmake-debug.manifest: \
$(PROJ_DIR)/.cache/assetmanifests/gui_desktop_v1: \
$(PROJ_DIR)/.cache/asset_package_resolved
@$(PCOMMAND) asset_package_fetch $(PROJ_DIR)/.cache/asset_package_resolved \
cmake-debug $@
@$(PCOMMAND) asset_package_assemble \
$(PROJ_DIR)/.cache/asset_package_resolved gui_desktop_v1
foo: $(PROJ_DIR)/.cache/assetmanifests/cmake-debug.manifest
foo: $(PROJ_DIR)/.cache/assetmanifests/gui_desktop_v1
# These targets don't correspond to actual files; make sure make knows that.
.PHONY: cmake win mac ios android audio audio-clean fonts fonts-clean data \

View File

@ -52,7 +52,7 @@ if TYPE_CHECKING:
# Build number and version of the ballistica binary we expect to be
# using.
TARGET_BALLISTICA_BUILD = 21897
TARGET_BALLISTICA_BUILD = 21899
TARGET_BALLISTICA_VERSION = '1.7.36'

View File

@ -39,7 +39,7 @@ auto main(int argc, char** argv) -> int {
namespace ballistica {
// These are set automatically via script; don't modify them here.
const int kEngineBuildNumber = 21897;
const int kEngineBuildNumber = 21899;
const char* kEngineVersion = "1.7.36";
const int kEngineApiVersion = 8;

View File

@ -14,7 +14,26 @@ if TYPE_CHECKING:
# Version is sent to the master-server with all commands. Can be incremented
# if we need to change behavior server-side to go along with client changes.
BACLOUD_VERSION = 12
BACLOUD_VERSION = 13
def asset_file_cache_path(filehash: str) -> str:
"""Given a sha256 hex file hash, return a storage path."""
# We expect a 64 byte hex str with only lowercase letters and
# numbers. Note to self: I considered base64 hashes to save space
# but then remembered that lots of filesystems out there ignore case
# so that would not end well.
assert len(filehash) == 64
assert filehash.islower()
assert filehash.isalnum()
# Split into a few levels of directories to keep directory listings
# and operations reasonable. This will give 256 top level dirs, each
# with 256 subdirs. So if we have 65,536 files in our cache then
# dirs will average 1 file each. That seems like a reasonable spread
# I think.
return f'{filehash[:2]}/{filehash[2:4]}/{filehash[4:]}'
@ioprepped
@ -32,7 +51,6 @@ class RequestData:
@ioprepped
@dataclass
class ResponseData:
# noinspection PyUnresolvedReferences
"""Response sent from the bacloud server to the client.
Attributes:
@ -53,6 +71,8 @@ class ResponseData:
and uploaded to an 'uploads_inline' bytes dict in end_command args.
This should be limited to relatively small files.
deletes: If present, file paths that should be deleted on the client.
downloads: If present, describes files the client should individually
request from the server if not already present on the client.
downloads_inline: If present, pathnames mapped to gzipped data to
be written to the client. This should only be used for relatively
small files as they are all included inline as part of the response.
@ -69,6 +89,39 @@ class ResponseData:
end_command: If present, this command is run with these args at the end
of response processing.
"""
@ioprepped
@dataclass
class Downloads:
"""Info about downloads included in a response."""
@ioprepped
@dataclass
class Entry:
"""Individual download."""
path: Annotated[str, IOAttrs('p')]
# Args include with this particular request (combined with
# baseargs).
args: Annotated[dict[str, str], IOAttrs('a')]
# TODO: could add a hash here if we want the client to
# verify hashes.
# If present, will be prepended to all entry paths via os.path.join.
basepath: Annotated[str | None, IOAttrs('p')]
# Server command that should be called for each download. The
# server command is expected to respond with a downloads_inline
# containing a single 'default' entry. In the future this may
# be expanded to a more streaming-friendly process.
cmd: Annotated[str, IOAttrs('c')]
# Args that should be included with all download requests.
baseargs: Annotated[dict[str, str], IOAttrs('a')]
# Everything that should be downloaded.
entries: Annotated[list[Entry], IOAttrs('e')]
message: Annotated[str | None, IOAttrs('m', store_default=False)] = None
message_end: Annotated[str, IOAttrs('m_end', store_default=False)] = '\n'
error: Annotated[str | None, IOAttrs('e', store_default=False)] = None
@ -87,6 +140,9 @@ class ResponseData:
deletes: Annotated[
list[str] | None, IOAttrs('dlt', store_default=False)
] = None
downloads: Annotated[
Downloads | None, IOAttrs('dl', store_default=False)
] = None
downloads_inline: Annotated[
dict[str, bytes] | None, IOAttrs('dinl', store_default=False)
] = None

View File

@ -197,6 +197,38 @@ class App:
return response
def _download_file(
self, filename: str, call: str, args: dict
) -> int | None:
# Fast out - for repeat batch downloads, most of the time these
# will already exist and we can ignore them.
if os.path.isfile(filename):
return None
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
response = self._servercmd(call, args)
# We currently expect a single 'default' entry in
# downloads_inline for this.
assert response.downloads_inline is not None
assert len(response.downloads_inline) == 1
data_zipped = response.downloads_inline.get('default')
assert isinstance(data_zipped, bytes)
data = zlib.decompress(data_zipped)
# Write to tmp files first and then move into place. This
# way crashes are less likely to lead to corrupt data.
fnametmp = f'{filename}.tmp'
with open(fnametmp, 'wb') as outfile:
outfile.write(data)
os.rename(fnametmp, filename)
return len(data)
def _upload_file(self, filename: str, call: str, args: dict) -> None:
import tempfile
@ -256,12 +288,12 @@ class App:
downloads_inline: dict[str, bytes],
) -> None:
"""Handle inline file data to be saved to the client."""
# import base64
for fname, fdata in downloads_inline.items():
# If there's a directory where we want our file to go, clear it
# out first. File deletes should have run before this so
# everything under it should be empty and thus killable via rmdir.
# If there's a directory where we want our file to go, clear
# it out first. File deletes should have run before this so
# everything under it should be empty and thus killable via
# rmdir.
if os.path.isdir(fname):
for basename, dirnames, _fn in os.walk(fname, topdown=False):
for dirname in dirnames:
@ -271,7 +303,6 @@ class App:
dirname = os.path.dirname(fname)
if dirname:
os.makedirs(dirname, exist_ok=True)
# data_zipped = base64.b64decode(fdata)
data_zipped = fdata
data = zlib.decompress(data_zipped)
@ -282,6 +313,38 @@ class App:
outfile.write(data)
os.rename(fnametmp, fname)
def _handle_downloads(self, downloads: ResponseData.Downloads) -> None:
from efro.util import data_size_str
from concurrent.futures import ThreadPoolExecutor
starttime = time.monotonic()
def _do_entry(entry: ResponseData.Downloads.Entry) -> int | None:
allargs = downloads.baseargs | entry.args
fullpath = (
entry.path
if downloads.basepath is None
else os.path.join(downloads.basepath, entry.path)
)
return self._download_file(fullpath, downloads.cmd, allargs)
# Run several downloads simultaneously to hopefully maximize
# throughput.
with ThreadPoolExecutor(max_workers=4) as executor:
# Convert the generator to a list to trigger any
# exceptions that occurred.
results = list(executor.map(_do_entry, downloads.entries))
num_dls = sum(1 for x in results if x is not None)
total_bytes = sum(x for x in results if x is not None)
duration = time.monotonic() - starttime
if num_dls:
print(
f'{Clr.BLU}Downloaded {num_dls} files'
f' ({data_size_str(total_bytes)}'
f' total) in {duration:.2f}s.{Clr.RST}'
)
def _handle_dir_prune_empty(self, prunedir: str) -> None:
"""Handle pruning empty directories."""
# Walk the tree bottom-up so we can properly kill recursive empty dirs.
@ -347,17 +410,28 @@ class App:
self._state.login_token = None
if response.dir_manifest is not None:
self._handle_dir_manifest_response(response.dir_manifest)
if response.uploads_inline is not None:
self._handle_uploads_inline(response.uploads_inline)
if response.uploads is not None:
self._handle_uploads(response.uploads)
if response.uploads_inline is not None:
self._handle_uploads_inline(response.uploads_inline)
# Note: we handle file deletes *before* downloads. This
# way our file-download code only has to worry about creating or
# removing directories and not files, and corner cases such as
# a file getting replaced with a directory should just work.
#
# UPDATE: that actually only applies to commands where the
# client uploads a manifest first and then the server
# responds with specific deletes and inline downloads. The
# newer 'downloads' command is used differently; in that
# case the server is just pushing a big list of hashes to
# the client and the client is asking for the stuff it
# doesn't have. So in that case the client needs to fully
# handle things like replacing dirs with files.
if response.deletes:
self._handle_deletes(response.deletes)
if response.downloads:
self._handle_downloads(response.downloads)
if response.downloads_inline:
self._handle_downloads_inline(response.downloads_inline)
if response.dir_prune_empty:

View File

@ -143,7 +143,7 @@ from batools.pcommands2 import (
wsl_build_check_win_drive,
get_modern_make,
asset_package_resolve,
asset_package_fetch,
asset_package_assemble,
)
# pylint: enable=unused-import

View File

@ -639,19 +639,20 @@ def asset_package_resolve() -> None:
outfile.write(apversion)
def asset_package_fetch() -> None:
"""Build/fetch an asset-package-manifest."""
def asset_package_assemble() -> None:
"""Assemble asset package data and its manifest."""
import os
import subprocess
from efro.error import CleanError
from efrotools.project import getprojectconfig
pcommand.disallow_in_batch()
args = pcommand.get_args()
if len(args) != 3:
raise CleanError('Expected 3 args.')
if len(args) != 2:
raise CleanError('Expected 2 args.')
resolve_path, _buildtype, outpath = args
resolve_path, flavor = args
# If resolve path exists, it is the exact asset-package-version we
# should use.
@ -669,8 +670,13 @@ def asset_package_fetch() -> None:
f'Expected a string asset-package-version; got {type(apversion)}.'
)
print('WOULD GO FORWARD WITH', apversion)
os.makedirs(os.path.dirname(outpath), exist_ok=True)
with open(outpath, 'w', encoding='utf-8') as outfile:
outfile.write('foo')
subprocess.run(
[
f'{pcommand.PROJROOT}/tools/bacloud',
'assetpackage',
'_assemble',
apversion,
flavor,
],
check=True,
)

View File

@ -150,7 +150,6 @@ class LogHandler(logging.Handler):
self._cache = deque[tuple[int, LogEntry]]()
self._cache_index_offset = 0
self._cache_lock = Lock()
# self._report_blocking_io_on_echo_error = False
self._printed_callback_error = False
self._thread_bootstrapped = False
self._thread = Thread(target=self._log_thread_main, daemon=True)
@ -515,6 +514,25 @@ class LogHandler(logging.Handler):
traceback.print_exc(file=self._echofile)
def shutdown(self) -> None:
"""Block until all pending logs/prints are done."""
done = False
self.file_flush('stdout')
self.file_flush('stderr')
def _set_done() -> None:
nonlocal done
done = True
self._event_loop.call_soon_threadsafe(_set_done)
starttime = time.monotonic()
while not done:
if time.monotonic() - starttime > 5.0:
print('LogHandler shutdown hung!!!', file=sys.stderr)
break
time.sleep(0.01)
def file_flush(self, name: str) -> None:
"""Send raw stdout/stderr flush to the logger to be collated."""

View File

@ -213,7 +213,14 @@ class MessageProtocol:
return (
ErrorSysResponse(
error_message=(
traceback.format_exc()
# Note: need to format exception ourself here; it
# might not be current so we can't use
# traceback.format_exc().
''.join(
traceback.format_exception(
type(exc), exc, exc.__traceback__
)
)
if self.remote_errors_include_stack_traces
else 'An internal error has occurred.'
),

View File

@ -90,20 +90,6 @@ class MessageReceiver:
f' got {sig.args}'
)
# Make sure we are only given async methods if we are an async handler
# and sync ones otherwise.
# UPDATE - can't do this anymore since we now sometimes use
# regular functions which return awaitables instead of having
# the entire function be async.
# is_async = inspect.iscoroutinefunction(call)
# if self.is_async != is_async:
# msg = (
# 'Expected a sync method; found an async one.'
# if is_async
# else 'Expected an async method; found a sync one.'
# )
# raise ValueError(msg)
# Check annotation types to determine what message types we handle.
# Return-type annotation can be a Union, but we probably don't
# have it available at runtime. Explicitly pull it in.
@ -162,7 +148,7 @@ class MessageReceiver:
if msgtype in self._handlers:
raise TypeError(
f'Message type {msgtype} already has a registered' f' handler.'
f'Message type {msgtype} already has a registered handler.'
)
# Make sure the responses exactly matches what the message expects.
@ -285,7 +271,6 @@ class MessageReceiver:
"""
assert not self.is_async, "can't call sync handler on async receiver"
msg_decoded: Message | None = None
msgtype: type[Message] | None = None
try:
msg_decoded = self._decode_incoming_message(bound_obj, msg)
msgtype = type(msg_decoded)
@ -305,7 +290,8 @@ class MessageReceiver:
bound_obj, msg_decoded, exc
)
if dolog:
if msgtype is not None:
if msg_decoded is not None:
msgtype = type(msg_decoded)
logging.exception(
'Error handling %s.%s message.',
msgtype.__module__,
@ -313,7 +299,9 @@ class MessageReceiver:
)
else:
logging.exception(
'Error handling raw efro.message. msg=%s', msg
'Error handling raw efro.message'
' (likely a message format incompatibility): %s.',
msg,
)
return rstr
@ -330,9 +318,8 @@ class MessageReceiver:
# able to guarantee that messages handlers would be called in the
# order the messages were received.
assert self.is_async, "can't call async handler on sync receiver"
assert self.is_async, "Can't call async handler on sync receiver."
msg_decoded: Message | None = None
msgtype: type[Message] | None = None
try:
msg_decoded = self._decode_incoming_message(bound_obj, msg)
msgtype = type(msg_decoded)
@ -347,43 +334,51 @@ class MessageReceiver:
):
raise
return self._handle_raw_message_async_error(
bound_obj, msg_decoded, msgtype, exc
bound_obj, msg, msg_decoded, exc
)
# Return an awaitable to handle the rest asynchronously.
return self._handle_raw_message_async(
bound_obj, msg_decoded, msgtype, handler_awaitable
bound_obj, msg, msg_decoded, handler_awaitable
)
async def _handle_raw_message_async_error(
self,
bound_obj: Any,
msg_raw: str,
msg_decoded: Message | None,
msgtype: type[Message] | None,
exc: Exception,
) -> str:
rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc)
if dolog:
if msgtype is not None:
if msg_decoded is not None:
msgtype = type(msg_decoded)
logging.exception(
'Error handling %s.%s message.',
msgtype.__module__,
msgtype.__qualname__,
# We need to explicitly provide the exception here,
# otherwise it shows up at None. I assume related to
# the fact that we're an async function.
exc_info=exc,
)
else:
logging.exception(
'Error handling raw async efro.message.'
' msgtype=%s msg_decoded=%s.',
msgtype,
msg_decoded,
'Error handling raw async efro.message'
' (likely a message format incompatibility): %s.',
msg_raw,
# We need to explicitly provide the exception here,
# otherwise it shows up at None. I assume related to
# the fact that we're an async function.
exc_info=exc,
)
return rstr
async def _handle_raw_message_async(
self,
bound_obj: Any,
msg_raw: str,
msg_decoded: Message,
msgtype: type[Message] | None,
handler_awaitable: Awaitable[Response | None],
) -> str:
"""Should be called when the receiver gets a message.
@ -397,7 +392,7 @@ class MessageReceiver:
except Exception as exc:
return await self._handle_raw_message_async_error(
bound_obj, msg_decoded, msgtype, exc
bound_obj, msg_raw, msg_decoded, exc
)