Last active
March 27, 2025 00:47
-
-
Save keithchambers/7a44b48c53debf9227e77303c5e65c6c to your computer and use it in GitHub Desktop.
fiddler-data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import logging | |
| import numpy as np | |
| import pandas as pd | |
| import pyarrow.parquet as pq | |
| from pathlib import Path | |
| from typing import Union | |
| import random | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(message)s", | |
| ) | |
| # Fixed width (in characters) for the first five columns (Line, Row, Column, Expected, Rejected). | |
| FIELD_WIDTH = 25 | |
| def format_field(value, width=FIELD_WIDTH): | |
| """ | |
| Left-align 'value' in a fixed-width column. If 'value' is longer than 'width', | |
| truncate it and append '...'. | |
| """ | |
| s = str(value) | |
| if len(s) > width: | |
| s = s[:width - 3] + "..." | |
| return f"{s:<{width}}" | |
| def get_bad_value_for_column(col_name: str): | |
| """ | |
| Returns a bad value for the given column to trigger a validation error. | |
| """ | |
| if col_name == "timestamp": | |
| return "unsupported_timestamp" # Invalid timestamp format. | |
| elif col_name in ["event_id", "user_id", "model_id"]: | |
| return "not_int" # Not an integer. | |
| elif col_name == "event_type": | |
| return 123 # Not one of the expected strings. | |
| elif col_name in ["input_data", "output_data", "error_message"]: | |
| return 12345 # Not a string. | |
| elif col_name in ["latency", "confidence"]: | |
| return "bad_float" # Not a float. | |
| elif col_name.startswith("attr_"): | |
| return "not_a_float" # Not a float. | |
| else: | |
| return "error" | |
| def insert_blank_lines(csv_content: str, blank_lines: int) -> str: | |
| """ | |
| Insert a total of 'blank_lines' completely empty lines into the CSV content. | |
| The blank lines are inserted evenly throughout the file. | |
| """ | |
| lines = csv_content.splitlines() | |
| total_lines = len(lines) | |
| if blank_lines <= 0: | |
| return csv_content | |
| new_lines = [] | |
| interval = total_lines / (blank_lines + 1) | |
| next_insertion = interval | |
| current_line = 0 | |
| for line in lines: | |
| new_lines.append(line) | |
| current_line += 1 | |
| if current_line >= next_insertion: | |
| new_lines.append("") | |
| next_insertion += interval | |
| return "\n".join(new_lines) + "\n" | |
| def write_bad_data_file(num_rows: int, num_errors: int = 0, out_dir: Union[str, Path] = ".", num_columns: int = 20) -> pd.DataFrame: | |
| """ | |
| Generate an ML event log DataFrame with deliberately injected errors and write it to Parquet, CSV, | |
| and a plain text file (DataFrame file). The DataFrame will include columns relevant to ML event logs. | |
| The first column is 'timestamp' followed by: | |
| - event_id, user_id, model_id, event_type, input_data, output_data, latency, confidence, error_message | |
| - If num_columns > 10, additional columns named attr_1, attr_2, ... are appended. | |
| Exactly num_errors errors are injected: some as blank lines (in CSV) and some as cell-level errors. | |
| """ | |
| if num_columns < 10: | |
| raise ValueError("At least 10 columns are required for ML event logs.") | |
| if num_columns > 100: | |
| num_columns = 100 | |
| predef_cols = ["timestamp", "event_id", "user_id", "model_id", "event_type", | |
| "input_data", "output_data", "latency", "confidence", "error_message"] | |
| extra_count = num_columns - len(predef_cols) | |
| extra_cols = [f"attr_{i+1}" for i in range(extra_count)] | |
| columns = predef_cols + extra_cols | |
| data = {} | |
| data["timestamp"] = [pd.Timestamp("1970-01-01") + pd.to_timedelta(i, unit="s") for i in range(num_rows)] | |
| data["event_id"] = np.random.randint(1000, 10000, size=num_rows).tolist() | |
| data["user_id"] = np.random.randint(1, 101, size=num_rows).tolist() | |
| data["model_id"] = np.random.randint(1, 51, size=num_rows).tolist() | |
| valid_event_types = ["prediction", "error", "log"] | |
| data["event_type"] = np.random.choice(valid_event_types, size=num_rows, p=[0.8, 0.1, 0.1]).tolist() | |
| data["input_data"] = [f"input_{i}" for i in range(num_rows)] | |
| data["output_data"] = [f"output_{i}" for i in range(num_rows)] | |
| data["latency"] = np.random.rand(num_rows).tolist() | |
| data["confidence"] = np.random.rand(num_rows).tolist() | |
| # Only add an error message if the event_type is "error"; otherwise keep it empty. | |
| data["error_message"] = [("Error occurred" if et == "error" else "") for et in data["event_type"]] | |
| for col in extra_cols: | |
| data[col] = np.random.rand(num_rows).tolist() | |
| df = pd.DataFrame(data, columns=columns) | |
| total_cells = num_rows * num_columns | |
| # Randomly determine how many blank lines vs. cell errors. | |
| num_blank_lines = 0 | |
| num_cell_errors = 0 | |
| for i in range(num_errors): | |
| if random.random() < 0.5: | |
| num_blank_lines += 1 | |
| else: | |
| num_cell_errors += 1 | |
| if num_cell_errors > total_cells: | |
| excess = num_cell_errors - total_cells | |
| num_cell_errors = total_cells | |
| num_blank_lines += excess | |
| # Inject cell errors. | |
| if num_cell_errors > 0: | |
| error_indices = np.random.choice(total_cells, size=num_cell_errors, replace=False) | |
| for idx in error_indices: | |
| row_idx = idx // num_columns | |
| col_idx = idx % num_columns | |
| col_name = columns[col_idx] | |
| if not pd.api.types.is_object_dtype(df[col_name]): | |
| df[col_name] = df[col_name].astype(object) | |
| df.at[row_idx, col_name] = get_bad_value_for_column(col_name) | |
| out_path = Path(out_dir).resolve() | |
| out_path.mkdir(parents=True, exist_ok=True) | |
| parquet_file = out_path / "bad_data.parquet" | |
| csv_file = out_path / "bad_data.csv" | |
| dataframe_file = out_path / "bad_data.txt" | |
| # Write Parquet | |
| try: | |
| df.to_parquet(parquet_file, engine="pyarrow", index=False) | |
| logging.info("Parquet file '%s' created with %d rows and %d columns.", parquet_file.name, num_rows, num_columns) | |
| except Exception as e: | |
| logging.warning("Initial Parquet write failed: %s", e) | |
| logging.info("Falling back to converting columns to string for file write.") | |
| df_string = df.astype(str) | |
| try: | |
| df_string.to_parquet(parquet_file, engine="pyarrow", index=False) | |
| logging.info("Parquet file '%s' created with %d rows and %d columns (all columns as strings).", | |
| parquet_file.name, num_rows, num_columns) | |
| except Exception as e2: | |
| logging.error("Fallback Parquet write also failed: %s", e2) | |
| # Write CSV (with blank lines) | |
| try: | |
| csv_content = df.to_csv(index=False) | |
| if num_blank_lines > 0: | |
| csv_content = insert_blank_lines(csv_content, num_blank_lines) | |
| logging.info("Inserted %d blank lines into CSV content.", num_blank_lines) | |
| with open(csv_file, "w") as f: | |
| f.write(csv_content) | |
| effective_rows = num_rows + num_blank_lines | |
| logging.info("CSV file '%s' created with %d rows (including %d blank lines) and %d columns.", | |
| csv_file.name, effective_rows, num_blank_lines, num_columns) | |
| except Exception as e: | |
| logging.warning("Initial CSV write failed: %s", e) | |
| logging.info("Falling back to converting columns to string for file write.") | |
| df_string = df.astype(str) | |
| try: | |
| csv_content = df_string.to_csv(index=False) | |
| if num_blank_lines > 0: | |
| csv_content = insert_blank_lines(csv_content, num_blank_lines) | |
| logging.info("Inserted %d blank lines into CSV content.", num_blank_lines) | |
| with open(csv_file, "w") as f: | |
| f.write(csv_content) | |
| effective_rows = num_rows + num_blank_lines | |
| logging.info("CSV file '%s' created with %d rows and %d columns (all columns as strings).", | |
| csv_file.name, effective_rows, num_blank_lines, num_columns) | |
| except Exception as e2: | |
| logging.error("Fallback CSV write also failed: %s", e2) | |
| # Write the DataFrame to its own file (plain text representation). | |
| try: | |
| with open(dataframe_file, "w") as f: | |
| f.write(df.to_string()) | |
| logging.info("DataFrame file '%s' created with %d rows and %d columns.", dataframe_file.name, num_rows, num_columns) | |
| except Exception as e: | |
| logging.error("Failed to write DataFrame file: %s", e) | |
| return df | |
| def validate_csv_file(out_dir: Union[str, Path] = ".") -> None: | |
| """ | |
| Validate the CSV file row by row using ML event log expectations. | |
| For each cell, any error is recorded. | |
| Expected data types: | |
| - timestamp: valid timestamp string. | |
| - event_id, user_id, model_id: integer. | |
| - event_type: string in {"prediction", "error", "log"}. | |
| - input_data, output_data: non-empty string. | |
| - error_message: if event_type is "error", must be non-empty; otherwise, empty is allowed. | |
| - latency, confidence, and extra columns (attr_*): float. | |
| Entire blank rows are also flagged. | |
| The errors are output in an evenly spaced table for the first five columns, | |
| with the Solution column printed in full. | |
| """ | |
| out_path = Path(out_dir).resolve() | |
| csv_file = out_path / "bad_data.csv" | |
| logging.info("Validating CSV file at %s", out_path) | |
| try: | |
| df_csv = pd.read_csv(csv_file, keep_default_na=False, skip_blank_lines=False) | |
| logging.info("CSV file '%s' loaded successfully with shape %s.", csv_file.name, df_csv.shape) | |
| except Exception as e: | |
| logging.error("CSV file loading ERROR: %s", e) | |
| return | |
| valid_event_types = {"prediction", "error", "log"} | |
| error_rows = [] | |
| for idx, row in df_csv.iterrows(): | |
| file_line = idx + 2 # Account for header. | |
| if all((pd.isna(cell) or str(cell).strip() == "") for cell in row): | |
| error_rows.append((file_line, idx, "Entire Row", "non-null", "", "Remove blank row or provide valid data.")) | |
| continue | |
| for col in df_csv.columns: | |
| value = row[col] | |
| # error_message can be empty if event_type != "error" | |
| if col == "error_message": | |
| if row["event_type"] == "error" and not value.strip(): | |
| error_rows.append((file_line, idx, col, "non-null", value, "Provide a valid error message.")) | |
| continue | |
| else: | |
| if pd.isna(value) or not value.strip(): | |
| error_rows.append((file_line, idx, col, "non-null", value, "Provide a valid value.")) | |
| continue | |
| if col == "timestamp": | |
| try: | |
| parsed = pd.to_datetime(value, errors='coerce') | |
| if pd.isnull(parsed): | |
| raise ValueError | |
| except Exception: | |
| error_rows.append((file_line, idx, col, "valid timestamp", value, | |
| "Use format 'YYYY-MM-DD HH:MM:SS' or a valid timestamp string.")) | |
| elif col in ["event_id", "user_id", "model_id"]: | |
| try: | |
| _ = int(float(value)) | |
| except Exception: | |
| error_rows.append((file_line, idx, col, "integer", value, | |
| "Provide an integer value (e.g. 42).")) | |
| elif col == "event_type": | |
| if value not in valid_event_types: | |
| error_rows.append((file_line, idx, col, f"one of {valid_event_types}", value, | |
| f"Use one of {valid_event_types}.")) | |
| elif col in ["input_data", "output_data"]: | |
| if not isinstance(value, str): | |
| error_rows.append((file_line, idx, col, "string", value, | |
| "Provide a valid string value.")) | |
| elif col in ["latency", "confidence"] or col.startswith("attr_"): | |
| try: | |
| _ = float(value) | |
| except Exception: | |
| error_rows.append((file_line, idx, col, "float", value, | |
| "Provide a numeric value (e.g. 3.14).")) | |
| if error_rows: | |
| # Header for first five columns (fixed width) plus "Solution" with no truncation: | |
| header = ( | |
| f"{format_field('Line')}| {format_field('Row')}| {format_field('Column')}| " | |
| f"{format_field('Expected')}| {format_field('Rejected')}| Solution" | |
| ) | |
| print(header) | |
| # Separator line: 5 columns * FIELD_WIDTH + 5 separators (each 2 chars: "| ") + len("Solution") | |
| dash_count = (FIELD_WIDTH * 5) + (2 * 5) + len("Solution") | |
| print("-" * dash_count) | |
| for line, row, col, expected, rejected, solution in error_rows: | |
| # Format the first five columns with fixed width: | |
| row_str = ( | |
| f"{format_field(line)}| {format_field(row)}| {format_field(col)}| " | |
| f"{format_field(expected)}| {format_field(rejected)}| {solution}" | |
| ) | |
| print(row_str) | |
| else: | |
| print("No validation errors found.") | |
| def parse_args() -> argparse.Namespace: | |
| """ | |
| Parse command-line arguments. | |
| In generation mode, -e specifies the total number of errors to inject. | |
| In validation mode, the CSV file is validated without generating new data. | |
| """ | |
| parser = argparse.ArgumentParser( | |
| description="Generate ML event log files with deliberately bad data injected, or validate an existing CSV file." | |
| ) | |
| parser.add_argument( | |
| "-r", "--rows", | |
| type=int, | |
| default=100, | |
| help="Number of rows to generate (default: 100). Ignored if --validate is used.", | |
| ) | |
| parser.add_argument( | |
| "-e", "--errors", | |
| type=int, | |
| default=0, | |
| help="Total number of errors to inject (cell errors and blank line errors combined, default: 0).", | |
| ) | |
| parser.add_argument( | |
| "-c", "--columns", | |
| type=int, | |
| default=20, | |
| help="Total number of columns in the ML event log (default: 20, maximum: 100).", | |
| ) | |
| parser.add_argument( | |
| "-v", "--validate", | |
| action="store_true", | |
| help="Validate the output CSV file without generating new data.", | |
| ) | |
| parser.add_argument( | |
| "-o", "--out_dir", | |
| type=str, | |
| default=".", | |
| help="Output directory for the generated files (default: current directory).", | |
| ) | |
| return parser.parse_args() | |
| def main() -> None: | |
| args = parse_args() | |
| if args.validate: | |
| validate_csv_file(args.out_dir) | |
| else: | |
| write_bad_data_file(args.rows, args.errors, args.out_dir, num_columns=args.columns) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment