Skip to content

Instantly share code, notes, and snippets.

@masuidrive
Created October 13, 2024 15:16
Show Gist options
  • Save masuidrive/22772b9a95acc2dde52e2fd551e32909 to your computer and use it in GitHub Desktop.
Save masuidrive/22772b9a95acc2dde52e2fd551e32909 to your computer and use it in GitHub Desktop.
LibriSpeechデータセットの転写ファイル(.trans.txt)を処理し、構造化されたJSONLファイルに変換します。また、必要に応じて関連する本文をProject Gutenbergからダウンロードします。
'''
# Transcription to JSONL Converter
## 概要
LibriSpeechデータセットの転写ファイル(.trans.txt)を処理し、構造化されたJSONLファイルに変換します。また、必要に応じて関連する本文をProject Gutenbergからダウンロードします。
## 主な機能
1. 転写ファイル(.trans.txt)の読み込みと処理
2. 章情報(CHAPTERS.txt)と話者情報(SPEAKERS.txt)の統合
3. 本文ファイルの存在確認と必要に応じたダウンロード
4. 転写テキストと本文のマッチング
5. 構造化されたJSONLファイルの生成
## 入力データ
- .trans.txtファイル:形式 "reader_id-chapter_id-sentence_id 文章"
- CHAPTERS.txt:章情報を含むパイプ区切りファイル
- SPEAKERS.txt:話者情報を含むパイプ区切りファイル
- 本文ファイル:book_id.txt形式(存在しない場合はダウンロード)
## 出力データ
構造化されたJSONLファイル。各行は以下の情報を含むJSONオブジェクト:
- reader:ID、名前、性別
- chapter:ID、タイトル
- book:ID、タイトル
- sentence:ID、テキスト、前の文
- audio_path:関連する音声ファイルへのパス
- subset:データセットのサブセット情報
## 主要な処理フロー
1. コマンドライン引数の解析
2. 必要なディレクトリの作成
3. CHAPTERS.txtとSPEAKERS.txtの読み込み
4. .trans.txtファイルの再帰的な検索と処理
5. 各文に対する本文とのマッチング
6. 構造化されたデータのJSONL形式での書き出し
## 使用方法
```
python librispeech_to_json.py --data-dir <directory> [--books-dir <directory>] [--chapters-file <path>] [--speakers-file <path>] [--output <file.jsonl>]
```
## 注意点
- 大量のファイルを処理する場合、メモリ使用量に注意が必要
- Project Gutenbergからの本のダウンロードは、ネットワーク接続に依存
- 文章マッチングアルゴリズムは単純なものを使用しており、改善の余地あり
## 拡張性
- より高度な文章マッチングアルゴリズムの実装
- 並列処理による処理速度の向上
- エラーリカバリー機能の強化
- ログ機能の追加
このスクリプトは、音声認識データセットの前処理や構造化に有用であり、必要に応じてカスタマイズや拡張が可能です。
'''
import argparse
import os
import json
from tqdm import tqdm
import re
import requests
import sys
def preprocess_text(text):
return re.sub(r'[^\w\s]', '', text.upper())
def find_matching_sentence(sentence, book_content):
preprocessed_sentence = preprocess_text(sentence)
preprocessed_book = preprocess_text(book_content)
words = preprocessed_sentence.split()
best_match = ""
best_score = 0
for i in range(len(words)):
for j in range(i + 1, len(words) + 1):
substring = ' '.join(words[i:j])
if substring in preprocessed_book:
score = len(substring)
if score > best_score:
best_score = score
start_index = preprocessed_book.index(substring)
end_index = start_index + len(substring)
best_match = book_content[start_index:end_index]
return best_match
def load_chapters(chapters_file):
chapters = {}
with open(chapters_file, 'r', encoding='utf-8') as f:
for line in f:
if line.startswith(';'):
continue
parts = line.strip().split('|')
if len(parts) == 8:
chapter_id, reader_id, _, subset, _, book_id, chapter_title, project_title = parts
chapters[int(chapter_id.strip())] = {
'reader_id': int(reader_id.strip()),
'book_id': int(book_id.strip()),
'chapter_title': chapter_title.strip(),
'book_title': project_title.strip(),
'subset': subset.strip()
}
return chapters
def load_speakers(speakers_file):
speakers = {}
with open(speakers_file, 'r', encoding='utf-8') as f:
for line in f:
if line.startswith(';'):
continue
parts = line.strip().split('|')
if len(parts) == 5:
reader_id, sex, subset, duration, name = parts
speakers[int(reader_id.strip())] = {
'name': name.strip(),
'sex': sex.strip()
}
return speakers
def download_book(book_id, books_dir):
url = f"https://www.gutenberg.org/cache/epub/{book_id}/pg{book_id}.txt"
response = requests.get(url)
if response.status_code == 200:
filepath = os.path.join(books_dir, f"{book_id}.txt")
with open(filepath, 'w', encoding='utf-8') as f:
f.write(response.text)
print(f"Downloaded: {book_id}.txt")
return True
else:
print(f"Failed to download book ID {book_id}", file=sys.stderr)
return False
def process_trans_files(data_dir, books_dir, chapters_file, speakers_file, output_file):
chapters = load_chapters(chapters_file)
speakers = load_speakers(speakers_file)
trans_files = []
for root, _, files in os.walk(data_dir):
for file in files:
if file.endswith('.trans.txt'):
trans_files.append(os.path.join(root, file))
with open(output_file, 'w', encoding='utf-8') as jsonl_file:
for trans_file in tqdm(trans_files, desc="Processing files"):
book_sentences = {}
with open(trans_file, 'r', encoding='utf-8') as f:
for line in f:
parts = line.strip().split(' ', 1)
if len(parts) == 2:
id_parts = parts[0].split('-')
if len(id_parts) == 3:
reader_id, chapter_id, sentence_id = map(int, id_parts)
sentence = parts[1]
if chapter_id not in chapters:
print(f"Warning: Chapter ID {chapter_id} not found in CHAPTERS.txt")
continue
chapter_info = chapters[chapter_id]
book_id = chapter_info['book_id']
if book_id not in book_sentences:
book_file = os.path.join(books_dir, f"{book_id}.txt")
if not os.path.exists(book_file):
print(f"Book file for ID {book_id} not found. Attempting to download...")
if not download_book(book_id, books_dir):
continue
with open(book_file, 'r', encoding='utf-8') as book_f:
book_content = book_f.read()
book_sentences[book_id] = []
matched_sentence = find_matching_sentence(sentence, book_content)
book_sentences[book_id].append((reader_id, sentence_id, matched_sentence))
previous_sentence = ""
if len(book_sentences[book_id]) > 1:
previous_sentence = book_sentences[book_id][-2][2]
audio_path = os.path.join(os.path.dirname(trans_file), f"{reader_id}-{chapter_id}-{sentence_id}.flac")
entry = {
'reader': {
'id': reader_id,
'name': speakers.get(reader_id, {}).get('name', ''),
'sex': speakers.get(reader_id, {}).get('sex', '')
},
'chapter': {
'id': chapter_id,
'title': chapter_info['chapter_title']
},
'book': {
'id': book_id,
'title': chapter_info['book_title']
},
'sentence': {
'id': sentence_id,
'text': matched_sentence,
'previous': previous_sentence
},
'audio_path': audio_path,
'subset': chapter_info['subset']
}
jsonl_file.write(json.dumps(entry) + '\n')
def main():
parser = argparse.ArgumentParser(description="Convert transcription files to JSONL")
parser.add_argument("--data-dir", required=True, help="Directory containing transcription files")
parser.add_argument("--books-dir", help="Directory containing book text files")
parser.add_argument("--chapters-file", help="Path to CHAPTERS.txt file")
parser.add_argument("--speakers-file", help="Path to SPEAKERS.txt file")
parser.add_argument("--output", default="output.jsonl", help="Output JSONL file name")
args = parser.parse_args()
# Set default books_dir if not provided
if args.books_dir is None:
args.books_dir = os.path.join(args.data_dir, "books")
# Create books directory if it doesn't exist
os.makedirs(args.books_dir, exist_ok=True)
# デフォルトのファイルパスを設定
if args.chapters_file is None:
args.chapters_file = os.path.join(args.data_dir, "CHAPTERS.txt")
if args.speakers_file is None:
args.speakers_file = os.path.join(args.data_dir, "SPEAKERS.txt")
if not os.path.exists(args.chapters_file):
print(f"Error: CHAPTERS.txt file not found at {args.chapters_file}")
return
if not os.path.exists(args.speakers_file):
print(f"Error: SPEAKERS.txt file not found at {args.speakers_file}")
return
process_trans_files(args.data_dir, args.books_dir, args.chapters_file, args.speakers_file, args.output)
print(f"JSONL file has been created: {args.output}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment