Instantly share code, notes, and snippets.
Last active
April 16, 2025 06:46
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save kepoorz/2b5e78af69ff8516909dc12753dd7562 to your computer and use it in GitHub Desktop.
Obsidianのノートに特定のルールに基づいてタグを追加・更新するスクリプト
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| r""" | |
| Obsidianのノートに特定のルールに基づいてタグを追加・更新するスクリプトです。 | |
| ## 主な機能 | |
| - **update_tags**: YAMLフロントマターのtagsフィールドを更新します | |
| - **process_files**: ディレクトリ内のすべてのMarkdownファイルを処理し、各ファイルのタグを生成・更新します | |
| ## コマンドラインサンプル | |
| python add_tags.py --directory "/path/to/obsidian/vault" --api_key "your_gemini_api_key_here" --rules_file "path/to/rules.md" | |
| python add_tags.py --directory "/path/to/obsidian/vault" --api_key "your_gemini_api_key_here" --rules_file "path/to/rules.md" --max_concurrent 3 --max_files 50 --file_pattern "project|work" --dry_run | |
| windowsの場合 | |
| python add_tags.py --directory "C:/Users/YourName/Documents/ObsidianVault" --api_key "your_gemini_api_key_here" --rules_file "path/to/rules.md" | |
| ## コマンドライン引数 | |
| - **--directory**: Obsidianノート(Markdownファイル)を含むディレクトリのパス | |
| - **--api_key**: 必須。Gemini APIにアクセスするためのAPIキー | |
| - **--rules_file**: 必須。タグ付けルールが記述されたMarkdownファイルのパス | |
| - **--max_concurrent**: オプション。同時処理するファイルの最大数(デフォルト:5) | |
| - **--max_files**: オプション。処理するファイルの最大数 | |
| - **--file_pattern**: オプション。ファイル名でフィルタリングするための正規表現パターン | |
| - **--dry_run**: オプション。ファイルを実際に変更せずに、何が変更されるかをプレビューします | |
| ## 使用方法 | |
| コマンドラインから必要な引数を指定してスクリプトを実行します。スクリプトは指定されたディレクトリ内のMarkdownファイルを処理し、 | |
| Gemini APIを使用してファイル内容を分析し、設定したルールに基づいてタグを生成します。 | |
| その後、YAMLフロントマターのtagsフィールドを更新します。 | |
| """ | |
| import os | |
| import asyncio | |
| import argparse | |
| import time | |
| import logging | |
| import re | |
| import yaml | |
| from tqdm.asyncio import tqdm_asyncio | |
| from pathlib import Path | |
| import sys # Add sys for error handling | |
| # ログ設定 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("tag_manager.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| async def generate_tags_with_gemini(filepath, content, api_key, model, rules_content, total_tokens=None): | |
| """ | |
| ファイルの内容に基づいてGemini APIを使用してタグを生成する関数 | |
| """ | |
| try: | |
| # ファイル名を取得 | |
| filename = Path(filepath).stem | |
| # コンテンツサンプルを取得(長すぎる場合は先頭部分のみ) | |
| max_chars = 2000 | |
| sample_content = content[:max_chars] + ("..." if len(content) > max_chars else "") | |
| # Gemini APIへのプロンプト (ルールは外部ファイルから読み込む) | |
| prompt = f""" | |
| 以下のファイル「{filename}」の内容から適切なタグを抽出してください。 | |
| CONTENT: | |
| {sample_content} | |
| --- START RULES --- | |
| {rules_content} | |
| --- END RULES --- | |
| OUTPUT: | |
| 1行に「tag1, tag2, tag3」のようにカンマ区切りのリスト形式で出力 | |
| """ | |
| generation_config = { | |
| 'max_output_tokens': 512, | |
| 'temperature': 0.1, | |
| 'top_p': 0.3, | |
| 'top_k': 10 | |
| } | |
| response = await asyncio.to_thread( | |
| model.generate_content, | |
| prompt, | |
| generation_config=generation_config | |
| ) | |
| # デバッグ用: レスポンスの属性をログに出力 | |
| logger.debug(f"Response object attributes: {dir(response)}") | |
| if hasattr(response, 'usage_metadata'): | |
| logger.debug(f"Response usage_metadata attribute: {response.usage_metadata}") | |
| elif hasattr(response, 'token_usage'): | |
| logger.debug(f"Response token_usage attribute: {response.token_usage}") | |
| # トークン使用量の更新 | |
| if total_tokens is not None: | |
| if hasattr(response, 'usage_metadata'): | |
| usage = response.usage_metadata | |
| total_tokens["input"] += usage.prompt_token_count | |
| # Use candidates_token_count for output, sum if multiple candidates exist (though unlikely here) | |
| total_tokens["output"] += usage.candidates_token_count | |
| elif hasattr(response, 'token_usage'): # Fallback for older/different response structures | |
| usage = response.token_usage | |
| total_tokens["input"] += usage.get("input_tokens", 0) | |
| total_tokens["output"] += usage.get("output_tokens", 0) | |
| else: | |
| logger.warning(f"Token usage information not found in response for {filepath}") | |
| # レスポンスからタグリストを抽出 | |
| tag_text = response.text.strip() | |
| # タグから#記号を削除し、カンマで分割 | |
| tags = [t.strip().replace('#', '') for t in tag_text.split(',')] | |
| # 空のタグを除外 | |
| tags = [t for t in tags if t] | |
| # リストに変換して重複を削除 | |
| return list(set(tags)) | |
| except Exception as e: | |
| logger.error(f"Gemini APIでのタグ生成エラー ({filepath}): {e}") | |
| # エラーの場合は空のリストを返す | |
| return [] | |
| async def update_tags(filepath, semaphore, api_key, model, rules_content, dry_run=False, total_tokens=None): | |
| """YAMLフロントマターのタグを更新する""" | |
| async with semaphore: # 同時処理数を制限 | |
| try: | |
| # ファイルを読み込む | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # YAMLフロントマターを探す (---で囲まれた部分) | |
| yaml_pattern = re.compile(r'^---\n(.*?)\n---\n', re.DOTALL) | |
| match = yaml_pattern.search(content) | |
| if match: | |
| # YAMLフロントマターが見つかった場合 | |
| yaml_text = match.group(1) | |
| try: | |
| # YAMLとして解析 | |
| yaml_data = yaml.safe_load(yaml_text) | |
| if yaml_data is None: | |
| yaml_data = {} # YAMLが空の場合は空の辞書を作成 | |
| except yaml.YAMLError: | |
| # YAML解析エラーの場合は新しいYAMLデータを作成 | |
| yaml_data = {} | |
| logger.warning(f"YAMLフロントマターの解析エラー: {filepath}") | |
| # Gemini APIを使って新しいタグを生成 | |
| new_tags = await generate_tags_with_gemini(filepath, content, api_key, model, rules_content, total_tokens) | |
| # 元のタグと比較 | |
| old_tags = yaml_data.get('tags', []) | |
| # タグの処理(NoneやリストのNoneを除外) | |
| if old_tags is None: | |
| old_tags = [] | |
| elif isinstance(old_tags, list): | |
| # リスト内のNoneや空文字を除外 | |
| old_tags = [tag for tag in old_tags if tag is not None and tag != ''] | |
| elif isinstance(old_tags, str): | |
| old_tags = [old_tags] # 文字列の場合はリストに変換 | |
| # 新しいタグのNULLチェック | |
| if new_tags is None: | |
| new_tags = [] | |
| # リスト内の空文字やNoneを除外 | |
| new_tags = [tag for tag in new_tags if tag is not None and tag != ''] | |
| # タグの更新が必要かチェック | |
| tags_changed = sorted(new_tags) != sorted(old_tags) | |
| if tags_changed: | |
| # タグを更新 | |
| yaml_data['tags'] = new_tags | |
| # 新しいYAMLテキストを作成 | |
| new_yaml_text = yaml.dump(yaml_data, allow_unicode=True, sort_keys=False) | |
| # コンテンツを更新 | |
| updated_content = content.replace(match.group(0), f"---\n{new_yaml_text}---\n") | |
| if not dry_run: | |
| # ファイルを上書き | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(updated_content) | |
| logger.info(f"タグを更新しました: {filepath} - {old_tags} -> {new_tags}") | |
| else: | |
| logger.info(f"[ドライラン] タグを更新します: {filepath} - {old_tags} -> {new_tags}") | |
| return True | |
| else: | |
| logger.info(f"タグの変更なし: {filepath}") | |
| return False | |
| else: | |
| # YAMLフロントマターが見つからない場合は新規作成 | |
| new_tags = await generate_tags_with_gemini(filepath, content, api_key, model, rules_content, total_tokens) | |
| # 新しいタグのNULLチェック | |
| if new_tags is None: | |
| new_tags = [] | |
| # リスト内の空文字やNoneを除外 | |
| new_tags = [tag for tag in new_tags if tag is not None and tag != ''] | |
| if new_tags and len(new_tags) > 0: | |
| yaml_data = {'tags': new_tags} | |
| new_yaml_text = yaml.dump(yaml_data, allow_unicode=True, sort_keys=False) | |
| # 新しいコンテンツを作成 | |
| updated_content = f"---\n{new_yaml_text}---\n\n{content}" | |
| if not dry_run: | |
| # ファイルを上書き | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(updated_content) | |
| logger.info(f"新しいYAMLフロントマターとタグを追加しました: {filepath} - {new_tags}") | |
| else: | |
| logger.info(f"[ドライラン] 新しいYAMLフロントマターとタグを追加します: {filepath} - {new_tags}") | |
| return True | |
| else: | |
| logger.info(f"タグが生成されませんでした: {filepath}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"エラー ({filepath}): {e}") | |
| # スタックトレースを出力してデバッグを容易にする | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return False | |
| async def process_files(directory, api_key, rules_file, max_concurrent=5, max_files=None, file_pattern=None, dry_run=False): | |
| """ディレクトリ内のマークダウンファイルを処理する""" | |
| # ルールファイルを読み込む | |
| try: | |
| with open(rules_file, 'r', encoding='utf-8') as f: | |
| rules_content = f.read() | |
| logger.info(f"ルールファイルを読み込みました: {rules_file}") | |
| except FileNotFoundError: | |
| logger.error(f"ルールファイルが見つかりません: {rules_file}") | |
| sys.exit(1) # エラーで終了 | |
| except Exception as e: | |
| logger.error(f"ルールファイルの読み込みエラー: {e}") | |
| sys.exit(1) # エラーで終了 | |
| # 同時実行数を制限するセマフォを作成 | |
| semaphore = asyncio.Semaphore(max_concurrent) | |
| # Gemini APIのモデルを初期化 | |
| import google.generativeai as genai | |
| genai.configure(api_key=api_key) | |
| # 2025年4月現在の最新の安定バージョン | |
| model = genai.GenerativeModel('gemini-1.5-flash-002') | |
| # 処理対象ファイルを収集 | |
| markdown_files = [] | |
| for root, _, files in os.walk(directory): | |
| for file in files: | |
| if file.endswith(".md"): | |
| filepath = os.path.join(root, file) | |
| # ファイルパターンが指定された場合、ファイル名でフィルタリング | |
| if file_pattern and not re.search(file_pattern, file): | |
| continue | |
| markdown_files.append(filepath) | |
| # 最大ファイル数が指定された場合 | |
| if max_files and len(markdown_files) > max_files: | |
| markdown_files = markdown_files[:max_files] | |
| logger.info(f"処理対象ファイル数: {len(markdown_files)}") | |
| # タスクを作成 | |
| tasks = [] | |
| total_tokens = {"input": 0, "output": 0} # トークン使用量の追跡 | |
| for filepath in markdown_files: | |
| tasks.append(update_tags(filepath, semaphore, api_key, model, rules_content, dry_run, total_tokens)) | |
| # タスクを非同期で実行 | |
| results = await tqdm_asyncio.gather(*tasks, desc="ファイル処理中") | |
| # 結果をカウント | |
| updated_count = sum(1 for result in results if result) | |
| # APIコストを概算(gemini-1.5-flash-002 のレートに基づく - 2024年4月時点) | |
| # Input: $0.000125 / 1K tokens | |
| # Output: $0.000375 / 1K tokens | |
| # Note: Rates might change, adjust accordingly. | |
| input_cost = total_tokens["input"] / 1000 * 0.000125 | |
| output_cost = total_tokens["output"] / 1000 * 0.000375 | |
| total_cost = input_cost + output_cost | |
| if dry_run: | |
| logger.info(f"ドライラン完了。更新予定ファイル数: {updated_count}/{len(markdown_files)}") | |
| else: | |
| logger.info(f"処理完了。更新ファイル数: {updated_count}/{len(markdown_files)}") | |
| logger.info(f"合計入力トークン: {total_tokens['input']} トークン (推定コスト: ${input_cost:.6f})") | |
| logger.info(f"合計出力トークン: {total_tokens['output']} トークン (推定コスト: ${output_cost:.6f})") | |
| logger.info(f"推定合計コスト: ${total_cost:.6f}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Obsidianのノートのタグを自動管理するスクリプト") | |
| parser.add_argument("--directory", required=True, help="Obsidianのディレクトリパス") | |
| parser.add_argument("--api_key", required=True, help="Gemini API キー") | |
| parser.add_argument("--rules_file", required=True, help="タグ付けルールが記述されたMarkdownファイルのパス") # New argument | |
| parser.add_argument("--max_concurrent", type=int, default=5, help="同時処理数の上限") | |
| parser.add_argument("--max_files", type=int, help="処理する最大ファイル数") | |
| parser.add_argument("--file_pattern", help="処理対象のファイル名のパターン(正規表現)") | |
| parser.add_argument("--dry_run", action="store_true", help="ファイルを実際に変更せずにプレビューする") | |
| args = parser.parse_args() | |
| asyncio.run(process_files( | |
| args.directory, | |
| args.api_key, | |
| args.rules_file, # Pass rules_file path | |
| args.max_concurrent, | |
| args.max_files, | |
| args.file_pattern, | |
| args.dry_run | |
| )) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment