diff --git a/.efrocachemap b/.efrocachemap index 3cba208c..8be1524c 100644 --- a/.efrocachemap +++ b/.efrocachemap @@ -3995,26 +3995,26 @@ "assets/src/ba_data/python/ba/_generated/__init__.py": "https://files.ballistica.net/cache/ba1/ee/e8/cad05aa531c7faf7ff7b96db7f6e", "assets/src/ba_data/python/ba/_generated/enums.py": "https://files.ballistica.net/cache/ba1/b2/e5/0ee0561e16257a32830645239f34", "ballisticacore-windows/Generic/BallisticaCore.ico": "https://files.ballistica.net/cache/ba1/89/c0/e32c7d2a35dc9aef57cc73b0911a", - "build/prefab/full/linux_arm64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/b7/31/a293aea8039ee830d3577179aa81", - "build/prefab/full/linux_arm64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/a2/11/7a48cde8029be043579abf94179e", - "build/prefab/full/linux_arm64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/6b/77/198dd7c53b085c116c77a7877af5", - "build/prefab/full/linux_arm64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/14/9f/ca41b53c5fa84b063b5d089f061f", - "build/prefab/full/linux_x86_64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/75/a2/f30da762e9451cea11bbf89b4c6e", - "build/prefab/full/linux_x86_64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/b8/de/fd123320eaac865c1484e7678397", - "build/prefab/full/linux_x86_64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/81/90/b8f77fe7c99f6caf7305734a0962", - "build/prefab/full/linux_x86_64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/8d/8c/7a6b8f0c7a62b6fd7c67e89d9b3c", - "build/prefab/full/mac_arm64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/9a/17/a6fdb6d82f342786e32ba4c64ded", - "build/prefab/full/mac_arm64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/c1/14/f93f5ebba5ff82ca596931dbc30a", - "build/prefab/full/mac_arm64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/30/9c/9a8758cd98e1c2d566ef61d0c511", - "build/prefab/full/mac_arm64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/5b/2f/3e28b0c669980be5f960515c125e", - "build/prefab/full/mac_x86_64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/16/5c/de01bf8f26d3bea3cf30240dc609", - "build/prefab/full/mac_x86_64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/c4/85/8d848c0eee73737090a2e68dab13", - "build/prefab/full/mac_x86_64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/c6/9f/0b3cfdd1b580768b2cb0a3509de2", - "build/prefab/full/mac_x86_64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/38/c5/688b96898a6cdd8d8c5a410d3dd9", - "build/prefab/full/windows_x86_gui/debug/BallisticaCore.exe": "https://files.ballistica.net/cache/ba1/44/a9/b16f6100e676469ee2c1fa48cde0", - "build/prefab/full/windows_x86_gui/release/BallisticaCore.exe": "https://files.ballistica.net/cache/ba1/c8/d8/65aba46607929b1f697d9d082330", - "build/prefab/full/windows_x86_server/debug/dist/BallisticaCoreHeadless.exe": "https://files.ballistica.net/cache/ba1/41/4c/0e5722286d59009754ac6d0b6867", - "build/prefab/full/windows_x86_server/release/dist/BallisticaCoreHeadless.exe": "https://files.ballistica.net/cache/ba1/cb/60/54dd87249f9e40b8075943bcab0f", + "build/prefab/full/linux_arm64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/8a/a3/09c2e2c3c8486701a9ee5327b263", + "build/prefab/full/linux_arm64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/9e/11/1da357b0f99d277a25e28cef667e", + "build/prefab/full/linux_arm64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/e7/be/d742c224a0d596e0dce2a5484562", + "build/prefab/full/linux_arm64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/3b/7d/790c154602635edb1a0b135b5b21", + "build/prefab/full/linux_x86_64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/e9/18/520b9da134c837a6888c30e97f19", + "build/prefab/full/linux_x86_64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/de/e5/ce18bd44acaea6fa0efce75997fa", + "build/prefab/full/linux_x86_64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/b1/12/5758a7ea749dae196ff895cae51e", + "build/prefab/full/linux_x86_64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/b0/2c/7ead0f7195b764ec07a1a9370db3", + "build/prefab/full/mac_arm64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/79/32/79dea81648ee523d80b56723adcb", + "build/prefab/full/mac_arm64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/e1/4c/abe20dc46ac5ab6dbc063d0ba665", + "build/prefab/full/mac_arm64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/cc/e7/989e658ad42701bb0b59917a4aa0", + "build/prefab/full/mac_arm64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/f4/5f/cf504d8fc3c164dbe4483ff97b54", + "build/prefab/full/mac_x86_64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/81/79/a916d9439ff7bf8320dfe0834c1c", + "build/prefab/full/mac_x86_64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/c2/b5/280018b8d8d9c69969bbe32c9b38", + "build/prefab/full/mac_x86_64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/de/f0/2ae295b8adb55a8fca413c2983c1", + "build/prefab/full/mac_x86_64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/59/ae/b2d08ff9b3a090e9eb0c36941505", + "build/prefab/full/windows_x86_gui/debug/BallisticaCore.exe": "https://files.ballistica.net/cache/ba1/e6/c2/70029cb62124d7e7cc56b4093063", + "build/prefab/full/windows_x86_gui/release/BallisticaCore.exe": "https://files.ballistica.net/cache/ba1/85/51/baa09a10bd02ecd687743ecb11dc", + "build/prefab/full/windows_x86_server/debug/dist/BallisticaCoreHeadless.exe": "https://files.ballistica.net/cache/ba1/5e/c8/df6b785ea053fdc385284db9ce02", + "build/prefab/full/windows_x86_server/release/dist/BallisticaCoreHeadless.exe": "https://files.ballistica.net/cache/ba1/96/5a/9e78e672ff1d12e0e03339a33ee4", "build/prefab/lib/linux_arm64_gui/debug/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/e2/88/53757bc9fd92d49bd35dc6d3be0e", "build/prefab/lib/linux_arm64_gui/release/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/06/69/078f7eef49a1127a1492db4703f6", "build/prefab/lib/linux_arm64_server/debug/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/82/f1/2b13fe77164f72d2bf57453bb8e5", @@ -4031,14 +4031,14 @@ "build/prefab/lib/mac_x86_64_gui/release/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/a6/91/f9cb15d0876750e28abe3b0d221c", "build/prefab/lib/mac_x86_64_server/debug/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/b7/11/1ecfe322ae997772b71538664cad", "build/prefab/lib/mac_x86_64_server/release/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/8d/d6/1e83dba73d581cfb2b2f6eb31f22", - "build/prefab/lib/windows/Debug_Win32/BallisticaCoreGenericInternal.lib": "https://files.ballistica.net/cache/ba1/5c/e1/a38642b99936733a5a560f9300d3", - "build/prefab/lib/windows/Debug_Win32/BallisticaCoreGenericInternal.pdb": "https://files.ballistica.net/cache/ba1/24/31/78c54905becd2f236a620dda01e1", - "build/prefab/lib/windows/Debug_Win32/BallisticaCoreHeadlessInternal.lib": "https://files.ballistica.net/cache/ba1/7e/f5/48e9c6cd11de662188fef4484e6d", - "build/prefab/lib/windows/Debug_Win32/BallisticaCoreHeadlessInternal.pdb": "https://files.ballistica.net/cache/ba1/e7/40/fce7451ad5e3802aeb058dc15712", - "build/prefab/lib/windows/Release_Win32/BallisticaCoreGenericInternal.lib": "https://files.ballistica.net/cache/ba1/a5/07/a9b7fdd826d2235e8ad2e398e13c", - "build/prefab/lib/windows/Release_Win32/BallisticaCoreGenericInternal.pdb": "https://files.ballistica.net/cache/ba1/05/7e/c97aa4f1876c7a5029fb9478e1a5", - "build/prefab/lib/windows/Release_Win32/BallisticaCoreHeadlessInternal.lib": "https://files.ballistica.net/cache/ba1/e9/3e/120aaa006c5494394d862854fa4e", - "build/prefab/lib/windows/Release_Win32/BallisticaCoreHeadlessInternal.pdb": "https://files.ballistica.net/cache/ba1/ac/c9/3d3fe16a6fe892945977350861c7", + "build/prefab/lib/windows/Debug_Win32/BallisticaCoreGenericInternal.lib": "https://files.ballistica.net/cache/ba1/50/6a/ca5e49b3cad047b541648dc9914d", + "build/prefab/lib/windows/Debug_Win32/BallisticaCoreGenericInternal.pdb": "https://files.ballistica.net/cache/ba1/6f/f7/b38282cfbd3cb0cda89d5a458176", + "build/prefab/lib/windows/Debug_Win32/BallisticaCoreHeadlessInternal.lib": "https://files.ballistica.net/cache/ba1/97/e5/1d2e76fcadbe022d4c522c7d2135", + "build/prefab/lib/windows/Debug_Win32/BallisticaCoreHeadlessInternal.pdb": "https://files.ballistica.net/cache/ba1/54/d6/2d642477837c34c946b70e27014d", + "build/prefab/lib/windows/Release_Win32/BallisticaCoreGenericInternal.lib": "https://files.ballistica.net/cache/ba1/79/2b/f17b81bdf03719098a558305f0a3", + "build/prefab/lib/windows/Release_Win32/BallisticaCoreGenericInternal.pdb": "https://files.ballistica.net/cache/ba1/9a/5a/3ca5679187f8dd5f89d8bb68ed84", + "build/prefab/lib/windows/Release_Win32/BallisticaCoreHeadlessInternal.lib": "https://files.ballistica.net/cache/ba1/4b/88/d016d4059ea7b334e95d1e6ef258", + "build/prefab/lib/windows/Release_Win32/BallisticaCoreHeadlessInternal.pdb": "https://files.ballistica.net/cache/ba1/1b/35/b11d851fc912b7bcd57766981fa2", "src/ballistica/generated/python_embedded/binding.inc": "https://files.ballistica.net/cache/ba1/c0/32/b7907e3859a5c5013a3d97b6b523", "src/ballistica/generated/python_embedded/bootstrap.inc": "https://files.ballistica.net/cache/ba1/2d/4f/f4fe67827f36cd59cd5193333a02", "src/ballistica/generated/python_embedded/bootstrap_monolithic.inc": "https://files.ballistica.net/cache/ba1/ef/c1/aa5f1aa10af89f5c0b1e616355fd" diff --git a/.idea/dictionaries/ericf.xml b/.idea/dictionaries/ericf.xml index 4bbeebf2..d8ea14f9 100644 --- a/.idea/dictionaries/ericf.xml +++ b/.idea/dictionaries/ericf.xml @@ -1736,6 +1736,7 @@ offsanchor offsx offsy + ofile ofval oggenc oghash @@ -1951,6 +1952,7 @@ priceraw printcolors printf + printfiles printnodes printobjects printpaths diff --git a/CHANGELOG.md b/CHANGELOG.md index 20a4c476..22b32fea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,9 @@ -### 1.7.10 (build 20884, api 7, 2022-09-27) +### 1.7.10 (build 20885, api 7, 2022-09-28) - Added eval support for cloud-console. This means you can type something like '1+1' in the console and see '2' printed. This is how Python behaves in the stdin console or in-game console or the standard Python interpreter. - Exceptions in the cloud-console now print to stderr instead of logging.exception(). This means they aren't a pretty red color anymore, but this will keep cloud-console behaving well with things like servers where logging.exception() might trigger alarms or otherwise. This is also consistent with standard interactive Python behavior. - Cloud console now shows the device name at the top instead of simply 'Console' while connected. - Moved the function that actually runs cloud console code to `ba._cloud.cloud_console_exec()`. -- Upgraded bundled Python (for Android and Apple builds) to 3.10.7. +- Added efro.debug which contains useful functionality for debugging object reference issues and memory leaks on live app instances (via cloud shell or whatever). ### 1.7.9 (build 20880, api 7, 2022-09-24) - Cleaned up the efro.message system to isolate response types that are used purely internally (via a new SysResponse type). diff --git a/assets/src/ba_data/python/ba/_bootstrap.py b/assets/src/ba_data/python/ba/_bootstrap.py index 29dda6e0..13467d7c 100644 --- a/assets/src/ba_data/python/ba/_bootstrap.py +++ b/assets/src/ba_data/python/ba/_bootstrap.py @@ -45,7 +45,7 @@ def bootstrap() -> None: # Give a soft warning if we're being used with a different binary # version than we expect. - expected_build = 20884 + expected_build = 20885 running_build: int = env['build_number'] if running_build != expected_build: print( diff --git a/ballisticacore-cmake/.idea/dictionaries/ericf.xml b/ballisticacore-cmake/.idea/dictionaries/ericf.xml index 9519c9e8..4cabf0c5 100644 --- a/ballisticacore-cmake/.idea/dictionaries/ericf.xml +++ b/ballisticacore-cmake/.idea/dictionaries/ericf.xml @@ -913,6 +913,7 @@ oenval offsx offsy + ofile oiffsss okbtn oldbook @@ -1015,6 +1016,7 @@ prerun prettypath printf + printfiles printnodes printobjects printrefs diff --git a/config/config.json b/config/config.json index 5b6c3394..1fcd9866 100644 --- a/config/config.json +++ b/config/config.json @@ -35,7 +35,8 @@ "filelock", "Cocoa", "pdoc", - "certifi" + "certifi", + "psutil" ], "python_paths": [ "assets/src/ba_data/python", diff --git a/src/ballistica/ballistica.cc b/src/ballistica/ballistica.cc index e0d073dc..3bd3bc47 100644 --- a/src/ballistica/ballistica.cc +++ b/src/ballistica/ballistica.cc @@ -32,7 +32,7 @@ namespace ballistica { // These are set automatically via script; don't modify them here. -const int kAppBuildNumber = 20884; +const int kAppBuildNumber = 20885; const char* kAppVersion = "1.7.10"; // Our standalone globals. diff --git a/tools/efro/debug.py b/tools/efro/debug.py index b474afd7..9fce4aa7 100644 --- a/tools/efro/debug.py +++ b/tools/efro/debug.py @@ -9,7 +9,7 @@ import types from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Any + from typing import Any, TextIO ABS_MAX_LEVEL = 10 @@ -22,12 +22,12 @@ ABS_MAX_LEVEL = 10 # returning these temporary references wherever possible. # A good test is running printrefs() repeatedly on some object that is -# known to be static. If the list of references changes or the id or any -# of the references, we're probably letting a temporary object sneak into -# the results and should fix it. +# known to be static. If the list of references or the ids or any +# the listed references changes with each run, it's a good sign that +# we're showing some temporary objects that we should be ignoring. -def getobjs(cls: type, contains: str | None = None) -> list[Any]: +def getobjs(cls: type | str, contains: str | None = None) -> list[Any]: """Return all garbage-collected objects matching criteria. 'type' can be an actual type or a string in which case objects @@ -73,10 +73,96 @@ def getobj(objid: int) -> Any: def getrefs(obj: Any) -> list[Any]: """Given an object, return things referencing it.""" - v = vars() # Ignore ref in locals. + v = vars() # Ignore ref coming from locals. return [o for o in gc.get_referrers(obj) if o is not v] +def printfiles(file: TextIO | None = None) -> None: + """Print info about open files in the current app.""" + import io + file = sys.stderr if file is None else file + try: + import psutil + except ImportError: + print( + "Error: printfiles requires the 'psutil' module to be installed.", + file=file) + return + + proc = psutil.Process() + + # Let's grab all Python file handles so we can associate raw files + # with their Python objects when possible. + fileio_ids = {obj.fileno(): obj for obj in getobjs(io.FileIO)} + textio_ids = {obj.fileno(): obj for obj in getobjs(io.TextIOWrapper)} + + # FIXME: we could do a more limited version of this when psutil is + # not present that simply includes Python's files. + print('Files open by this app (not limited to Python\'s):', file=file) + for i, ofile in enumerate(proc.open_files()): + # Mypy doesn't know about mode apparently. + mode = ofile.mode # type: ignore + textio = textio_ids.get(ofile.fd) + textio_s = id(textio) if textio is not None else '' + fileio = fileio_ids.get(ofile.fd) + fileio_s = id(fileio) if fileio is not None else '' + print(f'#{i+1}: path={ofile.path!r},' + f' fd={ofile.fd}, mode={mode!r}, TextIOWrapper={textio_s},' + f' FileIO={fileio_s}') + + +def printrefs(obj: Any, + max_level: int = 2, + exclude_objs: list[Any] | None = None, + expand_ids: list[int] | None = None, + file: TextIO | None = None) -> None: + """Print human readable list of objects referring to an object. + + 'max_level' specifies how many levels of recursion are printed. + 'exclude_objs' can be a list of exact objects to skip if found in the + referrers list. This can be useful to avoid printing the local context + where the object was passed in from (locals(), etc). + 'expand_ids' can be a list of object ids; if that particular object is + found, it will always be expanded even if max_level has been reached. + """ + _printrefs(obj, + level=0, + max_level=max_level, + exclude_objs=[] if exclude_objs is None else exclude_objs, + expand_ids=[] if expand_ids is None else expand_ids, + file=sys.stderr if file is None else file) + + +def printtypes(limit: int = 50, file: TextIO | None = None) -> None: + """Print a human readable list of which types have the most instances.""" + assert limit > 0 + objtypes: dict[str, int] = {} + gc.collect() # Recommended before get_objects(). + allobjs = gc.get_objects() + allobjc = len(allobjs) + for obj in allobjs: + modname = type(obj).__module__ + tpname = type(obj).__qualname__ + if modname != 'builtins': + tpname = f'{modname}.{tpname}' + objtypes[tpname] = objtypes.get(tpname, 0) + 1 + + # Presumably allobjs contains stack-frame/dict type stuff + # from this function call which in turn contain refs to allobjs. + # Let's try to prevent these huge lists from accumulating until + # the cyclical collector (hopefully) gets to them. + allobjs.clear() + del allobjs + + print(f'Types most allocated ({allobjc} total objects):', file=file) + for i, tpitem in enumerate( + sorted(objtypes.items(), key=lambda x: x[1], + reverse=True)[:limit]): + tpname, tpval = tpitem + percent = tpval / allobjc * 100.0 + print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file) + + def _desctype(obj: Any) -> str: cls = type(obj) if cls is types.ModuleType: @@ -118,9 +204,9 @@ def _desc(obj: Any) -> str: def _printrefs(obj: Any, level: int, max_level: int, exclude_objs: list, - expand_ids: list[int]) -> None: + expand_ids: list[int], file: TextIO) -> None: ind = ' ' * level - print(ind + _desc(obj), file=sys.stderr) + print(ind + _desc(obj), file=file) v = vars() if level < max_level or (id(obj) in expand_ids and level < ABS_MAX_LEVEL): refs = getrefs(obj) @@ -147,24 +233,5 @@ def _printrefs(obj: Any, level: int, max_level: int, exclude_objs: list, level=level + 1, max_level=max_level, exclude_objs=exclude_objs + [refs], - expand_ids=expand_ids) - - -def printrefs(obj: Any, - max_level: int = 2, - exclude_objs: list[Any] | None = None, - expand_ids: list[int] | None = None) -> None: - """Print human readable list of objects referring to an object. - - 'max_level' specifies how many levels of recursion are printed. - 'exclude_objs' can be a list of exact objects to skip if found in the - referrers list. This can be useful to avoid printing the local context - where the object was passed in from (locals(), etc). - 'expand_ids' can be a list of object ids; if that particular object is - found, it will always be expanded even if max_level has been reached. - """ - _printrefs(obj, - level=0, - max_level=max_level, - exclude_objs=[] if exclude_objs is None else exclude_objs, - expand_ids=[] if expand_ids is None else expand_ids) + expand_ids=expand_ids, + file=file) diff --git a/tools/efro/rpc.py b/tools/efro/rpc.py index 793f434c..b7a14d03 100644 --- a/tools/efro/rpc.py +++ b/tools/efro/rpc.py @@ -76,6 +76,44 @@ def ssl_stream_writer_underlying_transport_info( return '(not found)' +def ssl_stream_writer_force_close_check(writer: asyncio.StreamWriter) -> None: + """Ensure a writer is closed; hacky workaround for odd hang.""" + from efro.call import tpartial + from threading import Thread + # Hopefully can remove this in Python 3.11?... + # see issue with is_closing() below for more details. + transport = getattr(writer, '_transport', None) + if transport is not None: + sslproto = getattr(transport, '_ssl_protocol', None) + if sslproto is not None: + raw_transport = getattr(sslproto, '_transport', None) + if raw_transport is not None: + Thread( + target=tpartial( + _do_writer_force_close_check, + weakref.ref(raw_transport), + ), + daemon=True, + ).start() + + +def _do_writer_force_close_check(transport_weak: weakref.ref) -> None: + try: + # Attempt to bail as soon as the obj dies. + # If it hasn't done so by our timeout, force-kill it. + starttime = time.monotonic() + while time.monotonic() - starttime < 10.0: + time.sleep(0.1) + if transport_weak() is None: + return + transport = transport_weak() + if transport is not None: + logging.info('Forcing abort on stuck transport %s.', transport) + transport.abort() + except Exception: + logging.warning('Error in writer-force-close-check', exc_info=True) + + class _InFlightMessage: """Represents a message that is out on the wire.""" @@ -155,6 +193,7 @@ class RPCEndpoint: self._keepalive_timeout = keepalive_timeout self._did_close_writer = False self._did_wait_closed_writer = False + self._did_out_packets_buildup_warning = False # Need to hold weak-refs to these otherwise it creates dep-loops # which keeps us alive. @@ -186,11 +225,26 @@ class RPCEndpoint: ' but writer not wait-closed (transport=%s).', id(self), ssl_stream_writer_underlying_transport_info(self._writer)) + # Currently seeing rare issue where sockets don't go down; + # let's add a timer to force the issue until we can figure it out. + ssl_stream_writer_force_close_check(self._writer) + async def run(self) -> None: """Run the endpoint until the connection is lost or closed. Handles closing the provided reader/writer on close. """ + try: + await self._do_run() + except asyncio.CancelledError: + # We aren't really designed to be cancelled so let's warn + # if it happens. + logging.warning('RPCEndpoint.run got CancelledError;' + ' want to try and avoid this.') + raise + + async def _do_run(self) -> None: + self._check_env() if self._run_called: @@ -216,9 +270,13 @@ class RPCEndpoint: # We want to know if any errors happened aside from CancelledError # (which are BaseExceptions, not Exception). if isinstance(result, Exception): - if self._debug_print: - logging.error('Got unexpected error from %s core task: %s', - self._label, result) + logging.warning('Got unexpected error from %s core task: %s', + self._label, result) + + if not all(task.done() for task in core_tasks): + logging.warning( + 'RPCEndpoint %d: not all core tasks marked done after gather.', + id(self)) # Shut ourself down. try: @@ -258,6 +316,9 @@ class RPCEndpoint: message_id = self._next_message_id self._next_message_id = (self._next_message_id + 1) % 65536 + # FIXME - should handle backpressure (waiting here if there are + # enough packets already enqueued). + if len(message) > 65535: # Payload consists of type (1b), message_id (2b), # len (4b), and data. @@ -377,6 +438,11 @@ class RPCEndpoint: logging.warning('Got unexpected error cleaning up %s task: %s', self._label, result) + if not all(task.done() for task in live_tasks): + logging.warning( + 'RPCEndpoint %d: not all live tasks marked done after gather.', + id(self)) + if self._debug_print: self._debug_print_call( f'{self._label}: tasks finished; waiting for writer close...') @@ -393,9 +459,7 @@ class RPCEndpoint: # indefinitely. See https://github.com/python/cpython/issues/83939 # It sounds like this should be fixed in 3.11 but for now just # forcing the issue with a timeout here. - assert not self._did_wait_closed_writer - self._did_wait_closed_writer = True - await asyncio.wait_for(self._writer.wait_closed(), timeout=10.0) + await asyncio.wait_for(self._writer.wait_closed(), timeout=30.0) except asyncio.TimeoutError: logging.info( 'Timeout on _writer.wait_closed() for %s rpc (transport=%s).', @@ -417,6 +481,8 @@ class RPCEndpoint: logging.warning('RPCEndpoint.wait_closed()' ' got asyncio.CancelledError; not expected.') raise + assert not self._did_wait_closed_writer + self._did_wait_closed_writer = True def _tm(self) -> str: """Simple readable time value for debugging.""" @@ -541,7 +607,21 @@ class RPCEndpoint: self._have_out_packets.clear() self._writer.write(data) - # await self._writer.drain() + + # This should keep our writer from buffering huge amounts + # of outgoing data. We must remember though that we also + # need to prevent _out_packets from growing too large and + # that part's on us. + await self._writer.drain() + + # For now we're not applying backpressure, but let's make + # noise if this gets out of hand. + if len(self._out_packets) > 200: + if not self._did_out_packets_buildup_warning: + logging.warning( + '_out_packets building up too' + ' much on RPCEndpoint %s.', id(self)) + self._did_out_packets_buildup_warning = True async def _run_keepalive_task(self) -> None: """Send periodic keepalive packets."""