Skip to content

Instantly share code, notes, and snippets.

@zmanji
Created June 23, 2014 22:15
Show Gist options
  • Save zmanji/f41df77510ef9d00265a to your computer and use it in GitHub Desktop.
Save zmanji/f41df77510ef9d00265a to your computer and use it in GitHub Desktop.
Downloading and Caching Task Binaries in Apache Aurora

Each task in Apache Aurora is started in an empty sandbox and a common first process for any task will be to fetch the binaries needed for the task. At Twitter binaries are fetched from an HDFS cluster similar to how Mesos executors are fetched from HDFS. If binaries for a task change rarely, this puts unnecessary stress on the HDFS cluster and ties task creation to the availability of the HDFS cluster. (See MESOS-336 for discussion on fetching Mesos executors).

The attached file package_cache.py serves to mitigate this problem by caching fetched binaries from HDFS on the slave file system, allowing subsequent task starts on the same slave to not depend on the availability of HDFS. It has been extracted from code used at Twitter and serves to be an example of how this issue can be addressed.

It is designed to be invoked from the Aurora task sandbox where the first process can specify the md5 of the binary, HDFS URI of the binary and the desired filename of the binary. The program will either fetch the file from the cache (and verify the md5) or fetch the binary from HDFS and place it in the cache. It will them copy the binary into the current working directory with the desired filename.

It expects a world readable/writable directory to be maintained outside of the sandboxes and uses this directory to hold the cache of binaries. It will automatically prune the cache when it reaches a specified size. It uses the given md5 of the binary to ensure the binary has not been tampered with by a rogue process.

"""Command-line tool for retrieving packages from HDFS or a local cache"""
from __future__ import print_function
import hashlib
import os
import shutil
import sys
from uuid import uuid4
from twitter.common import app, log
from twitter.common.concurrent import deadline, Timeout
from twitter.common.quantity import Amount, Data, Time
from twitter.common.dirutil import du, safe_delete, safe_mtime, touch
from twitter.common.fs import HDFSHelper
from twitter.common.log.options import LogOptions
from apache.aurora.client.base import die
LogOptions.disable_disk_logging()
LogOptions.set_stderr_log_level('google:ERROR')
# TODO(zmanji): Places these values in clusters.py
CACHE_DIRECTORY = "/var/tmp/aurora/package_cache/"
# The Cache Directory is created by puppet and has the required permissions.
CACHE_SIZE = Amount(5, Data.GB)
HADOOP_CONF_DIR = '/etc/hadoop/conf'
app.set_usage("""%s <md5> <URI> <filename>
Fetches a package that has the <md5> and resides at the given <URI>. Copies
the package to <filename> in the local directory.""" % sys.argv[0])
def main(args):
if len(args) != 3:
app.error("Invalid number of arguments.")
md5, uri, filename = args
log.info("Arguments: md5: %s, uri: %s, filename: %s" % (md5, uri, filename))
if not os.path.isdir(CACHE_DIRECTORY):
die("Cache directory does not exist")
if oct(os.stat(CACHE_DIRECTORY).st_mode & 0777) != '0777':
die("Cache directory has wrong permissions")
cache_path = os.path.join(CACHE_DIRECTORY, md5)
target_file = os.path.join(os.getcwd(), filename)
if _cache_fetch(cache_path, md5, target_file):
return
if not _hadoop_fetch(uri, md5, target_file):
die("Could not fetch from HDFS")
else:
_cache_put(cache_path, target_file)
if not _verify_fetch(md5, target_file):
die("Fetched file does not match given md5.")
def _verify_fetch(md5, filename):
return md5 == _streamed_md5(filename)
def _cache_fetch(cache_file, md5, filename):
# Ensure file has desired MD5.
try:
if _verify_fetch(md5, cache_file):
# This is not a hard link because the cache directory and sandbox might
# be on different partitions.
shutil.copyfile(cache_file, filename)
# Touch the file to increment mtime, needed for cache eviction.
touch(cache_file)
return True
except (IOError, OSError) as e:
log.warning("Failed to fetch from cache: %s" % e)
return False
else:
safe_delete(cache_file)
return False
def _cache_put(cache, sandbox):
# This is not a hard link because the cache directory and sandbox might be on different
# partitions.
cache_tmp = cache + uuid4().hex
try:
shutil.copy(sandbox, cache_tmp)
os.rename(cache_tmp, cache)
except (IOError, OSError) as e:
log.warn("Failed to put to cache: %s" % e)
def _hadoop_du(uri):
h = HDFSHelper(HADOOP_CONF_DIR)
try:
f = lambda: h.ls(uri)[0][1]
size = deadline(f, timeout=Amount(5, Time.MINUTES), propagate=True)
except HDFSHelper.InternalError as e:
log.error("Failed to query hadoop: %s" % e)
return None
except Timeout as e:
log.error("Failed to query hadoop within 5 minutes: %s" % e)
return None
else:
return Amount(size, Data.BYTES)
def _hadoop_fetch(uri, md5, target_path):
target_file_size = _hadoop_du(uri)
if target_file_size is None:
return False
_cache_eviction(CACHE_DIRECTORY, CACHE_SIZE, target_file_size)
h = HDFSHelper(HADOOP_CONF_DIR, heap_limit=Amount(256, Data.MB))
try:
f = lambda: h.copy_to_local(uri, target_path)
deadline(f, timeout=Amount(5, Time.MINUTES), propagate=True)
return True
except HDFSHelper.InternalError as e:
log.error("Failed to fetch package from hadoop: %s" % e)
return False
except Timeout as e:
log.error("Failed to fetch package from hadoop within 5 minutes: %s" % e)
return False
def _cache_eviction(cache_dir, cache_size, target_file_size):
while Amount(du(cache_dir), Data.BYTES) + target_file_size > cache_size:
files = [os.path.join(cache_dir, fn) for fn in os.listdir(cache_dir)]
f = sorted(files, key=safe_mtime)[0]
safe_delete(f)
def _streamed_md5(fname):
md5 = hashlib.md5()
with open(fname, 'rb') as fp:
while True:
block = fp.read(16 * 1024)
if not block:
break
md5.update(block)
return md5.hexdigest()
app.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment