Skip to content

Instantly share code, notes, and snippets.

@yeiichi
Created June 2, 2025 02:08
Show Gist options
  • Select an option

  • Save yeiichi/bcf210d459525ef8c044b1d03f247ab2 to your computer and use it in GitHub Desktop.

Select an option

Save yeiichi/bcf210d459525ef8c044b1d03f247ab2 to your computer and use it in GitHub Desktop.
Make a CSV file to be square by removing irregular rows
#!/usr/bin/env python3
import csv
import logging
from collections import Counter
from pathlib import Path
logging.basicConfig(level=logging.INFO)
class CSVSquarer:
"""
A utility class for processing CSV files by removing irregular rows and
ensuring the file is square (uniform row lengths).
This class reads a CSV file, identifies and extracts rows with the most
common length, and saves these rows to a new CSV file. Any rows that
do not conform to the most common row length are excluded, resulting in
a clean and rectangular file format.
Attributes:
_in_filepath (Path): Path to the input CSV file.
_out_filepath (Path): Path to the output CSV file where the processed
rows will be saved.
"""
def __init__(self):
self._in_filepath = self.get_input_file_path()
self._out_filepath = self._in_filepath.parent / f"{self._in_filepath.stem}_squared.csv"
@staticmethod
def get_input_file_path() -> Path:
"""Prompt the user for a valid CSV file path."""
while True:
try:
filepath = Path(input("Enter the CSV file path: ").strip())
if not filepath.is_file():
raise FileNotFoundError("File does not exist.")
if filepath.suffix.lower() != '.csv':
raise ValueError("The file must have a '.csv' extension.")
return filepath
except (FileNotFoundError, ValueError) as e:
print(f"Error: {e}")
def read_csv(self, encoding="utf-8"):
"""Read and return the content of the CSV file with a default encoding."""
try:
with self._in_filepath.open(newline='', encoding=encoding) as csvfile:
reader = csv.reader(csvfile)
data = list(reader)
if not data:
raise ValueError("The CSV file is empty.")
return data
except Exception as e:
print(f"Error reading CSV file: {e}")
raise
@staticmethod
def get_most_frequent_row_length(rows):
"""Find the most common row length in the given rows."""
lengths = [len(row) for row in rows]
counter = Counter(lengths)
return counter.most_common(1)[0][0]
def extract_target_rows(self):
"""Extract rows with the most frequent length using a streaming approach."""
rows_to_save = []
irregular_row_count = 0
with self._in_filepath.open(newline='') as csvfile:
reader = csv.reader(csvfile)
# Count row lengths in a single pass
lengths = Counter(len(row) for row in reader)
target_length = lengths.most_common(1)[0][0]
# Reset file pointer and process rows
csvfile.seek(0)
for row in reader:
if len(row) == target_length:
rows_to_save.append(row)
else:
irregular_row_count += 1
if irregular_row_count > 0:
print(f"Warning: {irregular_row_count} rows with irregular lengths were ignored.")
return rows_to_save
def save_array_to_csv(self):
"""Save extracted rows to a new CSV file."""
rows_to_save = self.extract_target_rows()
with self._out_filepath.open(mode='w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(rows_to_save)
self.display_message(self._out_filepath)
@staticmethod
def display_message(filepath):
"""Display a message about the saved file."""
logging.info(f'Saved: {filepath}')
def main():
instance = CSVSquarer()
instance.save_array_to_csv()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment