Last active
July 2, 2025 23:56
-
-
Save xthesaintx/f4dd81c96d18bcf52c44b36b086b3c9d to your computer and use it in GitHub Desktop.
Merge timestamped .dote json files in chronological order according to "startTime" "endTime" "speakerDesignation" "text" with gui
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import json | |
import glob | |
import re | |
import os | |
from datetime import datetime | |
from collections import defaultdict | |
from os.path import expanduser, join, basename, isdir, dirname | |
import tkinter as tk | |
from tkinter import filedialog, ttk | |
from ttkbootstrap.dialogs import Messagebox | |
import ttkbootstrap as bstt | |
from ttkbootstrap.scrolled import ScrolledFrame | |
import matplotlib.pyplot as plt | |
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg | |
import matplotlib.font_manager | |
import numpy as np | |
import matplotlib.patches as patches | |
from matplotlib.ticker import FuncFormatter | |
try: | |
import pandas as pd | |
from wordcloud import WordCloud | |
import tkinterdnd2 as dnd # For drag-and-drop | |
except ImportError: | |
Messagebox.show_error( | |
"Missing required libraries. Please install them by running:\n\n" | |
"pip install pandas wordcloud tkinterdnd2", | |
"Dependency Error" | |
) | |
sys.exit(1) | |
### Core Processing Functions ### | |
def parse_time(time_str): | |
try: | |
return datetime.strptime(time_str, "%H:%M:%S,%f") | |
except ValueError: | |
return datetime.strptime(time_str, "%H:%M:%S") | |
def time_str_to_seconds(time_str): | |
try: | |
dt = datetime.strptime(time_str, "%H:%M:%S,%f") | |
except ValueError: | |
dt = datetime.strptime(time_str, "%H:%M:%S") | |
return dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond / 1_000_000 | |
def word_count(text): | |
return len(re.findall(r'\w+', text)) | |
### DOTE Processor Window ### | |
class DoteProcessorWindow(tk.Toplevel): | |
def __init__(self, master, main_app): | |
super().__init__(master) | |
self.main_app = main_app | |
self.title("Process DOTE Files") | |
self.geometry("800x600") | |
self.input_dir = tk.StringVar(value=self.main_app.dote_input_dir_var.get()) | |
self.output_dir = tk.StringVar(value=self.main_app.dote_output_dir_var.get()) | |
self.save_in_place_var = tk.BooleanVar(value=self.main_app.dote_save_in_place_var.get()) | |
self.tree_data = {} | |
self._setup_widgets() | |
self._populate_from_directory(self.input_dir.get()) | |
self.drop_target_register(dnd.DND_FILES) | |
self.dnd_bind('<<Drop>>', self._on_drop) | |
self._center_on_parent() | |
self.grab_set() | |
def _center_on_parent(self): | |
self.update_idletasks() | |
parent = self.master | |
parent_x, parent_y = parent.winfo_x(), parent.winfo_y() | |
parent_width, parent_height = parent.winfo_width(), parent.winfo_height() | |
self_width, self_height = self.winfo_width(), self.winfo_height() | |
x = parent_x + (parent_width // 2) - (self_width // 2) | |
y = parent_y + (parent_height // 2) - (self_height // 2) | |
self.geometry(f"+{x}+{y}") | |
def _setup_widgets(self): | |
main_frame = bstt.Frame(self, padding=10) | |
main_frame.pack(fill=tk.BOTH, expand=True) | |
in_frame = bstt.LabelFrame(main_frame, text="Input Directory (.dote files)", padding=10) | |
in_frame.pack(fill=tk.X, pady=(0, 10)) | |
bstt.Entry(in_frame, textvariable=self.input_dir, state="readonly").pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5)) | |
bstt.Button(in_frame, text="Browse...", command=self._browse_input_dir).pack(side=tk.LEFT) | |
tree_frame = bstt.LabelFrame(main_frame, text="Files to Process (Double-click speaker to edit)", padding=10) | |
tree_frame.pack(fill=tk.BOTH, expand=True, pady=5) | |
self.tree = bstt.Treeview(tree_frame, columns=("Speaker",), style='primary.Treeview') | |
self.tree.heading("#0", text="DOTE File") | |
self.tree.heading("Speaker", text="Speaker") | |
self.tree.column("#0", width=400, stretch=True) | |
self.tree.column("Speaker", width=200, stretch=True) | |
self.tree.pack(fill=tk.BOTH, expand=True) | |
self.tree.bind("<Double-1>", self._edit_cell) | |
bstt.Button(tree_frame, text="Clear List", command=self._clear_list, bootstyle="warning-outline").pack(side=tk.BOTTOM, anchor=tk.E, pady=(5, 0)) | |
out_toggle_frame = bstt.Frame(main_frame, padding=(0, 5, 0, 0)) | |
out_toggle_frame.pack(fill=tk.X) | |
bstt.Checkbutton(out_toggle_frame, variable=self.save_in_place_var, text="Output in same directory as DOTE file", | |
bootstyle="success-round-toggle", command=self._toggle_output_dir_state).pack(anchor=tk.W) | |
self.out_frame = bstt.LabelFrame(main_frame, text="Output Directory (for new .json files)", padding=10) | |
self.out_frame.pack(fill=tk.X, pady=(5, 0)) | |
self.out_dir_entry = bstt.Entry(self.out_frame, textvariable=self.output_dir, state="readonly") | |
self.out_dir_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5)) | |
self.out_dir_browse_btn = bstt.Button(self.out_frame, text="Browse...", command=self._browse_output_dir) | |
self.out_dir_browse_btn.pack(side=tk.LEFT) | |
action_frame = bstt.Frame(main_frame) | |
action_frame.pack(fill=tk.X, pady=(15, 0), anchor=tk.S) | |
bstt.Button(action_frame, text="Process", command=self._process_files, bootstyle="success").pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,5)) | |
bstt.Button(action_frame, text="Cancel", command=self.destroy, bootstyle="danger-outline").pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=(5,0)) | |
self._toggle_output_dir_state() | |
def destroy(self): | |
self.main_app.dote_input_dir_var.set(self.input_dir.get()) | |
self.main_app.dote_output_dir_var.set(self.output_dir.get()) | |
self.main_app.dote_save_in_place_var.set(self.save_in_place_var.get()) | |
super().destroy() | |
def _clear_list(self): | |
for item in self.tree.get_children(): | |
self.tree.delete(item) | |
self.tree_data.clear() | |
def _toggle_output_dir_state(self): | |
state = "disabled" if self.save_in_place_var.get() else "normal" | |
for child in self.out_frame.winfo_children(): child.config(state=state) | |
if state == 'normal': self.out_dir_entry.config(state="readonly") | |
def _browse_input_dir(self): | |
dir_name = filedialog.askdirectory(initialdir=self.input_dir.get(), title='Select Directory with DOTE files') | |
if dir_name: | |
self.input_dir.set(dir_name) | |
self._populate_from_directory(dir_name) | |
def _browse_output_dir(self): | |
dir_name = filedialog.askdirectory(initialdir=self.output_dir.get(), title='Select Output Directory') | |
if dir_name: self.output_dir.set(dir_name) | |
def _populate_from_directory(self, directory): | |
self._clear_list() | |
if not isdir(directory): return | |
try: | |
dote_files = sorted(glob.glob(join(directory, "*.dote"))) | |
for file_path in dote_files: self._add_dote_file_to_tree(file_path) | |
except Exception as e: Messagebox.show_error(f"Could not read directory:\n{e}", parent=self) | |
def _add_dote_file_to_tree(self, file_path): | |
if file_path in self.tree_data.values(): return | |
filename = basename(file_path) | |
speaker = self._get_first_speaker(file_path) | |
item_id = self.tree.insert("", tk.END, text=filename, values=(speaker,)) | |
self.tree_data[item_id] = file_path | |
def _get_first_speaker(self, file_path): | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) | |
return data.get("lines", [])[0].get("speakerDesignation", "Unknown") | |
except (IOError, json.JSONDecodeError, IndexError): return "Error/Empty" | |
def _on_drop(self, event): | |
filepaths = self.tk.splitlist(event.data) | |
for file_path in sorted([f for f in filepaths if f.lower().endswith('.dote')]): | |
self._add_dote_file_to_tree(file_path) | |
def _edit_cell(self, event): | |
if self.tree.identify_region(event.x, event.y) != "cell": return | |
column, item_id = self.tree.identify_column(event.x), self.tree.identify_row(event.y) | |
if column != "#1" or not item_id: return | |
x, y, width, height = self.tree.bbox(item_id, column) | |
value = self.tree.set(item_id, column) | |
entry = bstt.Entry(self.tree) | |
entry.place(x=x, y=y + (height - entry.winfo_reqheight()) // 2, width=width) | |
entry.insert(0, value) | |
entry.select_range(0, tk.END) | |
entry.focus_set() | |
entry.bind("<Return>", lambda e: self._on_edit_commit(e.widget, item_id, column)) | |
entry.bind("<FocusOut>", lambda e: self._on_edit_commit(e.widget, item_id, column)) | |
def _on_edit_commit(self, entry, item_id, column): | |
self.tree.set(item_id, column, entry.get()) | |
entry.destroy() | |
def _process_files(self): | |
items = self.tree.get_children() | |
if not items: | |
Messagebox.show_warning("There are no files to process.", parent=self) | |
return | |
final_output_dir = self.output_dir.get() | |
if not self.save_in_place_var.get() and (not final_output_dir or not isdir(final_output_dir)): | |
Messagebox.show_warning("Please select a valid output directory.", parent=self) | |
return | |
processed_count, error_count = 0, 0 | |
for item_id in items: | |
try: | |
original_path, new_speaker = self.tree_data[item_id], self.tree.set(item_id, "Speaker") | |
with open(original_path, 'r', encoding='utf-8') as f_in: data = json.load(f_in) | |
for line in data.get("lines", []): line["speakerDesignation"] = new_speaker | |
new_filename = basename(original_path).replace('.dote', '.json') | |
output_path = join(dirname(original_path) if self.save_in_place_var.get() else final_output_dir, new_filename) | |
with open(output_path, 'w', encoding='utf-8') as f_out: json.dump(data, f_out, ensure_ascii=False, indent=2) | |
processed_count += 1 | |
except Exception as e: | |
Messagebox.show_error(f"Failed to process {basename(original_path)}:\n{e}", parent=self) | |
error_count += 1 | |
if error_count == 0: | |
Messagebox.ok(f"Successfully processed {processed_count} files.", parent=self) | |
self.destroy() | |
else: | |
Messagebox.show_warning(f"Completed with {error_count} errors.\n{processed_count} files were processed successfully.", parent=self) | |
### Main Application Class ### | |
class TranscriptCombinerApp: | |
def __init__(self, root): | |
self.root = root | |
self.root.title("Transcript Analyzer") | |
self.config_file = join(expanduser("~"), ".transcript_combiner_config.json") | |
self.input_dir = tk.StringVar() | |
self.last_dir_for_revert = expanduser("~") | |
self.in_drag_drop_mode = False | |
self.file_vars = {} | |
self.speaker_texts = None | |
self.output_dir = None | |
self.select_toggle_btn = None | |
self.main_frame = None | |
self.wc_speaker_vars = {} | |
self.dote_input_dir_var = tk.StringVar() | |
self.dote_output_dir_var = tk.StringVar() | |
self.dote_save_in_place_var = tk.BooleanVar(value=True) | |
self.main_output_dir_var = tk.StringVar() | |
self.main_save_in_place_var = tk.BooleanVar(value=True) | |
self.combine_segments_var, self.output_text_var, self.show_timestamps_var = tk.BooleanVar(value=False), tk.BooleanVar(value=True), tk.BooleanVar(value=True) | |
self.output_json_var, self.generate_graphs_var, self.save_graphs_var = tk.BooleanVar(value=False), tk.BooleanVar(value=True), tk.BooleanVar(value=True) | |
self.show_graphs_var = tk.BooleanVar(value=True) | |
self.bar_chart_var, self.timeline_var = tk.BooleanVar(value=True), tk.BooleanVar(value=True) | |
self.load_settings() | |
self.create_widgets() | |
self.populate_file_list() | |
self.root.drop_target_register(dnd.DND_FILES) | |
self.root.dnd_bind('<<Drop>>', self.on_drop) | |
self.root.protocol("WM_DELETE_WINDOW", self.on_closing) | |
self.center_window() | |
def on_drop(self, event): | |
try: | |
if not self.in_drag_drop_mode: | |
self._clear_file_list_widgets() | |
self.input_dir.set("--- Drag & Drop Mode (Browse to reset) ---") | |
self.in_drag_drop_mode = True | |
filepaths = self.root.tk.splitlist(event.data) | |
json_files_to_add = [f for f in filepaths if f.lower().endswith('.json') and f not in self.file_vars] | |
if not json_files_to_add: return | |
for f_path in sorted(json_files_to_add): | |
var = tk.BooleanVar(value=True) | |
self.file_vars[f_path] = var | |
cb = bstt.Checkbutton(self.scrolled_file_frame, text=basename(f_path), variable=var) | |
cb.pack(anchor=tk.W, padx=10) | |
if self.file_vars: | |
self.main_output_dir_var.set(dirname(list(self.file_vars.keys())[0])) | |
self.select_toggle_btn.config(text="Deselect All", state=tk.NORMAL) | |
except Exception as e: Messagebox.show_error(f"An error occurred while processing dropped files:\n{e}", "Drop Error") | |
def center_window(self): | |
self.root.update_idletasks() | |
w, h = self.root.winfo_width(), self.root.winfo_height() | |
sw, sh = self.root.winfo_screenwidth(), self.root.winfo_screenheight() | |
self.root.geometry(f'{w}x{h}+{sw//2 - w//2}+{sh//2 - h//2}') | |
def launch_dote_processor(self): | |
self._toggle_widget_state(self.main_frame, tk.DISABLED) | |
dote_window = DoteProcessorWindow(self.root, self) | |
self.root.wait_window(dote_window) | |
self._toggle_widget_state(self.main_frame, tk.NORMAL) | |
self.save_settings() | |
def create_widgets(self): | |
self.main_frame = bstt.Frame(self.root, padding="10") | |
self.main_frame.pack(fill=tk.BOTH, expand=True) | |
top_util_frame = bstt.Frame(self.main_frame) | |
top_util_frame.pack(fill=tk.X, expand=False, pady=(0, 5)) | |
bstt.Button(top_util_frame, text="Process DOTE Files...", command=self.launch_dote_processor, bootstyle="info-outline").pack(side=tk.LEFT) | |
dir_frame = bstt.LabelFrame(self.main_frame, text="Step 1: Select Input", padding=10) | |
dir_frame.pack(fill=tk.X, pady=5) | |
bstt.Entry(dir_frame, textvariable=self.input_dir, state="readonly").pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10)) | |
bstt.Button(dir_frame, text="Browse...", command=self.choose_directory, bootstyle="info").pack(side=tk.LEFT) | |
out_toggle_frame = bstt.Frame(self.main_frame, padding=(0, 5, 0, 0)) | |
out_toggle_frame.pack(fill=tk.X) | |
bstt.Checkbutton(out_toggle_frame, variable=self.main_save_in_place_var, text="Save analysis in same directory as input", | |
bootstyle="success-round-toggle", command=self._toggle_main_output_dir_state).pack(anchor=tk.W) | |
self.out_frame = bstt.LabelFrame(self.main_frame, text="Output Directory", padding=10) | |
self.out_frame.pack(fill=tk.X, pady=5) | |
self.out_entry = bstt.Entry(self.out_frame, textvariable=self.main_output_dir_var, state="readonly") | |
self.out_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10)) | |
self.out_browse_btn = bstt.Button(self.out_frame, text="Browse...", command=self._browse_main_output_dir) | |
self.out_browse_btn.pack(side=tk.LEFT) | |
self._toggle_main_output_dir_state() | |
file_frame = bstt.LabelFrame(self.main_frame, text="Step 2: Select Files to Combine", padding=10) | |
file_frame.pack(fill=tk.BOTH, expand=True, pady=5) | |
self.scrolled_file_frame = ScrolledFrame(file_frame, autohide=True) | |
self.scrolled_file_frame.pack(fill=tk.BOTH, expand=True, pady=5) | |
selection_btn_frame = bstt.Frame(file_frame) | |
selection_btn_frame.pack(fill=tk.X, pady=(5,0)) | |
self.select_toggle_btn = bstt.Button(selection_btn_frame, text="Select All", command=self.toggle_selection, bootstyle="secondary", state=tk.DISABLED) | |
self.select_toggle_btn.pack(side=tk.LEFT, padx=5) | |
bstt.Button(selection_btn_frame, text="Clear List", command=self.handle_clear_list_button, bootstyle="secondary").pack(side=tk.LEFT, padx=5) | |
options_frame = bstt.LabelFrame(self.main_frame, text="Step 3: Choose Output Options", padding=10) | |
options_frame.pack(fill=tk.X, expand=False, pady=5) | |
self._create_options_widgets(options_frame) | |
action_frame = bstt.Frame(self.main_frame) | |
action_frame.pack(fill=tk.X, pady=10) | |
bstt.Button(action_frame, text="Process & Analyze", command=self.process_files, bootstyle="primary-outline").pack(side=tk.LEFT, fill=tk.X, expand=True, ipady=10, padx=(0,5)) | |
self.word_cloud_button = bstt.Button(action_frame, text="Show Interactive Word Cloud", command=self.launch_word_cloud_window, state=tk.DISABLED) | |
self.word_cloud_button.pack(side=tk.LEFT, fill=tk.X, expand=True, ipady=10, padx=(5,0)) | |
def _create_options_widgets(self, parent): | |
bstt.Checkbutton(parent, text="Combine consecutive speaker segments", variable=self.combine_segments_var, bootstyle="success-round-toggle").pack(anchor=tk.W, pady=3) | |
text_output_toggle = bstt.Checkbutton(parent, text="Output as .txt file", variable=self.output_text_var, bootstyle="success-round-toggle", command=self.update_timestamp_toggle_state) | |
text_output_toggle.pack(anchor=tk.W, pady=3) | |
timestamp_frame = bstt.Frame(parent, padding=(20, 0, 0, 0)) | |
timestamp_frame.pack(fill=tk.X) | |
self.timestamp_toggle = bstt.Checkbutton(timestamp_frame, text="Show timestamps in .txt file", variable=self.show_timestamps_var, bootstyle="info-round-toggle") | |
self.timestamp_toggle.pack(anchor=tk.W, pady=2) | |
self.update_timestamp_toggle_state() | |
bstt.Checkbutton(parent, text="Output as .json file", variable=self.output_json_var, bootstyle="success-round-toggle").pack(anchor=tk.W, pady=3) | |
bstt.Checkbutton(parent, text="Generate Graphs", variable=self.generate_graphs_var, bootstyle="success-round-toggle", command=self.update_graph_options_state).pack(anchor=tk.W, pady=(10,3)) | |
self.graph_options_frame = bstt.Frame(parent, padding=(20, 0, 0, 0)) | |
self.graph_options_frame.pack(fill=tk.X, expand=True) | |
bstt.Checkbutton(self.graph_options_frame, text="Save graphs as .png", variable=self.save_graphs_var, bootstyle="info-round-toggle", command=self._check_sub_graph_options).pack(anchor=tk.W, pady=2) | |
bstt.Checkbutton(self.graph_options_frame, text="Show graphs in new windows", variable=self.show_graphs_var, bootstyle="info-round-toggle", command=self._check_sub_graph_options).pack(anchor=tk.W, pady=2) | |
graph_types_frame = bstt.LabelFrame(self.graph_options_frame, text="Visualizations", padding=10) | |
graph_types_frame.pack(fill=tk.X, expand=True, pady=5) | |
bstt.Checkbutton(graph_types_frame, text="Bar Chart (Contribution)", variable=self.bar_chart_var).pack(anchor=tk.W) | |
bstt.Checkbutton(graph_types_frame, text="Conversation Timeline", variable=self.timeline_var).pack(anchor=tk.W) | |
self.update_graph_options_state() | |
def _toggle_widget_state(self, parent, state): | |
for child in parent.winfo_children(): | |
try: | |
child.config(state=state) | |
except tk.TclError: | |
self._toggle_widget_state(child, state) | |
def _browse_main_output_dir(self): | |
dir_name = filedialog.askdirectory(initialdir=self.main_output_dir_var.get()) | |
if dir_name: self.main_output_dir_var.set(dir_name) | |
def _toggle_main_output_dir_state(self): | |
state = "disabled" if self.main_save_in_place_var.get() else "normal" | |
self.out_browse_btn.config(state=state) | |
self.out_entry.config(state="readonly" if state == "normal" else "disabled") | |
def toggle_selection(self): | |
if self.select_toggle_btn.cget('text') == "Deselect All": | |
self.deselect_all() | |
self.select_toggle_btn.config(text="Select All") | |
else: | |
self.select_all() | |
self.select_toggle_btn.config(text="Deselect All") | |
def update_graph_options_state(self): | |
if self.generate_graphs_var.get(): | |
if not self.save_graphs_var.get() and not self.show_graphs_var.get(): | |
self.save_graphs_var.set(True) | |
state = tk.NORMAL if self.generate_graphs_var.get() else tk.DISABLED | |
self._toggle_widget_state(self.graph_options_frame, state) | |
def _check_sub_graph_options(self): | |
if not self.save_graphs_var.get() and not self.show_graphs_var.get(): | |
self.generate_graphs_var.set(False) | |
self.update_graph_options_state() | |
def update_timestamp_toggle_state(self): | |
self.timestamp_toggle.config(state=tk.NORMAL if self.output_text_var.get() else tk.DISABLED) | |
def choose_directory(self): | |
dir_name = filedialog.askdirectory(initialdir=self.last_dir_for_revert, title='Please select a directory') | |
if dir_name: | |
self.in_drag_drop_mode = False | |
self.input_dir.set(dir_name) | |
self.last_dir_for_revert = dir_name | |
self.main_output_dir_var.set(dir_name) | |
self.save_settings() | |
self.populate_file_list() | |
def handle_clear_list_button(self): | |
self._clear_file_list_widgets() | |
bstt.Label(self.scrolled_file_frame, text="File list cleared.", bootstyle="secondary").pack(pady=10) | |
def _clear_file_list_widgets(self): | |
for widget in self.scrolled_file_frame.winfo_children(): | |
widget.destroy() | |
self.file_vars.clear() | |
if self.select_toggle_btn: | |
self.select_toggle_btn.config(text="Select All", state=tk.DISABLED) | |
def populate_file_list(self): | |
if self.in_drag_drop_mode: return | |
self._clear_file_list_widgets() | |
directory = self.input_dir.get() | |
if not isdir(directory): | |
bstt.Label(self.scrolled_file_frame, text="No directory selected.", bootstyle="secondary").pack(pady=10) | |
return | |
json_files = sorted(glob.glob(join(directory, "*.json"))) | |
if not json_files: | |
bstt.Label(self.scrolled_file_frame, text="No .json files found in this directory.", bootstyle="secondary").pack(pady=10) | |
return | |
for f_path in json_files: | |
var = tk.BooleanVar(value=True) | |
self.file_vars[f_path] = var | |
cb = bstt.Checkbutton(self.scrolled_file_frame, text=basename(f_path), variable=var) | |
cb.pack(anchor=tk.W, padx=10) | |
self.select_toggle_btn.config(text="Deselect All", state=tk.NORMAL) | |
def select_all(self): | |
for var in self.file_vars.values(): var.set(True) | |
def deselect_all(self): | |
for var in self.file_vars.values(): var.set(False) | |
def on_closing(self): | |
self.save_settings() | |
self.root.destroy() | |
def load_settings(self): | |
default_dir = expanduser("~") | |
try: | |
with open(self.config_file, 'r') as f: config = json.load(f) | |
except (IOError, json.JSONDecodeError): config = {} | |
last_dir = config.get("last_directory", default_dir) | |
self.input_dir.set(last_dir if isdir(last_dir) else default_dir) | |
self.last_dir_for_revert = self.input_dir.get() | |
self.main_output_dir_var.set(config.get("main_output_directory", default_dir)) | |
self.main_save_in_place_var.set(config.get("main_save_in_place", True)) | |
self.dote_input_dir_var.set(config.get("dote_input_directory", default_dir)) | |
self.dote_output_dir_var.set(config.get("dote_output_directory", default_dir)) | |
self.dote_save_in_place_var.set(config.get("dote_save_in_place", True)) | |
self.combine_segments_var.set(config.get("combine_segments", False)) | |
self.output_text_var.set(config.get("output_text", True)) | |
self.show_timestamps_var.set(config.get("show_timestamps", True)) | |
self.output_json_var.set(config.get("output_json", False)) | |
self.generate_graphs_var.set(config.get("generate_graphs", True)) | |
self.save_graphs_var.set(config.get("save_graphs", True)) | |
self.show_graphs_var.set(config.get("show_graphs", True)) | |
self.bar_chart_var.set(config.get("bar_chart", True)) | |
self.timeline_var.set(config.get("timeline", True)) | |
def save_settings(self): | |
settings = { | |
"last_directory": self.last_dir_for_revert, | |
"main_output_directory": self.main_output_dir_var.get(), | |
"main_save_in_place": self.main_save_in_place_var.get(), | |
"dote_input_directory": self.dote_input_dir_var.get(), | |
"dote_output_directory": self.dote_output_dir_var.get(), | |
"dote_save_in_place": self.dote_save_in_place_var.get(), | |
"combine_segments": self.combine_segments_var.get(), | |
"output_text": self.output_text_var.get(), | |
"show_timestamps": self.show_timestamps_var.get(), | |
"output_json": self.output_json_var.get(), | |
"generate_graphs": self.generate_graphs_var.get(), | |
"save_graphs": self.save_graphs_var.get(), | |
"show_graphs": self.show_graphs_var.get(), | |
"bar_chart": self.bar_chart_var.get(), | |
"timeline": self.timeline_var.get(), | |
} | |
try: | |
with open(self.config_file, 'w') as f: json.dump(settings, f, indent=2) | |
except IOError: pass | |
def process_files(self): | |
self.word_cloud_button.config(state=tk.DISABLED) | |
selected_files = [f for f, var in self.file_vars.items() if var.get()] | |
if not selected_files: | |
Messagebox.show_warning("Please select at least one JSON file to process.", "No Files Selected") | |
return | |
try: | |
if self.main_save_in_place_var.get(): | |
if self.in_drag_drop_mode: base_dir = dirname(selected_files[0]) | |
else: base_dir = self.input_dir.get() | |
else: base_dir = self.main_output_dir_var.get() | |
if not isdir(base_dir): raise OSError(f"The selected output directory does not exist:\n{base_dir}") | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
self.output_dir = join(base_dir, f"analysis_{timestamp}") | |
os.makedirs(self.output_dir, exist_ok=True) | |
except OSError as e: | |
Messagebox.show_error(f"Could not create output directory:\n{e}", "Directory Error") | |
return | |
merged_lines = [] | |
for filename in selected_files: | |
try: | |
with open(filename, 'r', encoding='utf-8') as f: | |
merged_lines.extend(json.load(f).get("lines", [])) | |
except (json.JSONDecodeError, IOError) as e: | |
Messagebox.show_error(f"Could not read/parse {basename(filename)}:\n{e}", "File Error") | |
return | |
if not merged_lines: | |
Messagebox.ok("The selected files contain no transcript lines to process.", "Empty Files") | |
return | |
self.analyze_and_generate_outputs(merged_lines, self.output_dir) | |
def analyze_and_generate_outputs(self, merged_lines, output_dir): | |
merged_lines.sort(key=lambda x: parse_time(x["startTime"])) | |
lines_to_process, output_filename_base = merged_lines, "merged_transcript" | |
if self.combine_segments_var.get(): | |
lines_to_process = self._combine_consecutive_segments(merged_lines) | |
output_filename_base = "merged_combined_transcript" | |
if self.output_text_var.get(): | |
filepath = join(output_dir, f"{output_filename_base}.txt") | |
with open(filepath, "w", encoding='utf-8') as f: | |
for line in lines_to_process: | |
f.write(f"[{line['startTime']}] {line['speakerDesignation']}: {line['text']}\n" if self.show_timestamps_var.get() else f"{line['speakerDesignation']}: {line['text']}\n") | |
if self.output_json_var.get(): | |
filepath = join(output_dir, f"{output_filename_base}.json") | |
with open(filepath, "w", encoding='utf-8') as f: | |
json.dump({"lines": lines_to_process}, f, ensure_ascii=False, indent=2) | |
speaker_word_counts = defaultdict(int) | |
for line in lines_to_process: | |
speaker_word_counts[line["speakerDesignation"]] += word_count(line["text"]) | |
total_words = sum(speaker_word_counts.values()) | |
with open(join(output_dir, "report.txt"), "w", encoding='utf-8') as f: | |
f.write("Word Count Summary:\n===================\n") | |
for speaker, count in sorted(speaker_word_counts.items()): | |
percent = (count / total_words) * 100 if total_words > 0 else 0 | |
f.write(f"\n{speaker}: {count} words ({percent:.2f}%)") | |
f.write(f"\n\nTotal Words: {total_words}") | |
Messagebox.ok(f"Processing complete!\nFiles saved in: {output_dir}", "Success", bootstyle='success') | |
if total_words > 0: | |
self.speaker_texts = defaultdict(list) | |
for line in merged_lines: self.speaker_texts[line['speakerDesignation']].append(line['text']) | |
self.speaker_texts = {s: " ".join(t) for s, t in self.speaker_texts.items()} | |
self.word_cloud_button.config(state=tk.NORMAL) | |
if self.generate_graphs_var.get(): | |
figures_to_show = [] | |
if self.bar_chart_var.get(): | |
fig = self.create_bar_graph_figure(speaker_word_counts, total_words) | |
self._save_graph(fig, "barchart_contribution") | |
figures_to_show.append((fig, "Speaker Contribution (Bar Chart)")) | |
if self.timeline_var.get(): | |
fig = self.create_timeline_figure(merged_lines) | |
self._save_graph(fig, "timeline_conversation_flow") | |
figures_to_show.append((fig, "Conversation Timeline")) | |
if self.show_graphs_var.get(): | |
for fig, title in figures_to_show: | |
self.show_report_graph(fig, title) | |
def _combine_consecutive_segments(self, lines): | |
if not lines: return [] | |
combined_lines = [] | |
prev = lines[0].copy() | |
for line in lines[1:]: | |
if line["speakerDesignation"] == prev["speakerDesignation"]: | |
prev["text"] += " " + line["text"] | |
prev["endTime"] = line["endTime"] | |
else: | |
combined_lines.append(prev) | |
prev = line.copy() | |
combined_lines.append(prev) | |
return combined_lines | |
def _save_graph(self, fig, name): | |
if self.save_graphs_var.get(): | |
try: | |
save_path = join(self.output_dir, f"{name}.png") | |
fig.savefig(save_path, dpi=300, bbox_inches='tight', facecolor=fig.get_facecolor()) | |
except Exception as e: Messagebox.show_error(f"Could not save {name}:\n{e}", "Graph Save Error") | |
def launch_word_cloud_window(self): | |
if not self.speaker_texts: | |
Messagebox.show_warning("Please process files first to generate data.", "No Data Available") | |
return | |
self._toggle_widget_state(self.main_frame, tk.DISABLED) | |
wc_window = tk.Toplevel(self.root) | |
wc_window.title("Interactive Word Cloud") | |
wc_window.geometry("1000x800") | |
wc_window.grab_set() | |
self.wc_speaker_vars.clear() | |
top_frame = bstt.Frame(wc_window, padding=10) | |
top_frame.pack(fill=tk.BOTH, expand=True) | |
left_frame = bstt.Frame(top_frame, padding=(0, 0, 10, 0)) | |
left_frame.pack(side=tk.LEFT, fill=tk.Y) | |
bstt.Label(left_frame, text="Select Speakers:").pack(anchor=tk.W, pady=(0, 5)) | |
speaker_frame = ScrolledFrame(left_frame, autohide=True) | |
speaker_frame.pack(fill=tk.BOTH, expand=True) | |
speaker_list = sorted(self.speaker_texts.keys()) | |
for speaker in speaker_list: | |
var = tk.BooleanVar(value=True) | |
self.wc_speaker_vars[speaker] = var | |
bstt.Checkbutton(speaker_frame, text=speaker, variable=var, | |
command=self.update_word_cloud).pack(anchor=tk.W, padx=10, pady=2) | |
btn_frame = bstt.Frame(left_frame) | |
btn_frame.pack(fill=tk.X, pady=5) | |
bstt.Button(btn_frame, text="All", command=self.wc_select_all, bootstyle="outline-secondary").pack(side=tk.LEFT, expand=True) | |
bstt.Button(btn_frame, text="None", command=self.wc_deselect_none, bootstyle="outline-secondary").pack(side=tk.LEFT, expand=True) | |
bstt.Button(left_frame, text="Export PNG", command=lambda: self.export_word_cloud(wc_window)).pack(fill=tk.X, pady=(10, 0)) | |
canvas_frame = bstt.Frame(top_frame) | |
canvas_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) | |
self.wc_fig, self.wc_ax = plt.subplots(figsize=(10, 8), constrained_layout=True) | |
self.wc_canvas = FigureCanvasTkAgg(self.wc_fig, master=canvas_frame) | |
self.wc_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) | |
self.update_word_cloud() | |
self.root.wait_window(wc_window) | |
self._toggle_widget_state(self.main_frame, tk.NORMAL) | |
def wc_select_all(self): | |
for var in self.wc_speaker_vars.values(): var.set(True) | |
self.update_word_cloud() | |
def wc_deselect_none(self): | |
for var in self.wc_speaker_vars.values(): var.set(False) | |
self.update_word_cloud() | |
def update_word_cloud(self, event=None): | |
selected_speakers = [s for s, v in self.wc_speaker_vars.items() if v.get()] | |
if not selected_speakers: text, title = "", "No Speakers Selected" | |
else: | |
text = " ".join([self.speaker_texts.get(sp, "") for sp in selected_speakers]) | |
if len(selected_speakers) == 1: title = selected_speakers[0] | |
elif len(selected_speakers) == len(self.wc_speaker_vars): title = "All Speakers" | |
else: title = "Custom Selection" | |
self.wc_ax.clear() | |
if not text.strip(): | |
self.wc_ax.text(0.5, 0.5, "Select speakers to generate word cloud.", ha='center', va='center') | |
else: | |
wordcloud = WordCloud(width=1200, height=800, background_color='white', colormap='viridis', max_words=150, random_state=42).generate(text) | |
self.wc_ax.imshow(wordcloud, interpolation='bilinear') | |
self.wc_ax.set_title(f'Most Common Words: {title}', fontsize=20, weight='bold', pad=20) | |
self.wc_ax.axis('off') | |
self.wc_canvas.draw() | |
def export_word_cloud(self, parent_window): | |
if not self.output_dir: | |
Messagebox.show_error("Could not find analysis directory.", "Directory Not Found", parent=parent_window) | |
return | |
selected_speakers = [s for s, v in self.wc_speaker_vars.items() if v.get()] | |
if not selected_speakers: | |
Messagebox.show_warning("Please select speakers to export.", "Nothing to Export", parent=parent_window) | |
return | |
if len(selected_speakers) == 1: name = selected_speakers[0] | |
elif len(selected_speakers) == len(self.wc_speaker_vars): name = "All_Speakers" | |
else: name = "Custom_Selection" | |
safe_name = re.sub(r'\W+', '', name) | |
filepath = join(self.output_dir, f"wordcloud_{safe_name}.png") | |
try: | |
self.wc_fig.savefig(filepath, dpi=300, facecolor='white') | |
Messagebox.ok(f"Word cloud saved to:\n\n{filepath}", "Export Successful", parent=parent_window) | |
except Exception as e: Messagebox.show_error(f"Failed to save image:\n{e}", "Export Error", parent=parent_window) | |
def create_bar_graph_figure(self, speaker_data, total_words): | |
plt.style.use('default'); plt.rcParams['font.family'] = 'sans-serif'; plt.rcParams['font.sans-serif'] = ['Helvetica Neue', 'Arial'] | |
bg, p_text, s_text = '#FFFFFF', '#1f1f1f', '#5f6368' | |
sorted_data = sorted(speaker_data.items(), key=lambda item: item[1]) | |
labels, sizes = [item[0] for item in sorted_data], [item[1] for item in sorted_data] | |
cmap = plt.get_cmap('GnBu', len(labels) + 4) | |
colors = cmap(np.linspace(0.35, 0.85, len(labels))) | |
fig, ax = plt.subplots(figsize=(12, max(6, len(labels) * 0.65)), constrained_layout=True) | |
fig.patch.set_facecolor(bg); ax.set_facecolor(bg) | |
fig.suptitle('Speaker Contribution', fontsize=20, weight='bold', color=p_text) | |
ax.set_title(f'Analysis of {total_words:,} words across {len(labels)} speakers', loc='left', fontsize=12, color=s_text) | |
bar_height = 0.6 | |
for i, (value, color) in enumerate(zip(sizes, colors)): | |
ax.add_patch(patches.FancyBboxPatch((0, i - bar_height / 2), value, bar_height, boxstyle=f"round,pad=0,rounding_size=0.1", fc=color, ec='none', zorder=3)) | |
for i, (value, label_text) in enumerate(zip(sizes, labels)): | |
if value == 0: continue | |
percent = (value / total_words) * 100 | |
text = f'{value:,} ({percent:.1f}%)' | |
is_dark_bg, label_color = value > (total_words * 0.25), 'white' if value > (total_words * 0.25) else p_text | |
ha, padding = 'right' if is_dark_bg else 'left', max(sizes) * 0.01 if sizes else 0 | |
ax.text(value - padding if is_dark_bg else value + padding, i, text, va='center', ha=ha, color=label_color, fontsize=10, weight='bold') | |
ax.grid(True, axis='x', color='#E8EAED', linestyle='-', zorder=0) | |
ax.spines[['top', 'right', 'left', 'bottom']].set_visible(False) | |
ax.set_yticks(range(len(labels))); ax.set_yticklabels(labels, fontsize=11, color=p_text) | |
ax.tick_params(axis='x', colors=s_text, length=0); ax.tick_params(axis='y', length=0, pad=10) | |
ax.set_ylim(-0.8, len(labels) - 0.2) | |
if sizes: ax.set_xlim(0, max(sizes) * 1.05) | |
return fig | |
def create_timeline_figure(self, lines): | |
plt.style.use('default'); plt.rcParams['font.family'] = 'sans-serif'; plt.rcParams['font.sans-serif'] = ['Helvetica Neue', 'Arial'] | |
bg, p_text, s_text, grid = '#FFFFFF', '#1f1f1f', '#5f6368', '#E8EAED' | |
speakers = sorted(list(set(line['speakerDesignation'] for line in lines)), reverse=True) | |
plot_data = defaultdict(list) | |
for line in lines: | |
start_sec, end_sec = time_str_to_seconds(line['startTime']), time_str_to_seconds(line['endTime']) | |
if (duration := end_sec - start_sec) < 0.1: continue | |
plot_data[line['speakerDesignation']].append((start_sec, duration)) | |
fig, ax = plt.subplots(figsize=(14, max(4, len(speakers) * 0.6)), constrained_layout=True) | |
fig.patch.set_facecolor(bg); ax.set_facecolor(bg) | |
fig.suptitle('Conversation Timeline', fontsize=20, weight='bold', color=p_text) | |
ax.set_title('Visualizing the flow of dialogue over time', loc='left', fontsize=12, color=s_text) | |
cmap = plt.get_cmap('tab20', len(speakers)) | |
colors = cmap(np.linspace(0, 1, len(speakers))) | |
for i, speaker in enumerate(speakers): | |
if speaker in plot_data: ax.broken_barh(plot_data[speaker], (i - 0.45, 0.9), facecolors=colors[i], zorder=3) | |
ax.xaxis.set_major_formatter(FuncFormatter(lambda x, pos: f'{int(x)//60}:{int(x)%60:02d}')) | |
ax.set_xlabel('Time (minutes:seconds)', color=s_text, labelpad=10) | |
ax.set_yticks(range(len(speakers))); ax.set_yticklabels(speakers, fontsize=11, color=p_text) | |
ax.grid(True, axis='x', color=grid, linestyle='-', zorder=0) | |
ax.spines[['top', 'right', 'left', 'bottom']].set_visible(False) | |
ax.tick_params(axis='x', colors=s_text, length=0); ax.tick_params(axis='y', length=0) | |
return fig | |
def show_report_graph(self, fig, title="Report Graph"): | |
graph_window = tk.Toplevel(self.root) | |
graph_window.title(title) | |
graph_window.geometry("1200x800") | |
graph_window.configure(bg='#FFFFFF') | |
canvas = FigureCanvasTkAgg(fig, master=graph_window) | |
canvas.draw() | |
canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=1, padx=10, pady=10) | |
if __name__ == "__main__": | |
try: | |
root = dnd.TkinterDnD.Tk() | |
style = bstt.Style(theme="litera") | |
except tk.TclError: | |
Messagebox.show_warning("Drag and drop could not be initialized. The app will run without it.", "DND Init Failed") | |
root = bstt.Window(themename="litera") | |
app = TranscriptCombinerApp(root) | |
root.lift() | |
root.attributes('-topmost', True) | |
root.after_idle(root.attributes, '-topmost', False) | |
root.focus_force() | |
root.mainloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
reads .dote json files in a directory and combines them in chronological order.
.dote json files need to be formatted as below
{ "lines" : [ { "endTime" : "00:00:11,680", "speakerDesignation" : "DM", "startTime" : "00:00:10,160", "text" : "All right, let's see how this goes." }....
.
outputs a json and plain text file in the directory