Skip to content

Instantly share code, notes, and snippets.

@osandov
Created February 26, 2024 22:46
Show Gist options
  • Save osandov/844de01f8322527f66046274e292ae0d to your computer and use it in GitHub Desktop.
Save osandov/844de01f8322527f66046274e292ae0d to your computer and use it in GitHub Desktop.
/* Proof of concept of random access in xz-compressed RPMs to speed up
debuginfod.
Copyright (c) 2024 Meta Platforms, Inc. and affiliates.
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the Free
Software Foundation, either version 3 of the License, or (at your option)
any later version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <https://www.gnu.org/licenses/>. */
#include <cassert>
#include <cerrno>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <archive.h>
#include <archive_entry.h>
#include <fcntl.h>
#include <limits.h>
#include <lzma.h>
using namespace std;
// Infrastructure copied from debuginfod.
struct reportable_exception
{
int code;
string message;
reportable_exception(int c, const string& m): code(c), message(m) {}
reportable_exception(const string& m): code(503), message(m) {}
reportable_exception(): code(503), message() {}
};
struct libc_exception: public reportable_exception
{
libc_exception(int rc, const string& msg):
reportable_exception(string("libc error: ") + msg + ": " + string(strerror(rc) ?: "?")) {}
};
struct archive_exception: public reportable_exception
{
archive_exception(const string& msg):
reportable_exception(string("libarchive error: ") + msg) {}
archive_exception(struct archive* a, const string& msg):
reportable_exception(string("libarchive error: ") + msg + ": " + string(archive_error_string(a) ?: "?")) {}
};
template <class Payload, class Ignore>
struct defer_dtor
{
public:
typedef Ignore (*dtor_fn) (Payload);
private:
Payload p;
dtor_fn fn;
public:
defer_dtor(Payload _p, dtor_fn _fn): p(_p), fn(_fn) {}
~defer_dtor() { (void) (*fn)(p); }
private:
defer_dtor(const defer_dtor<Payload,Ignore>&); // make uncopyable
defer_dtor& operator=(const defer_dtor<Payload,Ignore> &); // make unassignable
};
struct lzma_exception: public reportable_exception
{
lzma_exception(int rc, const string& msg):
reportable_exception(string ("lzma error: ") + msg + ": error " + to_string(rc)) {}
};
// Scan through an archive and populate FILE_MAP with a mapping from pathname
// to offset of the file contents from the beginning of the xz stream and the
// file size. (In debuginfod, this would instead be indexed on the build ID
// and stored in the database.)
static void
index_archive (const char* path,
unordered_map<string, pair<uint64_t, uint64_t> >& file_map)
{
struct archive* a = archive_read_new ();
if (a == NULL)
throw archive_exception ("cannot create archive reader");
defer_dtor<struct archive*,int> archive_read_freer (a, archive_read_free);
archive_read_support_filter_all (a);
archive_read_support_format_all (a);
int r = archive_read_open_filename (a, path, 10240);
if (r != ARCHIVE_OK)
throw archive_exception (a, "cannot open archive file");
// Only RPMs for now. xz-compressed deb files could probably also be
// supported.
if (archive_filter_count (a) != 3
|| archive_filter_code (a, 0) != ARCHIVE_FILTER_XZ
|| archive_filter_code (a, 1) != ARCHIVE_FILTER_RPM)
throw reportable_exception ("not an xz-compressed rpm");
struct archive_entry* e;
while (archive_read_next_header (a, &e) == ARCHIVE_OK)
{
if (! S_ISREG (archive_entry_mode (e)))
continue;
const char* pathname = archive_entry_pathname (e);
if (pathname[0] == '.' && pathname[1] == '/')
pathname++;
uint64_t offset = archive_filter_bytes (a, 0);
uint64_t size = archive_entry_size (e);
file_map.emplace (make_pair (pathname, make_pair (offset, size)));
}
}
// Read the index of an xz file.
//
// The xz format [1] ends with an index of independently compressed blocks in
// the stream.
// 1: https://xz.tukaani.org/format/xz-file-format.txt
static lzma_index*
read_xz_index (int fd)
{
off_t footer_pos = -LZMA_STREAM_HEADER_SIZE;
if (lseek (fd, footer_pos, SEEK_END) == -1)
throw libc_exception (errno, "lseek");
uint8_t footer[LZMA_STREAM_HEADER_SIZE];
size_t footer_read = 0;
while (footer_read < sizeof (footer))
{
ssize_t bytes_read = read (fd, footer + footer_read,
sizeof (footer) - footer_read);
if (bytes_read < 0)
{
if (errno == EINTR)
continue;
throw libc_exception (errno, "read");
}
if (bytes_read == 0)
throw reportable_exception ("truncated file");
footer_read += bytes_read;
}
lzma_stream_flags stream_flags;
lzma_ret ret = lzma_stream_footer_decode (&stream_flags, footer);
if (ret != LZMA_OK)
throw lzma_exception (ret, "lzma_stream_footer_decode");
if (lseek (fd, footer_pos - stream_flags.backward_size, SEEK_END) == -1)
throw libc_exception (errno, "lseek");
lzma_stream strm = LZMA_STREAM_INIT;
lzma_index* index = NULL;
// Arbitrary limit copied from librpm.
static const uint64_t MEM_LIMIT = 100 << 20;
ret = lzma_index_decoder (&strm, &index, MEM_LIMIT);
if (ret != LZMA_OK)
throw lzma_exception (ret, "lzma_index_decoder");
defer_dtor<lzma_stream*,void> strm_ender (&strm, lzma_end);
uint8_t in_buf[4096];
while (true)
{
if (strm.avail_in == 0)
{
ssize_t bytes_read = read (fd, in_buf, sizeof (in_buf));
if (bytes_read < 0)
{
if (errno == EINTR)
continue;
throw libc_exception (errno, "read");
}
if (bytes_read == 0)
throw reportable_exception ("truncated file");
strm.avail_in = bytes_read;
strm.next_in = in_buf;
}
ret = lzma_code (&strm, LZMA_RUN);
if (ret == LZMA_STREAM_END)
break;
else if (ret != LZMA_OK)
throw lzma_exception (ret, "lzma_code");
}
ret = lzma_index_stream_flags (index, &stream_flags);
if (ret != LZMA_OK)
{
lzma_index_end (index, NULL);
throw lzma_exception (ret, "lzma_index_stream_flags");
}
return index;
}
static void
my_lzma_index_end (lzma_index* index)
{
lzma_index_end (index, NULL);
}
static void
free_lzma_block_filter_options (lzma_block* block)
{
for (int i = 0; i < LZMA_FILTERS_MAX; i++)
{
free (block->filters[i].options);
block->filters[i].options = NULL;
}
}
static void
free_lzma_block_filters (lzma_block* block)
{
if (block->filters != NULL)
{
free_lzma_block_filter_options (block);
free (block->filters);
}
}
// Extract a range of bytes from the xz file at PATH to DEST. OFFSET is the
// number of bytes from the beginning of the decompressed data to extract. SIZE
// is the number of decompressed bytes to extract.
static void
extract_partial_xz (const char* path, uint64_t offset, uint64_t size,
FILE* dest)
{
int fd = open (path, O_RDONLY);
if (fd < 0)
throw libc_exception (errno, path);
defer_dtor<int,int> fd_closer (fd, close);
lzma_index* index = read_xz_index (fd);
defer_dtor<lzma_index*,void> index_ender (index, my_lzma_index_end);
// Find the block containing the start offset.
lzma_index_iter iter;
lzma_index_iter_init (&iter, index);
if (lzma_index_iter_locate (&iter, offset))
throw reportable_exception ("offset not found");
offset -= iter.block.uncompressed_file_offset;
// Seek to the block. Seeking from the end using the compressed size from the
// footer means we don't need to know where the xz stream starts in the
// archive.
if (lseek (fd,
iter.block.compressed_stream_offset - iter.stream.compressed_size,
SEEK_END) == -1)
throw libc_exception (errno, "lseek");
lzma_block block{};
block.filters = (lzma_filter*) calloc (LZMA_FILTERS_MAX + 1,
sizeof (lzma_filter));
if (block.filters == NULL)
throw libc_exception (errno, "calloc");
defer_dtor<lzma_block*,void> filters_freer (&block, free_lzma_block_filters);
// Decode blocks until we've extracted the whole range.
uint8_t in_buf[4096];
uint8_t out_buf[4096];
size_t header_read = 0;
while (true)
{
// The first byte of the block encodes the header size. Read the first
// byte and whatever extra fits in the buffer.
while (header_read == 0)
{
ssize_t bytes_read = read (fd, in_buf, sizeof (in_buf));
if (bytes_read < 0)
{
if (errno == EINTR)
continue;
throw libc_exception (errno, "read");
}
if (bytes_read == 0)
throw reportable_exception ("truncated file");
header_read += bytes_read;
}
block.header_size = lzma_block_header_size_decode (in_buf[0]);
// If we didn't buffer the whole header earlier, get the rest.
static_assert (lzma_block_header_size_decode (UINT8_MAX) < sizeof (in_buf),
"in_buf is too small");
while (header_read < block.header_size)
{
ssize_t bytes_read = read (fd, in_buf + header_read,
sizeof (in_buf) - header_read);
if (bytes_read < 0)
{
if (errno == EINTR)
continue;
throw libc_exception (errno, "read");
}
if (bytes_read == 0)
throw reportable_exception ("truncated file");
header_read += bytes_read;
}
// Decode the block header.
block.check = iter.stream.flags->check;
lzma_ret ret = lzma_block_header_decode (&block, NULL, in_buf);
if (ret != LZMA_OK)
throw lzma_exception (ret, "lzma_block_header_decode");
ret = lzma_block_compressed_size (&block, iter.block.unpadded_size);
if (ret != LZMA_OK)
throw lzma_exception (ret, "lzma_block_compressed_size");
// Start decoding the block data.
lzma_stream strm = LZMA_STREAM_INIT;
ret = lzma_block_decoder (&strm, &block);
if (ret != LZMA_OK)
throw lzma_exception (ret, "lzma_block_decoder");
defer_dtor<lzma_stream*,void> strm_ender (&strm, lzma_end);
// We might still have some input buffered from when we read the header.
strm.avail_in = header_read - block.header_size;
strm.next_in = in_buf + block.header_size;
strm.avail_out = sizeof (out_buf);
strm.next_out = out_buf;
while (true)
{
if (strm.avail_in == 0)
{
ssize_t bytes_read = read (fd, in_buf, sizeof (in_buf));
if (bytes_read < 0)
{
if (errno == EINTR)
continue;
throw libc_exception (errno, "read");
}
if (bytes_read == 0)
throw reportable_exception ("truncated file");
strm.avail_in = bytes_read;
strm.next_in = in_buf;
}
ret = lzma_code (&strm, LZMA_RUN);
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
throw lzma_exception (ret, "lzma_code");
// Throw away anything we decode until we reach the offset, then
// start writing to the destination.
if (strm.total_out > offset)
{
size_t bytes_to_write = strm.next_out - out_buf;
uint8_t* buf_to_write = out_buf;
// Ignore anything in the buffer before the offset.
if (bytes_to_write > strm.total_out - offset)
{
buf_to_write += bytes_to_write - (strm.total_out - offset);
bytes_to_write = strm.total_out - offset;
}
// Ignore anything after the size.
if (strm.total_out - offset >= size)
bytes_to_write -= strm.total_out - offset - size;
if (fwrite (buf_to_write, 1, bytes_to_write, dest)
< bytes_to_write)
throw libc_exception (errno, "fwrite");
// If we reached the size, we're done.
if (strm.total_out - offset >= size)
return;
}
strm.avail_out = sizeof (out_buf);
strm.next_out = out_buf;
if (ret == LZMA_STREAM_END)
break;
}
// This block didn't have enough data. Go to the next one.
if (lzma_index_iter_next (&iter, LZMA_INDEX_ITER_BLOCK))
throw reportable_exception ("no more blocks");
if (strm.total_out > offset)
size -= strm.total_out - offset;
offset = 0;
// If we had any buffered input left, move it to the beginning of the
// buffer to decode the next block header.
if (strm.avail_in)
{
memmove (in_buf, strm.next_in, strm.avail_in);
header_read = strm.avail_in;
}
free_lzma_block_filter_options (&block);
}
}
int main (int argc, char* argv[])
{
if (argc != 2)
{
cerr << "usage: " << argv[0] << " ARCHIVE" << endl;
return EXIT_FAILURE;
}
const char* archive_path = argv[1];
unordered_map<string, pair<uint64_t, uint64_t> > file_map;
try
{
cerr << "Indexing archive..." << endl;
auto begin_indexing = chrono::steady_clock::now ();
index_archive (archive_path, file_map);
auto end_indexing = chrono::steady_clock::now ();
cerr << "Done indexing archive ("
<< chrono::duration<double>(end_indexing - begin_indexing).count ()
<< " sec)" << endl;
}
catch (const reportable_exception& e)
{
cerr << e.message << endl;
return EXIT_FAILURE;
}
for (string line; getline (cin, line);)
{
auto it = file_map.find (line);
if (it == file_map.end ())
{
cerr << "file not found" << endl;
continue;
}
FILE* output = fopen ("output", "wb");
if (output == NULL)
{
cerr << "output: " << strerror (errno) << endl;
return EXIT_FAILURE;
}
defer_dtor<FILE*,int> output_closer (output, fclose);
auto begin_extracting = chrono::steady_clock::now ();
try
{
extract_partial_xz (archive_path, it->second.first,
it->second.second, output);
}
catch (const reportable_exception& e)
{
cerr << e.message << endl;
continue;
}
auto end_extracting = chrono::steady_clock::now ();
fflush (output);
cerr << "extracted " << line << " to ./output ("
<< chrono::duration<double>(end_extracting - begin_extracting).count ()
<< " sec)" << endl;
}
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment