ballistica/tools/bacloud
2020-03-09 20:38:37 -07:00

454 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3.7
# Copyright (c) 2011-2020 Eric Froemling
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------
"""A tool for interacting with ballistica's cloud services.
This facilitates workflows such as creating asset-packages, etc.
"""
from __future__ import annotations
import sys
import os
from pathlib import Path
from typing import TYPE_CHECKING
from dataclasses import dataclass
import json
import subprocess
import tempfile
import requests
if TYPE_CHECKING:
from typing import Optional, Dict, Tuple, List, BinaryIO
# Version is sent to the master-server with all commands. Can be incremented
# if we need to change behavior server-side to go along with client changes.
VERSION = 1
TOOL_NAME = 'bacloud'
# Set BACLOUD_SERVER env var to LOCAL to talk to a locally-run master-server.
# Set it to TEST to talk to the 'test' app-engine service.
MASTER_SERVER_ADDRESS = (
'http://localhost:23524' if os.environ.get('BACLOUD_SERVER') == 'LOCAL'
else 'https://1-dot-test-dot-bamaster.appspot.com' if os.environ.get(
'BACLOUD_SERVER') == 'TEST' else 'https://bamaster.appspot.com')
CLRHDR = '\033[95m' # Header.
CLRGRN = '\033[92m' # Green.
CLRBLU = '\033[94m' # Glue.
CLRRED = '\033[91m' # Red.
CLREND = '\033[0m' # End.
@dataclass
class StateData:
"""Persistent state data stored to disk."""
login_token: Optional[str] = None
# noinspection PyUnresolvedReferences
@dataclass
class Response:
"""Response sent from the bacloud server to the client.
Attributes:
message: If present, client should print this message before any other
response processing (including error handling) occurs.
message_end: end arg for message print() call.
error: If present, client should abort with this error message.
login: If present, a token that should be stored client-side and passed
with subsequent commands.
logout: If True, any existing client-side token should be discarded.
dir_manifest: If present, client should generate a manifest of this dir.
It should be added to endcommand args as 'manifest'.
uploads: If present, client should upload the requested files (arg1)
individually to a server command (arg2) with provided args (arg3).
uploads_inline: If present, a list of pathnames that should be base64
gzipped and uploaded to an 'uploads_inline' dict in endcommand args.
This should be limited to relatively small files.
downloads_inline: If present, pathnames mapped to base64 gzipped data to
be written to the client. This should only be used for relatively
small files as they are all included inline as part of the response.
deletes: If present, file paths that should be deleted on the client.
dir_prune_empty: If present, all empty dirs under this one should be
removed.
open_url: If present, url to display to the user.
input_prompt: If present, a line of input is read and placed into
endcommand args as 'input'. The first value is the prompt printed
before reading and the second is whether it should be read as a
password (without echoing to the terminal).
end_message: If present, a message that should be printed after all other
response processing is done.
end_message_end: end arg for end_message print() call.
end_command: If present, this command is run with these args at the end
of response processing.
"""
message: Optional[str] = None
message_end: str = '\n'
error: Optional[str] = None
login: Optional[str] = None
logout: bool = False
dir_manifest: Optional[str] = None
uploads: Optional[Tuple[List[str], str, Dict]] = None
uploads_inline: Optional[List[str]] = None
downloads_inline: Optional[Dict[str, str]] = None
deletes: Optional[List[str]] = None
dir_prune_empty: Optional[str] = None
open_url: Optional[str] = None
input_prompt: Optional[Tuple[str, bool]] = None
end_message: Optional[str] = None
end_message_end: str = '\n'
end_command: Optional[Tuple[str, Dict]] = None
class CleanError(Exception):
"""Exception resulting in a clean error string print and exit."""
def get_tz_offset_seconds() -> float:
"""Return the offset between utc and local time in seconds."""
import time
import datetime
tval = time.time()
utc_offset = (datetime.datetime.fromtimestamp(tval) -
datetime.datetime.utcfromtimestamp(tval)).total_seconds()
return utc_offset
@dataclass
class DirManifestFile:
"""Represents a single file within a DirManifest."""
filehash: str
filesize: int
class DirManifest:
"""Represents a directory of files with some common purpose."""
def __init__(self) -> None:
self.path = Path('')
self.files: Dict[str, DirManifestFile] = {}
@classmethod
def load_from_disk(cls, path: Path) -> DirManifest:
"""Create a package populated from a directory on disk."""
package = DirManifest()
package.path = path
packagepathstr = str(path)
paths: List[str] = []
# Simply return empty manifests if the given path isn't a dir.
# (the server may intend to create it and is just asking what's
# there already)
if path.is_dir():
# Build the full list of package-relative paths.
for basename, _dirnames, filenames in os.walk(path):
for filename in filenames:
fullname = os.path.join(basename, filename)
assert fullname.startswith(packagepathstr)
paths.append(fullname[len(packagepathstr) + 1:])
import hashlib
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
def _get_file_info(filepath: str) -> Tuple[str, DirManifestFile]:
sha = hashlib.sha256()
fullfilepath = os.path.join(packagepathstr, filepath)
if not os.path.isfile(fullfilepath):
raise Exception(f'File not found: "{fullfilepath}"')
with open(fullfilepath, 'rb') as infile:
filebytes = infile.read()
filesize = len(filebytes)
sha.update(filebytes)
return (filepath,
DirManifestFile(filehash=sha.hexdigest(),
filesize=filesize))
# Now use all procs to hash the files efficiently.
with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
package.files = dict(executor.map(_get_file_info, paths))
return package
class App:
"""Context for a run of the tool."""
def __init__(self) -> None:
self._state = StateData()
self._project_root: Optional[Path] = None
self._end_command_args: Dict = {}
def run(self) -> None:
"""Run the tool."""
# Make sure we can locate the project bacloud is being run from.
self._project_root = Path(sys.argv[0]).parents[1]
if not all(
Path(self._project_root, name).is_dir()
for name in ('tools', 'config', 'tests')):
raise CleanError('Unable to locate project directory.')
# Also run project prereqs checks so we can hopefully inform the user
# of missing python modules/etc. instead of just failing cryptically.
try:
subprocess.run(['make', '--quiet', 'prereqs'],
check=True,
cwd=self._project_root)
except subprocess.CalledProcessError:
raise CleanError('"make prereqs" check failed. '
'Install missing requirements and try again.')
self._load_state()
# Simply pass all args to the server and let it do the thing.
self.run_user_command(sys.argv[1:])
self._save_state()
@property
def _state_dir(self) -> Path:
"""The full path to the state dir."""
assert self._project_root is not None
return Path(self._project_root, '.cache/bacloud')
@property
def _state_data_path(self) -> Path:
"""The full path to the state data file."""
return Path(self._state_dir, 'state')
def _load_state(self) -> None:
if not os.path.exists(self._state_data_path):
return
try:
with open(self._state_data_path, 'r') as infile:
self._state = StateData(**json.loads(infile.read()))
except Exception:
print(f'{CLRRED}Error loading {TOOL_NAME} data;'
f' resetting to defaults.{CLREND}')
def _save_state(self) -> None:
if not self._state_dir.exists():
self._state_dir.mkdir(parents=True, exist_ok=True)
with open(self._state_data_path, 'w') as outfile:
outfile.write(json.dumps(self._state.__dict__))
def _servercmd(self,
cmd: str,
data: Dict,
files: Dict[str, BinaryIO] = None) -> Response:
"""Issue a command to the server and get a response."""
response_raw_2 = requests.post(
(MASTER_SERVER_ADDRESS + '/bacloudcmd'),
data={
'c': cmd,
'v': VERSION,
't': json.dumps(self._state.login_token),
'd': json.dumps(data),
'z': get_tz_offset_seconds(),
'y': int(sys.stdout.isatty()),
},
files=files)
response_raw_2.raise_for_status() # Except if anything went wrong.
assert isinstance(response_raw_2.content, bytes)
output = json.loads(response_raw_2.content.decode())
# Create a default Response and fill in only attrs we're aware of.
# (newer server may send us attrs we're unaware of)
response = Response()
for key, val in output.items():
if hasattr(response, key):
setattr(response, key, val)
# Handle a few things inline.
# (so this functionality is available even to recursive commands, etc.)
if response.message is not None:
print(response.message, end=response.message_end, flush=True)
if response.error is not None:
raise CleanError(response.error)
return response
def _upload_file(self, filename: str, call: str, args: Dict) -> None:
print(f'{CLRBLU}Uploading {filename}{CLREND}', flush=True)
with tempfile.TemporaryDirectory() as tempdir:
srcpath = Path(filename)
gzpath = Path(tempdir, 'file.gz')
subprocess.run(f'gzip --stdout "{srcpath}" > "{gzpath}"',
shell=True,
check=True)
with open(gzpath, 'rb') as infile:
putfiles: Dict = {'file': infile}
_response = self._servercmd(
call,
args,
files=putfiles,
)
def _handle_dir_manifest_response(self, dirmanifest: str) -> None:
from dataclasses import asdict
manifest = DirManifest.load_from_disk(Path(dirmanifest))
# Store the manifest to be included with our next called command.
self._end_command_args['manifest'] = {
'files': {key: asdict(val)
for key, val in manifest.files.items()}
}
def _handle_uploads(self, uploads: Tuple[List[str], str, Dict]) -> None:
from concurrent.futures import ThreadPoolExecutor
assert len(uploads) == 3
filenames, uploadcmd, uploadargs = uploads
assert isinstance(filenames, list)
assert isinstance(uploadcmd, str)
assert isinstance(uploadargs, dict)
def _do_filename(filename: str) -> None:
self._upload_file(filename, uploadcmd, uploadargs)
# Here we can run uploads concurrently if that goes faster...
# (should keep an eye on this to make sure its thread safe
# and behaves itself)
with ThreadPoolExecutor(max_workers=4) as executor:
# Convert the generator to a list to trigger any
# exceptions that occurred.
list(executor.map(_do_filename, filenames))
def _handle_downloads_inline(self, downloads_inline: Dict[str,
str]) -> None:
"""Handle inline file data to be saved to the client."""
import base64
import zlib
for fname, fdata in downloads_inline.items():
dirname = os.path.dirname(fname)
if dirname:
os.makedirs(dirname, exist_ok=True)
data_zipped = base64.b64decode(fdata)
data = zlib.decompress(data_zipped)
with open(fname, 'wb') as outfile:
outfile.write(data)
def _handle_deletes(self, deletes: List[str]) -> None:
"""Handle file deletes."""
for fname in deletes:
os.unlink(fname)
def _handle_uploads_inline(self, uploads_inline: List[str]) -> None:
"""Handle uploading files inline."""
import base64
import zlib
files: Dict[str, str] = {}
for filepath in uploads_inline:
if not os.path.exists(filepath):
raise CleanError(f'File not found: {filepath}')
with open(filepath, 'rb') as infile:
data = infile.read()
data_zipped = zlib.compress(data)
data_base64 = base64.b64encode(data_zipped).decode()
files[filepath] = data_base64
self._end_command_args['uploads_inline'] = files
def _handle_dir_prune_empty(self, prunedir: str) -> None:
"""Handle pruning empty directories."""
# Walk the tree bottom-up so we can properly kill recursive empty dirs.
for basename, dirnames, filenames in os.walk(prunedir, topdown=False):
# It seems that child dirs we kill during the walk are still
# listed when the parent dir is visited, so lets make sure
# to only acknowledge still-existing ones.
dirnames = [
d for d in dirnames
if os.path.exists(os.path.join(basename, d))
]
if not dirnames and not filenames and basename != prunedir:
os.rmdir(basename)
def _handle_open_url(self, url: str) -> None:
import webbrowser
webbrowser.open(url)
def _handle_input_prompt(self, prompt: str, as_password: bool) -> None:
if as_password:
from getpass import getpass
self._end_command_args['input'] = getpass(prompt=prompt)
else:
if prompt:
print(prompt, end='', flush=True)
self._end_command_args['input'] = input()
def run_user_command(self, args: List[str]) -> None:
"""Run a single user command to completion."""
# pylint: disable=too-many-branches
nextcall: Optional[Tuple[str, Dict]] = ('user', {'a': args})
# Now talk to the server in a loop until there's nothing left to do.
while nextcall is not None:
self._end_command_args = {}
response = self._servercmd(*nextcall)
nextcall = None
if response.login is not None:
self._state.login_token = response.login
if response.logout:
self._state.login_token = None
if response.dir_manifest is not None:
self._handle_dir_manifest_response(response.dir_manifest)
if response.uploads_inline is not None:
self._handle_uploads_inline(response.uploads_inline)
if response.uploads is not None:
self._handle_uploads(response.uploads)
if response.downloads_inline:
self._handle_downloads_inline(response.downloads_inline)
if response.deletes:
self._handle_deletes(response.deletes)
if response.dir_prune_empty:
self._handle_dir_prune_empty(response.dir_prune_empty)
if response.open_url is not None:
self._handle_open_url(response.open_url)
if response.input_prompt is not None:
self._handle_input_prompt(prompt=response.input_prompt[0],
as_password=response.input_prompt[1])
if response.end_message is not None:
print(response.end_message,
end=response.end_message_end,
flush=True)
if response.end_command is not None:
nextcall = response.end_command
for key, val in self._end_command_args.items():
nextcall[1][key] = val
if __name__ == '__main__':
try:
App().run()
except KeyboardInterrupt:
# Let's do a clean fail on keyboard interrupt.
# Can make this optional if a backtrace is ever useful..
sys.exit(-1)
except CleanError as exc:
if str(exc):
print(f'{CLRRED}{exc}{CLREND}')
sys.exit(-1)