cloud console and debugging improvements

This commit is contained in:
Eric 2022-09-28 09:46:12 -07:00
parent e45d6675c9
commit 2c694ebd7e
No known key found for this signature in database
GPG Key ID: 89C93F0F8D6D5A98
9 changed files with 221 additions and 69 deletions

View File

@ -3995,26 +3995,26 @@
"assets/src/ba_data/python/ba/_generated/__init__.py": "https://files.ballistica.net/cache/ba1/ee/e8/cad05aa531c7faf7ff7b96db7f6e",
"assets/src/ba_data/python/ba/_generated/enums.py": "https://files.ballistica.net/cache/ba1/b2/e5/0ee0561e16257a32830645239f34",
"ballisticacore-windows/Generic/BallisticaCore.ico": "https://files.ballistica.net/cache/ba1/89/c0/e32c7d2a35dc9aef57cc73b0911a",
"build/prefab/full/linux_arm64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/b7/31/a293aea8039ee830d3577179aa81",
"build/prefab/full/linux_arm64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/a2/11/7a48cde8029be043579abf94179e",
"build/prefab/full/linux_arm64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/6b/77/198dd7c53b085c116c77a7877af5",
"build/prefab/full/linux_arm64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/14/9f/ca41b53c5fa84b063b5d089f061f",
"build/prefab/full/linux_x86_64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/75/a2/f30da762e9451cea11bbf89b4c6e",
"build/prefab/full/linux_x86_64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/b8/de/fd123320eaac865c1484e7678397",
"build/prefab/full/linux_x86_64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/81/90/b8f77fe7c99f6caf7305734a0962",
"build/prefab/full/linux_x86_64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/8d/8c/7a6b8f0c7a62b6fd7c67e89d9b3c",
"build/prefab/full/mac_arm64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/9a/17/a6fdb6d82f342786e32ba4c64ded",
"build/prefab/full/mac_arm64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/c1/14/f93f5ebba5ff82ca596931dbc30a",
"build/prefab/full/mac_arm64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/30/9c/9a8758cd98e1c2d566ef61d0c511",
"build/prefab/full/mac_arm64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/5b/2f/3e28b0c669980be5f960515c125e",
"build/prefab/full/mac_x86_64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/16/5c/de01bf8f26d3bea3cf30240dc609",
"build/prefab/full/mac_x86_64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/c4/85/8d848c0eee73737090a2e68dab13",
"build/prefab/full/mac_x86_64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/c6/9f/0b3cfdd1b580768b2cb0a3509de2",
"build/prefab/full/mac_x86_64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/38/c5/688b96898a6cdd8d8c5a410d3dd9",
"build/prefab/full/windows_x86_gui/debug/BallisticaCore.exe": "https://files.ballistica.net/cache/ba1/44/a9/b16f6100e676469ee2c1fa48cde0",
"build/prefab/full/windows_x86_gui/release/BallisticaCore.exe": "https://files.ballistica.net/cache/ba1/c8/d8/65aba46607929b1f697d9d082330",
"build/prefab/full/windows_x86_server/debug/dist/BallisticaCoreHeadless.exe": "https://files.ballistica.net/cache/ba1/41/4c/0e5722286d59009754ac6d0b6867",
"build/prefab/full/windows_x86_server/release/dist/BallisticaCoreHeadless.exe": "https://files.ballistica.net/cache/ba1/cb/60/54dd87249f9e40b8075943bcab0f",
"build/prefab/full/linux_arm64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/8a/a3/09c2e2c3c8486701a9ee5327b263",
"build/prefab/full/linux_arm64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/9e/11/1da357b0f99d277a25e28cef667e",
"build/prefab/full/linux_arm64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/e7/be/d742c224a0d596e0dce2a5484562",
"build/prefab/full/linux_arm64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/3b/7d/790c154602635edb1a0b135b5b21",
"build/prefab/full/linux_x86_64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/e9/18/520b9da134c837a6888c30e97f19",
"build/prefab/full/linux_x86_64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/de/e5/ce18bd44acaea6fa0efce75997fa",
"build/prefab/full/linux_x86_64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/b1/12/5758a7ea749dae196ff895cae51e",
"build/prefab/full/linux_x86_64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/b0/2c/7ead0f7195b764ec07a1a9370db3",
"build/prefab/full/mac_arm64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/79/32/79dea81648ee523d80b56723adcb",
"build/prefab/full/mac_arm64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/e1/4c/abe20dc46ac5ab6dbc063d0ba665",
"build/prefab/full/mac_arm64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/cc/e7/989e658ad42701bb0b59917a4aa0",
"build/prefab/full/mac_arm64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/f4/5f/cf504d8fc3c164dbe4483ff97b54",
"build/prefab/full/mac_x86_64_gui/debug/ballisticacore": "https://files.ballistica.net/cache/ba1/81/79/a916d9439ff7bf8320dfe0834c1c",
"build/prefab/full/mac_x86_64_gui/release/ballisticacore": "https://files.ballistica.net/cache/ba1/c2/b5/280018b8d8d9c69969bbe32c9b38",
"build/prefab/full/mac_x86_64_server/debug/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/de/f0/2ae295b8adb55a8fca413c2983c1",
"build/prefab/full/mac_x86_64_server/release/dist/ballisticacore_headless": "https://files.ballistica.net/cache/ba1/59/ae/b2d08ff9b3a090e9eb0c36941505",
"build/prefab/full/windows_x86_gui/debug/BallisticaCore.exe": "https://files.ballistica.net/cache/ba1/e6/c2/70029cb62124d7e7cc56b4093063",
"build/prefab/full/windows_x86_gui/release/BallisticaCore.exe": "https://files.ballistica.net/cache/ba1/85/51/baa09a10bd02ecd687743ecb11dc",
"build/prefab/full/windows_x86_server/debug/dist/BallisticaCoreHeadless.exe": "https://files.ballistica.net/cache/ba1/5e/c8/df6b785ea053fdc385284db9ce02",
"build/prefab/full/windows_x86_server/release/dist/BallisticaCoreHeadless.exe": "https://files.ballistica.net/cache/ba1/96/5a/9e78e672ff1d12e0e03339a33ee4",
"build/prefab/lib/linux_arm64_gui/debug/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/e2/88/53757bc9fd92d49bd35dc6d3be0e",
"build/prefab/lib/linux_arm64_gui/release/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/06/69/078f7eef49a1127a1492db4703f6",
"build/prefab/lib/linux_arm64_server/debug/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/82/f1/2b13fe77164f72d2bf57453bb8e5",
@ -4031,14 +4031,14 @@
"build/prefab/lib/mac_x86_64_gui/release/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/a6/91/f9cb15d0876750e28abe3b0d221c",
"build/prefab/lib/mac_x86_64_server/debug/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/b7/11/1ecfe322ae997772b71538664cad",
"build/prefab/lib/mac_x86_64_server/release/libballisticacore_internal.a": "https://files.ballistica.net/cache/ba1/8d/d6/1e83dba73d581cfb2b2f6eb31f22",
"build/prefab/lib/windows/Debug_Win32/BallisticaCoreGenericInternal.lib": "https://files.ballistica.net/cache/ba1/5c/e1/a38642b99936733a5a560f9300d3",
"build/prefab/lib/windows/Debug_Win32/BallisticaCoreGenericInternal.pdb": "https://files.ballistica.net/cache/ba1/24/31/78c54905becd2f236a620dda01e1",
"build/prefab/lib/windows/Debug_Win32/BallisticaCoreHeadlessInternal.lib": "https://files.ballistica.net/cache/ba1/7e/f5/48e9c6cd11de662188fef4484e6d",
"build/prefab/lib/windows/Debug_Win32/BallisticaCoreHeadlessInternal.pdb": "https://files.ballistica.net/cache/ba1/e7/40/fce7451ad5e3802aeb058dc15712",
"build/prefab/lib/windows/Release_Win32/BallisticaCoreGenericInternal.lib": "https://files.ballistica.net/cache/ba1/a5/07/a9b7fdd826d2235e8ad2e398e13c",
"build/prefab/lib/windows/Release_Win32/BallisticaCoreGenericInternal.pdb": "https://files.ballistica.net/cache/ba1/05/7e/c97aa4f1876c7a5029fb9478e1a5",
"build/prefab/lib/windows/Release_Win32/BallisticaCoreHeadlessInternal.lib": "https://files.ballistica.net/cache/ba1/e9/3e/120aaa006c5494394d862854fa4e",
"build/prefab/lib/windows/Release_Win32/BallisticaCoreHeadlessInternal.pdb": "https://files.ballistica.net/cache/ba1/ac/c9/3d3fe16a6fe892945977350861c7",
"build/prefab/lib/windows/Debug_Win32/BallisticaCoreGenericInternal.lib": "https://files.ballistica.net/cache/ba1/50/6a/ca5e49b3cad047b541648dc9914d",
"build/prefab/lib/windows/Debug_Win32/BallisticaCoreGenericInternal.pdb": "https://files.ballistica.net/cache/ba1/6f/f7/b38282cfbd3cb0cda89d5a458176",
"build/prefab/lib/windows/Debug_Win32/BallisticaCoreHeadlessInternal.lib": "https://files.ballistica.net/cache/ba1/97/e5/1d2e76fcadbe022d4c522c7d2135",
"build/prefab/lib/windows/Debug_Win32/BallisticaCoreHeadlessInternal.pdb": "https://files.ballistica.net/cache/ba1/54/d6/2d642477837c34c946b70e27014d",
"build/prefab/lib/windows/Release_Win32/BallisticaCoreGenericInternal.lib": "https://files.ballistica.net/cache/ba1/79/2b/f17b81bdf03719098a558305f0a3",
"build/prefab/lib/windows/Release_Win32/BallisticaCoreGenericInternal.pdb": "https://files.ballistica.net/cache/ba1/9a/5a/3ca5679187f8dd5f89d8bb68ed84",
"build/prefab/lib/windows/Release_Win32/BallisticaCoreHeadlessInternal.lib": "https://files.ballistica.net/cache/ba1/4b/88/d016d4059ea7b334e95d1e6ef258",
"build/prefab/lib/windows/Release_Win32/BallisticaCoreHeadlessInternal.pdb": "https://files.ballistica.net/cache/ba1/1b/35/b11d851fc912b7bcd57766981fa2",
"src/ballistica/generated/python_embedded/binding.inc": "https://files.ballistica.net/cache/ba1/c0/32/b7907e3859a5c5013a3d97b6b523",
"src/ballistica/generated/python_embedded/bootstrap.inc": "https://files.ballistica.net/cache/ba1/2d/4f/f4fe67827f36cd59cd5193333a02",
"src/ballistica/generated/python_embedded/bootstrap_monolithic.inc": "https://files.ballistica.net/cache/ba1/ef/c1/aa5f1aa10af89f5c0b1e616355fd"

View File

@ -1736,6 +1736,7 @@
<w>offsanchor</w>
<w>offsx</w>
<w>offsy</w>
<w>ofile</w>
<w>ofval</w>
<w>oggenc</w>
<w>oghash</w>
@ -1951,6 +1952,7 @@
<w>priceraw</w>
<w>printcolors</w>
<w>printf</w>
<w>printfiles</w>
<w>printnodes</w>
<w>printobjects</w>
<w>printpaths</w>

View File

@ -1,9 +1,9 @@
### 1.7.10 (build 20884, api 7, 2022-09-27)
### 1.7.10 (build 20885, api 7, 2022-09-28)
- Added eval support for cloud-console. This means you can type something like '1+1' in the console and see '2' printed. This is how Python behaves in the stdin console or in-game console or the standard Python interpreter.
- Exceptions in the cloud-console now print to stderr instead of logging.exception(). This means they aren't a pretty red color anymore, but this will keep cloud-console behaving well with things like servers where logging.exception() might trigger alarms or otherwise. This is also consistent with standard interactive Python behavior.
- Cloud console now shows the device name at the top instead of simply 'Console' while connected.
- Moved the function that actually runs cloud console code to `ba._cloud.cloud_console_exec()`.
- Upgraded bundled Python (for Android and Apple builds) to 3.10.7.
- Added efro.debug which contains useful functionality for debugging object reference issues and memory leaks on live app instances (via cloud shell or whatever).
### 1.7.9 (build 20880, api 7, 2022-09-24)
- Cleaned up the efro.message system to isolate response types that are used purely internally (via a new SysResponse type).

View File

@ -45,7 +45,7 @@ def bootstrap() -> None:
# Give a soft warning if we're being used with a different binary
# version than we expect.
expected_build = 20884
expected_build = 20885
running_build: int = env['build_number']
if running_build != expected_build:
print(

View File

@ -913,6 +913,7 @@
<w>oenval</w>
<w>offsx</w>
<w>offsy</w>
<w>ofile</w>
<w>oiffsss</w>
<w>okbtn</w>
<w>oldbook</w>
@ -1015,6 +1016,7 @@
<w>prerun</w>
<w>prettypath</w>
<w>printf</w>
<w>printfiles</w>
<w>printnodes</w>
<w>printobjects</w>
<w>printrefs</w>

View File

@ -35,7 +35,8 @@
"filelock",
"Cocoa",
"pdoc",
"certifi"
"certifi",
"psutil"
],
"python_paths": [
"assets/src/ba_data/python",

View File

@ -32,7 +32,7 @@
namespace ballistica {
// These are set automatically via script; don't modify them here.
const int kAppBuildNumber = 20884;
const int kAppBuildNumber = 20885;
const char* kAppVersion = "1.7.10";
// Our standalone globals.

View File

@ -9,7 +9,7 @@ import types
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any
from typing import Any, TextIO
ABS_MAX_LEVEL = 10
@ -22,12 +22,12 @@ ABS_MAX_LEVEL = 10
# returning these temporary references wherever possible.
# A good test is running printrefs() repeatedly on some object that is
# known to be static. If the list of references changes or the id or any
# of the references, we're probably letting a temporary object sneak into
# the results and should fix it.
# known to be static. If the list of references or the ids or any
# the listed references changes with each run, it's a good sign that
# we're showing some temporary objects that we should be ignoring.
def getobjs(cls: type, contains: str | None = None) -> list[Any]:
def getobjs(cls: type | str, contains: str | None = None) -> list[Any]:
"""Return all garbage-collected objects matching criteria.
'type' can be an actual type or a string in which case objects
@ -73,10 +73,96 @@ def getobj(objid: int) -> Any:
def getrefs(obj: Any) -> list[Any]:
"""Given an object, return things referencing it."""
v = vars() # Ignore ref in locals.
v = vars() # Ignore ref coming from locals.
return [o for o in gc.get_referrers(obj) if o is not v]
def printfiles(file: TextIO | None = None) -> None:
"""Print info about open files in the current app."""
import io
file = sys.stderr if file is None else file
try:
import psutil
except ImportError:
print(
"Error: printfiles requires the 'psutil' module to be installed.",
file=file)
return
proc = psutil.Process()
# Let's grab all Python file handles so we can associate raw files
# with their Python objects when possible.
fileio_ids = {obj.fileno(): obj for obj in getobjs(io.FileIO)}
textio_ids = {obj.fileno(): obj for obj in getobjs(io.TextIOWrapper)}
# FIXME: we could do a more limited version of this when psutil is
# not present that simply includes Python's files.
print('Files open by this app (not limited to Python\'s):', file=file)
for i, ofile in enumerate(proc.open_files()):
# Mypy doesn't know about mode apparently.
mode = ofile.mode # type: ignore
textio = textio_ids.get(ofile.fd)
textio_s = id(textio) if textio is not None else '<not found>'
fileio = fileio_ids.get(ofile.fd)
fileio_s = id(fileio) if fileio is not None else '<not found>'
print(f'#{i+1}: path={ofile.path!r},'
f' fd={ofile.fd}, mode={mode!r}, TextIOWrapper={textio_s},'
f' FileIO={fileio_s}')
def printrefs(obj: Any,
max_level: int = 2,
exclude_objs: list[Any] | None = None,
expand_ids: list[int] | None = None,
file: TextIO | None = None) -> None:
"""Print human readable list of objects referring to an object.
'max_level' specifies how many levels of recursion are printed.
'exclude_objs' can be a list of exact objects to skip if found in the
referrers list. This can be useful to avoid printing the local context
where the object was passed in from (locals(), etc).
'expand_ids' can be a list of object ids; if that particular object is
found, it will always be expanded even if max_level has been reached.
"""
_printrefs(obj,
level=0,
max_level=max_level,
exclude_objs=[] if exclude_objs is None else exclude_objs,
expand_ids=[] if expand_ids is None else expand_ids,
file=sys.stderr if file is None else file)
def printtypes(limit: int = 50, file: TextIO | None = None) -> None:
"""Print a human readable list of which types have the most instances."""
assert limit > 0
objtypes: dict[str, int] = {}
gc.collect() # Recommended before get_objects().
allobjs = gc.get_objects()
allobjc = len(allobjs)
for obj in allobjs:
modname = type(obj).__module__
tpname = type(obj).__qualname__
if modname != 'builtins':
tpname = f'{modname}.{tpname}'
objtypes[tpname] = objtypes.get(tpname, 0) + 1
# Presumably allobjs contains stack-frame/dict type stuff
# from this function call which in turn contain refs to allobjs.
# Let's try to prevent these huge lists from accumulating until
# the cyclical collector (hopefully) gets to them.
allobjs.clear()
del allobjs
print(f'Types most allocated ({allobjc} total objects):', file=file)
for i, tpitem in enumerate(
sorted(objtypes.items(), key=lambda x: x[1],
reverse=True)[:limit]):
tpname, tpval = tpitem
percent = tpval / allobjc * 100.0
print(f'{i+1}: {tpname}: {tpval} ({percent:.2f}%)', file=file)
def _desctype(obj: Any) -> str:
cls = type(obj)
if cls is types.ModuleType:
@ -118,9 +204,9 @@ def _desc(obj: Any) -> str:
def _printrefs(obj: Any, level: int, max_level: int, exclude_objs: list,
expand_ids: list[int]) -> None:
expand_ids: list[int], file: TextIO) -> None:
ind = ' ' * level
print(ind + _desc(obj), file=sys.stderr)
print(ind + _desc(obj), file=file)
v = vars()
if level < max_level or (id(obj) in expand_ids and level < ABS_MAX_LEVEL):
refs = getrefs(obj)
@ -147,24 +233,5 @@ def _printrefs(obj: Any, level: int, max_level: int, exclude_objs: list,
level=level + 1,
max_level=max_level,
exclude_objs=exclude_objs + [refs],
expand_ids=expand_ids)
def printrefs(obj: Any,
max_level: int = 2,
exclude_objs: list[Any] | None = None,
expand_ids: list[int] | None = None) -> None:
"""Print human readable list of objects referring to an object.
'max_level' specifies how many levels of recursion are printed.
'exclude_objs' can be a list of exact objects to skip if found in the
referrers list. This can be useful to avoid printing the local context
where the object was passed in from (locals(), etc).
'expand_ids' can be a list of object ids; if that particular object is
found, it will always be expanded even if max_level has been reached.
"""
_printrefs(obj,
level=0,
max_level=max_level,
exclude_objs=[] if exclude_objs is None else exclude_objs,
expand_ids=[] if expand_ids is None else expand_ids)
expand_ids=expand_ids,
file=file)

View File

@ -76,6 +76,44 @@ def ssl_stream_writer_underlying_transport_info(
return '(not found)'
def ssl_stream_writer_force_close_check(writer: asyncio.StreamWriter) -> None:
"""Ensure a writer is closed; hacky workaround for odd hang."""
from efro.call import tpartial
from threading import Thread
# Hopefully can remove this in Python 3.11?...
# see issue with is_closing() below for more details.
transport = getattr(writer, '_transport', None)
if transport is not None:
sslproto = getattr(transport, '_ssl_protocol', None)
if sslproto is not None:
raw_transport = getattr(sslproto, '_transport', None)
if raw_transport is not None:
Thread(
target=tpartial(
_do_writer_force_close_check,
weakref.ref(raw_transport),
),
daemon=True,
).start()
def _do_writer_force_close_check(transport_weak: weakref.ref) -> None:
try:
# Attempt to bail as soon as the obj dies.
# If it hasn't done so by our timeout, force-kill it.
starttime = time.monotonic()
while time.monotonic() - starttime < 10.0:
time.sleep(0.1)
if transport_weak() is None:
return
transport = transport_weak()
if transport is not None:
logging.info('Forcing abort on stuck transport %s.', transport)
transport.abort()
except Exception:
logging.warning('Error in writer-force-close-check', exc_info=True)
class _InFlightMessage:
"""Represents a message that is out on the wire."""
@ -155,6 +193,7 @@ class RPCEndpoint:
self._keepalive_timeout = keepalive_timeout
self._did_close_writer = False
self._did_wait_closed_writer = False
self._did_out_packets_buildup_warning = False
# Need to hold weak-refs to these otherwise it creates dep-loops
# which keeps us alive.
@ -186,11 +225,26 @@ class RPCEndpoint:
' but writer not wait-closed (transport=%s).', id(self),
ssl_stream_writer_underlying_transport_info(self._writer))
# Currently seeing rare issue where sockets don't go down;
# let's add a timer to force the issue until we can figure it out.
ssl_stream_writer_force_close_check(self._writer)
async def run(self) -> None:
"""Run the endpoint until the connection is lost or closed.
Handles closing the provided reader/writer on close.
"""
try:
await self._do_run()
except asyncio.CancelledError:
# We aren't really designed to be cancelled so let's warn
# if it happens.
logging.warning('RPCEndpoint.run got CancelledError;'
' want to try and avoid this.')
raise
async def _do_run(self) -> None:
self._check_env()
if self._run_called:
@ -216,9 +270,13 @@ class RPCEndpoint:
# We want to know if any errors happened aside from CancelledError
# (which are BaseExceptions, not Exception).
if isinstance(result, Exception):
if self._debug_print:
logging.error('Got unexpected error from %s core task: %s',
self._label, result)
logging.warning('Got unexpected error from %s core task: %s',
self._label, result)
if not all(task.done() for task in core_tasks):
logging.warning(
'RPCEndpoint %d: not all core tasks marked done after gather.',
id(self))
# Shut ourself down.
try:
@ -258,6 +316,9 @@ class RPCEndpoint:
message_id = self._next_message_id
self._next_message_id = (self._next_message_id + 1) % 65536
# FIXME - should handle backpressure (waiting here if there are
# enough packets already enqueued).
if len(message) > 65535:
# Payload consists of type (1b), message_id (2b),
# len (4b), and data.
@ -377,6 +438,11 @@ class RPCEndpoint:
logging.warning('Got unexpected error cleaning up %s task: %s',
self._label, result)
if not all(task.done() for task in live_tasks):
logging.warning(
'RPCEndpoint %d: not all live tasks marked done after gather.',
id(self))
if self._debug_print:
self._debug_print_call(
f'{self._label}: tasks finished; waiting for writer close...')
@ -393,9 +459,7 @@ class RPCEndpoint:
# indefinitely. See https://github.com/python/cpython/issues/83939
# It sounds like this should be fixed in 3.11 but for now just
# forcing the issue with a timeout here.
assert not self._did_wait_closed_writer
self._did_wait_closed_writer = True
await asyncio.wait_for(self._writer.wait_closed(), timeout=10.0)
await asyncio.wait_for(self._writer.wait_closed(), timeout=30.0)
except asyncio.TimeoutError:
logging.info(
'Timeout on _writer.wait_closed() for %s rpc (transport=%s).',
@ -417,6 +481,8 @@ class RPCEndpoint:
logging.warning('RPCEndpoint.wait_closed()'
' got asyncio.CancelledError; not expected.')
raise
assert not self._did_wait_closed_writer
self._did_wait_closed_writer = True
def _tm(self) -> str:
"""Simple readable time value for debugging."""
@ -541,7 +607,21 @@ class RPCEndpoint:
self._have_out_packets.clear()
self._writer.write(data)
# await self._writer.drain()
# This should keep our writer from buffering huge amounts
# of outgoing data. We must remember though that we also
# need to prevent _out_packets from growing too large and
# that part's on us.
await self._writer.drain()
# For now we're not applying backpressure, but let's make
# noise if this gets out of hand.
if len(self._out_packets) > 200:
if not self._did_out_packets_buildup_warning:
logging.warning(
'_out_packets building up too'
' much on RPCEndpoint %s.', id(self))
self._did_out_packets_buildup_warning = True
async def _run_keepalive_task(self) -> None:
"""Send periodic keepalive packets."""