#!/usr/bin/env python3.7 # Copyright (c) 2011-2019 Eric Froemling # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # ----------------------------------------------------------------------------- """A tool for interacting with ballistica's cloud services. This facilitates workflows such as creating asset-packages, etc. """ from __future__ import annotations import sys import os from pathlib import Path from typing import TYPE_CHECKING from dataclasses import dataclass, asdict import json import subprocess import tempfile import requests if TYPE_CHECKING: from typing import Optional, Dict, Tuple, List, BinaryIO # Version is sent to the master-server with all commands. Can be incremented # if we need to change behavior server-side to go along with client changes. VERSION = 1 TOOL_NAME = 'cloudtool' # Set CLOUDTOOL_LOCAL env var to 1 to test with a locally-run master-server. MASTER_SERVER_ADDRESS = ('http://localhost:23524' if os.environ.get('CLOUDTOOL_LOCAL') == '1' else 'https://bamaster.appspot.com') STATE_DIR = Path('.cache/cloudtool') STATE_DATA_PATH = Path(STATE_DIR, 'state') CLRHDR = '\033[95m' # Header. CLRGRN = '\033[92m' # Green. CLRBLU = '\033[94m' # Glue. CLRRED = '\033[91m' # Red. CLREND = '\033[0m' # End. @dataclass class StateData: """Persistent state data stored to disk.""" login_token: Optional[str] = None # noinspection PyUnresolvedReferences @dataclass class Response: """Response sent from the cloudtool server to the client. Attributes: message: If present, client should print this message. error: If present, client should abort with this error message. loadpackage: If present, client should load this package from a location on disk (arg1) and push its manifest to a server command (arg2) with provided args (arg3). The manifest should be added to the args as 'manifest'. arg4 is the index file name whose contents should be included with the manifest. upload: If present, client should upload the requested files (arg1) from the loaded package to a server command (arg2) with provided args (arg3). Arg4 and arg5 are a server call and args which should be called once all file uploads finish. login: If present, a token that should be stored client-side and passed with subsequent commands. logout: If True, any existing client-side token should be discarded. """ message: Optional[str] = None error: Optional[str] = None loadpackage: Optional[Tuple[str, str, Dict, str]] = None upload: Optional[Tuple[List[str], str, Dict, str, Dict]] = None login: Optional[str] = None logout: bool = False class CleanError(Exception): """Exception resulting in a clean error string print and exit.""" def get_tz_offset_seconds() -> float: """Return the offset between utc and local time in seconds.""" import time import datetime tval = time.time() utc_offset = (datetime.datetime.fromtimestamp(tval) - datetime.datetime.utcfromtimestamp(tval)).total_seconds() return utc_offset @dataclass class File: """Represents a single file within a Package.""" filehash: str filesize: int class Package: """Represents a directory of files with some common purpose.""" def __init__(self) -> None: self.path = Path('') self.files: Dict[str, File] = {} @classmethod def load_from_disk(cls, path: Path) -> Package: """Create a package populated from a directory on disk.""" package = Package() if not path.is_dir(): raise CleanError(f'Directory not found: "{path}"') package.path = path packagepathstr = str(path) paths: List[str] = [] # Build the full list of package-relative paths. for basename, _dirnames, filenames in os.walk(path): for filename in filenames: fullname = os.path.join(basename, filename) assert fullname.startswith(packagepathstr) paths.append(fullname[len(packagepathstr) + 1:]) import hashlib from concurrent.futures import ThreadPoolExecutor from multiprocessing import cpu_count def _get_file_info(filepath: str) -> Tuple[str, File]: sha = hashlib.sha256() fullfilepath = os.path.join(packagepathstr, filepath) if not os.path.isfile(fullfilepath): raise Exception(f'File not found: "{fullfilepath}"') with open(fullfilepath, 'rb') as infile: filebytes = infile.read() filesize = len(filebytes) sha.update(filebytes) return (filepath, File(filehash=sha.hexdigest(), filesize=filesize)) # Now use all procs to hash the files efficiently. with ThreadPoolExecutor(max_workers=cpu_count()) as executor: package.files = dict(executor.map(_get_file_info, paths)) return package class App: """Context for a run of the tool.""" def __init__(self) -> None: self._state = StateData() self._package: Optional[Package] = None def run(self) -> None: """Run the tool.""" # Make reasonably sure we're being run from project root. if not os.path.exists(f'tools/{TOOL_NAME}'): raise CleanError( 'This tool must be run from ballistica project root.') # Also run project prereqs checks so we can hopefully inform the user # of missing python modules/etc. instead of just failing cryptically. try: subprocess.run(['make', '--quiet', 'prereqs'], check=True) except subprocess.CalledProcessError: raise CleanError('"make prereqs" check failed. ' 'Install missing requirements and try again.') self._load_state() if len(sys.argv) < 2: print(f'{CLRRED}You must provide one or more arguments.{CLREND}') self.run_command(['help']) raise CleanError() # Simply pass all args to the server and let it do the thing. self.run_command(sys.argv[1:]) self._save_state() def _load_state(self) -> None: if not os.path.exists(STATE_DATA_PATH): return try: with open(STATE_DATA_PATH, 'r') as infile: self._state = StateData(**json.loads(infile.read())) except Exception: print(f'{CLRRED}Error loading {TOOL_NAME} data;' f' resetting to defaults.{CLREND}') def _save_state(self) -> None: if not STATE_DIR.exists(): STATE_DIR.mkdir(parents=True, exist_ok=True) with open(STATE_DATA_PATH, 'w') as outfile: outfile.write(json.dumps(self._state.__dict__)) def _servercmd(self, cmd: str, data: Dict, files: Dict[str, BinaryIO] = None) -> Response: """Issue a command to the server and get a response.""" response_raw_2 = requests.post( (MASTER_SERVER_ADDRESS + '/cloudtoolcmd'), data={ 'c': cmd, 'v': VERSION, 't': json.dumps(self._state.login_token), 'd': json.dumps(data), 'z': get_tz_offset_seconds(), }, files=files) response_raw_2.raise_for_status() # Except if anything went wrong. assert isinstance(response_raw_2.content, bytes) output = json.loads(response_raw_2.content.decode()) # Create a default Response and fill in only attrs we're aware of. # (server may send attrs unknown to older clients) response = Response() for key, val in output.items(): if hasattr(response, key): setattr(response, key, val) # Handle common responses (can move these out of here at some point) if response.error is not None: raise CleanError(response.error) if response.message is not None: print(response.message) return response def _upload_file(self, filename: str, call: str, args: Dict) -> None: print(f'{CLRBLU}Uploading {filename}{CLREND}', flush=True) assert self._package is not None with tempfile.TemporaryDirectory() as tempdir: srcpath = Path(self._package.path, filename) gzpath = Path(tempdir, 'file.gz') subprocess.run(f'gzip --stdout "{srcpath}" > "{gzpath}"', shell=True, check=True) with open(gzpath, 'rb') as infile: putfiles: Dict = {'file': infile} _response = self._servercmd( call, args, files=putfiles, ) def _handle_loadpackage_response( self, response: Response) -> Optional[Tuple[str, Dict]]: assert response.loadpackage is not None assert len(response.loadpackage) == 4 (packagepath, callname, callargs, indexfile) = response.loadpackage assert isinstance(packagepath, str) assert isinstance(callname, str) assert isinstance(callargs, dict) assert isinstance(indexfile, str) self._package = Package.load_from_disk(Path(packagepath)) # Make the remote call they gave us with the package # manifest added in. with Path(self._package.path, indexfile).open() as infile: index = infile.read() callargs['manifest'] = { 'index': index, 'files': { key: asdict(val) for key, val in self._package.files.items() } } return callname, callargs def _handle_upload_response( self, response: Response) -> Optional[Tuple[str, Dict]]: from concurrent.futures import ThreadPoolExecutor assert response.upload is not None assert self._package is not None assert len(response.upload) == 5 (filenames, uploadcmd, uploadargs, completecmd, completeargs) = response.upload assert isinstance(filenames, list) assert isinstance(uploadcmd, str) assert isinstance(uploadargs, dict) assert isinstance(completecmd, str) assert isinstance(completeargs, dict) def _do_filename(filename: str) -> None: self._upload_file(filename, uploadcmd, uploadargs) # Here we can run uploads concurrently if that goes faster... # (should keep an eye on this to make sure its thread safe # and behaves itself) with ThreadPoolExecutor(max_workers=4) as executor: # Convert the generator to a list to trigger any # exceptions that occurred. list(executor.map(_do_filename, filenames)) # Lastly, run the 'upload complete' command we were passed. return completecmd, completeargs def run_command(self, args: List[str]) -> None: """Run a command to completion.""" nextcall: Optional[Tuple[str, Dict]] = ('toplevel', {'a': args}) # Now talk to the server in a loop until they are done with us. while nextcall is not None: response = self._servercmd(*nextcall) nextcall = None if response.loadpackage is not None: nextcall = self._handle_loadpackage_response(response) if response.upload is not None: nextcall = self._handle_upload_response(response) if response.login is not None: self._state.login_token = response.login if response.logout: self._state.login_token = None if __name__ == '__main__': try: App().run() except CleanError as exc: if str(exc): print(f'{CLRRED}{exc}{CLREND}') sys.exit(-1)