Last active
January 4, 2024 08:17
-
-
Save vsuharnikov/80784b461bc391fae2492788f64536ec to your computer and use it in GitHub Desktop.
Parse LOG file tables of RocksDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numbers | |
import re | |
from datetime import datetime | |
import numpy as np | |
import pandas as pd | |
def convert_to_bytes(size_str): | |
""" | |
Convert sizes like '10 b', '123 kb', '44 mb' to bytes. | |
Parameters: | |
size_str (str): A string representing the size with unit. | |
Returns: | |
int: The size in bytes. | |
""" | |
units = {"B": 1, "KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4} | |
if isinstance(size_str, str): | |
size, unit = size_str.split() | |
return np.int64(np.float64(size) * units[unit]) | |
else: | |
return size_str | |
def convert_to_number(size_str): | |
""" | |
Convert sizes like '1', '32m', '2566k' to numerical values. 'm' for millions, 'k' for thousands. | |
Parameters: | |
size_str (str): A string representing the size with unit. | |
Returns: | |
int: The numerical value. | |
""" | |
units = {"K": np.uint64(1000), "M": np.uint64(1000000)} | |
if isinstance(size_str, str) and size_str[-1] in units: | |
return np.uint64(size_str[:-1]) * units[size_str[-1]] | |
else: | |
return np.uint64(size_str) | |
# https://stackoverflow.com/a/67403354 | |
def auto_opt_pd_dtypes(df: pd.DataFrame): | |
""" | |
Automatically downcast numeric dtypes in a DataFrame to the smallest possible size. | |
Parameters: | |
df (pd.DataFrame): The DataFrame to be optimized. | |
This function modifies the DataFrame in place. | |
""" | |
for col in df.columns: | |
# integers | |
if issubclass(df[col].dtypes.type, numbers.Integral): | |
if df[col].min() >= 0: | |
df[col] = pd.to_numeric(df[col], downcast="unsigned") | |
else: | |
df[col] = pd.to_numeric(df[col], downcast="integer") | |
# other real numbers | |
elif issubclass(df[col].dtypes.type, numbers.Real): | |
df[col] = pd.to_numeric(df[col], downcast="float") | |
class RocksDBLogParser: | |
def __init__(self, enable_stats=False, uptime_from=None, uptime_to=None): | |
""" | |
Initialize the RocksDBLogParser object. | |
Parameters: | |
enable_stats (bool): If True, enables parsing of statistics. | |
uptime_from (float): The minimum uptime from which to start parsing. | |
uptime_to (float): The maximum uptime to which parsing is allowed. | |
""" | |
self.enable_stats = enable_stats | |
self.uptime_from = uptime_from | |
self.uptime_to = uptime_to | |
self.category_columns = ["Level", "Priority", "CF"] | |
self.state = "expect_start_time" | |
self.compaction = pd.DataFrame() | |
self.stats = pd.DataFrame() | |
self.start_date = None | |
self.uptime = 0.0 | |
self.column_family = "" | |
self.columns = [] | |
self.rows = [] | |
self.curr_stats = {} | |
self.stat_rows = [] | |
self.num_of_tables = {} | |
self.events = [] | |
self.start_time_regex = re.compile(r'(\d{4}/\d{2}/\d{2}-\d{2}:\d{2}:\d{2}\.\d+) \d+ RocksDB version') | |
self.db_stats_regex = re.compile(r"^\*\* DB Stats \*\*") | |
self.uptime_regex = re.compile(r"^Uptime\(secs\):\s+(\d+\.\d+)\s+total") | |
self.heading_regex = re.compile(r"^\*\* Compaction Stats \[(.*?)\]") | |
self.columns_regex = re.compile(r"\s{1,}") | |
self.row_regex = re.compile(r"\s+(?![BKMGT]{2}\b)") | |
self.stats_heading_regex = re.compile(r'(\d{4}/\d{2}/\d{2}-\d{2}:\d{2}:\d{2}\.\d+).+STATISTICS:') | |
self.stat_regex = re.compile( | |
r"^\s?(?P<label>rocksdb\.[^\s]+)(?P<type_values> [^\n]+)$" | |
) | |
self.stat_value_regex = re.compile( | |
r"(?P<type>COUNT|SUM|P\d+) : (?P<value>[\d\.]+)" | |
) | |
self.event_regex = re.compile( | |
r'EVENT_LOG_v1.+"time_micros": (?P<time>\d+),.*"event": "(?P<event>[^"]+)"' | |
) | |
def parse_file(self, file_path): | |
""" | |
Parse the contents of a file line by line. | |
Parameters: | |
file_path (str): The path to the file that needs to be parsed. | |
Returns: | |
tuple: A tuple containing two DataFrames, one for compaction and one for statistics. | |
""" | |
with open(file_path, "r") as file: | |
for line in file: | |
if not self._parse_line(line.strip()) | |
break | |
return self._finalize() | |
def _parse_line(self, line): | |
""" | |
Parse a single line from the input. | |
Parameters: | |
line (str): A line from the log or input file. | |
This method updates the internal state based on the parsed line. | |
Returns: | |
bool: False if parsing should be stopped. | |
""" | |
match = self.event_regex.search(line) | |
if match: | |
self.events.append([match.group("time"), match.group("event")]) | |
# print(f'table.state: {self.state}') | |
match self.state: | |
case "uptime_limit": | |
return False | |
case "expect_start_time": | |
match = self.start_time_regex.search(line) | |
if match: | |
self.start_date = datetime.strptime(match.group(1), "%Y/%m/%d-%H:%M:%S.%f") | |
self.state = "expect_first_db_stats" | |
case "expect_first_db_stats" if self.db_stats_regex.search(line): | |
self.state = "expect_uptime" | |
case "expect_uptime": | |
match = self.uptime_regex.search(line) | |
if match: | |
self.uptime = float(match.group(1)) | |
if self.uptime_from is None or self.uptime >= self.uptime_from: | |
if self.uptime_to is None or self.uptime <= self.uptime_to: | |
self.state = "expect_heading" | |
else: | |
self.state = "uptime_limit" | |
else: | |
return # Don't change the state, still waiting for uptime >= from | |
case "expect_heading": | |
match = self.heading_regex.search(line) | |
if match: | |
self.column_family = match.group(1) | |
self.state = "expect_header" | |
case "expect_heading_or_stats_header": | |
match = self.stats_heading_regex.search(line) | |
if match: | |
parsed_date = datetime.strptime(match.group(1), "%Y/%m/%d-%H:%M:%S.%f") | |
self.uptime = (parsed_date - self.start_date).total_seconds() | |
self.state = "expect_stats_row" | |
else: | |
match = self.heading_regex.search(line) | |
if match: | |
self.column_family = match.group(1) | |
self.state = "expect_header" | |
case "expect_header": | |
self.columns = self.columns_regex.split(line) | |
self.state = "skip_separator" | |
case "skip_separator": | |
self.state = "expect_row" | |
case "expect_row": | |
if line == "": | |
self.compaction = pd.concat( | |
[self.compaction, self._curr_compaction_to_pd()], | |
ignore_index=True, | |
) | |
self.rows = [] | |
self.num_of_tables[self.column_family] = ( | |
self.num_of_tables.get(self.column_family, 0) + 1 | |
) | |
self.state = ( | |
"expect_heading_or_stats_header" | |
if self.enable_stats | |
else "expect_heading" | |
) | |
else: | |
row = self.row_regex.split(line) | |
self.rows.append(row) | |
case "expect_stats_row": | |
match = self.stat_regex.search(line) | |
if match: | |
for m in self.stat_value_regex.finditer(match["type_values"]): | |
k = match.group("label") + "." + m.group("type") | |
self.curr_stats[k] = m.group("value") | |
else: | |
self.curr_stats["CF"] = self.column_family | |
self.curr_stats["Uptime(s)"] = str(self.uptime) | |
self.stat_rows.append(self.curr_stats) | |
self.curr_stats = {} | |
self.state = "expect_heading" | |
return True | |
def _finalize(self): | |
""" | |
Finalize the parsing process and prepare data for output. | |
Returns: | |
tuple: A tuple containing two DataFrames, one for compaction and one for statistics. | |
""" | |
for c in self.category_columns: | |
if c in self.compaction: | |
self.compaction[c] = self.compaction[c].astype("category") | |
self.compaction[["Files(cnt)", "Files(comp)"]] = self.compaction["Files"].apply( | |
lambda x: pd.Series(RocksDBLogParser._split_files(x)) | |
) | |
stats_df = pd.DataFrame(self.stat_rows) | |
self.stat_rows = [] | |
for c in stats_df.columns: | |
if c == "Files" or c in self.category_columns: | |
continue | |
stats_df[c] = pd.to_numeric(stats_df[c]) | |
auto_opt_pd_dtypes(stats_df) | |
events_df = pd.DataFrame(self.events, columns=["Timestamp", "Name"]) | |
events_df["Name"] = events_df["Name"].astype("category") | |
events_df["Timestamp"] = events_df["Timestamp"].astype(np.uint64) | |
start_time = events_df["Timestamp"].iloc[0] | |
events_df["Uptime(s)"] = (events_df["Timestamp"] - start_time) / 1000000 | |
auto_opt_pd_dtypes(events_df) | |
return { | |
"compaction": self.compaction.copy(), | |
"events": events_df, | |
"stats": stats_df.copy(), | |
} | |
def _curr_compaction_to_pd(self): | |
""" | |
Convert the current compaction data to a pandas DataFrame. | |
Returns: | |
pd.DataFrame: A DataFrame representing the current compaction data. | |
""" | |
df = pd.DataFrame(self.rows, columns=self.columns) | |
df["CF"] = self.column_family | |
df["Uptime(s)"] = float(self.uptime) | |
if "Size" in df.columns: | |
df["Size"] = df["Size"].apply(convert_to_bytes) | |
for c in ["KeyIn", "KeyDrop"]: | |
if c in df.columns: | |
df[c] = df[c].apply(convert_to_number) | |
gb = np.float64(1024**3) | |
for c in [ | |
"Read(GB)", | |
"Rn(GB)", | |
"Rnp1(GB)", | |
"Write(GB)", | |
"Wnew(GB)", | |
"Moved(GB)", | |
"Rblob(GB)", | |
"Wblob(GB)", | |
]: | |
if c in df.columns: | |
df[c] = df[c].apply(lambda x: np.int64(np.float64(x) * gb)) | |
for c in df.columns: | |
if c == "Files" or c in self.category_columns: | |
continue | |
df[c] = pd.to_numeric(df[c]) | |
auto_opt_pd_dtypes(df) | |
return df | |
@staticmethod | |
def _split_files(file_str): | |
""" | |
Split a string formatted as 'count/comp' into two separate values. | |
Parameters: | |
file_str (str): A string representing file count and compaction. | |
Returns: | |
tuple: A tuple of two integers (count, comp). | |
""" | |
cnt, comp = file_str.split("/") | |
return np.uint32(cnt), np.uint32(comp) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
r = RocksDBLogParser(enable_stats = True).parse_file('path/to/LOG/file') | |
r['compaction'] # | stats | events |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment