Packaged up MetadataSubsystem into ba.app.meta

This commit is contained in:
Eric Froemling 2020-10-16 17:45:24 -07:00
parent 7535e0807b
commit 7c7f89385e
10 changed files with 205 additions and 139 deletions

View File

@ -4,6 +4,7 @@
- Achievement functionality has been consolidated into an AchievementSubsystem object at ba.app.ach
- Plugin functionality has been consolidated into a PluginSubsystem obj at ba.app.plugins
- Ditto with AccountSubsystem and ba.app.accounts
- Ditto with MetadataSubsystem and ba.app.meta
### 1.5.26 (20217)
- Simplified licensing header on python scripts.

View File

@ -65,6 +65,7 @@ from ba._keyboard import Keyboard
from ba._level import Level
from ba._lobby import Lobby, Chooser
from ba._math import normalized_color, is_point_in_box, vec3validate
from ba._meta import MetadataSubsystem
from ba._messages import (UNHANDLED, OutOfBoundsMessage, DeathType, DieMessage,
PlayerDiedMessage, StandMessage, PickUpMessage,
DropMessage, PickedUpMessage, DroppedMessage,

View File

@ -11,7 +11,7 @@ import _ba
if TYPE_CHECKING:
import ba
from ba import _language, _meta
from ba import _language
from bastd.actor import spazappearance
from typing import Optional, Dict, Set, Any, Type, Tuple, Callable, List
@ -28,11 +28,6 @@ class App:
"""
# pylint: disable=too-many-public-methods
# Note: many values here are simple method attrs and thus don't show
# up in docs. If there's any that'd be useful to expose publicly, they
# should be converted to properties so its possible to validate values
# and provide docs.
@property
def build_number(self) -> int:
"""Integer build number.
@ -177,6 +172,7 @@ class App:
from ba._achievement import AchievementSubsystem
from ba._plugin import PluginSubsystem
from ba._account import AccountSubsystem
from ba._meta import MetadataSubsystem
# Config.
self.config_file_healthy = False
@ -201,7 +197,6 @@ class App:
assert isinstance(self.headless_mode, bool)
# Misc.
self.metascan: Optional[_meta.ScanResults] = None
self.tips: List[str] = []
self.stress_test_reset_timer: Optional[ba.Timer] = None
self.last_ad_completion_time: Optional[float] = None
@ -235,6 +230,7 @@ class App:
self.last_ad_purpose = 'invalid'
self.attempted_first_ad = False
self.meta = MetadataSubsystem()
self.accounts = AccountSubsystem()
self.plugins = PluginSubsystem()
self.music = MusicSubsystem()
@ -288,7 +284,6 @@ class App:
from ba import _appconfig
from ba import _achievement
from ba import _map
from ba import _meta
from ba import _campaign
from bastd import appdelegate
from bastd import maps as stdmaps
@ -382,8 +377,7 @@ class App:
if not self.headless_mode:
_ba.timer(3.0, check_special_offer, timetype=TimeType.REAL)
# Start scanning for things exposed via ba_meta.
_meta.start_scan()
self.meta.on_app_launch()
self.accounts.on_app_launch()
self.plugins.on_app_launch()

View File

@ -5,6 +5,7 @@
from __future__ import annotations
import os
import time
import pathlib
import threading
from typing import TYPE_CHECKING
@ -33,78 +34,146 @@ class ScanResults:
warnings: str = ''
def start_scan() -> None:
"""Begin scanning script directories for scripts containing metadata.
class MetadataSubsystem:
"""Subsystem for working with script metadata in the app.
Should be called only once at launch."""
app = _ba.app
if app.metascan is not None:
print('WARNING: meta scan run more than once.')
pythondirs = [app.python_directory_app, app.python_directory_user]
thread = ScanThread(pythondirs)
thread.start()
Category: App Classes
Access the single shared instance of this class at 'ba.app.meta'.
"""
def handle_scan_results(results: ScanResults) -> None:
"""Called in the game thread with results of a completed scan."""
def __init__(self) -> None:
self.metascan: Optional[ScanResults] = None
from ba._language import Lstr
from ba._plugin import PotentialPlugin
def on_app_launch(self) -> None:
"""Should be called when the app is done bootstrapping."""
# Warnings generally only get printed locally for users' benefit
# (things like out-of-date scripts being ignored, etc.)
# Errors are more serious and will get included in the regular log
# warnings = results.get('warnings', '')
# errors = results.get('errors', '')
if results.warnings != '' or results.errors != '':
import textwrap
_ba.screenmessage(Lstr(resource='scanScriptsErrorText'),
color=(1, 0, 0))
_ba.playsound(_ba.getsound('error'))
if results.warnings != '':
_ba.log(textwrap.indent(results.warnings, 'Warning (meta-scan): '),
to_server=False)
if results.errors != '':
_ba.log(textwrap.indent(results.errors, 'Error (meta-scan): '))
# Start scanning for things exposed via ba_meta.
self.start_scan()
# Handle plugins.
plugs = _ba.app.plugins
config_changed = False
found_new = False
plugstates: Dict[str, Dict] = _ba.app.config.setdefault('Plugins', {})
assert isinstance(plugstates, dict)
def start_scan(self) -> None:
"""Begin scanning script directories for scripts containing metadata.
# Create a potential-plugin for each class we found in the scan.
for class_path in results.plugins:
plugs.potential_plugins.append(
PotentialPlugin(display_name=Lstr(value=class_path),
class_path=class_path,
available=True))
if class_path not in plugstates:
plugstates[class_path] = {'enabled': False}
config_changed = True
found_new = True
Should be called only once at launch."""
app = _ba.app
if self.metascan is not None:
print('WARNING: meta scan run more than once.')
pythondirs = [app.python_directory_app, app.python_directory_user]
thread = ScanThread(pythondirs)
thread.start()
# Also add a special one for any plugins set to load but *not* found
# in the scan (this way they will show up in the UI so we can disable them)
for class_path, plugstate in plugstates.items():
enabled = plugstate.get('enabled', False)
assert isinstance(enabled, bool)
if enabled and class_path not in results.plugins:
def handle_scan_results(self, results: ScanResults) -> None:
"""Called in the game thread with results of a completed scan."""
from ba._language import Lstr
from ba._plugin import PotentialPlugin
# Warnings generally only get printed locally for users' benefit
# (things like out-of-date scripts being ignored, etc.)
# Errors are more serious and will get included in the regular log
# warnings = results.get('warnings', '')
# errors = results.get('errors', '')
if results.warnings != '' or results.errors != '':
import textwrap
_ba.screenmessage(Lstr(resource='scanScriptsErrorText'),
color=(1, 0, 0))
_ba.playsound(_ba.getsound('error'))
if results.warnings != '':
_ba.log(textwrap.indent(results.warnings,
'Warning (meta-scan): '),
to_server=False)
if results.errors != '':
_ba.log(textwrap.indent(results.errors, 'Error (meta-scan): '))
# Handle plugins.
plugs = _ba.app.plugins
config_changed = False
found_new = False
plugstates: Dict[str, Dict] = _ba.app.config.setdefault('Plugins', {})
assert isinstance(plugstates, dict)
# Create a potential-plugin for each class we found in the scan.
for class_path in results.plugins:
plugs.potential_plugins.append(
PotentialPlugin(display_name=Lstr(value=class_path),
class_path=class_path,
available=False))
available=True))
if class_path not in plugstates:
plugstates[class_path] = {'enabled': False}
config_changed = True
found_new = True
plugs.potential_plugins.sort(key=lambda p: p.class_path)
# Also add a special one for any plugins set to load but *not* found
# in the scan (this way they will show up in the UI so we can disable
# them)
for class_path, plugstate in plugstates.items():
enabled = plugstate.get('enabled', False)
assert isinstance(enabled, bool)
if enabled and class_path not in results.plugins:
plugs.potential_plugins.append(
PotentialPlugin(display_name=Lstr(value=class_path),
class_path=class_path,
available=False))
if found_new:
_ba.screenmessage(Lstr(resource='pluginsDetectedText'),
color=(0, 1, 0))
_ba.playsound(_ba.getsound('ding'))
plugs.potential_plugins.sort(key=lambda p: p.class_path)
if config_changed:
_ba.app.config.commit()
if found_new:
_ba.screenmessage(Lstr(resource='pluginsDetectedText'),
color=(0, 1, 0))
_ba.playsound(_ba.getsound('ding'))
if config_changed:
_ba.app.config.commit()
def get_scan_results(self) -> ScanResults:
"""Return meta scan results; block if the scan is not yet complete."""
if self.metascan is None:
print('WARNING: ba.meta.get_scan_results()'
' called before scan completed.'
' This can cause hitches.')
# Now wait a bit for the scan to complete.
# Eventually error though if it doesn't.
starttime = time.time()
while self.metascan is None:
time.sleep(0.05)
if time.time() - starttime > 10.0:
raise TimeoutError(
'timeout waiting for meta scan to complete.')
return self.metascan
def get_game_types(self) -> List[Type[ba.GameActivity]]:
"""Return available game types."""
from ba._general import getclass
from ba._gameactivity import GameActivity
gameclassnames = self.get_scan_results().games
gameclasses = []
for gameclassname in gameclassnames:
try:
cls = getclass(gameclassname, GameActivity)
gameclasses.append(cls)
except Exception:
from ba import _error
_error.print_exception('error importing ' + str(gameclassname))
unowned = self.get_unowned_game_types()
return [cls for cls in gameclasses if cls not in unowned]
def get_unowned_game_types(self) -> Set[Type[ba.GameActivity]]:
"""Return present game types not owned by the current account."""
try:
from ba import _store
unowned_games: Set[Type[ba.GameActivity]] = set()
if not _ba.app.headless_mode:
for section in _store.get_store_layout()['minigames']:
for mname in section['items']:
if not _ba.get_purchased(mname):
m_info = _store.get_store_item(mname)
unowned_games.add(m_info['gametype'])
return unowned_games
except Exception:
from ba import _error
_error.print_exception('error calcing un-owned games')
return set()
class ScanThread(threading.Thread):
@ -125,13 +194,13 @@ class ScanThread(threading.Thread):
# Push a call to the game thread to print warnings/errors
# or otherwise deal with scan results.
_ba.pushcall(Call(handle_scan_results, results),
_ba.pushcall(Call(_ba.app.meta.handle_scan_results, results),
from_other_thread=True)
# We also, however, immediately make results available.
# This is because the game thread may be blocked waiting
# for them so we can't push a call or we'd get deadlock.
_ba.app.metascan = results
_ba.app.meta.metascan = results
class DirectoryScan:
@ -338,58 +407,3 @@ class DirectoryScan:
': no valid "# ba_meta api require <NUM>" line found;'
' ignoring module.\n')
return None
def get_scan_results() -> ScanResults:
"""Return meta scan results; blocking if the scan is not yet complete."""
import time
app = _ba.app
if app.metascan is None:
print(
'WARNING: ba.meta.get_scan_results() called before scan completed.'
' This can cause hitches.')
# Now wait a bit for the scan to complete.
# Eventually error though if it doesn't.
starttime = time.time()
while app.metascan is None:
time.sleep(0.05)
if time.time() - starttime > 10.0:
raise TimeoutError(
'timeout waiting for meta scan to complete.')
return app.metascan
def get_game_types() -> List[Type[ba.GameActivity]]:
"""Return available game types."""
from ba._general import getclass
from ba._gameactivity import GameActivity
gameclassnames = get_scan_results().games
gameclasses = []
for gameclassname in gameclassnames:
try:
cls = getclass(gameclassname, GameActivity)
gameclasses.append(cls)
except Exception:
from ba import _error
_error.print_exception('error importing ' + str(gameclassname))
unowned = get_unowned_game_types()
return [cls for cls in gameclasses if cls not in unowned]
def get_unowned_game_types() -> Set[Type[ba.GameActivity]]:
"""Return present game types not owned by the current account."""
try:
from ba import _store
unowned_games: Set[Type[ba.GameActivity]] = set()
if not _ba.app.headless_mode:
for section in _store.get_store_layout()['minigames']:
for mname in section['items']:
if not _ba.get_purchased(mname):
m_info = _store.get_store_item(mname)
unowned_games.add(m_info['gametype'])
return unowned_games
except Exception:
from ba import _error
_error.print_exception('error calcing un-owned games')
return set()

View File

@ -28,7 +28,7 @@ def filter_playlist(playlist: PlaylistType,
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
from ba import _meta
import _ba
from ba import _map
from ba import _general
from ba import _gameactivity
@ -36,7 +36,7 @@ def filter_playlist(playlist: PlaylistType,
unowned_maps: Sequence[str]
if remove_unowned or mark_unowned:
unowned_maps = _map.get_unowned_maps()
unowned_game_types = _meta.get_unowned_game_types()
unowned_game_types = _ba.app.meta.get_unowned_game_types()
else:
unowned_maps = []
unowned_game_types = set()

View File

@ -23,7 +23,6 @@ from ba._benchmark import (run_gpu_benchmark, run_cpu_benchmark,
run_media_reload_benchmark, run_stress_test)
from ba._campaign import getcampaign
from ba._messages import PlayerProfilesChangedMessage
from ba._meta import get_game_types
from ba._multiteamsession import DEFAULT_TEAM_COLORS, DEFAULT_TEAM_NAMES
from ba._music import do_play_music
from ba._netutils import (master_server_get, master_server_post,

View File

@ -213,8 +213,8 @@ class OnScreenKeyboardWindow(ba.Window):
# Show change instructions only if we have more than one
# keyboard option.
if (ba.app.metascan is not None
and len(ba.app.metascan.keyboards) > 1):
if (ba.app.meta.metascan is not None
and len(ba.app.meta.metascan.keyboards) > 1):
ba.textwidget(
parent=self._root_widget,
h_align='center',
@ -238,8 +238,8 @@ class OnScreenKeyboardWindow(ba.Window):
self._refresh()
def _get_keyboard(self) -> ba.Keyboard:
assert ba.app.metascan is not None
classname = ba.app.metascan.keyboards[self._keyboard_index]
assert ba.app.meta.metascan is not None
classname = ba.app.meta.metascan.keyboards[self._keyboard_index]
kbclass = ba.getclass(classname, ba.Keyboard)
return kbclass()
@ -305,11 +305,11 @@ class OnScreenKeyboardWindow(ba.Window):
self._refresh()
def _next_keyboard(self) -> None:
assert ba.app.metascan is not None
assert ba.app.meta.metascan is not None
self._keyboard_index = (self._keyboard_index + 1) % len(
ba.app.metascan.keyboards)
ba.app.meta.metascan.keyboards)
self._load_keyboard()
if len(ba.app.metascan.keyboards) < 2:
if len(ba.app.meta.metascan.keyboards) < 2:
ba.playsound(ba.getsound('error'))
ba.screenmessage(ba.Lstr(resource='keyboardNoOthersAvailableText'),
color=(1, 0, 0))

View File

@ -120,7 +120,6 @@ class PlaylistAddGameWindow(ba.Window):
self._refresh()
def _refresh(self, select_get_more_games_button: bool = False) -> None:
from ba.internal import get_game_types
if self._column is not None:
self._column.delete()
@ -130,8 +129,8 @@ class PlaylistAddGameWindow(ba.Window):
margin=0)
gametypes = [
gt for gt in get_game_types() if gt.supports_session_type(
self._editcontroller.get_session_type())
gt for gt in ba.app.meta.get_game_types() if
gt.supports_session_type(self._editcontroller.get_session_type())
]
# Sort in the current language.

View File

@ -93,7 +93,7 @@ class PluginSettingsWindow(ba.Window):
self._subcontainer = ba.columnwidget(parent=self._scrollwidget,
selection_loops_to_parent=True)
if ba.app.metascan is None:
if ba.app.meta.metascan is None:
ba.screenmessage('Still scanning plugins; please try again.',
color=(1, 0, 0))
ba.playsound(ba.getsound('error'))

View File

@ -155,6 +155,7 @@
<li><a href="#class_ba_Campaign">ba.Campaign</a></li>
<li><a href="#class_ba_Keyboard">ba.Keyboard</a></li>
<li><a href="#class_ba_LanguageSubsystem">ba.LanguageSubsystem</a></li>
<li><a href="#class_ba_MetadataSubsystem">ba.MetadataSubsystem</a></li>
<li><a href="#class_ba_MusicPlayer">ba.MusicPlayer</a></li>
<li><a href="#class_ba_MusicSubsystem">ba.MusicSubsystem</a></li>
<li><a href="#class_ba_Plugin">ba.Plugin</a></li>
@ -3807,6 +3808,63 @@ m.add_actions(conditions=('they_have_material',
actions=(('impact_sound', <a href="#function_ba_getsound">ba.getsound</a>('metalHit'), 2, 5),
('skid_sound', <a href="#function_ba_getsound">ba.getsound</a>('metalSkid'), 2, 5)))</pre>
</dd>
</dl>
<hr>
<h2><strong><a name="class_ba_MetadataSubsystem">ba.MetadataSubsystem</a></strong></h3>
<p><em>&lt;top level class&gt;</em>
</p>
<p>Subsystem for working with script metadata in the app.</p>
<p>Category: <a href="#class_category_App_Classes">App Classes</a></p>
<p> Access the single shared instance of this class at 'ba.app.meta'.
</p>
<h3>Methods:</h3>
<h5><a href="#method_ba_MetadataSubsystem____init__">&lt;constructor&gt;</a>, <a href="#method_ba_MetadataSubsystem__get_game_types">get_game_types()</a>, <a href="#method_ba_MetadataSubsystem__get_scan_results">get_scan_results()</a>, <a href="#method_ba_MetadataSubsystem__get_unowned_game_types">get_unowned_game_types()</a>, <a href="#method_ba_MetadataSubsystem__handle_scan_results">handle_scan_results()</a>, <a href="#method_ba_MetadataSubsystem__on_app_launch">on_app_launch()</a>, <a href="#method_ba_MetadataSubsystem__start_scan">start_scan()</a></h5>
<dl>
<dt><h4><a name="method_ba_MetadataSubsystem____init__">&lt;constructor&gt;</a></dt></h4><dd>
<p><span>ba.MetadataSubsystem()</span></p>
</dd>
<dt><h4><a name="method_ba_MetadataSubsystem__get_game_types">get_game_types()</a></dt></h4><dd>
<p><span>get_game_types(self) -&gt; List[Type[<a href="#class_ba_GameActivity">ba.GameActivity</a>]]</span></p>
<p>Return available game types.</p>
</dd>
<dt><h4><a name="method_ba_MetadataSubsystem__get_scan_results">get_scan_results()</a></dt></h4><dd>
<p><span>get_scan_results(self) -&gt; ScanResults</span></p>
<p>Return meta scan results; block if the scan is not yet complete.</p>
</dd>
<dt><h4><a name="method_ba_MetadataSubsystem__get_unowned_game_types">get_unowned_game_types()</a></dt></h4><dd>
<p><span>get_unowned_game_types(self) -&gt; Set[Type[<a href="#class_ba_GameActivity">ba.GameActivity</a>]]</span></p>
<p>Return present game types not owned by the current account.</p>
</dd>
<dt><h4><a name="method_ba_MetadataSubsystem__handle_scan_results">handle_scan_results()</a></dt></h4><dd>
<p><span>handle_scan_results(self, results: ScanResults) -&gt; None</span></p>
<p>Called in the game thread with results of a completed scan.</p>
</dd>
<dt><h4><a name="method_ba_MetadataSubsystem__on_app_launch">on_app_launch()</a></dt></h4><dd>
<p><span>on_app_launch(self) -&gt; None</span></p>
<p>Should be called when the app is done bootstrapping.</p>
</dd>
<dt><h4><a name="method_ba_MetadataSubsystem__start_scan">start_scan()</a></dt></h4><dd>
<p><span>start_scan(self) -&gt; None</span></p>
<p>Begin scanning script directories for scripts containing metadata.</p>
<p>Should be called only once at launch.</p>
</dd>
</dl>
<hr>