Last active
November 24, 2023 11:22
-
-
Save Menziess/5cee83b319e1b6bf3a39226bc82905be to your computer and use it in GitHub Desktop.
Rocksdb using the rocksdict python package, cleaning/pruning old data via TTL or FIFO compaction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import random | |
| import string | |
| from os import cpu_count | |
| from pprint import pprint | |
| from time import sleep | |
| from rocksdict import (AccessType, DBCompactionStyle, DBCompressionType, | |
| FifoCompactOptions, Options, Rdict, ReadOptions) | |
| def get_db(path): | |
| """Get rdict with specified options.""" | |
| MB, options = 1024 * 1024, Options() | |
| # Create database if it doesn't exist yet | |
| options.create_if_missing(True) | |
| # Set number of background jobs (flushing/compaction), default: 2 | |
| options.set_max_background_jobs(cpu_count() or 2) | |
| options.increase_parallelism(cpu_count()) | |
| # Cleans logs every 10 seconds to exclude filesize from calculation | |
| options.set_log_file_time_to_roll(10) | |
| # Only keep x number of log files | |
| options.set_keep_log_file_num(2) | |
| # Set log file size | |
| options.set_max_log_file_size(int(0.1 * MB)) | |
| # Fifo will only store lvl0 files, oldest ones are deleted | |
| options.set_compaction_style(DBCompactionStyle.fifo()) | |
| compaction_options = FifoCompactOptions() | |
| # Set total table size before the oldest file gets deleted | |
| compaction_options.max_table_files_size = int(1 * MB) | |
| options.set_fifo_compaction_options(compaction_options) | |
| # Trigger compaction when x number of lvl0 files exist | |
| options.set_level_zero_file_num_compaction_trigger(4) | |
| # Slow down writing when x number of lvl0 files exist | |
| options.set_level_zero_slowdown_writes_trigger(6) | |
| # Stop writing when x number of lvl0 files exist | |
| options.set_level_zero_stop_writes_trigger(8) | |
| # When one buffer is flushing, the other one can still be written to | |
| options.set_max_write_buffer_number(2) | |
| # Point where buffer is written to sorted file on disk | |
| options.set_write_buffer_size(int(0.001 * MB)) | |
| # Allowed size for lvl1 | |
| options.set_max_bytes_for_level_base(1024 * MB) | |
| # Size until compaction is triggered for lvl1 | |
| options.set_target_file_size_base(256 * MB) | |
| # Multiplied sizes for each subsequent level | |
| options.set_max_bytes_for_level_multiplier(4.0) | |
| # Very fast compression, lz4hc is more thorough | |
| options.set_compression_type(DBCompressionType.lz4()) | |
| # Keeps records a minimum of 3 seconds (not necessarily removed) | |
| access_type = AccessType.with_ttl(3) | |
| # Interval of deleting obsolete files, default 6 hours | |
| options.set_delete_obsolete_files_period_micros(10 * 1000) | |
| db = Rdict( | |
| path, | |
| options, | |
| access_type=access_type | |
| ) | |
| read_options = ReadOptions() | |
| # Schedule background clean job when PurgeObsoleteFile is called | |
| read_options.set_background_purge_on_iterator_cleanup(True) | |
| db.set_read_options(read_options) | |
| return db | |
| def randomword(length): | |
| """Generate random word, used as values in database.""" | |
| letters = string.ascii_lowercase | |
| return ''.join(random.choice(letters) for i in range(length)) | |
| def get_size(start_path='.', unit='bytes'): | |
| """Get directory size (to see impact of configuration on disk size).""" | |
| exponents_map = {'bytes': 0, 'kb': 1, 'mb': 2, 'gb': 3} | |
| if unit not in exponents_map: | |
| raise ValueError("Must select from ['bytes', 'kb', 'mb', 'gb']") | |
| total_size_bytes = 0 | |
| for dirpath, dirnames, filenames in os.walk(start_path): | |
| for f in filenames: | |
| try: | |
| fp = os.path.join(dirpath, f) | |
| # Skip if it is symbolic link | |
| if not os.path.islink(fp): | |
| total_size_bytes += os.path.getsize(fp) | |
| except FileNotFoundError: | |
| print(f'File was deleted: {f}') | |
| return f'{round(total_size_bytes / 1024 ** exponents_map[unit], 3)}{unit}' | |
| def main(): | |
| """Run main program. | |
| It will start inserting random numbers between 0 and `range_length`. | |
| Each iteration, the numbers shift 1 up. | |
| Over time, the first numbers (keys) will never be accessed anymore. | |
| """ | |
| path = 'db' | |
| db = get_db(path) | |
| print(f'Starting, database size: {get_size(path, "mb")}.') | |
| range_length, it, total = 500, 0, 0 | |
| try: | |
| while True: | |
| print(f'Inserting: {it}..{range_length + it - 1}.') | |
| for i in range(it, range_length + it): | |
| print(i, end='\r') | |
| db[i] = {'hello': randomword(100)} | |
| total += 1 | |
| print(f'Database size: {get_size(path, "mb")}.') | |
| pprint(db.live_files()) | |
| sleep(5) | |
| print() | |
| it += 1 | |
| except KeyboardInterrupt: | |
| # Destroy database so that option changes are applied | |
| db.close() | |
| Rdict.destroy(path) | |
| print('Stopped program.') | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output: