Skip to content

Instantly share code, notes, and snippets.

@keithchambers
Last active March 27, 2025 00:47
Show Gist options
  • Select an option

  • Save keithchambers/7a44b48c53debf9227e77303c5e65c6c to your computer and use it in GitHub Desktop.

Select an option

Save keithchambers/7a44b48c53debf9227e77303c5e65c6c to your computer and use it in GitHub Desktop.
fiddler-data
#!/usr/bin/env python3
import argparse
import logging
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
from pathlib import Path
from typing import Union
import random
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
)
# Fixed width (in characters) for the first five columns (Line, Row, Column, Expected, Rejected).
FIELD_WIDTH = 25
def format_field(value, width=FIELD_WIDTH):
"""
Left-align 'value' in a fixed-width column. If 'value' is longer than 'width',
truncate it and append '...'.
"""
s = str(value)
if len(s) > width:
s = s[:width - 3] + "..."
return f"{s:<{width}}"
def get_bad_value_for_column(col_name: str):
"""
Returns a bad value for the given column to trigger a validation error.
"""
if col_name == "timestamp":
return "unsupported_timestamp" # Invalid timestamp format.
elif col_name in ["event_id", "user_id", "model_id"]:
return "not_int" # Not an integer.
elif col_name == "event_type":
return 123 # Not one of the expected strings.
elif col_name in ["input_data", "output_data", "error_message"]:
return 12345 # Not a string.
elif col_name in ["latency", "confidence"]:
return "bad_float" # Not a float.
elif col_name.startswith("attr_"):
return "not_a_float" # Not a float.
else:
return "error"
def insert_blank_lines(csv_content: str, blank_lines: int) -> str:
"""
Insert a total of 'blank_lines' completely empty lines into the CSV content.
The blank lines are inserted evenly throughout the file.
"""
lines = csv_content.splitlines()
total_lines = len(lines)
if blank_lines <= 0:
return csv_content
new_lines = []
interval = total_lines / (blank_lines + 1)
next_insertion = interval
current_line = 0
for line in lines:
new_lines.append(line)
current_line += 1
if current_line >= next_insertion:
new_lines.append("")
next_insertion += interval
return "\n".join(new_lines) + "\n"
def write_bad_data_file(num_rows: int, num_errors: int = 0, out_dir: Union[str, Path] = ".", num_columns: int = 20) -> pd.DataFrame:
"""
Generate an ML event log DataFrame with deliberately injected errors and write it to Parquet, CSV,
and a plain text file (DataFrame file). The DataFrame will include columns relevant to ML event logs.
The first column is 'timestamp' followed by:
- event_id, user_id, model_id, event_type, input_data, output_data, latency, confidence, error_message
- If num_columns > 10, additional columns named attr_1, attr_2, ... are appended.
Exactly num_errors errors are injected: some as blank lines (in CSV) and some as cell-level errors.
"""
if num_columns < 10:
raise ValueError("At least 10 columns are required for ML event logs.")
if num_columns > 100:
num_columns = 100
predef_cols = ["timestamp", "event_id", "user_id", "model_id", "event_type",
"input_data", "output_data", "latency", "confidence", "error_message"]
extra_count = num_columns - len(predef_cols)
extra_cols = [f"attr_{i+1}" for i in range(extra_count)]
columns = predef_cols + extra_cols
data = {}
data["timestamp"] = [pd.Timestamp("1970-01-01") + pd.to_timedelta(i, unit="s") for i in range(num_rows)]
data["event_id"] = np.random.randint(1000, 10000, size=num_rows).tolist()
data["user_id"] = np.random.randint(1, 101, size=num_rows).tolist()
data["model_id"] = np.random.randint(1, 51, size=num_rows).tolist()
valid_event_types = ["prediction", "error", "log"]
data["event_type"] = np.random.choice(valid_event_types, size=num_rows, p=[0.8, 0.1, 0.1]).tolist()
data["input_data"] = [f"input_{i}" for i in range(num_rows)]
data["output_data"] = [f"output_{i}" for i in range(num_rows)]
data["latency"] = np.random.rand(num_rows).tolist()
data["confidence"] = np.random.rand(num_rows).tolist()
# Only add an error message if the event_type is "error"; otherwise keep it empty.
data["error_message"] = [("Error occurred" if et == "error" else "") for et in data["event_type"]]
for col in extra_cols:
data[col] = np.random.rand(num_rows).tolist()
df = pd.DataFrame(data, columns=columns)
total_cells = num_rows * num_columns
# Randomly determine how many blank lines vs. cell errors.
num_blank_lines = 0
num_cell_errors = 0
for i in range(num_errors):
if random.random() < 0.5:
num_blank_lines += 1
else:
num_cell_errors += 1
if num_cell_errors > total_cells:
excess = num_cell_errors - total_cells
num_cell_errors = total_cells
num_blank_lines += excess
# Inject cell errors.
if num_cell_errors > 0:
error_indices = np.random.choice(total_cells, size=num_cell_errors, replace=False)
for idx in error_indices:
row_idx = idx // num_columns
col_idx = idx % num_columns
col_name = columns[col_idx]
if not pd.api.types.is_object_dtype(df[col_name]):
df[col_name] = df[col_name].astype(object)
df.at[row_idx, col_name] = get_bad_value_for_column(col_name)
out_path = Path(out_dir).resolve()
out_path.mkdir(parents=True, exist_ok=True)
parquet_file = out_path / "bad_data.parquet"
csv_file = out_path / "bad_data.csv"
dataframe_file = out_path / "bad_data.txt"
# Write Parquet
try:
df.to_parquet(parquet_file, engine="pyarrow", index=False)
logging.info("Parquet file '%s' created with %d rows and %d columns.", parquet_file.name, num_rows, num_columns)
except Exception as e:
logging.warning("Initial Parquet write failed: %s", e)
logging.info("Falling back to converting columns to string for file write.")
df_string = df.astype(str)
try:
df_string.to_parquet(parquet_file, engine="pyarrow", index=False)
logging.info("Parquet file '%s' created with %d rows and %d columns (all columns as strings).",
parquet_file.name, num_rows, num_columns)
except Exception as e2:
logging.error("Fallback Parquet write also failed: %s", e2)
# Write CSV (with blank lines)
try:
csv_content = df.to_csv(index=False)
if num_blank_lines > 0:
csv_content = insert_blank_lines(csv_content, num_blank_lines)
logging.info("Inserted %d blank lines into CSV content.", num_blank_lines)
with open(csv_file, "w") as f:
f.write(csv_content)
effective_rows = num_rows + num_blank_lines
logging.info("CSV file '%s' created with %d rows (including %d blank lines) and %d columns.",
csv_file.name, effective_rows, num_blank_lines, num_columns)
except Exception as e:
logging.warning("Initial CSV write failed: %s", e)
logging.info("Falling back to converting columns to string for file write.")
df_string = df.astype(str)
try:
csv_content = df_string.to_csv(index=False)
if num_blank_lines > 0:
csv_content = insert_blank_lines(csv_content, num_blank_lines)
logging.info("Inserted %d blank lines into CSV content.", num_blank_lines)
with open(csv_file, "w") as f:
f.write(csv_content)
effective_rows = num_rows + num_blank_lines
logging.info("CSV file '%s' created with %d rows and %d columns (all columns as strings).",
csv_file.name, effective_rows, num_blank_lines, num_columns)
except Exception as e2:
logging.error("Fallback CSV write also failed: %s", e2)
# Write the DataFrame to its own file (plain text representation).
try:
with open(dataframe_file, "w") as f:
f.write(df.to_string())
logging.info("DataFrame file '%s' created with %d rows and %d columns.", dataframe_file.name, num_rows, num_columns)
except Exception as e:
logging.error("Failed to write DataFrame file: %s", e)
return df
def validate_csv_file(out_dir: Union[str, Path] = ".") -> None:
"""
Validate the CSV file row by row using ML event log expectations.
For each cell, any error is recorded.
Expected data types:
- timestamp: valid timestamp string.
- event_id, user_id, model_id: integer.
- event_type: string in {"prediction", "error", "log"}.
- input_data, output_data: non-empty string.
- error_message: if event_type is "error", must be non-empty; otherwise, empty is allowed.
- latency, confidence, and extra columns (attr_*): float.
Entire blank rows are also flagged.
The errors are output in an evenly spaced table for the first five columns,
with the Solution column printed in full.
"""
out_path = Path(out_dir).resolve()
csv_file = out_path / "bad_data.csv"
logging.info("Validating CSV file at %s", out_path)
try:
df_csv = pd.read_csv(csv_file, keep_default_na=False, skip_blank_lines=False)
logging.info("CSV file '%s' loaded successfully with shape %s.", csv_file.name, df_csv.shape)
except Exception as e:
logging.error("CSV file loading ERROR: %s", e)
return
valid_event_types = {"prediction", "error", "log"}
error_rows = []
for idx, row in df_csv.iterrows():
file_line = idx + 2 # Account for header.
if all((pd.isna(cell) or str(cell).strip() == "") for cell in row):
error_rows.append((file_line, idx, "Entire Row", "non-null", "", "Remove blank row or provide valid data."))
continue
for col in df_csv.columns:
value = row[col]
# error_message can be empty if event_type != "error"
if col == "error_message":
if row["event_type"] == "error" and not value.strip():
error_rows.append((file_line, idx, col, "non-null", value, "Provide a valid error message."))
continue
else:
if pd.isna(value) or not value.strip():
error_rows.append((file_line, idx, col, "non-null", value, "Provide a valid value."))
continue
if col == "timestamp":
try:
parsed = pd.to_datetime(value, errors='coerce')
if pd.isnull(parsed):
raise ValueError
except Exception:
error_rows.append((file_line, idx, col, "valid timestamp", value,
"Use format 'YYYY-MM-DD HH:MM:SS' or a valid timestamp string."))
elif col in ["event_id", "user_id", "model_id"]:
try:
_ = int(float(value))
except Exception:
error_rows.append((file_line, idx, col, "integer", value,
"Provide an integer value (e.g. 42)."))
elif col == "event_type":
if value not in valid_event_types:
error_rows.append((file_line, idx, col, f"one of {valid_event_types}", value,
f"Use one of {valid_event_types}."))
elif col in ["input_data", "output_data"]:
if not isinstance(value, str):
error_rows.append((file_line, idx, col, "string", value,
"Provide a valid string value."))
elif col in ["latency", "confidence"] or col.startswith("attr_"):
try:
_ = float(value)
except Exception:
error_rows.append((file_line, idx, col, "float", value,
"Provide a numeric value (e.g. 3.14)."))
if error_rows:
# Header for first five columns (fixed width) plus "Solution" with no truncation:
header = (
f"{format_field('Line')}| {format_field('Row')}| {format_field('Column')}| "
f"{format_field('Expected')}| {format_field('Rejected')}| Solution"
)
print(header)
# Separator line: 5 columns * FIELD_WIDTH + 5 separators (each 2 chars: "| ") + len("Solution")
dash_count = (FIELD_WIDTH * 5) + (2 * 5) + len("Solution")
print("-" * dash_count)
for line, row, col, expected, rejected, solution in error_rows:
# Format the first five columns with fixed width:
row_str = (
f"{format_field(line)}| {format_field(row)}| {format_field(col)}| "
f"{format_field(expected)}| {format_field(rejected)}| {solution}"
)
print(row_str)
else:
print("No validation errors found.")
def parse_args() -> argparse.Namespace:
"""
Parse command-line arguments.
In generation mode, -e specifies the total number of errors to inject.
In validation mode, the CSV file is validated without generating new data.
"""
parser = argparse.ArgumentParser(
description="Generate ML event log files with deliberately bad data injected, or validate an existing CSV file."
)
parser.add_argument(
"-r", "--rows",
type=int,
default=100,
help="Number of rows to generate (default: 100). Ignored if --validate is used.",
)
parser.add_argument(
"-e", "--errors",
type=int,
default=0,
help="Total number of errors to inject (cell errors and blank line errors combined, default: 0).",
)
parser.add_argument(
"-c", "--columns",
type=int,
default=20,
help="Total number of columns in the ML event log (default: 20, maximum: 100).",
)
parser.add_argument(
"-v", "--validate",
action="store_true",
help="Validate the output CSV file without generating new data.",
)
parser.add_argument(
"-o", "--out_dir",
type=str,
default=".",
help="Output directory for the generated files (default: current directory).",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
if args.validate:
validate_csv_file(args.out_dir)
else:
write_bad_data_file(args.rows, args.errors, args.out_dir, num_columns=args.columns)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment