Skip to content

Instantly share code, notes, and snippets.

@Menziess
Last active November 24, 2023 11:22
Show Gist options
  • Save Menziess/5cee83b319e1b6bf3a39226bc82905be to your computer and use it in GitHub Desktop.
Save Menziess/5cee83b319e1b6bf3a39226bc82905be to your computer and use it in GitHub Desktop.
Rocksdb using the rocksdict python package, cleaning/pruning old data via TTL or FIFO compaction
import os
import random
import string
from os import cpu_count
from pprint import pprint
from time import sleep
from rocksdict import (AccessType, DBCompactionStyle, DBCompressionType,
FifoCompactOptions, Options, Rdict, ReadOptions)
def get_db(path):
"""Get rdict with specified options."""
MB, options = 1024 * 1024, Options()
# Create database if it doesn't exist yet
options.create_if_missing(True)
# Set number of background jobs (flushing/compaction), default: 2
options.set_max_background_jobs(cpu_count() or 2)
options.increase_parallelism(cpu_count())
# Cleans logs every 10 seconds to exclude filesize from calculation
options.set_log_file_time_to_roll(10)
# Only keep x number of log files
options.set_keep_log_file_num(2)
# Set log file size
options.set_max_log_file_size(int(0.1 * MB))
# Fifo will only store lvl0 files, oldest ones are deleted
options.set_compaction_style(DBCompactionStyle.fifo())
compaction_options = FifoCompactOptions()
# Set total table size before the oldest file gets deleted
compaction_options.max_table_files_size = int(1 * MB)
options.set_fifo_compaction_options(compaction_options)
# Trigger compaction when x number of lvl0 files exist
options.set_level_zero_file_num_compaction_trigger(4)
# Slow down writing when x number of lvl0 files exist
options.set_level_zero_slowdown_writes_trigger(6)
# Stop writing when x number of lvl0 files exist
options.set_level_zero_stop_writes_trigger(8)
# When one buffer is flushing, the other one can still be written to
options.set_max_write_buffer_number(2)
# Point where buffer is written to sorted file on disk
options.set_write_buffer_size(int(0.001 * MB))
# Allowed size for lvl1
options.set_max_bytes_for_level_base(1024 * MB)
# Size until compaction is triggered for lvl1
options.set_target_file_size_base(256 * MB)
# Multiplied sizes for each subsequent level
options.set_max_bytes_for_level_multiplier(4.0)
# Very fast compression, lz4hc is more thorough
options.set_compression_type(DBCompressionType.lz4())
# Keeps records a minimum of 3 seconds (not necessarily removed)
access_type = AccessType.with_ttl(3)
# Interval of deleting obsolete files, default 6 hours
options.set_delete_obsolete_files_period_micros(10 * 1000)
db = Rdict(
path,
options,
access_type=access_type
)
read_options = ReadOptions()
# Schedule background clean job when PurgeObsoleteFile is called
read_options.set_background_purge_on_iterator_cleanup(True)
db.set_read_options(read_options)
return db
def randomword(length):
"""Generate random word, used as values in database."""
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
def get_size(start_path='.', unit='bytes'):
"""Get directory size (to see impact of configuration on disk size)."""
exponents_map = {'bytes': 0, 'kb': 1, 'mb': 2, 'gb': 3}
if unit not in exponents_map:
raise ValueError("Must select from ['bytes', 'kb', 'mb', 'gb']")
total_size_bytes = 0
for dirpath, dirnames, filenames in os.walk(start_path):
for f in filenames:
try:
fp = os.path.join(dirpath, f)
# Skip if it is symbolic link
if not os.path.islink(fp):
total_size_bytes += os.path.getsize(fp)
except FileNotFoundError:
print(f'File was deleted: {f}')
return f'{round(total_size_bytes / 1024 ** exponents_map[unit], 3)}{unit}'
def main():
"""Run main program.
It will start inserting random numbers between 0 and `range_length`.
Each iteration, the numbers shift 1 up.
Over time, the first numbers (keys) will never be accessed anymore.
"""
path = 'db'
db = get_db(path)
print(f'Starting, database size: {get_size(path, "mb")}.')
range_length, it, total = 500, 0, 0
try:
while True:
print(f'Inserting: {it}..{range_length + it - 1}.')
for i in range(it, range_length + it):
print(i, end='\r')
db[i] = {'hello': randomword(100)}
total += 1
print(f'Database size: {get_size(path, "mb")}.')
pprint(db.live_files())
sleep(5)
print()
it += 1
except KeyboardInterrupt:
# Destroy database so that option changes are applied
db.close()
Rdict.destroy(path)
print('Stopped program.')
if __name__ == "__main__":
main()
@Menziess
Copy link
Author

Output:

Inserting: 4..503.
Database size: 0.191mb.
[{'end_key': 412,
  'level': 0,
  'name': '/000020.sst',
  'num_deletions': 0,
  'num_entries': 403,
  'size': 46919,
  'start_key': 10},
 {'end_key': 502,
  'level': 0,
  'name': '/000018.sst',
  'num_deletions': 0,
  'num_entries': 398,
  'size': 46402,
  'start_key': 4},
 {'end_key': 110,
  'level': 1,
  'name': '/000016.sst',
  'num_deletions': 0,
  'num_entries': 108,
  'size': 13056,
  'start_key': 3}]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment