Created
October 13, 2024 15:16
-
-
Save masuidrive/22772b9a95acc2dde52e2fd551e32909 to your computer and use it in GitHub Desktop.
LibriSpeechデータセットの転写ファイル(.trans.txt)を処理し、構造化されたJSONLファイルに変換します。また、必要に応じて関連する本文をProject Gutenbergからダウンロードします。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
# Transcription to JSONL Converter | |
## 概要 | |
LibriSpeechデータセットの転写ファイル(.trans.txt)を処理し、構造化されたJSONLファイルに変換します。また、必要に応じて関連する本文をProject Gutenbergからダウンロードします。 | |
## 主な機能 | |
1. 転写ファイル(.trans.txt)の読み込みと処理 | |
2. 章情報(CHAPTERS.txt)と話者情報(SPEAKERS.txt)の統合 | |
3. 本文ファイルの存在確認と必要に応じたダウンロード | |
4. 転写テキストと本文のマッチング | |
5. 構造化されたJSONLファイルの生成 | |
## 入力データ | |
- .trans.txtファイル:形式 "reader_id-chapter_id-sentence_id 文章" | |
- CHAPTERS.txt:章情報を含むパイプ区切りファイル | |
- SPEAKERS.txt:話者情報を含むパイプ区切りファイル | |
- 本文ファイル:book_id.txt形式(存在しない場合はダウンロード) | |
## 出力データ | |
構造化されたJSONLファイル。各行は以下の情報を含むJSONオブジェクト: | |
- reader:ID、名前、性別 | |
- chapter:ID、タイトル | |
- book:ID、タイトル | |
- sentence:ID、テキスト、前の文 | |
- audio_path:関連する音声ファイルへのパス | |
- subset:データセットのサブセット情報 | |
## 主要な処理フロー | |
1. コマンドライン引数の解析 | |
2. 必要なディレクトリの作成 | |
3. CHAPTERS.txtとSPEAKERS.txtの読み込み | |
4. .trans.txtファイルの再帰的な検索と処理 | |
5. 各文に対する本文とのマッチング | |
6. 構造化されたデータのJSONL形式での書き出し | |
## 使用方法 | |
``` | |
python librispeech_to_json.py --data-dir <directory> [--books-dir <directory>] [--chapters-file <path>] [--speakers-file <path>] [--output <file.jsonl>] | |
``` | |
## 注意点 | |
- 大量のファイルを処理する場合、メモリ使用量に注意が必要 | |
- Project Gutenbergからの本のダウンロードは、ネットワーク接続に依存 | |
- 文章マッチングアルゴリズムは単純なものを使用しており、改善の余地あり | |
## 拡張性 | |
- より高度な文章マッチングアルゴリズムの実装 | |
- 並列処理による処理速度の向上 | |
- エラーリカバリー機能の強化 | |
- ログ機能の追加 | |
このスクリプトは、音声認識データセットの前処理や構造化に有用であり、必要に応じてカスタマイズや拡張が可能です。 | |
''' | |
import argparse | |
import os | |
import json | |
from tqdm import tqdm | |
import re | |
import requests | |
import sys | |
def preprocess_text(text): | |
return re.sub(r'[^\w\s]', '', text.upper()) | |
def find_matching_sentence(sentence, book_content): | |
preprocessed_sentence = preprocess_text(sentence) | |
preprocessed_book = preprocess_text(book_content) | |
words = preprocessed_sentence.split() | |
best_match = "" | |
best_score = 0 | |
for i in range(len(words)): | |
for j in range(i + 1, len(words) + 1): | |
substring = ' '.join(words[i:j]) | |
if substring in preprocessed_book: | |
score = len(substring) | |
if score > best_score: | |
best_score = score | |
start_index = preprocessed_book.index(substring) | |
end_index = start_index + len(substring) | |
best_match = book_content[start_index:end_index] | |
return best_match | |
def load_chapters(chapters_file): | |
chapters = {} | |
with open(chapters_file, 'r', encoding='utf-8') as f: | |
for line in f: | |
if line.startswith(';'): | |
continue | |
parts = line.strip().split('|') | |
if len(parts) == 8: | |
chapter_id, reader_id, _, subset, _, book_id, chapter_title, project_title = parts | |
chapters[int(chapter_id.strip())] = { | |
'reader_id': int(reader_id.strip()), | |
'book_id': int(book_id.strip()), | |
'chapter_title': chapter_title.strip(), | |
'book_title': project_title.strip(), | |
'subset': subset.strip() | |
} | |
return chapters | |
def load_speakers(speakers_file): | |
speakers = {} | |
with open(speakers_file, 'r', encoding='utf-8') as f: | |
for line in f: | |
if line.startswith(';'): | |
continue | |
parts = line.strip().split('|') | |
if len(parts) == 5: | |
reader_id, sex, subset, duration, name = parts | |
speakers[int(reader_id.strip())] = { | |
'name': name.strip(), | |
'sex': sex.strip() | |
} | |
return speakers | |
def download_book(book_id, books_dir): | |
url = f"https://www.gutenberg.org/cache/epub/{book_id}/pg{book_id}.txt" | |
response = requests.get(url) | |
if response.status_code == 200: | |
filepath = os.path.join(books_dir, f"{book_id}.txt") | |
with open(filepath, 'w', encoding='utf-8') as f: | |
f.write(response.text) | |
print(f"Downloaded: {book_id}.txt") | |
return True | |
else: | |
print(f"Failed to download book ID {book_id}", file=sys.stderr) | |
return False | |
def process_trans_files(data_dir, books_dir, chapters_file, speakers_file, output_file): | |
chapters = load_chapters(chapters_file) | |
speakers = load_speakers(speakers_file) | |
trans_files = [] | |
for root, _, files in os.walk(data_dir): | |
for file in files: | |
if file.endswith('.trans.txt'): | |
trans_files.append(os.path.join(root, file)) | |
with open(output_file, 'w', encoding='utf-8') as jsonl_file: | |
for trans_file in tqdm(trans_files, desc="Processing files"): | |
book_sentences = {} | |
with open(trans_file, 'r', encoding='utf-8') as f: | |
for line in f: | |
parts = line.strip().split(' ', 1) | |
if len(parts) == 2: | |
id_parts = parts[0].split('-') | |
if len(id_parts) == 3: | |
reader_id, chapter_id, sentence_id = map(int, id_parts) | |
sentence = parts[1] | |
if chapter_id not in chapters: | |
print(f"Warning: Chapter ID {chapter_id} not found in CHAPTERS.txt") | |
continue | |
chapter_info = chapters[chapter_id] | |
book_id = chapter_info['book_id'] | |
if book_id not in book_sentences: | |
book_file = os.path.join(books_dir, f"{book_id}.txt") | |
if not os.path.exists(book_file): | |
print(f"Book file for ID {book_id} not found. Attempting to download...") | |
if not download_book(book_id, books_dir): | |
continue | |
with open(book_file, 'r', encoding='utf-8') as book_f: | |
book_content = book_f.read() | |
book_sentences[book_id] = [] | |
matched_sentence = find_matching_sentence(sentence, book_content) | |
book_sentences[book_id].append((reader_id, sentence_id, matched_sentence)) | |
previous_sentence = "" | |
if len(book_sentences[book_id]) > 1: | |
previous_sentence = book_sentences[book_id][-2][2] | |
audio_path = os.path.join(os.path.dirname(trans_file), f"{reader_id}-{chapter_id}-{sentence_id}.flac") | |
entry = { | |
'reader': { | |
'id': reader_id, | |
'name': speakers.get(reader_id, {}).get('name', ''), | |
'sex': speakers.get(reader_id, {}).get('sex', '') | |
}, | |
'chapter': { | |
'id': chapter_id, | |
'title': chapter_info['chapter_title'] | |
}, | |
'book': { | |
'id': book_id, | |
'title': chapter_info['book_title'] | |
}, | |
'sentence': { | |
'id': sentence_id, | |
'text': matched_sentence, | |
'previous': previous_sentence | |
}, | |
'audio_path': audio_path, | |
'subset': chapter_info['subset'] | |
} | |
jsonl_file.write(json.dumps(entry) + '\n') | |
def main(): | |
parser = argparse.ArgumentParser(description="Convert transcription files to JSONL") | |
parser.add_argument("--data-dir", required=True, help="Directory containing transcription files") | |
parser.add_argument("--books-dir", help="Directory containing book text files") | |
parser.add_argument("--chapters-file", help="Path to CHAPTERS.txt file") | |
parser.add_argument("--speakers-file", help="Path to SPEAKERS.txt file") | |
parser.add_argument("--output", default="output.jsonl", help="Output JSONL file name") | |
args = parser.parse_args() | |
# Set default books_dir if not provided | |
if args.books_dir is None: | |
args.books_dir = os.path.join(args.data_dir, "books") | |
# Create books directory if it doesn't exist | |
os.makedirs(args.books_dir, exist_ok=True) | |
# デフォルトのファイルパスを設定 | |
if args.chapters_file is None: | |
args.chapters_file = os.path.join(args.data_dir, "CHAPTERS.txt") | |
if args.speakers_file is None: | |
args.speakers_file = os.path.join(args.data_dir, "SPEAKERS.txt") | |
if not os.path.exists(args.chapters_file): | |
print(f"Error: CHAPTERS.txt file not found at {args.chapters_file}") | |
return | |
if not os.path.exists(args.speakers_file): | |
print(f"Error: SPEAKERS.txt file not found at {args.speakers_file}") | |
return | |
process_trans_files(args.data_dir, args.books_dir, args.chapters_file, args.speakers_file, args.output) | |
print(f"JSONL file has been created: {args.output}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment