Created
June 2, 2025 02:08
-
-
Save yeiichi/bcf210d459525ef8c044b1d03f247ab2 to your computer and use it in GitHub Desktop.
Make a CSV file to be square by removing irregular rows
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import csv | |
| import logging | |
| from collections import Counter | |
| from pathlib import Path | |
| logging.basicConfig(level=logging.INFO) | |
| class CSVSquarer: | |
| """ | |
| A utility class for processing CSV files by removing irregular rows and | |
| ensuring the file is square (uniform row lengths). | |
| This class reads a CSV file, identifies and extracts rows with the most | |
| common length, and saves these rows to a new CSV file. Any rows that | |
| do not conform to the most common row length are excluded, resulting in | |
| a clean and rectangular file format. | |
| Attributes: | |
| _in_filepath (Path): Path to the input CSV file. | |
| _out_filepath (Path): Path to the output CSV file where the processed | |
| rows will be saved. | |
| """ | |
| def __init__(self): | |
| self._in_filepath = self.get_input_file_path() | |
| self._out_filepath = self._in_filepath.parent / f"{self._in_filepath.stem}_squared.csv" | |
| @staticmethod | |
| def get_input_file_path() -> Path: | |
| """Prompt the user for a valid CSV file path.""" | |
| while True: | |
| try: | |
| filepath = Path(input("Enter the CSV file path: ").strip()) | |
| if not filepath.is_file(): | |
| raise FileNotFoundError("File does not exist.") | |
| if filepath.suffix.lower() != '.csv': | |
| raise ValueError("The file must have a '.csv' extension.") | |
| return filepath | |
| except (FileNotFoundError, ValueError) as e: | |
| print(f"Error: {e}") | |
| def read_csv(self, encoding="utf-8"): | |
| """Read and return the content of the CSV file with a default encoding.""" | |
| try: | |
| with self._in_filepath.open(newline='', encoding=encoding) as csvfile: | |
| reader = csv.reader(csvfile) | |
| data = list(reader) | |
| if not data: | |
| raise ValueError("The CSV file is empty.") | |
| return data | |
| except Exception as e: | |
| print(f"Error reading CSV file: {e}") | |
| raise | |
| @staticmethod | |
| def get_most_frequent_row_length(rows): | |
| """Find the most common row length in the given rows.""" | |
| lengths = [len(row) for row in rows] | |
| counter = Counter(lengths) | |
| return counter.most_common(1)[0][0] | |
| def extract_target_rows(self): | |
| """Extract rows with the most frequent length using a streaming approach.""" | |
| rows_to_save = [] | |
| irregular_row_count = 0 | |
| with self._in_filepath.open(newline='') as csvfile: | |
| reader = csv.reader(csvfile) | |
| # Count row lengths in a single pass | |
| lengths = Counter(len(row) for row in reader) | |
| target_length = lengths.most_common(1)[0][0] | |
| # Reset file pointer and process rows | |
| csvfile.seek(0) | |
| for row in reader: | |
| if len(row) == target_length: | |
| rows_to_save.append(row) | |
| else: | |
| irregular_row_count += 1 | |
| if irregular_row_count > 0: | |
| print(f"Warning: {irregular_row_count} rows with irregular lengths were ignored.") | |
| return rows_to_save | |
| def save_array_to_csv(self): | |
| """Save extracted rows to a new CSV file.""" | |
| rows_to_save = self.extract_target_rows() | |
| with self._out_filepath.open(mode='w', newline='') as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerows(rows_to_save) | |
| self.display_message(self._out_filepath) | |
| @staticmethod | |
| def display_message(filepath): | |
| """Display a message about the saved file.""" | |
| logging.info(f'Saved: {filepath}') | |
| def main(): | |
| instance = CSVSquarer() | |
| instance.save_array_to_csv() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment