|
"""Command-line tool for retrieving packages from HDFS or a local cache""" |
|
|
|
from __future__ import print_function |
|
|
|
import hashlib |
|
import os |
|
import shutil |
|
import sys |
|
from uuid import uuid4 |
|
|
|
from twitter.common import app, log |
|
from twitter.common.concurrent import deadline, Timeout |
|
from twitter.common.quantity import Amount, Data, Time |
|
from twitter.common.dirutil import du, safe_delete, safe_mtime, touch |
|
from twitter.common.fs import HDFSHelper |
|
from twitter.common.log.options import LogOptions |
|
from apache.aurora.client.base import die |
|
|
|
LogOptions.disable_disk_logging() |
|
LogOptions.set_stderr_log_level('google:ERROR') |
|
|
|
# TODO(zmanji): Places these values in clusters.py |
|
CACHE_DIRECTORY = "/var/tmp/aurora/package_cache/" |
|
# The Cache Directory is created by puppet and has the required permissions. |
|
CACHE_SIZE = Amount(5, Data.GB) |
|
|
|
HADOOP_CONF_DIR = '/etc/hadoop/conf' |
|
|
|
app.set_usage("""%s <md5> <URI> <filename> |
|
|
|
Fetches a package that has the <md5> and resides at the given <URI>. Copies |
|
the package to <filename> in the local directory.""" % sys.argv[0]) |
|
|
|
|
|
def main(args): |
|
if len(args) != 3: |
|
app.error("Invalid number of arguments.") |
|
|
|
md5, uri, filename = args |
|
|
|
log.info("Arguments: md5: %s, uri: %s, filename: %s" % (md5, uri, filename)) |
|
|
|
if not os.path.isdir(CACHE_DIRECTORY): |
|
die("Cache directory does not exist") |
|
|
|
if oct(os.stat(CACHE_DIRECTORY).st_mode & 0777) != '0777': |
|
die("Cache directory has wrong permissions") |
|
|
|
cache_path = os.path.join(CACHE_DIRECTORY, md5) |
|
target_file = os.path.join(os.getcwd(), filename) |
|
|
|
if _cache_fetch(cache_path, md5, target_file): |
|
return |
|
|
|
if not _hadoop_fetch(uri, md5, target_file): |
|
die("Could not fetch from HDFS") |
|
else: |
|
_cache_put(cache_path, target_file) |
|
|
|
if not _verify_fetch(md5, target_file): |
|
die("Fetched file does not match given md5.") |
|
|
|
|
|
def _verify_fetch(md5, filename): |
|
return md5 == _streamed_md5(filename) |
|
|
|
|
|
def _cache_fetch(cache_file, md5, filename): |
|
# Ensure file has desired MD5. |
|
|
|
try: |
|
if _verify_fetch(md5, cache_file): |
|
# This is not a hard link because the cache directory and sandbox might |
|
# be on different partitions. |
|
shutil.copyfile(cache_file, filename) |
|
# Touch the file to increment mtime, needed for cache eviction. |
|
touch(cache_file) |
|
return True |
|
except (IOError, OSError) as e: |
|
log.warning("Failed to fetch from cache: %s" % e) |
|
return False |
|
else: |
|
safe_delete(cache_file) |
|
return False |
|
|
|
|
|
def _cache_put(cache, sandbox): |
|
# This is not a hard link because the cache directory and sandbox might be on different |
|
# partitions. |
|
cache_tmp = cache + uuid4().hex |
|
try: |
|
shutil.copy(sandbox, cache_tmp) |
|
os.rename(cache_tmp, cache) |
|
except (IOError, OSError) as e: |
|
log.warn("Failed to put to cache: %s" % e) |
|
|
|
|
|
def _hadoop_du(uri): |
|
h = HDFSHelper(HADOOP_CONF_DIR) |
|
try: |
|
f = lambda: h.ls(uri)[0][1] |
|
size = deadline(f, timeout=Amount(5, Time.MINUTES), propagate=True) |
|
except HDFSHelper.InternalError as e: |
|
log.error("Failed to query hadoop: %s" % e) |
|
return None |
|
except Timeout as e: |
|
log.error("Failed to query hadoop within 5 minutes: %s" % e) |
|
return None |
|
else: |
|
return Amount(size, Data.BYTES) |
|
|
|
|
|
def _hadoop_fetch(uri, md5, target_path): |
|
target_file_size = _hadoop_du(uri) |
|
|
|
if target_file_size is None: |
|
return False |
|
|
|
_cache_eviction(CACHE_DIRECTORY, CACHE_SIZE, target_file_size) |
|
|
|
h = HDFSHelper(HADOOP_CONF_DIR, heap_limit=Amount(256, Data.MB)) |
|
try: |
|
f = lambda: h.copy_to_local(uri, target_path) |
|
deadline(f, timeout=Amount(5, Time.MINUTES), propagate=True) |
|
return True |
|
except HDFSHelper.InternalError as e: |
|
log.error("Failed to fetch package from hadoop: %s" % e) |
|
return False |
|
except Timeout as e: |
|
log.error("Failed to fetch package from hadoop within 5 minutes: %s" % e) |
|
return False |
|
|
|
|
|
def _cache_eviction(cache_dir, cache_size, target_file_size): |
|
while Amount(du(cache_dir), Data.BYTES) + target_file_size > cache_size: |
|
files = [os.path.join(cache_dir, fn) for fn in os.listdir(cache_dir)] |
|
f = sorted(files, key=safe_mtime)[0] |
|
safe_delete(f) |
|
|
|
|
|
def _streamed_md5(fname): |
|
md5 = hashlib.md5() |
|
with open(fname, 'rb') as fp: |
|
while True: |
|
block = fp.read(16 * 1024) |
|
if not block: |
|
break |
|
md5.update(block) |
|
return md5.hexdigest() |
|
|
|
app.main() |