Skip to content

Instantly share code, notes, and snippets.

@abcdabcd987
Last active November 28, 2022 22:57
Show Gist options
  • Save abcdabcd987/d02758106ebc10cc21480b8dbc966281 to your computer and use it in GitHub Desktop.
Save abcdabcd987/d02758106ebc10cc21480b8dbc966281 to your computer and use it in GitHub Desktop.
A filesystem from which reading a file incurs 50ms delay for each byte.
#if 0
set -e
binname=$(mktemp --suffix -delay-fs.bin)
g++ -o "$binname" -Wall -g -O2 "$0" -lfuse3 -lpthread
"$binname" $@
rm "$binname"
exit
#endif
// delay-fs:
// A filesystem from which reading a file incurs 50ms delay for each byte.
//
// Author: Lequn Chen <[email protected]>
//
// This program can be distributed under the terms of the GNU GPLv2.
// Licensing inherits from:
// https://github.com/libfuse/libfuse/blob/fuse-3.12.0/example/hello_ll.c
//
// Example usage:
//
// $ mkdir -p /tmp/delay-fs
//
// $ ./delay-fs.cc /tmp/delay-fs
//
// $ ls -la /tmp/delay-fs/
// total 0
// drwxr-xr-x 2 root root 0 Dec 31 1969 .
// drwxrwxrwt 18 root root 660 Oct 25 17:22 ..
// -r--r--r-- 1 root root 20 Dec 31 1969 counter.txt
// -r--r--r-- 1 root root 27 Dec 31 1969 hello.txt
//
// $ time cat /tmp/delay-fs/hello.txt
// Hello
// World!
// From delay-fs
//
// real 0m1.421s
// user 0m0.002s
// sys 0m0.000s
//
// $ time cat /tmp/delay-fs/counter.txt
// 00000000000000000000
// real 0m1.001s
// user 0m0.000s
// sys 0m0.001s
//
// $ cat /tmp/delay-fs/counter.txt
// 00000000000000000001
//
// $ fusermount3 -u /tmp/delay-fs
//
#include <algorithm>
#include <atomic>
#include <cassert>
#include <cerrno>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_set>
#include <sys/stat.h>
#define FUSE_USE_VERSION 35
#include <fuse3/fuse_lowlevel.h>
const char *const kHelloFname = "hello.txt";
const char *const kHelloText = "Hello\nWorld!\nFrom delay-fs\n";
const char *const kCounterFname = "counter.txt";
const int kCounterWidth = 20;
const int kInoRoot = 1;
const int kInoHello = 2;
const int kInoCounter = 3;
class DelayFs {
public:
void AddRequest(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
fuse_file_info *fi);
uint64_t FetchAddCounter();
void Start();
void Stop();
private:
struct ReadRequest {
fuse_req_t req;
fuse_ino_t ino;
size_t size;
off_t off;
fuse_file_info *fi;
std::chrono::system_clock::time_point timeout;
};
std::mutex mutex_;
std::unordered_set<std::shared_ptr<ReadRequest>> reqs_;
std::atomic<bool> stop_{false};
std::atomic<uint64_t> counter_{0};
std::thread timer_thread_;
void TimerWorker();
};
DelayFs *gDelayFs = nullptr;
int hello_stat(fuse_ino_t ino, struct stat *stbuf) {
stbuf->st_ino = ino;
switch (ino) {
case kInoRoot:
stbuf->st_mode = S_IFDIR | 0755;
stbuf->st_nlink = 2;
break;
case kInoHello:
stbuf->st_mode = S_IFREG | 0444;
stbuf->st_nlink = 1;
stbuf->st_size = strlen(kHelloText);
break;
case kInoCounter:
stbuf->st_mode = S_IFREG | 0444;
stbuf->st_nlink = 1;
stbuf->st_size = kCounterWidth;
break;
default:
return -1;
}
return 0;
}
void hello_ll_getattr(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) {
struct stat stbuf;
memset(&stbuf, 0, sizeof(stbuf));
if (hello_stat(ino, &stbuf) == -1)
fuse_reply_err(req, ENOENT);
else
fuse_reply_attr(req, &stbuf, 0);
}
void hello_ll_lookup(fuse_req_t req, fuse_ino_t parent, const char *name) {
fuse_entry_param e;
memset(&e, 0, sizeof(e));
if (strcmp(name, kHelloFname) == 0) {
e.ino = kInoHello;
} else if (strcmp(name, kCounterFname) == 0) {
e.ino = kInoCounter;
}
if (e.ino) {
hello_stat(e.ino, &e.attr);
fuse_reply_entry(req, &e);
} else {
fuse_reply_err(req, ENOENT);
}
}
struct dirbuf {
char *p;
size_t size;
};
void dirbuf_add(fuse_req_t req, struct dirbuf *b, const char *name,
fuse_ino_t ino) {
struct stat stbuf;
size_t oldsize = b->size;
b->size += fuse_add_direntry(req, NULL, 0, name, NULL, 0);
b->p = (char *)realloc(b->p, b->size);
memset(&stbuf, 0, sizeof(stbuf));
stbuf.st_ino = ino;
fuse_add_direntry(req, b->p + oldsize, b->size - oldsize, name, &stbuf,
b->size);
}
int reply_buf_limited(fuse_req_t req, const char *buf, size_t bufsize,
off_t off, size_t maxsize) {
if (static_cast<size_t>(off) < bufsize)
return fuse_reply_buf(req, buf + off, std::min(bufsize - off, maxsize));
else
return fuse_reply_buf(req, NULL, 0);
}
void hello_ll_readdir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
fuse_file_info *fi) {
(void)fi;
if (ino != 1)
fuse_reply_err(req, ENOTDIR);
else {
struct dirbuf b;
memset(&b, 0, sizeof(b));
dirbuf_add(req, &b, ".", kInoRoot);
dirbuf_add(req, &b, "..", kInoRoot);
dirbuf_add(req, &b, kHelloFname, kInoHello);
dirbuf_add(req, &b, kCounterFname, kInoCounter);
reply_buf_limited(req, b.p, b.size, off, size);
free(b.p);
}
}
void hello_ll_open(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) {
if (ino == kInoRoot) {
fuse_reply_err(req, EISDIR);
return;
}
if ((fi->flags & O_ACCMODE) != O_RDONLY) {
fuse_reply_err(req, EACCES);
return;
}
if (ino == kInoHello) {
fi->direct_io = 1;
fuse_reply_open(req, fi);
return;
}
if (ino == kInoCounter) {
fi->fh = gDelayFs->FetchAddCounter() + 1;
fuse_reply_open(req, fi);
return;
}
fuse_reply_err(req, ENOENT);
}
void hello_ll_read(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
fuse_file_info *fi) {
if (ino != kInoHello && ino != kInoCounter) {
fuse_reply_err(req, ENOENT);
return;
}
gDelayFs->AddRequest(req, ino, size, off, fi);
}
void DelayFs::AddRequest(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
fuse_file_info *fi) {
auto timeout = std::chrono::system_clock::now();
switch (ino) {
case kInoHello:
timeout += std::chrono::milliseconds(50);
break;
case kInoCounter:
timeout += std::chrono::milliseconds(50 * kCounterWidth);
break;
}
std::shared_ptr<ReadRequest> r(
new ReadRequest{req, ino, size, off, fi, timeout});
std::lock_guard<std::mutex> lock(mutex_);
reqs_.insert(std::move(r));
}
uint64_t DelayFs::FetchAddCounter() { return counter_.fetch_add(1); }
void DelayFs::Start() {
timer_thread_ = std::thread([this] { TimerWorker(); });
}
void DelayFs::Stop() {
stop_ = true;
timer_thread_.join();
}
void DelayFs::TimerWorker() {
char buf[256];
std::unordered_set<std::shared_ptr<ReadRequest>> reqs;
while (!stop_) {
auto now = std::chrono::system_clock::now();
{
std::lock_guard lock(mutex_);
reqs = reqs_;
}
for (auto it = reqs.begin(); it != reqs.end();) {
auto &r = **it;
if (now < r.timeout) {
it = reqs.erase(it);
continue;
}
switch (r.ino) {
case kInoHello:
reply_buf_limited(r.req, kHelloText, strlen(kHelloText), r.off, 1);
break;
case kInoCounter:
snprintf(buf, sizeof(buf), "%0*lu", kCounterWidth, r.fi->fh);
reply_buf_limited(r.req, buf, kCounterWidth, r.off, r.size);
break;
default:
fuse_reply_err(r.req, ENOENT);
}
++it;
}
if (!reqs.empty()) {
std::lock_guard lock(mutex_);
for (const auto &r : reqs) {
reqs_.erase(r);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
int main(int argc, char *argv[]) {
fuse_args args = FUSE_ARGS_INIT(argc, argv);
fuse_session *se;
fuse_cmdline_opts opts;
fuse_loop_config config;
int ret = -1;
if (fuse_parse_cmdline(&args, &opts) != 0)
return 1;
if (opts.show_help) {
printf("usage: %s [options] <mountpoint>\n\n", argv[0]);
fuse_cmdline_help();
fuse_lowlevel_help();
ret = 0;
goto err_out1;
} else if (opts.show_version) {
printf("FUSE library version %s\n", fuse_pkgversion());
fuse_lowlevel_version();
ret = 0;
goto err_out1;
}
if (opts.mountpoint == NULL) {
printf("usage: %s [options] <mountpoint>\n", argv[0]);
printf(" %s --help\n", argv[0]);
ret = 1;
goto err_out1;
}
fuse_lowlevel_ops fuse;
memset(&fuse, 0, sizeof(fuse));
fuse.lookup = hello_ll_lookup;
fuse.getattr = hello_ll_getattr;
fuse.readdir = hello_ll_readdir;
fuse.open = hello_ll_open;
fuse.read = hello_ll_read;
se = fuse_session_new(&args, &fuse, sizeof(fuse), NULL);
if (se == NULL)
goto err_out1;
if (fuse_set_signal_handlers(se) != 0)
goto err_out2;
if (fuse_session_mount(se, opts.mountpoint) != 0)
goto err_out3;
fuse_daemonize(opts.foreground);
gDelayFs = new DelayFs;
gDelayFs->Start();
if (opts.singlethread)
ret = fuse_session_loop(se);
else {
config.clone_fd = opts.clone_fd;
config.max_idle_threads = opts.max_idle_threads;
ret = fuse_session_loop_mt(se, &config);
}
gDelayFs->Stop();
fuse_session_unmount(se);
err_out3:
fuse_remove_signal_handlers(se);
err_out2:
fuse_session_destroy(se);
err_out1:
free(opts.mountpoint);
fuse_opt_free_args(&args);
return ret;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment