#!/usr/bin/env python3.7 # Copyright (c) 2011-2019 Eric Froemling # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # ----------------------------------------------------------------------------- """A tool for interacting with ballistica's cloud services. This facilitates workflows such as creating asset-packages, etc. """ from __future__ import annotations import sys import os from enum import Enum from pathlib import Path from typing import TYPE_CHECKING from dataclasses import dataclass import json import subprocess import tempfile import requests if TYPE_CHECKING: from typing import Optional, Dict, Any, Tuple, List, BinaryIO # Version is sent to the master-server with all commands. Can be incremented # if we need to change behavior server-side to go along with client changes. VERSION = 1 TOOL_NAME = 'cloudtool' # Set CLOUDTOOL_LOCAL env var to 1 to test with a locally-run master-server. MASTER_SERVER_ADDRESS = ('http://localhost:23524' if os.environ.get('CLOUDTOOL_LOCAL') == '1' else 'https://bamaster.appspot.com') USER_AGENT_STRING = 'cloudtool' CACHE_DIR = Path('.cache/cloudtool') CACHE_DATA_PATH = Path(CACHE_DIR, 'state') CLRHDR = '\033[95m' # Header. CLRGRN = '\033[92m' # Green. CLRBLU = '\033[94m' # Glue. CLRRED = '\033[91m' # Red. CLREND = '\033[0m' # End. CMD_LOGIN = 'login' CMD_LOGOUT = 'logout' CMD_PUTASSETPACK = 'putassetpack' CMD_HELP = 'help' ASSET_PACKAGE_NAME_VALID_CHARS = 'abcdefghijklmnopqrstuvwxyz0123456789_' ASSET_PACKAGE_NAME_MAX_LENGTH = 32 ASSET_PATH_VALID_CHARS = 'abcdefghijklmnopqrstuvwxyz0123456789_' ASSET_PATH_MAX_LENGTH = 128 @dataclass class StateData: """Persistent state data stored to disk.""" login_token: Optional[str] = None @dataclass class Response: """Response data from the master server for a command.""" message: Optional[str] error: Optional[str] data: Any class CleanError(Exception): """Exception resulting in a clean error string print and exit.""" class AssetType(Enum): """Types for asset files.""" TEXTURE = 'texture' SOUND = 'sound' class Asset: """Data for a single asset.""" def __init__(self, package: AssetPackage, assettype: AssetType, path: str) -> None: self.assettype = assettype self.path = path exts = {AssetType.TEXTURE: '.png', AssetType.SOUND: '.wav'} self.filepath = os.path.join(package.path, path + exts[assettype]) # Note to self: keep this synced with server-side validation func... def validate_asset_package_name(name: str) -> None: """Throw an exception on an invalid asset-package name.""" if len(name) > ASSET_PACKAGE_NAME_MAX_LENGTH: raise CleanError(f'Asset package name is too long: "{name}"') if not name: raise CleanError(f'Asset package name cannot be empty.') if name[0] == '_' or name[-1] == '_': raise CleanError( f'Asset package name cannot start or end with underscore.') if '__' in name: raise CleanError( f'Asset package name cannot contain sequential underscores.') for char in name: if char not in ASSET_PACKAGE_NAME_VALID_CHARS: raise CleanError( f'Found invalid char "{char}" in asset package name "{name}".') # Note to self: keep this synced with server-side validation func... def validate_asset_path(path: str) -> None: """Throw an exception on an invalid asset path.""" if len(path) > ASSET_PATH_MAX_LENGTH: raise CleanError(f'Asset path is too long: "{path}"') names = path.split('/') for name in names: if not name: raise CleanError(f'Found empty component in asset path "{path}".') for char in name: if char not in ASSET_PATH_VALID_CHARS: raise CleanError( f'Found invalid char "{char}" in asset path "{path}".') class AssetPackage: """Data for local or remote asset packages.""" def __init__(self) -> None: self.assets: Dict[str, Asset] = {} self.path = Path('') self.name = 'untitled' @classmethod def load_from_disk(cls, path: Path) -> AssetPackage: """Load an asset package from files on disk.""" import yaml indexfilename = 'assetpackage.yaml' package = AssetPackage() if not path.is_dir(): raise CleanError(f'Directory not found: "{path}"') package.path = path with open(Path(path, indexfilename)) as infile: index = yaml.safe_load(infile) if not isinstance(index, dict): raise CleanError(f'Root dict not found in {indexfilename}') name = index.get('name') if not isinstance(name, str): raise CleanError(f'No "name" str found in {indexfilename}') validate_asset_package_name(name) package.name = name assets = index.get('assets') if not isinstance(assets, dict): raise CleanError(f'No "assets" dict found in {indexfilename}') for assetpath, assetdata in assets.items(): validate_asset_path(assetpath) if not isinstance(assetdata, dict): raise CleanError( f'Invalid asset data for {assetpath} in {indexfilename}') assettypestr = assetdata.get('type') if not isinstance(assettypestr, str): raise CleanError( f'Invalid asset type for {assetpath} in {indexfilename}') assettype = AssetType(assettypestr) print('Looking at asset:', assetpath, assetdata) package.assets[assetpath] = Asset(package, assettype, assetpath) return package def get_manifest(self) -> Dict: """Build a manifest of hashes and other info for the package.""" import hashlib from concurrent.futures import ThreadPoolExecutor from multiprocessing import cpu_count manifest: Dict = {'name': self.name, 'files': {}} def _get_asset_info(iasset: Asset) -> Tuple[Asset, Dict]: sha = hashlib.sha256() with open(iasset.filepath, 'rb') as infile: sha.update(infile.read()) if not os.path.isfile(iasset.filepath): raise Exception(f'Asset file not found: "{iasset.filepath}"') info_out: Dict = {'hash': sha.hexdigest()} return iasset, info_out # Use all procs to hash files for extra speedy goodness. with ThreadPoolExecutor(max_workers=cpu_count()) as executor: for result in executor.map(_get_asset_info, self.assets.values()): asset, info = result manifest['files'][asset.path] = info return manifest class App: """Context for a run of the tool.""" def __init__(self) -> None: self._state = StateData() def run(self) -> None: """Run the tool.""" # Make reasonably sure we're being run from project root. if not os.path.exists(f'tools/{TOOL_NAME}'): raise CleanError( 'This tool must be run from ballistica project root.') # Also run project prereqs checks so we can hopefully inform the user # of missing python modules/etc. instead of just failing cryptically. try: subprocess.run(['make', '--quiet', 'prereqs'], check=True) except subprocess.CalledProcessError: raise CleanError('"make prereqs" check failed. ' 'Install missing requirements and try again.') self._load_cache() if len(sys.argv) < 2: raise CleanError( f'Invalid args. Run "cloudtool help" for usage info.') cmd = sys.argv[1] if cmd == CMD_LOGIN: self.do_login() elif cmd == CMD_LOGOUT: self.do_logout() elif cmd == CMD_PUTASSETPACK: self.do_putassetpack() else: # For all other commands, simply pass them to the server verbatim. self.do_misc_command() self._save_cache() def _load_cache(self) -> None: if not os.path.exists(CACHE_DATA_PATH): return try: with open(CACHE_DATA_PATH, 'r') as infile: self._state = StateData(**json.loads(infile.read())) except Exception: print(CLRRED + f'Error loading {TOOL_NAME} data; resetting to defaults.' + CLREND) def _save_cache(self) -> None: if not CACHE_DIR.exists(): CACHE_DIR.mkdir(parents=True, exist_ok=True) with open(CACHE_DATA_PATH, 'w') as outfile: outfile.write(json.dumps(self._state.__dict__)) def _servercmd(self, cmd: str, data: Dict, files: Dict[str, BinaryIO] = None) -> Response: """Issue a command to the server and get a response.""" response_raw_2 = requests.post( (MASTER_SERVER_ADDRESS + '/cloudtoolcmd'), data={ 'c': cmd, 'v': VERSION, 't': json.dumps(self._state.login_token), 'd': json.dumps(data) }, files=files) response_raw_2.raise_for_status() # Except if anything went wrong. assert isinstance(response_raw_2.content, bytes) output = json.loads(response_raw_2.content.decode()) assert isinstance(output, dict) assert isinstance(output['m'], (str, type(None))) assert isinstance(output['e'], (str, type(None))) assert 'd' in output response = Response(message=output['m'], data=output['d'], error=output['e']) # Handle errors and print messages; # (functionality common to all command types). if response.error is not None: raise CleanError(response.error) if response.message is not None: print(response.message) return response def do_login(self) -> None: """Run the login command.""" if len(sys.argv) != 3: raise CleanError('Expected a login code.') login_code = sys.argv[2] response = self._servercmd('login', {'c': login_code}) # If the command returned cleanly, we should have a token we can use # to log in. token = response.data['logintoken'] assert isinstance(token, str) aname = response.data['accountname'] assert isinstance(aname, str) print(f'{CLRGRN}Now logged in as {aname}.{CLREND}') self._state.login_token = token def do_logout(self) -> None: """Run the logout command.""" self._state.login_token = None print(f'{CLRGRN}Cloudtool is now logged out.{CLREND}') def do_putassetpack(self) -> None: """Run a putassetpack command.""" if len(sys.argv) != 3: raise CleanError('Expected a path to an assetpackage directory.') path = Path(sys.argv[2]) package = AssetPackage.load_from_disk(path) # Send the server a manifest of everything we've got locally. manifest = package.get_manifest() print('SENDING PACKAGE MANIFEST:', manifest) response = self._servercmd('putassetpackmanifest', {'m': manifest}) # The server should give us an upload id and a set of files it wants. # Upload each of those. upload_files: List[str] = response.data['upload_files'] assert isinstance(upload_files, list) assert all(isinstance(f, str) for f in upload_files) self._putassetpack_upload(package, upload_files) print('Asset upload successful!') def _putassetpack_upload(self, package: AssetPackage, files: List[str]) -> None: # Upload the files one at a time. # (we can potentially do this in parallel in the future). for fnum, fname in enumerate(files): print( f'{CLRBLU}Uploading file {fnum+1} of {len(files)}: ' f'"{fname}"...{CLREND}', flush=True) with tempfile.TemporaryDirectory() as tempdir: asset = package.assets[fname] srcpath = Path(asset.filepath) gzpath = Path(tempdir, 'file.gz') subprocess.run(f'gzip --stdout "{srcpath}" > "{gzpath}"', shell=True, check=True) with open(gzpath, 'rb') as infile: putfiles: Dict = {'file': infile} _response = self._servercmd('putassetpackupload', {'path': asset.path}, files=putfiles) def do_misc_command(self) -> None: """Run a miscellaneous command.""" # We don't do anything special with the response here; the normal # error-handling/message-printing is all that happens. self._servercmd('misc', {'a': sys.argv[1:]}) if __name__ == '__main__': try: App().run() except CleanError as exc: if str(exc): print(f'{CLRRED}{exc}{CLREND}') sys.exit(-1)