Skip to content

Instantly share code, notes, and snippets.

@xthesaintx
Last active July 2, 2025 23:56
Show Gist options
  • Save xthesaintx/f4dd81c96d18bcf52c44b36b086b3c9d to your computer and use it in GitHub Desktop.
Save xthesaintx/f4dd81c96d18bcf52c44b36b086b3c9d to your computer and use it in GitHub Desktop.
Merge timestamped .dote json files in chronological order according to "startTime" "endTime" "speakerDesignation" "text" with gui
#!/usr/bin/env python3
import sys
import json
import glob
import re
import os
from datetime import datetime
from collections import defaultdict
from os.path import expanduser, join, basename, isdir, dirname
import tkinter as tk
from tkinter import filedialog, ttk
from ttkbootstrap.dialogs import Messagebox
import ttkbootstrap as bstt
from ttkbootstrap.scrolled import ScrolledFrame
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.font_manager
import numpy as np
import matplotlib.patches as patches
from matplotlib.ticker import FuncFormatter
try:
import pandas as pd
from wordcloud import WordCloud
import tkinterdnd2 as dnd # For drag-and-drop
except ImportError:
Messagebox.show_error(
"Missing required libraries. Please install them by running:\n\n"
"pip install pandas wordcloud tkinterdnd2",
"Dependency Error"
)
sys.exit(1)
### Core Processing Functions ###
def parse_time(time_str):
try:
return datetime.strptime(time_str, "%H:%M:%S,%f")
except ValueError:
return datetime.strptime(time_str, "%H:%M:%S")
def time_str_to_seconds(time_str):
try:
dt = datetime.strptime(time_str, "%H:%M:%S,%f")
except ValueError:
dt = datetime.strptime(time_str, "%H:%M:%S")
return dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond / 1_000_000
def word_count(text):
return len(re.findall(r'\w+', text))
### DOTE Processor Window ###
class DoteProcessorWindow(tk.Toplevel):
def __init__(self, master, main_app):
super().__init__(master)
self.main_app = main_app
self.title("Process DOTE Files")
self.geometry("800x600")
self.input_dir = tk.StringVar(value=self.main_app.dote_input_dir_var.get())
self.output_dir = tk.StringVar(value=self.main_app.dote_output_dir_var.get())
self.save_in_place_var = tk.BooleanVar(value=self.main_app.dote_save_in_place_var.get())
self.tree_data = {}
self._setup_widgets()
self._populate_from_directory(self.input_dir.get())
self.drop_target_register(dnd.DND_FILES)
self.dnd_bind('<<Drop>>', self._on_drop)
self._center_on_parent()
self.grab_set()
def _center_on_parent(self):
self.update_idletasks()
parent = self.master
parent_x, parent_y = parent.winfo_x(), parent.winfo_y()
parent_width, parent_height = parent.winfo_width(), parent.winfo_height()
self_width, self_height = self.winfo_width(), self.winfo_height()
x = parent_x + (parent_width // 2) - (self_width // 2)
y = parent_y + (parent_height // 2) - (self_height // 2)
self.geometry(f"+{x}+{y}")
def _setup_widgets(self):
main_frame = bstt.Frame(self, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
in_frame = bstt.LabelFrame(main_frame, text="Input Directory (.dote files)", padding=10)
in_frame.pack(fill=tk.X, pady=(0, 10))
bstt.Entry(in_frame, textvariable=self.input_dir, state="readonly").pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5))
bstt.Button(in_frame, text="Browse...", command=self._browse_input_dir).pack(side=tk.LEFT)
tree_frame = bstt.LabelFrame(main_frame, text="Files to Process (Double-click speaker to edit)", padding=10)
tree_frame.pack(fill=tk.BOTH, expand=True, pady=5)
self.tree = bstt.Treeview(tree_frame, columns=("Speaker",), style='primary.Treeview')
self.tree.heading("#0", text="DOTE File")
self.tree.heading("Speaker", text="Speaker")
self.tree.column("#0", width=400, stretch=True)
self.tree.column("Speaker", width=200, stretch=True)
self.tree.pack(fill=tk.BOTH, expand=True)
self.tree.bind("<Double-1>", self._edit_cell)
bstt.Button(tree_frame, text="Clear List", command=self._clear_list, bootstyle="warning-outline").pack(side=tk.BOTTOM, anchor=tk.E, pady=(5, 0))
out_toggle_frame = bstt.Frame(main_frame, padding=(0, 5, 0, 0))
out_toggle_frame.pack(fill=tk.X)
bstt.Checkbutton(out_toggle_frame, variable=self.save_in_place_var, text="Output in same directory as DOTE file",
bootstyle="success-round-toggle", command=self._toggle_output_dir_state).pack(anchor=tk.W)
self.out_frame = bstt.LabelFrame(main_frame, text="Output Directory (for new .json files)", padding=10)
self.out_frame.pack(fill=tk.X, pady=(5, 0))
self.out_dir_entry = bstt.Entry(self.out_frame, textvariable=self.output_dir, state="readonly")
self.out_dir_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5))
self.out_dir_browse_btn = bstt.Button(self.out_frame, text="Browse...", command=self._browse_output_dir)
self.out_dir_browse_btn.pack(side=tk.LEFT)
action_frame = bstt.Frame(main_frame)
action_frame.pack(fill=tk.X, pady=(15, 0), anchor=tk.S)
bstt.Button(action_frame, text="Process", command=self._process_files, bootstyle="success").pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,5))
bstt.Button(action_frame, text="Cancel", command=self.destroy, bootstyle="danger-outline").pack(side=tk.RIGHT, fill=tk.X, expand=True, padx=(5,0))
self._toggle_output_dir_state()
def destroy(self):
self.main_app.dote_input_dir_var.set(self.input_dir.get())
self.main_app.dote_output_dir_var.set(self.output_dir.get())
self.main_app.dote_save_in_place_var.set(self.save_in_place_var.get())
super().destroy()
def _clear_list(self):
for item in self.tree.get_children():
self.tree.delete(item)
self.tree_data.clear()
def _toggle_output_dir_state(self):
state = "disabled" if self.save_in_place_var.get() else "normal"
for child in self.out_frame.winfo_children(): child.config(state=state)
if state == 'normal': self.out_dir_entry.config(state="readonly")
def _browse_input_dir(self):
dir_name = filedialog.askdirectory(initialdir=self.input_dir.get(), title='Select Directory with DOTE files')
if dir_name:
self.input_dir.set(dir_name)
self._populate_from_directory(dir_name)
def _browse_output_dir(self):
dir_name = filedialog.askdirectory(initialdir=self.output_dir.get(), title='Select Output Directory')
if dir_name: self.output_dir.set(dir_name)
def _populate_from_directory(self, directory):
self._clear_list()
if not isdir(directory): return
try:
dote_files = sorted(glob.glob(join(directory, "*.dote")))
for file_path in dote_files: self._add_dote_file_to_tree(file_path)
except Exception as e: Messagebox.show_error(f"Could not read directory:\n{e}", parent=self)
def _add_dote_file_to_tree(self, file_path):
if file_path in self.tree_data.values(): return
filename = basename(file_path)
speaker = self._get_first_speaker(file_path)
item_id = self.tree.insert("", tk.END, text=filename, values=(speaker,))
self.tree_data[item_id] = file_path
def _get_first_speaker(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f)
return data.get("lines", [])[0].get("speakerDesignation", "Unknown")
except (IOError, json.JSONDecodeError, IndexError): return "Error/Empty"
def _on_drop(self, event):
filepaths = self.tk.splitlist(event.data)
for file_path in sorted([f for f in filepaths if f.lower().endswith('.dote')]):
self._add_dote_file_to_tree(file_path)
def _edit_cell(self, event):
if self.tree.identify_region(event.x, event.y) != "cell": return
column, item_id = self.tree.identify_column(event.x), self.tree.identify_row(event.y)
if column != "#1" or not item_id: return
x, y, width, height = self.tree.bbox(item_id, column)
value = self.tree.set(item_id, column)
entry = bstt.Entry(self.tree)
entry.place(x=x, y=y + (height - entry.winfo_reqheight()) // 2, width=width)
entry.insert(0, value)
entry.select_range(0, tk.END)
entry.focus_set()
entry.bind("<Return>", lambda e: self._on_edit_commit(e.widget, item_id, column))
entry.bind("<FocusOut>", lambda e: self._on_edit_commit(e.widget, item_id, column))
def _on_edit_commit(self, entry, item_id, column):
self.tree.set(item_id, column, entry.get())
entry.destroy()
def _process_files(self):
items = self.tree.get_children()
if not items:
Messagebox.show_warning("There are no files to process.", parent=self)
return
final_output_dir = self.output_dir.get()
if not self.save_in_place_var.get() and (not final_output_dir or not isdir(final_output_dir)):
Messagebox.show_warning("Please select a valid output directory.", parent=self)
return
processed_count, error_count = 0, 0
for item_id in items:
try:
original_path, new_speaker = self.tree_data[item_id], self.tree.set(item_id, "Speaker")
with open(original_path, 'r', encoding='utf-8') as f_in: data = json.load(f_in)
for line in data.get("lines", []): line["speakerDesignation"] = new_speaker
new_filename = basename(original_path).replace('.dote', '.json')
output_path = join(dirname(original_path) if self.save_in_place_var.get() else final_output_dir, new_filename)
with open(output_path, 'w', encoding='utf-8') as f_out: json.dump(data, f_out, ensure_ascii=False, indent=2)
processed_count += 1
except Exception as e:
Messagebox.show_error(f"Failed to process {basename(original_path)}:\n{e}", parent=self)
error_count += 1
if error_count == 0:
Messagebox.ok(f"Successfully processed {processed_count} files.", parent=self)
self.destroy()
else:
Messagebox.show_warning(f"Completed with {error_count} errors.\n{processed_count} files were processed successfully.", parent=self)
### Main Application Class ###
class TranscriptCombinerApp:
def __init__(self, root):
self.root = root
self.root.title("Transcript Analyzer")
self.config_file = join(expanduser("~"), ".transcript_combiner_config.json")
self.input_dir = tk.StringVar()
self.last_dir_for_revert = expanduser("~")
self.in_drag_drop_mode = False
self.file_vars = {}
self.speaker_texts = None
self.output_dir = None
self.select_toggle_btn = None
self.main_frame = None
self.wc_speaker_vars = {}
self.dote_input_dir_var = tk.StringVar()
self.dote_output_dir_var = tk.StringVar()
self.dote_save_in_place_var = tk.BooleanVar(value=True)
self.main_output_dir_var = tk.StringVar()
self.main_save_in_place_var = tk.BooleanVar(value=True)
self.combine_segments_var, self.output_text_var, self.show_timestamps_var = tk.BooleanVar(value=False), tk.BooleanVar(value=True), tk.BooleanVar(value=True)
self.output_json_var, self.generate_graphs_var, self.save_graphs_var = tk.BooleanVar(value=False), tk.BooleanVar(value=True), tk.BooleanVar(value=True)
self.show_graphs_var = tk.BooleanVar(value=True)
self.bar_chart_var, self.timeline_var = tk.BooleanVar(value=True), tk.BooleanVar(value=True)
self.load_settings()
self.create_widgets()
self.populate_file_list()
self.root.drop_target_register(dnd.DND_FILES)
self.root.dnd_bind('<<Drop>>', self.on_drop)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.center_window()
def on_drop(self, event):
try:
if not self.in_drag_drop_mode:
self._clear_file_list_widgets()
self.input_dir.set("--- Drag & Drop Mode (Browse to reset) ---")
self.in_drag_drop_mode = True
filepaths = self.root.tk.splitlist(event.data)
json_files_to_add = [f for f in filepaths if f.lower().endswith('.json') and f not in self.file_vars]
if not json_files_to_add: return
for f_path in sorted(json_files_to_add):
var = tk.BooleanVar(value=True)
self.file_vars[f_path] = var
cb = bstt.Checkbutton(self.scrolled_file_frame, text=basename(f_path), variable=var)
cb.pack(anchor=tk.W, padx=10)
if self.file_vars:
self.main_output_dir_var.set(dirname(list(self.file_vars.keys())[0]))
self.select_toggle_btn.config(text="Deselect All", state=tk.NORMAL)
except Exception as e: Messagebox.show_error(f"An error occurred while processing dropped files:\n{e}", "Drop Error")
def center_window(self):
self.root.update_idletasks()
w, h = self.root.winfo_width(), self.root.winfo_height()
sw, sh = self.root.winfo_screenwidth(), self.root.winfo_screenheight()
self.root.geometry(f'{w}x{h}+{sw//2 - w//2}+{sh//2 - h//2}')
def launch_dote_processor(self):
self._toggle_widget_state(self.main_frame, tk.DISABLED)
dote_window = DoteProcessorWindow(self.root, self)
self.root.wait_window(dote_window)
self._toggle_widget_state(self.main_frame, tk.NORMAL)
self.save_settings()
def create_widgets(self):
self.main_frame = bstt.Frame(self.root, padding="10")
self.main_frame.pack(fill=tk.BOTH, expand=True)
top_util_frame = bstt.Frame(self.main_frame)
top_util_frame.pack(fill=tk.X, expand=False, pady=(0, 5))
bstt.Button(top_util_frame, text="Process DOTE Files...", command=self.launch_dote_processor, bootstyle="info-outline").pack(side=tk.LEFT)
dir_frame = bstt.LabelFrame(self.main_frame, text="Step 1: Select Input", padding=10)
dir_frame.pack(fill=tk.X, pady=5)
bstt.Entry(dir_frame, textvariable=self.input_dir, state="readonly").pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10))
bstt.Button(dir_frame, text="Browse...", command=self.choose_directory, bootstyle="info").pack(side=tk.LEFT)
out_toggle_frame = bstt.Frame(self.main_frame, padding=(0, 5, 0, 0))
out_toggle_frame.pack(fill=tk.X)
bstt.Checkbutton(out_toggle_frame, variable=self.main_save_in_place_var, text="Save analysis in same directory as input",
bootstyle="success-round-toggle", command=self._toggle_main_output_dir_state).pack(anchor=tk.W)
self.out_frame = bstt.LabelFrame(self.main_frame, text="Output Directory", padding=10)
self.out_frame.pack(fill=tk.X, pady=5)
self.out_entry = bstt.Entry(self.out_frame, textvariable=self.main_output_dir_var, state="readonly")
self.out_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10))
self.out_browse_btn = bstt.Button(self.out_frame, text="Browse...", command=self._browse_main_output_dir)
self.out_browse_btn.pack(side=tk.LEFT)
self._toggle_main_output_dir_state()
file_frame = bstt.LabelFrame(self.main_frame, text="Step 2: Select Files to Combine", padding=10)
file_frame.pack(fill=tk.BOTH, expand=True, pady=5)
self.scrolled_file_frame = ScrolledFrame(file_frame, autohide=True)
self.scrolled_file_frame.pack(fill=tk.BOTH, expand=True, pady=5)
selection_btn_frame = bstt.Frame(file_frame)
selection_btn_frame.pack(fill=tk.X, pady=(5,0))
self.select_toggle_btn = bstt.Button(selection_btn_frame, text="Select All", command=self.toggle_selection, bootstyle="secondary", state=tk.DISABLED)
self.select_toggle_btn.pack(side=tk.LEFT, padx=5)
bstt.Button(selection_btn_frame, text="Clear List", command=self.handle_clear_list_button, bootstyle="secondary").pack(side=tk.LEFT, padx=5)
options_frame = bstt.LabelFrame(self.main_frame, text="Step 3: Choose Output Options", padding=10)
options_frame.pack(fill=tk.X, expand=False, pady=5)
self._create_options_widgets(options_frame)
action_frame = bstt.Frame(self.main_frame)
action_frame.pack(fill=tk.X, pady=10)
bstt.Button(action_frame, text="Process & Analyze", command=self.process_files, bootstyle="primary-outline").pack(side=tk.LEFT, fill=tk.X, expand=True, ipady=10, padx=(0,5))
self.word_cloud_button = bstt.Button(action_frame, text="Show Interactive Word Cloud", command=self.launch_word_cloud_window, state=tk.DISABLED)
self.word_cloud_button.pack(side=tk.LEFT, fill=tk.X, expand=True, ipady=10, padx=(5,0))
def _create_options_widgets(self, parent):
bstt.Checkbutton(parent, text="Combine consecutive speaker segments", variable=self.combine_segments_var, bootstyle="success-round-toggle").pack(anchor=tk.W, pady=3)
text_output_toggle = bstt.Checkbutton(parent, text="Output as .txt file", variable=self.output_text_var, bootstyle="success-round-toggle", command=self.update_timestamp_toggle_state)
text_output_toggle.pack(anchor=tk.W, pady=3)
timestamp_frame = bstt.Frame(parent, padding=(20, 0, 0, 0))
timestamp_frame.pack(fill=tk.X)
self.timestamp_toggle = bstt.Checkbutton(timestamp_frame, text="Show timestamps in .txt file", variable=self.show_timestamps_var, bootstyle="info-round-toggle")
self.timestamp_toggle.pack(anchor=tk.W, pady=2)
self.update_timestamp_toggle_state()
bstt.Checkbutton(parent, text="Output as .json file", variable=self.output_json_var, bootstyle="success-round-toggle").pack(anchor=tk.W, pady=3)
bstt.Checkbutton(parent, text="Generate Graphs", variable=self.generate_graphs_var, bootstyle="success-round-toggle", command=self.update_graph_options_state).pack(anchor=tk.W, pady=(10,3))
self.graph_options_frame = bstt.Frame(parent, padding=(20, 0, 0, 0))
self.graph_options_frame.pack(fill=tk.X, expand=True)
bstt.Checkbutton(self.graph_options_frame, text="Save graphs as .png", variable=self.save_graphs_var, bootstyle="info-round-toggle", command=self._check_sub_graph_options).pack(anchor=tk.W, pady=2)
bstt.Checkbutton(self.graph_options_frame, text="Show graphs in new windows", variable=self.show_graphs_var, bootstyle="info-round-toggle", command=self._check_sub_graph_options).pack(anchor=tk.W, pady=2)
graph_types_frame = bstt.LabelFrame(self.graph_options_frame, text="Visualizations", padding=10)
graph_types_frame.pack(fill=tk.X, expand=True, pady=5)
bstt.Checkbutton(graph_types_frame, text="Bar Chart (Contribution)", variable=self.bar_chart_var).pack(anchor=tk.W)
bstt.Checkbutton(graph_types_frame, text="Conversation Timeline", variable=self.timeline_var).pack(anchor=tk.W)
self.update_graph_options_state()
def _toggle_widget_state(self, parent, state):
for child in parent.winfo_children():
try:
child.config(state=state)
except tk.TclError:
self._toggle_widget_state(child, state)
def _browse_main_output_dir(self):
dir_name = filedialog.askdirectory(initialdir=self.main_output_dir_var.get())
if dir_name: self.main_output_dir_var.set(dir_name)
def _toggle_main_output_dir_state(self):
state = "disabled" if self.main_save_in_place_var.get() else "normal"
self.out_browse_btn.config(state=state)
self.out_entry.config(state="readonly" if state == "normal" else "disabled")
def toggle_selection(self):
if self.select_toggle_btn.cget('text') == "Deselect All":
self.deselect_all()
self.select_toggle_btn.config(text="Select All")
else:
self.select_all()
self.select_toggle_btn.config(text="Deselect All")
def update_graph_options_state(self):
if self.generate_graphs_var.get():
if not self.save_graphs_var.get() and not self.show_graphs_var.get():
self.save_graphs_var.set(True)
state = tk.NORMAL if self.generate_graphs_var.get() else tk.DISABLED
self._toggle_widget_state(self.graph_options_frame, state)
def _check_sub_graph_options(self):
if not self.save_graphs_var.get() and not self.show_graphs_var.get():
self.generate_graphs_var.set(False)
self.update_graph_options_state()
def update_timestamp_toggle_state(self):
self.timestamp_toggle.config(state=tk.NORMAL if self.output_text_var.get() else tk.DISABLED)
def choose_directory(self):
dir_name = filedialog.askdirectory(initialdir=self.last_dir_for_revert, title='Please select a directory')
if dir_name:
self.in_drag_drop_mode = False
self.input_dir.set(dir_name)
self.last_dir_for_revert = dir_name
self.main_output_dir_var.set(dir_name)
self.save_settings()
self.populate_file_list()
def handle_clear_list_button(self):
self._clear_file_list_widgets()
bstt.Label(self.scrolled_file_frame, text="File list cleared.", bootstyle="secondary").pack(pady=10)
def _clear_file_list_widgets(self):
for widget in self.scrolled_file_frame.winfo_children():
widget.destroy()
self.file_vars.clear()
if self.select_toggle_btn:
self.select_toggle_btn.config(text="Select All", state=tk.DISABLED)
def populate_file_list(self):
if self.in_drag_drop_mode: return
self._clear_file_list_widgets()
directory = self.input_dir.get()
if not isdir(directory):
bstt.Label(self.scrolled_file_frame, text="No directory selected.", bootstyle="secondary").pack(pady=10)
return
json_files = sorted(glob.glob(join(directory, "*.json")))
if not json_files:
bstt.Label(self.scrolled_file_frame, text="No .json files found in this directory.", bootstyle="secondary").pack(pady=10)
return
for f_path in json_files:
var = tk.BooleanVar(value=True)
self.file_vars[f_path] = var
cb = bstt.Checkbutton(self.scrolled_file_frame, text=basename(f_path), variable=var)
cb.pack(anchor=tk.W, padx=10)
self.select_toggle_btn.config(text="Deselect All", state=tk.NORMAL)
def select_all(self):
for var in self.file_vars.values(): var.set(True)
def deselect_all(self):
for var in self.file_vars.values(): var.set(False)
def on_closing(self):
self.save_settings()
self.root.destroy()
def load_settings(self):
default_dir = expanduser("~")
try:
with open(self.config_file, 'r') as f: config = json.load(f)
except (IOError, json.JSONDecodeError): config = {}
last_dir = config.get("last_directory", default_dir)
self.input_dir.set(last_dir if isdir(last_dir) else default_dir)
self.last_dir_for_revert = self.input_dir.get()
self.main_output_dir_var.set(config.get("main_output_directory", default_dir))
self.main_save_in_place_var.set(config.get("main_save_in_place", True))
self.dote_input_dir_var.set(config.get("dote_input_directory", default_dir))
self.dote_output_dir_var.set(config.get("dote_output_directory", default_dir))
self.dote_save_in_place_var.set(config.get("dote_save_in_place", True))
self.combine_segments_var.set(config.get("combine_segments", False))
self.output_text_var.set(config.get("output_text", True))
self.show_timestamps_var.set(config.get("show_timestamps", True))
self.output_json_var.set(config.get("output_json", False))
self.generate_graphs_var.set(config.get("generate_graphs", True))
self.save_graphs_var.set(config.get("save_graphs", True))
self.show_graphs_var.set(config.get("show_graphs", True))
self.bar_chart_var.set(config.get("bar_chart", True))
self.timeline_var.set(config.get("timeline", True))
def save_settings(self):
settings = {
"last_directory": self.last_dir_for_revert,
"main_output_directory": self.main_output_dir_var.get(),
"main_save_in_place": self.main_save_in_place_var.get(),
"dote_input_directory": self.dote_input_dir_var.get(),
"dote_output_directory": self.dote_output_dir_var.get(),
"dote_save_in_place": self.dote_save_in_place_var.get(),
"combine_segments": self.combine_segments_var.get(),
"output_text": self.output_text_var.get(),
"show_timestamps": self.show_timestamps_var.get(),
"output_json": self.output_json_var.get(),
"generate_graphs": self.generate_graphs_var.get(),
"save_graphs": self.save_graphs_var.get(),
"show_graphs": self.show_graphs_var.get(),
"bar_chart": self.bar_chart_var.get(),
"timeline": self.timeline_var.get(),
}
try:
with open(self.config_file, 'w') as f: json.dump(settings, f, indent=2)
except IOError: pass
def process_files(self):
self.word_cloud_button.config(state=tk.DISABLED)
selected_files = [f for f, var in self.file_vars.items() if var.get()]
if not selected_files:
Messagebox.show_warning("Please select at least one JSON file to process.", "No Files Selected")
return
try:
if self.main_save_in_place_var.get():
if self.in_drag_drop_mode: base_dir = dirname(selected_files[0])
else: base_dir = self.input_dir.get()
else: base_dir = self.main_output_dir_var.get()
if not isdir(base_dir): raise OSError(f"The selected output directory does not exist:\n{base_dir}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.output_dir = join(base_dir, f"analysis_{timestamp}")
os.makedirs(self.output_dir, exist_ok=True)
except OSError as e:
Messagebox.show_error(f"Could not create output directory:\n{e}", "Directory Error")
return
merged_lines = []
for filename in selected_files:
try:
with open(filename, 'r', encoding='utf-8') as f:
merged_lines.extend(json.load(f).get("lines", []))
except (json.JSONDecodeError, IOError) as e:
Messagebox.show_error(f"Could not read/parse {basename(filename)}:\n{e}", "File Error")
return
if not merged_lines:
Messagebox.ok("The selected files contain no transcript lines to process.", "Empty Files")
return
self.analyze_and_generate_outputs(merged_lines, self.output_dir)
def analyze_and_generate_outputs(self, merged_lines, output_dir):
merged_lines.sort(key=lambda x: parse_time(x["startTime"]))
lines_to_process, output_filename_base = merged_lines, "merged_transcript"
if self.combine_segments_var.get():
lines_to_process = self._combine_consecutive_segments(merged_lines)
output_filename_base = "merged_combined_transcript"
if self.output_text_var.get():
filepath = join(output_dir, f"{output_filename_base}.txt")
with open(filepath, "w", encoding='utf-8') as f:
for line in lines_to_process:
f.write(f"[{line['startTime']}] {line['speakerDesignation']}: {line['text']}\n" if self.show_timestamps_var.get() else f"{line['speakerDesignation']}: {line['text']}\n")
if self.output_json_var.get():
filepath = join(output_dir, f"{output_filename_base}.json")
with open(filepath, "w", encoding='utf-8') as f:
json.dump({"lines": lines_to_process}, f, ensure_ascii=False, indent=2)
speaker_word_counts = defaultdict(int)
for line in lines_to_process:
speaker_word_counts[line["speakerDesignation"]] += word_count(line["text"])
total_words = sum(speaker_word_counts.values())
with open(join(output_dir, "report.txt"), "w", encoding='utf-8') as f:
f.write("Word Count Summary:\n===================\n")
for speaker, count in sorted(speaker_word_counts.items()):
percent = (count / total_words) * 100 if total_words > 0 else 0
f.write(f"\n{speaker}: {count} words ({percent:.2f}%)")
f.write(f"\n\nTotal Words: {total_words}")
Messagebox.ok(f"Processing complete!\nFiles saved in: {output_dir}", "Success", bootstyle='success')
if total_words > 0:
self.speaker_texts = defaultdict(list)
for line in merged_lines: self.speaker_texts[line['speakerDesignation']].append(line['text'])
self.speaker_texts = {s: " ".join(t) for s, t in self.speaker_texts.items()}
self.word_cloud_button.config(state=tk.NORMAL)
if self.generate_graphs_var.get():
figures_to_show = []
if self.bar_chart_var.get():
fig = self.create_bar_graph_figure(speaker_word_counts, total_words)
self._save_graph(fig, "barchart_contribution")
figures_to_show.append((fig, "Speaker Contribution (Bar Chart)"))
if self.timeline_var.get():
fig = self.create_timeline_figure(merged_lines)
self._save_graph(fig, "timeline_conversation_flow")
figures_to_show.append((fig, "Conversation Timeline"))
if self.show_graphs_var.get():
for fig, title in figures_to_show:
self.show_report_graph(fig, title)
def _combine_consecutive_segments(self, lines):
if not lines: return []
combined_lines = []
prev = lines[0].copy()
for line in lines[1:]:
if line["speakerDesignation"] == prev["speakerDesignation"]:
prev["text"] += " " + line["text"]
prev["endTime"] = line["endTime"]
else:
combined_lines.append(prev)
prev = line.copy()
combined_lines.append(prev)
return combined_lines
def _save_graph(self, fig, name):
if self.save_graphs_var.get():
try:
save_path = join(self.output_dir, f"{name}.png")
fig.savefig(save_path, dpi=300, bbox_inches='tight', facecolor=fig.get_facecolor())
except Exception as e: Messagebox.show_error(f"Could not save {name}:\n{e}", "Graph Save Error")
def launch_word_cloud_window(self):
if not self.speaker_texts:
Messagebox.show_warning("Please process files first to generate data.", "No Data Available")
return
self._toggle_widget_state(self.main_frame, tk.DISABLED)
wc_window = tk.Toplevel(self.root)
wc_window.title("Interactive Word Cloud")
wc_window.geometry("1000x800")
wc_window.grab_set()
self.wc_speaker_vars.clear()
top_frame = bstt.Frame(wc_window, padding=10)
top_frame.pack(fill=tk.BOTH, expand=True)
left_frame = bstt.Frame(top_frame, padding=(0, 0, 10, 0))
left_frame.pack(side=tk.LEFT, fill=tk.Y)
bstt.Label(left_frame, text="Select Speakers:").pack(anchor=tk.W, pady=(0, 5))
speaker_frame = ScrolledFrame(left_frame, autohide=True)
speaker_frame.pack(fill=tk.BOTH, expand=True)
speaker_list = sorted(self.speaker_texts.keys())
for speaker in speaker_list:
var = tk.BooleanVar(value=True)
self.wc_speaker_vars[speaker] = var
bstt.Checkbutton(speaker_frame, text=speaker, variable=var,
command=self.update_word_cloud).pack(anchor=tk.W, padx=10, pady=2)
btn_frame = bstt.Frame(left_frame)
btn_frame.pack(fill=tk.X, pady=5)
bstt.Button(btn_frame, text="All", command=self.wc_select_all, bootstyle="outline-secondary").pack(side=tk.LEFT, expand=True)
bstt.Button(btn_frame, text="None", command=self.wc_deselect_none, bootstyle="outline-secondary").pack(side=tk.LEFT, expand=True)
bstt.Button(left_frame, text="Export PNG", command=lambda: self.export_word_cloud(wc_window)).pack(fill=tk.X, pady=(10, 0))
canvas_frame = bstt.Frame(top_frame)
canvas_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.wc_fig, self.wc_ax = plt.subplots(figsize=(10, 8), constrained_layout=True)
self.wc_canvas = FigureCanvasTkAgg(self.wc_fig, master=canvas_frame)
self.wc_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
self.update_word_cloud()
self.root.wait_window(wc_window)
self._toggle_widget_state(self.main_frame, tk.NORMAL)
def wc_select_all(self):
for var in self.wc_speaker_vars.values(): var.set(True)
self.update_word_cloud()
def wc_deselect_none(self):
for var in self.wc_speaker_vars.values(): var.set(False)
self.update_word_cloud()
def update_word_cloud(self, event=None):
selected_speakers = [s for s, v in self.wc_speaker_vars.items() if v.get()]
if not selected_speakers: text, title = "", "No Speakers Selected"
else:
text = " ".join([self.speaker_texts.get(sp, "") for sp in selected_speakers])
if len(selected_speakers) == 1: title = selected_speakers[0]
elif len(selected_speakers) == len(self.wc_speaker_vars): title = "All Speakers"
else: title = "Custom Selection"
self.wc_ax.clear()
if not text.strip():
self.wc_ax.text(0.5, 0.5, "Select speakers to generate word cloud.", ha='center', va='center')
else:
wordcloud = WordCloud(width=1200, height=800, background_color='white', colormap='viridis', max_words=150, random_state=42).generate(text)
self.wc_ax.imshow(wordcloud, interpolation='bilinear')
self.wc_ax.set_title(f'Most Common Words: {title}', fontsize=20, weight='bold', pad=20)
self.wc_ax.axis('off')
self.wc_canvas.draw()
def export_word_cloud(self, parent_window):
if not self.output_dir:
Messagebox.show_error("Could not find analysis directory.", "Directory Not Found", parent=parent_window)
return
selected_speakers = [s for s, v in self.wc_speaker_vars.items() if v.get()]
if not selected_speakers:
Messagebox.show_warning("Please select speakers to export.", "Nothing to Export", parent=parent_window)
return
if len(selected_speakers) == 1: name = selected_speakers[0]
elif len(selected_speakers) == len(self.wc_speaker_vars): name = "All_Speakers"
else: name = "Custom_Selection"
safe_name = re.sub(r'\W+', '', name)
filepath = join(self.output_dir, f"wordcloud_{safe_name}.png")
try:
self.wc_fig.savefig(filepath, dpi=300, facecolor='white')
Messagebox.ok(f"Word cloud saved to:\n\n{filepath}", "Export Successful", parent=parent_window)
except Exception as e: Messagebox.show_error(f"Failed to save image:\n{e}", "Export Error", parent=parent_window)
def create_bar_graph_figure(self, speaker_data, total_words):
plt.style.use('default'); plt.rcParams['font.family'] = 'sans-serif'; plt.rcParams['font.sans-serif'] = ['Helvetica Neue', 'Arial']
bg, p_text, s_text = '#FFFFFF', '#1f1f1f', '#5f6368'
sorted_data = sorted(speaker_data.items(), key=lambda item: item[1])
labels, sizes = [item[0] for item in sorted_data], [item[1] for item in sorted_data]
cmap = plt.get_cmap('GnBu', len(labels) + 4)
colors = cmap(np.linspace(0.35, 0.85, len(labels)))
fig, ax = plt.subplots(figsize=(12, max(6, len(labels) * 0.65)), constrained_layout=True)
fig.patch.set_facecolor(bg); ax.set_facecolor(bg)
fig.suptitle('Speaker Contribution', fontsize=20, weight='bold', color=p_text)
ax.set_title(f'Analysis of {total_words:,} words across {len(labels)} speakers', loc='left', fontsize=12, color=s_text)
bar_height = 0.6
for i, (value, color) in enumerate(zip(sizes, colors)):
ax.add_patch(patches.FancyBboxPatch((0, i - bar_height / 2), value, bar_height, boxstyle=f"round,pad=0,rounding_size=0.1", fc=color, ec='none', zorder=3))
for i, (value, label_text) in enumerate(zip(sizes, labels)):
if value == 0: continue
percent = (value / total_words) * 100
text = f'{value:,} ({percent:.1f}%)'
is_dark_bg, label_color = value > (total_words * 0.25), 'white' if value > (total_words * 0.25) else p_text
ha, padding = 'right' if is_dark_bg else 'left', max(sizes) * 0.01 if sizes else 0
ax.text(value - padding if is_dark_bg else value + padding, i, text, va='center', ha=ha, color=label_color, fontsize=10, weight='bold')
ax.grid(True, axis='x', color='#E8EAED', linestyle='-', zorder=0)
ax.spines[['top', 'right', 'left', 'bottom']].set_visible(False)
ax.set_yticks(range(len(labels))); ax.set_yticklabels(labels, fontsize=11, color=p_text)
ax.tick_params(axis='x', colors=s_text, length=0); ax.tick_params(axis='y', length=0, pad=10)
ax.set_ylim(-0.8, len(labels) - 0.2)
if sizes: ax.set_xlim(0, max(sizes) * 1.05)
return fig
def create_timeline_figure(self, lines):
plt.style.use('default'); plt.rcParams['font.family'] = 'sans-serif'; plt.rcParams['font.sans-serif'] = ['Helvetica Neue', 'Arial']
bg, p_text, s_text, grid = '#FFFFFF', '#1f1f1f', '#5f6368', '#E8EAED'
speakers = sorted(list(set(line['speakerDesignation'] for line in lines)), reverse=True)
plot_data = defaultdict(list)
for line in lines:
start_sec, end_sec = time_str_to_seconds(line['startTime']), time_str_to_seconds(line['endTime'])
if (duration := end_sec - start_sec) < 0.1: continue
plot_data[line['speakerDesignation']].append((start_sec, duration))
fig, ax = plt.subplots(figsize=(14, max(4, len(speakers) * 0.6)), constrained_layout=True)
fig.patch.set_facecolor(bg); ax.set_facecolor(bg)
fig.suptitle('Conversation Timeline', fontsize=20, weight='bold', color=p_text)
ax.set_title('Visualizing the flow of dialogue over time', loc='left', fontsize=12, color=s_text)
cmap = plt.get_cmap('tab20', len(speakers))
colors = cmap(np.linspace(0, 1, len(speakers)))
for i, speaker in enumerate(speakers):
if speaker in plot_data: ax.broken_barh(plot_data[speaker], (i - 0.45, 0.9), facecolors=colors[i], zorder=3)
ax.xaxis.set_major_formatter(FuncFormatter(lambda x, pos: f'{int(x)//60}:{int(x)%60:02d}'))
ax.set_xlabel('Time (minutes:seconds)', color=s_text, labelpad=10)
ax.set_yticks(range(len(speakers))); ax.set_yticklabels(speakers, fontsize=11, color=p_text)
ax.grid(True, axis='x', color=grid, linestyle='-', zorder=0)
ax.spines[['top', 'right', 'left', 'bottom']].set_visible(False)
ax.tick_params(axis='x', colors=s_text, length=0); ax.tick_params(axis='y', length=0)
return fig
def show_report_graph(self, fig, title="Report Graph"):
graph_window = tk.Toplevel(self.root)
graph_window.title(title)
graph_window.geometry("1200x800")
graph_window.configure(bg='#FFFFFF')
canvas = FigureCanvasTkAgg(fig, master=graph_window)
canvas.draw()
canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=1, padx=10, pady=10)
if __name__ == "__main__":
try:
root = dnd.TkinterDnD.Tk()
style = bstt.Style(theme="litera")
except tk.TclError:
Messagebox.show_warning("Drag and drop could not be initialized. The app will run without it.", "DND Init Failed")
root = bstt.Window(themename="litera")
app = TranscriptCombinerApp(root)
root.lift()
root.attributes('-topmost', True)
root.after_idle(root.attributes, '-topmost', False)
root.focus_force()
root.mainloop()
@xthesaintx
Copy link
Author

reads .dote json files in a directory and combines them in chronological order.

.dote json files need to be formatted as below

{ "lines" : [ { "endTime" : "00:00:11,680", "speakerDesignation" : "DM", "startTime" : "00:00:10,160", "text" : "All right, let's see how this goes." }....
.
Screenshot 2025-06-30 at 1 53 26 PM

outputs a json and plain text file in the directory

@xthesaintx
Copy link
Author

Screenshot 2025-07-03 at 9 22 08 AM
The latest revsion add substantially more functionality

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment