Skip to content

Instantly share code, notes, and snippets.

@kepoorz
Last active April 16, 2025 06:46
Show Gist options
  • Save kepoorz/2b5e78af69ff8516909dc12753dd7562 to your computer and use it in GitHub Desktop.
Save kepoorz/2b5e78af69ff8516909dc12753dd7562 to your computer and use it in GitHub Desktop.
Obsidianのノートに特定のルールに基づいてタグを追加・更新するスクリプト
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
r"""
Obsidianのノートに特定のルールに基づいてタグを追加・更新するスクリプトです。
## 主な機能
- **update_tags**: YAMLフロントマターのtagsフィールドを更新します
- **process_files**: ディレクトリ内のすべてのMarkdownファイルを処理し、各ファイルのタグを生成・更新します
## コマンドラインサンプル
python add_tags.py --directory "/path/to/obsidian/vault" --api_key "your_gemini_api_key_here" --rules_file "path/to/rules.md"
python add_tags.py --directory "/path/to/obsidian/vault" --api_key "your_gemini_api_key_here" --rules_file "path/to/rules.md" --max_concurrent 3 --max_files 50 --file_pattern "project|work" --dry_run
windowsの場合
python add_tags.py --directory "C:/Users/YourName/Documents/ObsidianVault" --api_key "your_gemini_api_key_here" --rules_file "path/to/rules.md"
## コマンドライン引数
- **--directory**: Obsidianノート(Markdownファイル)を含むディレクトリのパス
- **--api_key**: 必須。Gemini APIにアクセスするためのAPIキー
- **--rules_file**: 必須。タグ付けルールが記述されたMarkdownファイルのパス
- **--max_concurrent**: オプション。同時処理するファイルの最大数(デフォルト:5)
- **--max_files**: オプション。処理するファイルの最大数
- **--file_pattern**: オプション。ファイル名でフィルタリングするための正規表現パターン
- **--dry_run**: オプション。ファイルを実際に変更せずに、何が変更されるかをプレビューします
## 使用方法
コマンドラインから必要な引数を指定してスクリプトを実行します。スクリプトは指定されたディレクトリ内のMarkdownファイルを処理し、
Gemini APIを使用してファイル内容を分析し、設定したルールに基づいてタグを生成します。
その後、YAMLフロントマターのtagsフィールドを更新します。
"""
import os
import asyncio
import argparse
import time
import logging
import re
import yaml
from tqdm.asyncio import tqdm_asyncio
from pathlib import Path
import sys # Add sys for error handling
# ログ設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("tag_manager.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
async def generate_tags_with_gemini(filepath, content, api_key, model, rules_content, total_tokens=None):
"""
ファイルの内容に基づいてGemini APIを使用してタグを生成する関数
"""
try:
# ファイル名を取得
filename = Path(filepath).stem
# コンテンツサンプルを取得(長すぎる場合は先頭部分のみ)
max_chars = 2000
sample_content = content[:max_chars] + ("..." if len(content) > max_chars else "")
# Gemini APIへのプロンプト (ルールは外部ファイルから読み込む)
prompt = f"""
以下のファイル「{filename}」の内容から適切なタグを抽出してください。
CONTENT:
{sample_content}
--- START RULES ---
{rules_content}
--- END RULES ---
OUTPUT:
1行に「tag1, tag2, tag3」のようにカンマ区切りのリスト形式で出力
"""
generation_config = {
'max_output_tokens': 512,
'temperature': 0.1,
'top_p': 0.3,
'top_k': 10
}
response = await asyncio.to_thread(
model.generate_content,
prompt,
generation_config=generation_config
)
# デバッグ用: レスポンスの属性をログに出力
logger.debug(f"Response object attributes: {dir(response)}")
if hasattr(response, 'usage_metadata'):
logger.debug(f"Response usage_metadata attribute: {response.usage_metadata}")
elif hasattr(response, 'token_usage'):
logger.debug(f"Response token_usage attribute: {response.token_usage}")
# トークン使用量の更新
if total_tokens is not None:
if hasattr(response, 'usage_metadata'):
usage = response.usage_metadata
total_tokens["input"] += usage.prompt_token_count
# Use candidates_token_count for output, sum if multiple candidates exist (though unlikely here)
total_tokens["output"] += usage.candidates_token_count
elif hasattr(response, 'token_usage'): # Fallback for older/different response structures
usage = response.token_usage
total_tokens["input"] += usage.get("input_tokens", 0)
total_tokens["output"] += usage.get("output_tokens", 0)
else:
logger.warning(f"Token usage information not found in response for {filepath}")
# レスポンスからタグリストを抽出
tag_text = response.text.strip()
# タグから#記号を削除し、カンマで分割
tags = [t.strip().replace('#', '') for t in tag_text.split(',')]
# 空のタグを除外
tags = [t for t in tags if t]
# リストに変換して重複を削除
return list(set(tags))
except Exception as e:
logger.error(f"Gemini APIでのタグ生成エラー ({filepath}): {e}")
# エラーの場合は空のリストを返す
return []
async def update_tags(filepath, semaphore, api_key, model, rules_content, dry_run=False, total_tokens=None):
"""YAMLフロントマターのタグを更新する"""
async with semaphore: # 同時処理数を制限
try:
# ファイルを読み込む
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# YAMLフロントマターを探す (---で囲まれた部分)
yaml_pattern = re.compile(r'^---\n(.*?)\n---\n', re.DOTALL)
match = yaml_pattern.search(content)
if match:
# YAMLフロントマターが見つかった場合
yaml_text = match.group(1)
try:
# YAMLとして解析
yaml_data = yaml.safe_load(yaml_text)
if yaml_data is None:
yaml_data = {} # YAMLが空の場合は空の辞書を作成
except yaml.YAMLError:
# YAML解析エラーの場合は新しいYAMLデータを作成
yaml_data = {}
logger.warning(f"YAMLフロントマターの解析エラー: {filepath}")
# Gemini APIを使って新しいタグを生成
new_tags = await generate_tags_with_gemini(filepath, content, api_key, model, rules_content, total_tokens)
# 元のタグと比較
old_tags = yaml_data.get('tags', [])
# タグの処理(NoneやリストのNoneを除外)
if old_tags is None:
old_tags = []
elif isinstance(old_tags, list):
# リスト内のNoneや空文字を除外
old_tags = [tag for tag in old_tags if tag is not None and tag != '']
elif isinstance(old_tags, str):
old_tags = [old_tags] # 文字列の場合はリストに変換
# 新しいタグのNULLチェック
if new_tags is None:
new_tags = []
# リスト内の空文字やNoneを除外
new_tags = [tag for tag in new_tags if tag is not None and tag != '']
# タグの更新が必要かチェック
tags_changed = sorted(new_tags) != sorted(old_tags)
if tags_changed:
# タグを更新
yaml_data['tags'] = new_tags
# 新しいYAMLテキストを作成
new_yaml_text = yaml.dump(yaml_data, allow_unicode=True, sort_keys=False)
# コンテンツを更新
updated_content = content.replace(match.group(0), f"---\n{new_yaml_text}---\n")
if not dry_run:
# ファイルを上書き
with open(filepath, 'w', encoding='utf-8') as f:
f.write(updated_content)
logger.info(f"タグを更新しました: {filepath} - {old_tags} -> {new_tags}")
else:
logger.info(f"[ドライラン] タグを更新します: {filepath} - {old_tags} -> {new_tags}")
return True
else:
logger.info(f"タグの変更なし: {filepath}")
return False
else:
# YAMLフロントマターが見つからない場合は新規作成
new_tags = await generate_tags_with_gemini(filepath, content, api_key, model, rules_content, total_tokens)
# 新しいタグのNULLチェック
if new_tags is None:
new_tags = []
# リスト内の空文字やNoneを除外
new_tags = [tag for tag in new_tags if tag is not None and tag != '']
if new_tags and len(new_tags) > 0:
yaml_data = {'tags': new_tags}
new_yaml_text = yaml.dump(yaml_data, allow_unicode=True, sort_keys=False)
# 新しいコンテンツを作成
updated_content = f"---\n{new_yaml_text}---\n\n{content}"
if not dry_run:
# ファイルを上書き
with open(filepath, 'w', encoding='utf-8') as f:
f.write(updated_content)
logger.info(f"新しいYAMLフロントマターとタグを追加しました: {filepath} - {new_tags}")
else:
logger.info(f"[ドライラン] 新しいYAMLフロントマターとタグを追加します: {filepath} - {new_tags}")
return True
else:
logger.info(f"タグが生成されませんでした: {filepath}")
return False
except Exception as e:
logger.error(f"エラー ({filepath}): {e}")
# スタックトレースを出力してデバッグを容易にする
import traceback
logger.error(traceback.format_exc())
return False
async def process_files(directory, api_key, rules_file, max_concurrent=5, max_files=None, file_pattern=None, dry_run=False):
"""ディレクトリ内のマークダウンファイルを処理する"""
# ルールファイルを読み込む
try:
with open(rules_file, 'r', encoding='utf-8') as f:
rules_content = f.read()
logger.info(f"ルールファイルを読み込みました: {rules_file}")
except FileNotFoundError:
logger.error(f"ルールファイルが見つかりません: {rules_file}")
sys.exit(1) # エラーで終了
except Exception as e:
logger.error(f"ルールファイルの読み込みエラー: {e}")
sys.exit(1) # エラーで終了
# 同時実行数を制限するセマフォを作成
semaphore = asyncio.Semaphore(max_concurrent)
# Gemini APIのモデルを初期化
import google.generativeai as genai
genai.configure(api_key=api_key)
# 2025年4月現在の最新の安定バージョン
model = genai.GenerativeModel('gemini-1.5-flash-002')
# 処理対象ファイルを収集
markdown_files = []
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(".md"):
filepath = os.path.join(root, file)
# ファイルパターンが指定された場合、ファイル名でフィルタリング
if file_pattern and not re.search(file_pattern, file):
continue
markdown_files.append(filepath)
# 最大ファイル数が指定された場合
if max_files and len(markdown_files) > max_files:
markdown_files = markdown_files[:max_files]
logger.info(f"処理対象ファイル数: {len(markdown_files)}")
# タスクを作成
tasks = []
total_tokens = {"input": 0, "output": 0} # トークン使用量の追跡
for filepath in markdown_files:
tasks.append(update_tags(filepath, semaphore, api_key, model, rules_content, dry_run, total_tokens))
# タスクを非同期で実行
results = await tqdm_asyncio.gather(*tasks, desc="ファイル処理中")
# 結果をカウント
updated_count = sum(1 for result in results if result)
# APIコストを概算(gemini-1.5-flash-002 のレートに基づく - 2024年4月時点)
# Input: $0.000125 / 1K tokens
# Output: $0.000375 / 1K tokens
# Note: Rates might change, adjust accordingly.
input_cost = total_tokens["input"] / 1000 * 0.000125
output_cost = total_tokens["output"] / 1000 * 0.000375
total_cost = input_cost + output_cost
if dry_run:
logger.info(f"ドライラン完了。更新予定ファイル数: {updated_count}/{len(markdown_files)}")
else:
logger.info(f"処理完了。更新ファイル数: {updated_count}/{len(markdown_files)}")
logger.info(f"合計入力トークン: {total_tokens['input']} トークン (推定コスト: ${input_cost:.6f})")
logger.info(f"合計出力トークン: {total_tokens['output']} トークン (推定コスト: ${output_cost:.6f})")
logger.info(f"推定合計コスト: ${total_cost:.6f}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Obsidianのノートのタグを自動管理するスクリプト")
parser.add_argument("--directory", required=True, help="Obsidianのディレクトリパス")
parser.add_argument("--api_key", required=True, help="Gemini API キー")
parser.add_argument("--rules_file", required=True, help="タグ付けルールが記述されたMarkdownファイルのパス") # New argument
parser.add_argument("--max_concurrent", type=int, default=5, help="同時処理数の上限")
parser.add_argument("--max_files", type=int, help="処理する最大ファイル数")
parser.add_argument("--file_pattern", help="処理対象のファイル名のパターン(正規表現)")
parser.add_argument("--dry_run", action="store_true", help="ファイルを実際に変更せずにプレビューする")
args = parser.parse_args()
asyncio.run(process_files(
args.directory,
args.api_key,
args.rules_file, # Pass rules_file path
args.max_concurrent,
args.max_files,
args.file_pattern,
args.dry_run
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment