Skip to content

Instantly share code, notes, and snippets.

@Cabeda
Last active March 19, 2025 19:29
Show Gist options
  • Save Cabeda/a0b532b2ea8456ad61cd56bb0bc69550 to your computer and use it in GitHub Desktop.
Save Cabeda/a0b532b2ea8456ad61cd56bb0bc69550 to your computer and use it in GitHub Desktop.
Script to get word cloud of documents
#!/usr/bin/env -S uv run
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "pypdf",
# "python-docx",
# "python-pptx",
# "nltk",
# "tqdm",
# "matplotlib",
# "wordcloud",
# ]
# ///
import os
import re
import argparse
import json
import csv
from collections import Counter
from pathlib import Path
import string
import time
from typing import Dict, List, Set, Tuple, Optional, Union
# Required third-party libraries
import pypdf
from docx import Document
from pptx import Presentation
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.util import ngrams
from tqdm import tqdm
nltk.download('punkt_tab')
# Optional visualization imports
try:
import matplotlib.pyplot as plt
from wordcloud import WordCloud
VISUALIZATION_AVAILABLE = True
except ImportError:
VISUALIZATION_AVAILABLE = False
# Download NLTK resources if not already downloaded
try:
nltk.data.find('tokenizers/punkt')
nltk.data.find('corpora/stopwords')
except LookupError:
nltk.download('punkt')
nltk.download('stopwords')
def extract_text_from_pdf(file_path: str) -> str:
"""Extract text content from a PDF file"""
text = ""
try:
with open(file_path, 'rb') as file:
reader = pypdf.PdfReader(file)
for page in reader.pages:
text += page.extract_text() or ""
except Exception as e:
print(f"Error processing PDF {file_path}: {e}")
return text
def extract_text_from_docx(file_path: str) -> str:
"""Extract text content from a Word document"""
text = ""
try:
doc = Document(file_path)
for para in doc.paragraphs:
text += para.text + "\n"
except Exception as e:
print(f"Error processing DOCX {file_path}: {e}")
return text
def extract_text_from_pptx(file_path: str) -> str:
"""Extract text content from a PowerPoint presentation"""
text = ""
try:
presentation = Presentation(file_path)
for slide in presentation.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
text += shape.text + "\n"
except Exception as e:
print(f"Error processing PPTX {file_path}: {e}")
return text
def extract_text_from_txt(file_path: str) -> str:
"""Extract text content from a plain text file"""
text = ""
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
text = file.read()
except Exception as e:
print(f"Error processing TXT {file_path}: {e}")
return text
def extract_text_from_csv(file_path: str) -> str:
"""Extract text content from a CSV file"""
text = ""
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
csv_reader = csv.reader(file)
for row in csv_reader:
text += " ".join(row) + "\n"
except Exception as e:
print(f"Error processing CSV {file_path}: {e}")
return text
def process_text(text: str, stop_words: Set[str] = None, min_length: int = 3, include_words: List[str] = None) -> List[str]:
"""
Process text by converting to lowercase, removing punctuation, and
filtering out short words and stopwords. If include_words is provided,
only those words will be included.
"""
# Convert to lowercase and remove punctuation
text = text.lower()
text = re.sub(f'[{re.escape(string.punctuation)}]', ' ', text)
# Tokenize and filter
words = word_tokenize(text)
# Filter out short words and stopwords
if stop_words is None:
stop_words = set(stopwords.words('english'))
filtered_words = [word for word in words
if len(word) >= min_length
and word.isalpha()
and word not in stop_words]
# Apply inclusion filter if provided
if include_words:
include_words_set = {word.lower() for word in include_words}
filtered_words = [word for word in filtered_words if word in include_words_set]
return filtered_words
def extract_ngrams(words: List[str], n: int = 2) -> List[str]:
"""Extract n-grams from list of words"""
n_grams = ngrams(words, n)
return [' '.join(gram) for gram in n_grams]
def analyze_documents(folder_path: Union[str, Path], top_n: int = 20, min_word_length: int = 3,
include_extensions: List[str] = None, exclude_words: List[str] = None,
include_words: List[str] = None, ngram_range: List[int] = None,
cache_file: str = None, progress_bar: bool = True) -> Dict[str, List[Tuple[str, int]]]:
"""
Analyze documents in a folder and return the most frequent words and n-grams
"""
if include_extensions is None:
include_extensions = ['.pdf', '.docx', '' '.pptx', '.txt', '.csv']
if ngram_range is None:
ngram_range = [1] # Default to unigrams only
# Prepare stopwords
stop_words = set(stopwords.words('english'))
if exclude_words:
stop_words.update(exclude_words)
# Initialize counters for words and different n-grams
counters = {f"{n}-gram": Counter() for n in ngram_range}
# Load cache if available
cache = {}
if cache_file and os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
cache = json.load(f)
except:
print(f"Could not load cache file {cache_file}")
# Prepare file list
file_list = []
if os.path.isfile(folder_path):
# If folder_path is actually a file
extension = os.path.splitext(folder_path)[1].lower()
if extension in include_extensions:
file_list.append((os.path.abspath(folder_path), extension))
else:
# Normal directory walk
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
extension = os.path.splitext(file)[1].lower()
if extension in include_extensions:
file_list.append((file_path, extension))
# Process files with progress bar if enabled
files_to_process = tqdm(file_list) if progress_bar else file_list
for file_path, extension in files_to_process:
if progress_bar:
files_to_process.set_description(f"Processing {os.path.basename(file_path)}")
# Check if file is in cache and hasn't been modified since
file_mtime = os.path.getmtime(file_path)
cache_key = f"{file_path}:{file_mtime}"
if cache_key in cache:
words = cache[cache_key]
else:
# Extract text based on file type
text = ""
if extension == '.pdf':
text = extract_text_from_pdf(file_path)
elif extension == '.docx':
text = extract_text_from_docx(file_path)
elif extension == '.pptx':
text = extract_text_from_pptx(file_path)
elif extension == '.txt':
text = extract_text_from_txt(file_path)
elif extension == '.csv':
text = extract_text_from_csv(file_path)
# Process text
words = process_text(text, stop_words, min_word_length, include_words)
# Update cache
if cache_file:
cache[cache_key] = words
# Update word counter
counters["1-gram"].update(words)
# Extract and count n-grams
for n in ngram_range:
if n > 1: # Skip unigrams as they're handled above
n_grams = extract_ngrams(words, n)
counters[f"{n}-gram"].update(n_grams)
# Save cache
if cache_file:
with open(cache_file, 'w') as f:
json.dump(cache, f)
# Get top N for each counter
results = {key: counter.most_common(top_n) for key, counter in counters.items()}
return results
def export_results(results: Dict[str, List[Tuple[str, int]]], output_file: str):
"""Export results to CSV or JSON"""
ext = os.path.splitext(output_file)[1].lower()
if ext == '.csv':
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# Write headers
headers = ["Type", "Word/Phrase", "Count"]
writer.writerow(headers)
# Write data
for gram_type, word_counts in results.items():
for word, count in word_counts:
writer.writerow([gram_type, word, count])
print(f"Results exported to {output_file}")
elif ext == '.json':
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2)
print(f"Results exported to {output_file}")
else:
print(f"Unsupported export format: {ext}")
def generate_wordcloud(word_counts: List[Tuple[str, int]], output_file: str = None):
"""Generate and optionally save a word cloud visualization"""
if not VISUALIZATION_AVAILABLE:
print("Visualization libraries (matplotlib/wordcloud) not available.")
print("Install with: pip install matplotlib wordcloud")
return
# Convert to dictionary for WordCloud
word_dict = {word: count for word, count in word_counts}
# Create word cloud
wordcloud = WordCloud(width=800, height=400, background_color='white',
max_words=100, relative_scaling=1.0).generate_from_frequencies(word_dict)
plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
if output_file:
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f"Word cloud saved to {output_file}")
else:
plt.show()
def main():
parser = argparse.ArgumentParser(
description="Analyze word frequencies across document files (PDF, DOCX, PPTX, TXT, CSV)"
)
parser.add_argument("path", help="Path to file or folder containing documents")
parser.add_argument("-n", "--top-n", type=int, default=20,
help="Number of top words to display (default: 20)")
parser.add_argument("-l", "--min-length", type=int, default=3,
help="Minimum word length to consider (default: 3)")
parser.add_argument("-e", "--extensions", nargs="+",
default=['.pdf', '.docx', '.pptx', '.txt', '.csv'],
help="File extensions to process (default: .pdf .docx .pptx .txt .csv)")
parser.add_argument("-x", "--exclude", nargs="+", default=[],
help="Additional words to exclude")
parser.add_argument("-i", "--include", nargs="+", default=[],
help="Only include specified words in analysis")
parser.add_argument("-g", "--ngram", type=int, nargs="+", default=[1],
help="N-gram sizes to analyze (default: 1)")
parser.add_argument("-o", "--output", type=str,
help="Output file for results (.csv or .json)")
parser.add_argument("-c", "--cache", type=str,
help="Cache file to speed up repeated analysis")
parser.add_argument("-w", "--wordcloud", type=str,
help="Generate word cloud visualization and save to file")
parser.add_argument("-q", "--quiet", action="store_true",
help="Disable progress bar")
args = parser.parse_args()
# Ensure extensions have dots
extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in args.extensions]
start_time = time.time()
# Analyze documents
results = analyze_documents(
args.path,
args.top_n,
args.min_length,
extensions,
args.exclude,
args.include,
args.ngram,
args.cache,
not args.quiet
)
# Print execution time
elapsed = time.time() - start_time
print(f"\nAnalysis completed in {elapsed:.2f} seconds")
# Print results
for gram_type, word_counts in results.items():
if not word_counts:
continue
print(f"\nTop {args.top_n} {gram_type}s by frequency:")
print("-" * 40)
max_word_len = max((len(word) for word, _ in word_counts), default=10)
for word, count in word_counts:
print(f"{word:{max_word_len}} : {count}")
# Export if requested
if args.output:
export_results(results, args.output)
# Generate word cloud if requested
if args.wordcloud and "1-gram" in results:
generate_wordcloud(results["1-gram"], args.wordcloud)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment