Last active
March 19, 2025 19:29
-
-
Save Cabeda/a0b532b2ea8456ad61cd56bb0bc69550 to your computer and use it in GitHub Desktop.
Script to get word cloud of documents
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run | |
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "pypdf", | |
# "python-docx", | |
# "python-pptx", | |
# "nltk", | |
# "tqdm", | |
# "matplotlib", | |
# "wordcloud", | |
# ] | |
# /// | |
import os | |
import re | |
import argparse | |
import json | |
import csv | |
from collections import Counter | |
from pathlib import Path | |
import string | |
import time | |
from typing import Dict, List, Set, Tuple, Optional, Union | |
# Required third-party libraries | |
import pypdf | |
from docx import Document | |
from pptx import Presentation | |
import nltk | |
from nltk.corpus import stopwords | |
from nltk.tokenize import word_tokenize | |
from nltk.util import ngrams | |
from tqdm import tqdm | |
nltk.download('punkt_tab') | |
# Optional visualization imports | |
try: | |
import matplotlib.pyplot as plt | |
from wordcloud import WordCloud | |
VISUALIZATION_AVAILABLE = True | |
except ImportError: | |
VISUALIZATION_AVAILABLE = False | |
# Download NLTK resources if not already downloaded | |
try: | |
nltk.data.find('tokenizers/punkt') | |
nltk.data.find('corpora/stopwords') | |
except LookupError: | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
def extract_text_from_pdf(file_path: str) -> str: | |
"""Extract text content from a PDF file""" | |
text = "" | |
try: | |
with open(file_path, 'rb') as file: | |
reader = pypdf.PdfReader(file) | |
for page in reader.pages: | |
text += page.extract_text() or "" | |
except Exception as e: | |
print(f"Error processing PDF {file_path}: {e}") | |
return text | |
def extract_text_from_docx(file_path: str) -> str: | |
"""Extract text content from a Word document""" | |
text = "" | |
try: | |
doc = Document(file_path) | |
for para in doc.paragraphs: | |
text += para.text + "\n" | |
except Exception as e: | |
print(f"Error processing DOCX {file_path}: {e}") | |
return text | |
def extract_text_from_pptx(file_path: str) -> str: | |
"""Extract text content from a PowerPoint presentation""" | |
text = "" | |
try: | |
presentation = Presentation(file_path) | |
for slide in presentation.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
text += shape.text + "\n" | |
except Exception as e: | |
print(f"Error processing PPTX {file_path}: {e}") | |
return text | |
def extract_text_from_txt(file_path: str) -> str: | |
"""Extract text content from a plain text file""" | |
text = "" | |
try: | |
with open(file_path, 'r', encoding='utf-8', errors='replace') as file: | |
text = file.read() | |
except Exception as e: | |
print(f"Error processing TXT {file_path}: {e}") | |
return text | |
def extract_text_from_csv(file_path: str) -> str: | |
"""Extract text content from a CSV file""" | |
text = "" | |
try: | |
with open(file_path, 'r', encoding='utf-8', errors='replace') as file: | |
csv_reader = csv.reader(file) | |
for row in csv_reader: | |
text += " ".join(row) + "\n" | |
except Exception as e: | |
print(f"Error processing CSV {file_path}: {e}") | |
return text | |
def process_text(text: str, stop_words: Set[str] = None, min_length: int = 3, include_words: List[str] = None) -> List[str]: | |
""" | |
Process text by converting to lowercase, removing punctuation, and | |
filtering out short words and stopwords. If include_words is provided, | |
only those words will be included. | |
""" | |
# Convert to lowercase and remove punctuation | |
text = text.lower() | |
text = re.sub(f'[{re.escape(string.punctuation)}]', ' ', text) | |
# Tokenize and filter | |
words = word_tokenize(text) | |
# Filter out short words and stopwords | |
if stop_words is None: | |
stop_words = set(stopwords.words('english')) | |
filtered_words = [word for word in words | |
if len(word) >= min_length | |
and word.isalpha() | |
and word not in stop_words] | |
# Apply inclusion filter if provided | |
if include_words: | |
include_words_set = {word.lower() for word in include_words} | |
filtered_words = [word for word in filtered_words if word in include_words_set] | |
return filtered_words | |
def extract_ngrams(words: List[str], n: int = 2) -> List[str]: | |
"""Extract n-grams from list of words""" | |
n_grams = ngrams(words, n) | |
return [' '.join(gram) for gram in n_grams] | |
def analyze_documents(folder_path: Union[str, Path], top_n: int = 20, min_word_length: int = 3, | |
include_extensions: List[str] = None, exclude_words: List[str] = None, | |
include_words: List[str] = None, ngram_range: List[int] = None, | |
cache_file: str = None, progress_bar: bool = True) -> Dict[str, List[Tuple[str, int]]]: | |
""" | |
Analyze documents in a folder and return the most frequent words and n-grams | |
""" | |
if include_extensions is None: | |
include_extensions = ['.pdf', '.docx', '' '.pptx', '.txt', '.csv'] | |
if ngram_range is None: | |
ngram_range = [1] # Default to unigrams only | |
# Prepare stopwords | |
stop_words = set(stopwords.words('english')) | |
if exclude_words: | |
stop_words.update(exclude_words) | |
# Initialize counters for words and different n-grams | |
counters = {f"{n}-gram": Counter() for n in ngram_range} | |
# Load cache if available | |
cache = {} | |
if cache_file and os.path.exists(cache_file): | |
try: | |
with open(cache_file, 'r') as f: | |
cache = json.load(f) | |
except: | |
print(f"Could not load cache file {cache_file}") | |
# Prepare file list | |
file_list = [] | |
if os.path.isfile(folder_path): | |
# If folder_path is actually a file | |
extension = os.path.splitext(folder_path)[1].lower() | |
if extension in include_extensions: | |
file_list.append((os.path.abspath(folder_path), extension)) | |
else: | |
# Normal directory walk | |
for root, _, files in os.walk(folder_path): | |
for file in files: | |
file_path = os.path.join(root, file) | |
extension = os.path.splitext(file)[1].lower() | |
if extension in include_extensions: | |
file_list.append((file_path, extension)) | |
# Process files with progress bar if enabled | |
files_to_process = tqdm(file_list) if progress_bar else file_list | |
for file_path, extension in files_to_process: | |
if progress_bar: | |
files_to_process.set_description(f"Processing {os.path.basename(file_path)}") | |
# Check if file is in cache and hasn't been modified since | |
file_mtime = os.path.getmtime(file_path) | |
cache_key = f"{file_path}:{file_mtime}" | |
if cache_key in cache: | |
words = cache[cache_key] | |
else: | |
# Extract text based on file type | |
text = "" | |
if extension == '.pdf': | |
text = extract_text_from_pdf(file_path) | |
elif extension == '.docx': | |
text = extract_text_from_docx(file_path) | |
elif extension == '.pptx': | |
text = extract_text_from_pptx(file_path) | |
elif extension == '.txt': | |
text = extract_text_from_txt(file_path) | |
elif extension == '.csv': | |
text = extract_text_from_csv(file_path) | |
# Process text | |
words = process_text(text, stop_words, min_word_length, include_words) | |
# Update cache | |
if cache_file: | |
cache[cache_key] = words | |
# Update word counter | |
counters["1-gram"].update(words) | |
# Extract and count n-grams | |
for n in ngram_range: | |
if n > 1: # Skip unigrams as they're handled above | |
n_grams = extract_ngrams(words, n) | |
counters[f"{n}-gram"].update(n_grams) | |
# Save cache | |
if cache_file: | |
with open(cache_file, 'w') as f: | |
json.dump(cache, f) | |
# Get top N for each counter | |
results = {key: counter.most_common(top_n) for key, counter in counters.items()} | |
return results | |
def export_results(results: Dict[str, List[Tuple[str, int]]], output_file: str): | |
"""Export results to CSV or JSON""" | |
ext = os.path.splitext(output_file)[1].lower() | |
if ext == '.csv': | |
with open(output_file, 'w', newline='', encoding='utf-8') as f: | |
writer = csv.writer(f) | |
# Write headers | |
headers = ["Type", "Word/Phrase", "Count"] | |
writer.writerow(headers) | |
# Write data | |
for gram_type, word_counts in results.items(): | |
for word, count in word_counts: | |
writer.writerow([gram_type, word, count]) | |
print(f"Results exported to {output_file}") | |
elif ext == '.json': | |
with open(output_file, 'w', encoding='utf-8') as f: | |
json.dump(results, f, indent=2) | |
print(f"Results exported to {output_file}") | |
else: | |
print(f"Unsupported export format: {ext}") | |
def generate_wordcloud(word_counts: List[Tuple[str, int]], output_file: str = None): | |
"""Generate and optionally save a word cloud visualization""" | |
if not VISUALIZATION_AVAILABLE: | |
print("Visualization libraries (matplotlib/wordcloud) not available.") | |
print("Install with: pip install matplotlib wordcloud") | |
return | |
# Convert to dictionary for WordCloud | |
word_dict = {word: count for word, count in word_counts} | |
# Create word cloud | |
wordcloud = WordCloud(width=800, height=400, background_color='white', | |
max_words=100, relative_scaling=1.0).generate_from_frequencies(word_dict) | |
plt.figure(figsize=(10, 6)) | |
plt.imshow(wordcloud, interpolation='bilinear') | |
plt.axis("off") | |
if output_file: | |
plt.savefig(output_file, dpi=300, bbox_inches='tight') | |
print(f"Word cloud saved to {output_file}") | |
else: | |
plt.show() | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Analyze word frequencies across document files (PDF, DOCX, PPTX, TXT, CSV)" | |
) | |
parser.add_argument("path", help="Path to file or folder containing documents") | |
parser.add_argument("-n", "--top-n", type=int, default=20, | |
help="Number of top words to display (default: 20)") | |
parser.add_argument("-l", "--min-length", type=int, default=3, | |
help="Minimum word length to consider (default: 3)") | |
parser.add_argument("-e", "--extensions", nargs="+", | |
default=['.pdf', '.docx', '.pptx', '.txt', '.csv'], | |
help="File extensions to process (default: .pdf .docx .pptx .txt .csv)") | |
parser.add_argument("-x", "--exclude", nargs="+", default=[], | |
help="Additional words to exclude") | |
parser.add_argument("-i", "--include", nargs="+", default=[], | |
help="Only include specified words in analysis") | |
parser.add_argument("-g", "--ngram", type=int, nargs="+", default=[1], | |
help="N-gram sizes to analyze (default: 1)") | |
parser.add_argument("-o", "--output", type=str, | |
help="Output file for results (.csv or .json)") | |
parser.add_argument("-c", "--cache", type=str, | |
help="Cache file to speed up repeated analysis") | |
parser.add_argument("-w", "--wordcloud", type=str, | |
help="Generate word cloud visualization and save to file") | |
parser.add_argument("-q", "--quiet", action="store_true", | |
help="Disable progress bar") | |
args = parser.parse_args() | |
# Ensure extensions have dots | |
extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in args.extensions] | |
start_time = time.time() | |
# Analyze documents | |
results = analyze_documents( | |
args.path, | |
args.top_n, | |
args.min_length, | |
extensions, | |
args.exclude, | |
args.include, | |
args.ngram, | |
args.cache, | |
not args.quiet | |
) | |
# Print execution time | |
elapsed = time.time() - start_time | |
print(f"\nAnalysis completed in {elapsed:.2f} seconds") | |
# Print results | |
for gram_type, word_counts in results.items(): | |
if not word_counts: | |
continue | |
print(f"\nTop {args.top_n} {gram_type}s by frequency:") | |
print("-" * 40) | |
max_word_len = max((len(word) for word, _ in word_counts), default=10) | |
for word, count in word_counts: | |
print(f"{word:{max_word_len}} : {count}") | |
# Export if requested | |
if args.output: | |
export_results(results, args.output) | |
# Generate word cloud if requested | |
if args.wordcloud and "1-gram" in results: | |
generate_wordcloud(results["1-gram"], args.wordcloud) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment