ballistica/tools/cloudtool
2019-12-13 16:55:31 -08:00

396 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3.7
# Copyright (c) 2011-2019 Eric Froemling
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------
"""A tool for interacting with ballistica's cloud services.
This facilitates workflows such as creating asset-packages, etc.
"""
from __future__ import annotations
import sys
import os
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING
from dataclasses import dataclass
import json
import subprocess
import tempfile
import requests
if TYPE_CHECKING:
from typing import Optional, Dict, Any, Tuple, List, BinaryIO
# Version is sent to the master-server with all commands. Can be incremented
# if we need to change behavior server-side to go along with client changes.
VERSION = 1
TOOL_NAME = 'cloudtool'
# Set CLOUDTOOL_LOCAL env var to 1 to test with a locally-run master-server.
MASTER_SERVER_ADDRESS = ('http://localhost:23524'
if os.environ.get('CLOUDTOOL_LOCAL') == '1' else
'https://bamaster.appspot.com')
USER_AGENT_STRING = 'cloudtool'
CACHE_DIR = Path('.cache/cloudtool')
CACHE_DATA_PATH = Path(CACHE_DIR, 'state')
CLRHDR = '\033[95m' # Header.
CLRGRN = '\033[92m' # Green.
CLRBLU = '\033[94m' # Glue.
CLRRED = '\033[91m' # Red.
CLREND = '\033[0m' # End.
CMD_LOGIN = 'login'
CMD_LOGOUT = 'logout'
CMD_PUTASSETPACK = 'putassetpack'
CMD_HELP = 'help'
ASSET_PACKAGE_NAME_VALID_CHARS = 'abcdefghijklmnopqrstuvwxyz0123456789_'
ASSET_PACKAGE_NAME_MAX_LENGTH = 32
ASSET_PATH_VALID_CHARS = 'abcdefghijklmnopqrstuvwxyz0123456789_'
ASSET_PATH_MAX_LENGTH = 128
@dataclass
class StateData:
"""Persistent state data stored to disk."""
login_token: Optional[str] = None
@dataclass
class Response:
"""Response data from the master server for a command."""
message: Optional[str]
error: Optional[str]
data: Any
class CleanError(Exception):
"""Exception resulting in a clean error string print and exit."""
class AssetType(Enum):
"""Types for asset files."""
TEXTURE = 'texture'
SOUND = 'sound'
class Asset:
"""Data for a single asset."""
def __init__(self, package: AssetPackage, assettype: AssetType,
path: str) -> None:
self.assettype = assettype
self.path = path
exts = {AssetType.TEXTURE: '.png', AssetType.SOUND: '.wav'}
self.filepath = os.path.join(package.path, path + exts[assettype])
# Note to self: keep this synced with server-side validation func...
def validate_asset_package_name(name: str) -> None:
"""Throw an exception on an invalid asset-package name."""
if len(name) > ASSET_PACKAGE_NAME_MAX_LENGTH:
raise CleanError(f'Asset package name is too long: "{name}"')
if not name:
raise CleanError(f'Asset package name cannot be empty.')
if name[0] == '_' or name[-1] == '_':
raise CleanError(
f'Asset package name cannot start or end with underscore.')
if '__' in name:
raise CleanError(
f'Asset package name cannot contain sequential underscores.')
for char in name:
if char not in ASSET_PACKAGE_NAME_VALID_CHARS:
raise CleanError(
f'Found invalid char "{char}" in asset package name "{name}".')
# Note to self: keep this synced with server-side validation func...
def validate_asset_path(path: str) -> None:
"""Throw an exception on an invalid asset path."""
if len(path) > ASSET_PATH_MAX_LENGTH:
raise CleanError(f'Asset path is too long: "{path}"')
names = path.split('/')
for name in names:
if not name:
raise CleanError(f'Found empty component in asset path "{path}".')
for char in name:
if char not in ASSET_PATH_VALID_CHARS:
raise CleanError(
f'Found invalid char "{char}" in asset path "{path}".')
class AssetPackage:
"""Data for local or remote asset packages."""
def __init__(self) -> None:
self.assets: Dict[str, Asset] = {}
self.path = Path('')
self.name = 'untitled'
@classmethod
def load_from_disk(cls, path: Path) -> AssetPackage:
"""Load an asset package from files on disk."""
import yaml
indexfilename = 'assetpackage.yaml'
package = AssetPackage()
if not path.is_dir():
raise CleanError(f'Directory not found: "{path}"')
package.path = path
with open(Path(path, indexfilename)) as infile:
index = yaml.safe_load(infile)
if not isinstance(index, dict):
raise CleanError(f'Root dict not found in {indexfilename}')
name = index.get('name')
if not isinstance(name, str):
raise CleanError(f'No "name" str found in {indexfilename}')
validate_asset_package_name(name)
package.name = name
assets = index.get('assets')
if not isinstance(assets, dict):
raise CleanError(f'No "assets" dict found in {indexfilename}')
for assetpath, assetdata in assets.items():
validate_asset_path(assetpath)
if not isinstance(assetdata, dict):
raise CleanError(
f'Invalid asset data for {assetpath} in {indexfilename}')
assettypestr = assetdata.get('type')
if not isinstance(assettypestr, str):
raise CleanError(
f'Invalid asset type for {assetpath} in {indexfilename}')
assettype = AssetType(assettypestr)
print('Looking at asset:', assetpath, assetdata)
package.assets[assetpath] = Asset(package, assettype, assetpath)
return package
def get_manifest(self) -> Dict:
"""Build a manifest of hashes and other info for the package."""
import hashlib
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
manifest: Dict = {'name': self.name, 'files': {}}
def _get_asset_info(iasset: Asset) -> Tuple[Asset, Dict]:
sha = hashlib.sha256()
with open(iasset.filepath, 'rb') as infile:
sha.update(infile.read())
if not os.path.isfile(iasset.filepath):
raise Exception(f'Asset file not found: "{iasset.filepath}"')
info_out: Dict = {'hash': sha.hexdigest()}
return iasset, info_out
# Use all procs to hash files for extra speedy goodness.
with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
for result in executor.map(_get_asset_info, self.assets.values()):
asset, info = result
manifest['files'][asset.path] = info
return manifest
class App:
"""Context for a run of the tool."""
def __init__(self) -> None:
self._state = StateData()
def run(self) -> None:
"""Run the tool."""
# Make reasonably sure we're being run from project root.
if not os.path.exists(f'tools/{TOOL_NAME}'):
raise CleanError(
'This tool must be run from ballistica project root.')
# Also run project prereqs checks so we can hopefully inform the user
# of missing python modules/etc. instead of just failing cryptically.
try:
subprocess.run(['make', '--quiet', 'prereqs'], check=True)
except subprocess.CalledProcessError:
raise CleanError('"make prereqs" check failed. '
'Install missing requirements and try again.')
self._load_cache()
if len(sys.argv) < 2:
raise CleanError(
f'Invalid args. Run "cloudtool help" for usage info.')
cmd = sys.argv[1]
if cmd == CMD_LOGIN:
self.do_login()
elif cmd == CMD_LOGOUT:
self.do_logout()
elif cmd == CMD_PUTASSETPACK:
self.do_putassetpack()
else:
# For all other commands, simply pass them to the server verbatim.
self.do_misc_command()
self._save_cache()
def _load_cache(self) -> None:
if not os.path.exists(CACHE_DATA_PATH):
return
try:
with open(CACHE_DATA_PATH, 'r') as infile:
self._state = StateData(**json.loads(infile.read()))
except Exception:
print(CLRRED +
f'Error loading {TOOL_NAME} data; resetting to defaults.' +
CLREND)
def _save_cache(self) -> None:
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True, exist_ok=True)
with open(CACHE_DATA_PATH, 'w') as outfile:
outfile.write(json.dumps(self._state.__dict__))
def _servercmd(self,
cmd: str,
data: Dict,
files: Dict[str, BinaryIO] = None) -> Response:
"""Issue a command to the server and get a response."""
response_raw_2 = requests.post(
(MASTER_SERVER_ADDRESS + '/cloudtoolcmd'),
data={
'c': cmd,
'v': VERSION,
't': json.dumps(self._state.login_token),
'd': json.dumps(data)
},
files=files)
response_raw_2.raise_for_status() # Except if anything went wrong.
assert isinstance(response_raw_2.content, bytes)
output = json.loads(response_raw_2.content.decode())
assert isinstance(output, dict)
assert isinstance(output['m'], (str, type(None)))
assert isinstance(output['e'], (str, type(None)))
assert 'd' in output
response = Response(message=output['m'],
data=output['d'],
error=output['e'])
# Handle errors and print messages;
# (functionality common to all command types).
if response.error is not None:
raise CleanError(response.error)
if response.message is not None:
print(response.message)
return response
def do_login(self) -> None:
"""Run the login command."""
if len(sys.argv) != 3:
raise CleanError('Expected a login code.')
login_code = sys.argv[2]
response = self._servercmd('login', {'c': login_code})
# If the command returned cleanly, we should have a token we can use
# to log in.
token = response.data['logintoken']
assert isinstance(token, str)
aname = response.data['accountname']
assert isinstance(aname, str)
print(f'{CLRGRN}Now logged in as {aname}.{CLREND}')
self._state.login_token = token
def do_logout(self) -> None:
"""Run the logout command."""
self._state.login_token = None
print(f'{CLRGRN}Cloudtool is now logged out.{CLREND}')
def do_putassetpack(self) -> None:
"""Run a putassetpack command."""
if len(sys.argv) != 3:
raise CleanError('Expected a path to an assetpackage directory.')
path = Path(sys.argv[2])
package = AssetPackage.load_from_disk(path)
# Send the server a manifest of everything we've got locally.
manifest = package.get_manifest()
print('SENDING PACKAGE MANIFEST:', manifest)
response = self._servercmd('putassetpackmanifest', {'m': manifest})
# The server should give us an upload id and a set of files it wants.
# Upload each of those.
upload_files: List[str] = response.data['upload_files']
assert isinstance(upload_files, list)
assert all(isinstance(f, str) for f in upload_files)
self._putassetpack_upload(package, upload_files)
print('Asset upload successful!')
def _putassetpack_upload(self, package: AssetPackage,
files: List[str]) -> None:
# Upload the files one at a time.
# (we can potentially do this in parallel in the future).
for fnum, fname in enumerate(files):
print(
f'{CLRBLU}Uploading file {fnum+1} of {len(files)}: '
f'"{fname}"...{CLREND}',
flush=True)
with tempfile.TemporaryDirectory() as tempdir:
asset = package.assets[fname]
srcpath = Path(asset.filepath)
gzpath = Path(tempdir, 'file.gz')
subprocess.run(f'gzip --stdout "{srcpath}" > "{gzpath}"',
shell=True,
check=True)
with open(gzpath, 'rb') as infile:
putfiles: Dict = {'file': infile}
_response = self._servercmd('putassetpackupload',
{'path': asset.path},
files=putfiles)
def do_misc_command(self) -> None:
"""Run a miscellaneous command."""
# We don't do anything special with the response here; the normal
# error-handling/message-printing is all that happens.
self._servercmd('misc', {'a': sys.argv[1:]})
if __name__ == '__main__':
try:
App().run()
except CleanError as exc:
if str(exc):
print(f'{CLRRED}{exc}{CLREND}')
sys.exit(-1)