Created
March 11, 2025 06:31
-
-
Save tkgshn/7356f561d18c335b6ad07e480009f64c to your computer and use it in GitHub Desktop.
openfisca_dependency_graph.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
OpenFisca-Japanの制度の依存関係を可視化するスクリプト | |
コードベースから依存関係を自動抽出して可視化します | |
""" | |
import matplotlib.pyplot as plt | |
import networkx as nx | |
import matplotlib | |
import sys | |
import matplotlib.font_manager as fm | |
import os | |
import re | |
import glob | |
import argparse | |
from matplotlib.patches import Patch | |
from collections import defaultdict | |
import numpy as np | |
import math | |
# コマンドライン引数の解析 | |
parser = argparse.ArgumentParser(description='OpenFisca-Japanの制度の依存関係を可視化します') | |
parser.add_argument('--systems', nargs='+', help='可視化する制度名のリスト(例: 児童手当 児童扶養手当)') | |
parser.add_argument('--list-systems', action='store_true', help='利用可能な制度の一覧を表示') | |
parser.add_argument('--all', action='store_true', help='すべての制度を可視化') | |
parser.add_argument('--max-systems', type=int, default=5, help='一度に表示する最大制度数(デフォルト: 5)') | |
parser.add_argument('--category', help='特定のカテゴリに属する制度のみ表示(例: 福祉, 育児)') | |
parser.add_argument('--output', help='出力ファイル名(デフォルト: dependency_graph.png)') | |
parser.add_argument('--node-spacing', type=float, default=0.1, help='ノード間隔の倍率(大きくすると間隔が広がる、デフォルト: 1.0)') | |
parser.add_argument('--node-size', type=float, default=1.0, help='ノードサイズの倍率(デフォルト: 1.0)') | |
parser.add_argument('--analyze-common', action='store_true', help='共通パラメータの分析を出力する') | |
parser.add_argument('--ring-spacing', type=float, default=1.0, help='同心円の距離間隔 (倍率)') | |
# 日本語フォント設定 | |
font_found = False | |
if sys.platform.startswith('darwin'): # macOS | |
# macOSの場合、複数のフォントパスを試す | |
possible_font_paths = [ | |
'/System/Library/Fonts/ヒラギノ角ゴシック W3.ttc', | |
'/System/Library/Fonts/AppleGothic.ttf', | |
'/System/Library/Fonts/ヒラギノ丸ゴ ProN W4.ttc', | |
'/Library/Fonts/Osaka.ttf' | |
] | |
for path in possible_font_paths: | |
if os.path.exists(path): | |
print(f"日本語フォントを使用: {path}") | |
# matplotlibのrc設定にフォントファミリーを追加 | |
plt.rcParams['font.family'] = 'sans-serif' | |
plt.rcParams['font.sans-serif'] = ['Hiragino Sans GB', 'AppleGothic', 'Hiragino Maru Gothic ProN', 'Osaka'] | |
font_prop = fm.FontProperties(fname=path) | |
font_found = True | |
break | |
# フォールバック:システムからフォントを検索 | |
if not font_found: | |
print("警告: 指定した日本語フォントが見つかりません。利用可能なフォントを検索します...") | |
# 利用可能なフォントをすべて表示 | |
font_list = fm.findSystemFonts() | |
japanese_fonts = [f for f in font_list if any(name in f.lower() for name in ['hira', 'gothic', 'mincho', 'osaka', 'maru'])] | |
if japanese_fonts: | |
print(f"以下の日本語フォントが見つかりました: {japanese_fonts[:3]}") | |
font_prop = fm.FontProperties(fname=japanese_fonts[0]) | |
font_found = True | |
else: | |
print("警告: 日本語フォントが見つかりませんでした。デフォルトフォントを使用します。") | |
font_prop = fm.FontProperties(family='sans-serif') | |
# グラフ作成のデフォルト設定 | |
plt.rcParams['font.size'] = 9 | |
matplotlib.rcParams['axes.unicode_minus'] = False | |
# パラメータのカテゴリ(整理のため) | |
PARAMETER_CATEGORIES = { | |
"基本属性": ["年齢", "誕生年月日", "収入", "学年"], | |
"資格条件": ["ひとり親", "扶養", "養育者認定", "障害者手帳"], | |
"所得関連": ["所得制限", "一部支給所得条件", "全部支給所得条件", "控除後世帯高所得"], | |
"給付金額": ["通常", "第2子以降追加額", "3歳未満", "3歳以上"] | |
} | |
# 制度ノードの色(すべての制度は同じ色) | |
SYSTEM_NODE_COLOR = "#FF6B6B" # 赤 | |
# パラメータノードの色(共通と個別で明確に区別) | |
COMMON_PARAM_COLOR = "#28B463" # 鮮やかな緑 | |
SPECIAL_PARAM_COLOR = "#3498DB" # 鮮やかな青 | |
INDIVIDUAL_PARAM_COLOR = "#9B59B6" # 紫 | |
def detect_available_systems(): | |
"""利用可能な制度を自動検出する""" | |
systems = set() | |
# 1. Variableクラスを定義しているファイルをすべて検索 | |
variable_files = glob.glob("openfisca_japan/variables/**/*.py", recursive=True) | |
for file_path in variable_files: | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
# クラス定義を検索 | |
class_patterns = re.findall(r'class\s+([^\s\(]+)\(Variable\)', content) | |
# クラス名から制度名を推測 | |
for class_name in class_patterns: | |
# 明らかにパラメータや共通変数を除外 | |
if "手当" in class_name or "支援" in class_name or "給付" in class_name or "助成" in class_name: | |
# 制度名のみを抽出(数字や記号を除外) | |
base_name = re.sub(r'_.*$', '', class_name) # _以降を削除 | |
base_name = re.sub(r'最大$|最小$|の.*$', '', base_name) # 最大/最小/の以降を削除 | |
if len(base_name) >= 2: # 短すぎる名前を除外 | |
systems.add(base_name) | |
except Exception as e: | |
print(f"ファイル {file_path} の解析中にエラーが発生しました: {e}") | |
# 2. パラメータフォルダからも制度を検出 | |
param_dirs = glob.glob("openfisca_japan/parameters/**/", recursive=True) | |
for dir_path in param_dirs: | |
# ディレクトリ名から制度名を推測 | |
dir_name = os.path.basename(os.path.dirname(dir_path)) | |
if "手当" in dir_name or "支援" in dir_name or "給付" in dir_name or "助成" in dir_name: | |
systems.add(dir_name) | |
return sorted(list(systems)) | |
def categorize_parameter(param_name): | |
"""パラメータ名からカテゴリを判定する""" | |
for category, keywords in PARAMETER_CATEGORIES.items(): | |
for keyword in keywords: | |
if keyword in param_name: | |
return category | |
return "その他" | |
def extract_parameters(file_content): | |
"""Pythonファイルから使用されているパラメータを抽出する""" | |
parameters = [] | |
# parameters(対象期間).福祉.育児.児童手当 のようなパターンを検索 | |
param_patterns = re.findall(r'parameters\(対象期間\)\.([^\s\)\+\-\*\/\(\,]+)', file_content) | |
for pattern in param_patterns: | |
# ドットで分割して階層構造を取得 | |
parts = pattern.split('.') | |
# 最下層のパラメータ名を取得(例: 児童手当) | |
if len(parts) > 0: | |
param_name = parts[-1] | |
parameters.append(param_name) | |
return list(set(parameters)) # 重複を除去 | |
def extract_variable_dependencies(file_content): | |
"""Pythonファイルから使用されている変数の依存関係を抽出する""" | |
variables = [] | |
# 対象世帯("変数名", 対象期間) または 対象世帯.members("変数名", 対象期間) のようなパターンを検索 | |
var_patterns = re.findall(r'対象世帯(?:\.members)?\("([^"]+)"', file_content) | |
for var_name in var_patterns: | |
variables.append(var_name) | |
return list(set(variables)) # 重複を除去 | |
def extract_class_name(file_content): | |
"""Pythonファイルからクラス名(変数名)を抽出する""" | |
class_names = [] | |
# class クラス名(Variable): のようなパターンを検索 | |
class_patterns = re.findall(r'class\s+([^\s\(]+)\(Variable\)', file_content) | |
class_names.extend(class_patterns) | |
return class_names | |
def analyze_openfisca_files(systems): | |
"""OpenFiscaのコードファイルを解析して依存関係を抽出する""" | |
G = nx.DiGraph() | |
# ノード属性を追加 | |
for i, system in enumerate(systems): | |
G.add_node(system, node_type="seido", category="制度", color=SYSTEM_NODE_COLOR) | |
# 指定された制度のファイルを探す | |
variable_files = [] | |
for system in systems: | |
files = glob.glob(f"openfisca_japan/variables/**/{system}.py", recursive=True) | |
files.extend(glob.glob(f"openfisca_japan/variables/**/GTT/{system}.py", recursive=True)) | |
# 制度名の部分一致も検索 | |
for root, dirs, files_list in os.walk("openfisca_japan/variables"): | |
for file in files_list: | |
if file.endswith(".py") and system in file: | |
file_path = os.path.join(root, file) | |
if file_path not in files: | |
files.append(file_path) | |
variable_files.extend(files) | |
# システムごとのパラメータと変数を記録 | |
system_params = defaultdict(set) | |
system_vars = defaultdict(set) | |
# 各ファイルを解析 | |
for file_path in variable_files: | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
# このファイルで定義されている変数(クラス)名を取得 | |
class_names = extract_class_name(content) | |
# このファイルで使用されているパラメータを取得 | |
file_params = extract_parameters(content) | |
# このファイルで依存している変数を取得 | |
file_vars = extract_variable_dependencies(content) | |
# ファイル名から制度名を推測 | |
system_name = None | |
for system in systems: | |
if system in file_path: | |
system_name = system | |
break | |
if system_name: | |
# システムごとのパラメータと変数を記録 | |
system_params[system_name].update(file_params) | |
system_vars[system_name].update(file_vars) | |
except Exception as e: | |
print(f"ファイル {file_path} の解析中にエラーが発生しました: {e}") | |
# 各制度の重要なパラメータと変数を抽出(カテゴリで整理) | |
for system_name in systems: | |
# システムのカラーを取得 | |
system_color = SYSTEM_NODE_COLOR | |
# 重要なパラメータに絞る | |
important_params = filter_important_parameters(system_params[system_name], system_name) | |
for param in important_params: | |
param_node = f"{param}" | |
category = categorize_parameter(param) | |
# ノードが存在しない場合のみ追加 | |
if param_node not in G: | |
G.add_node(param_node, node_type="parameter", category=category) | |
G.add_edge(system_name, param_node) | |
# 重要な変数に絞る | |
important_vars = filter_important_variables(system_vars[system_name], system_name) | |
for var in important_vars: | |
if var in ["年齢", "誕生年月日", "収入"]: | |
continue | |
# 制度に特有の変数処理 | |
var_node = None | |
if f"{system_name}の" in var: | |
var_node = var.replace(f"{system_name}の", "") | |
elif var not in ["扶養"]: | |
var_node = var | |
if var_node and var_node not in G: | |
category = categorize_parameter(var_node) | |
G.add_node(var_node, node_type="parameter", category=category) | |
if var_node: | |
G.add_edge(system_name, var_node) | |
# 共通パラメータを特定 | |
common_params = [] | |
for node in G.nodes(): | |
if G.nodes[node].get("node_type") == "parameter": | |
systems_using_param = [] | |
for system in systems: | |
if G.has_edge(system, node): | |
systems_using_param.append(system) | |
if len(systems_using_param) > 1: | |
common_params.append(node) | |
# 共通パラメータをハイライト | |
for param in common_params: | |
G.nodes[param]["is_common"] = True | |
return G | |
def filter_important_parameters(parameters, system_name): | |
"""重要なパラメータのみをフィルタリング""" | |
important_keywords = [ | |
"通常", "所得制限", "第2子以降", "追加額", "1人目", "2人目", "3人目", | |
"全部支給", "一部支給", "所得条件", "養育者", "減額", "金額" | |
] | |
# 特定の制度の重要キーワード | |
if "児童手当" in system_name: | |
important_keywords.extend(["3歳未満", "3歳以上", "高校生以下"]) | |
elif "児童扶養手当" in system_name: | |
important_keywords.extend(["ひとり親", "扶養", "制限外の減額"]) | |
elif "障害" in system_name: | |
important_keywords.extend(["障害", "等級", "手帳"]) | |
# 重要なパラメータのみを抽出 | |
important_params = set() | |
for param in parameters: | |
for keyword in important_keywords: | |
if keyword in param: | |
important_params.add(param) | |
break | |
return important_params | |
def filter_important_variables(variables, system_name): | |
"""重要な変数のみをフィルタリング""" | |
important_keywords = [ | |
"所得条件", "対象", "ひとり親", "扶養", "年齢", "学年", "誕生", "金額" | |
] | |
# 特定の制度の重要キーワード | |
if "児童手当" in system_name: | |
important_keywords.extend(["出生順位", "養育者"]) | |
elif "児童扶養手当" in system_name: | |
important_keywords.extend(["全部支給", "一部支給"]) | |
elif "障害" in system_name: | |
important_keywords.extend(["障害", "等級", "手帳"]) | |
# 重要な変数のみを抽出 | |
important_vars = set() | |
for var in variables: | |
if system_name in var: # 制度名を含む変数は重要 | |
important_vars.add(var) | |
continue | |
for keyword in important_keywords: | |
if keyword in var: | |
important_vars.add(var) | |
break | |
return important_vars | |
def create_dependency_graph(systems): | |
"""依存関係のグラフを作成""" | |
# 新しいグラフを作成 | |
G = nx.DiGraph() | |
# システムノードを追加 | |
for system in systems: | |
G.add_node(system, type='system') | |
# サンプルパラメータデータ(実際のOpenFiscaデータがない場合に使用) | |
sample_params = { | |
"児童手当": [ | |
"児童の年齢", "養育者の所得", "児童手当支給額", "第2子以降追加額", "所得制限", | |
"特例給付", "一律給付", "年少扶養控除", "3歳未満の子の数", "児童手当の対象児童" | |
], | |
"児童扶養手当": [ | |
"ひとり親世帯", "児童の年齢", "養育者の所得", "児童扶養手当支給額", "所得制限", | |
"支給額係数", "一部支給所得条件", "全部支給所得条件", "所得計算等" | |
], | |
"特別児童扶養手当": [ | |
"障害児童", "児童の年齢", "養育者の所得", "特別児童扶養手当支給額", "障害の程度", | |
"療育手帳", "所得制限", "加算額" | |
], | |
"障害児福祉手当": [ | |
"障害程度", "所得制限", "支給額", "障害認定" | |
], | |
"児童育成手当(東京都)": [ | |
"東京都在住", "児童の年齢", "養育者の所得", "育成手当支給額" | |
], | |
"障害児童育成手当(東京都)": [ | |
"東京都在住", "障害児童", "児童の年齢", "養育者の所得", "障害程度", "支給額" | |
] | |
} | |
# 共通パラメータのリスト | |
common_params = ["児童の年齢", "養育者の所得"] | |
# パラメータノードを追加 | |
for system, params in sample_params.items(): | |
if system in systems: | |
for param in params: | |
# パラメータがまだグラフに存在しない場合は追加 | |
if param not in G: | |
G.add_node(param, type='parameter') | |
# 共通パラメータかどうかを設定 | |
if param in common_params: | |
G.nodes[param]['is_common'] = True | |
# エッジを追加 | |
G.add_edge(system, param) | |
return G | |
def draw_graph(G, systems, output_file='dependency_graph.png', node_spacing=1.0, node_size=1.0, ring_spacing=1.0): | |
"""グラフを描画し、ファイルに保存します""" | |
# パラメータのバランス調整 | |
if len(systems) > 4: | |
adj_factor = min(1.5, 0.3 * len(systems)) | |
node_size_factor = 1.0 / (1.0 + 0.15 * len(systems)) | |
print(f"制度数が多いため、自動的にノード間隔を調整しました: 間隔={adj_factor:.2f}倍, サイズ={node_size_factor:.2f}倍") | |
node_spacing *= adj_factor | |
node_size *= node_size_factor | |
# プロットの設定 | |
figsize = (20, 16) # 大きめのキャンバスサイズ | |
plt.figure(figsize=figsize) | |
ax = plt.subplot(111) | |
# 共通パラメータの分析 | |
common_params = analyze_common_parameters(G, systems) | |
# レイアウト調整のためのパラメータ | |
positions = {} | |
# システムノードの中心位置を設定 (円周上に等間隔で配置) | |
system_nodes = [node for node in G.nodes() if G.nodes[node].get('type') == 'system'] | |
center_radius = 0.3 * node_spacing # システムノードの中心からの距離 | |
for i, system in enumerate(system_nodes): | |
angle = 2 * math.pi * i / len(system_nodes) | |
positions[system] = ( | |
center_radius * math.cos(angle), | |
center_radius * math.sin(angle) | |
) | |
# 共通パラメータのノードを中心に配置 | |
common_param_nodes = list(common_params.keys()) | |
# 共通パラメータが複数ある場合、中心付近に円状に配置 | |
if common_param_nodes: | |
# 共通パラメータの数に応じて半径を調整 | |
common_param_count = len(common_param_nodes) | |
if common_param_count <= 3: | |
common_radius = 0.05 * node_spacing * ring_spacing | |
elif common_param_count <= 5: | |
common_radius = 0.10 * node_spacing * ring_spacing | |
else: | |
# 6個以上の場合は二重円に配置 | |
common_radius_inner = 0.08 * node_spacing * ring_spacing | |
common_radius_outer = 0.16 * node_spacing * ring_spacing | |
# 共通度(共有システム数)の高いパラメータを内側に配置 | |
param_counts = {param: len(systems_list) for param, systems_list in common_params.items()} | |
sorted_params = sorted(common_param_nodes, key=lambda p: param_counts[p], reverse=True) | |
# 内側の円に配置するパラメータ数(約半数) | |
inner_count = common_param_count // 2 | |
# 内側の円に配置 | |
for i, param in enumerate(sorted_params[:inner_count]): | |
angle = 2 * math.pi * i / inner_count | |
positions[param] = ( | |
common_radius_inner * math.cos(angle), | |
common_radius_inner * math.sin(angle) | |
) | |
# 外側の円に配置 | |
for i, param in enumerate(sorted_params[inner_count:]): | |
angle = 2 * math.pi * i / (common_param_count - inner_count) | |
positions[param] = ( | |
common_radius_outer * math.cos(angle), | |
common_radius_outer * math.sin(angle) | |
) | |
# すべてのパラメータは配置済みなのでここで終了 | |
common_radius = None # 後続のコードで使用されないようにNoneに設定 | |
# 共通パラメータが5個以下の場合はこちらで配置 | |
if common_radius: | |
for i, param in enumerate(common_param_nodes): | |
angle = 2 * math.pi * i / common_param_count | |
positions[param] = ( | |
common_radius * math.cos(angle), | |
common_radius * math.sin(angle) | |
) | |
# 各システムのパラメータを分類し、カテゴリごとにグループ化 | |
param_categories = { | |
'基本属性': ['年齢', '児童の年齢', '人数'], | |
'資格条件': ['要件', '条件', '該当', '障害', '児童'], | |
'所得関連': ['所得', '所得制限', '養育者の所得', '収入', '課税'], | |
'給付額': ['金額', '手当額', '支給額', '支給'] | |
} | |
# 各システムのパラメータノードを同心円状に配置 | |
for system in system_nodes: | |
system_x, system_y = positions[system] | |
# このシステムのパラメータを取得(共通パラメータを除く) | |
system_params = [] | |
for param in G.nodes(): | |
if (G.nodes[param].get('type') == 'parameter' and | |
G.has_edge(system, param) and | |
param not in common_param_nodes): | |
system_params.append(param) | |
# パラメータを各カテゴリに分類 | |
categorized_params = {category: [] for category in param_categories} | |
categorized_params['その他'] = [] | |
for param in system_params: | |
categorized = False | |
for category, keywords in param_categories.items(): | |
if any(keyword in param for keyword in keywords): | |
categorized_params[category].append(param) | |
categorized = True | |
break | |
if not categorized: | |
categorized_params['その他'].append(param) | |
# カテゴリごとに同心円状に配置 | |
radius_base = 0.15 * node_spacing * ring_spacing | |
radius_step = 0.08 * node_spacing * ring_spacing | |
radius_multipliers = { | |
'基本属性': 1.0, | |
'資格条件': 2.0, | |
'所得関連': 3.0, | |
'給付額': 4.0, | |
'その他': 5.0 | |
} | |
for category, params in categorized_params.items(): | |
if not params: | |
continue | |
radius = radius_base * radius_multipliers[category] | |
for i, param in enumerate(params): | |
angle_offset = (2 * math.pi * i / max(1, len(params))) + ( | |
system_nodes.index(system) * (math.pi / max(1, len(system_nodes))) | |
) | |
# システムノードからの相対位置 | |
param_x = radius * math.cos(angle_offset) | |
param_y = radius * math.sin(angle_offset) | |
# システムノードを中心とした位置に配置 | |
positions[param] = (system_x + param_x, system_y + param_y) | |
# 空のpositionsをチェック(エラー回避) | |
if not positions: | |
print("警告: ノードが見つかりませんでした。") | |
return | |
# ノード配置の微調整(オーバーラップを避けるために軽いバネモデルを適用) | |
pos = adjust_positions(G, positions, node_spacing) | |
# グラフの描画 | |
draw_nodes_and_edges(G, pos, ax, node_size, common_param_nodes) | |
# タイトルと凡例 | |
graph_title = f"{', '.join(systems)}の依存関係" | |
plt.title(graph_title, fontproperties=font_prop, fontsize=18) | |
# 凡例の作成 | |
legend_elements = [ | |
plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='red', markersize=15, label='制度'), | |
plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='green', markersize=15, label='共通パラメータ'), | |
plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='purple', markersize=15, label='固有パラメータ') | |
] | |
plt.legend(handles=legend_elements, loc='upper left', prop=font_prop) | |
# 軸の設定 | |
plt.axis('off') | |
plt.tight_layout() | |
# 図を保存 | |
plt.savefig(output_file, dpi=100, bbox_inches='tight') | |
print(f"依存関係グラフを {output_file} に保存しました。") | |
def adjust_positions(G, initial_pos, node_spacing): | |
"""ノードの位置をオーバーラップを避けるように微調整します""" | |
# 位置が空の場合はそのまま返す | |
if not initial_pos: | |
return initial_pos | |
# システムノードを抽出 | |
system_nodes = [n for n in G.nodes() if G.nodes[n].get('type') == 'system'] | |
# システムノードが存在しない場合は標準のレイアウトを使用 | |
if not system_nodes: | |
return nx.spring_layout(G, k=node_spacing * 0.1) | |
# spring_layoutの初期位置として使用 | |
try: | |
pos = nx.spring_layout( | |
G, | |
pos=initial_pos, | |
fixed=system_nodes, # システムノードは固定 | |
k=node_spacing * 0.1, # バネ定数 | |
iterations=50 # イテレーション数 | |
) | |
return pos | |
except ValueError: | |
# エラーが発生した場合は初期位置をそのまま返す | |
print("警告: レイアウト調整中にエラーが発生しました。初期配置を使用します。") | |
return initial_pos | |
def draw_nodes_and_edges(G, pos, ax, node_size, common_param_nodes=None): | |
"""ノードとエッジを描画します""" | |
# システムノード、共通パラメータノード、固有パラメータノードに分類 | |
system_nodes = [node for node in G.nodes() if G.nodes[node].get('type') == 'system'] | |
common_param_nodes = common_param_nodes or [node for node in G.nodes() if G.nodes[node].get('type') == 'parameter' and G.nodes[node].get('is_common', False)] | |
param_nodes = [node for node in G.nodes() if G.nodes[node].get('type') == 'parameter' and node not in common_param_nodes] | |
# ノードサイズを設定 | |
node_sizes = {} | |
for node in system_nodes: | |
node_sizes[node] = 3000 * node_size | |
for node in common_param_nodes: | |
node_sizes[node] = 2000 * node_size | |
for node in param_nodes: | |
node_sizes[node] = 1500 * node_size | |
# ノード色を設定 | |
node_colors = {} | |
for node in system_nodes: | |
node_colors[node] = 'red' # システムノードは赤色 | |
for node in common_param_nodes: | |
node_colors[node] = 'green' # 共通パラメータは緑色 | |
for node in param_nodes: | |
node_colors[node] = 'purple' # 固有パラメータは紫色 | |
# システムノードの描画 | |
nx.draw_networkx_nodes( | |
G, pos, | |
nodelist=system_nodes, | |
node_size=[node_sizes[node] for node in system_nodes], | |
node_color='red', | |
alpha=0.9, | |
ax=ax | |
) | |
# 共通パラメータノードの描画 | |
if common_param_nodes: | |
nx.draw_networkx_nodes( | |
G, pos, | |
nodelist=common_param_nodes, | |
node_size=[node_sizes[node] for node in common_param_nodes], | |
node_color='green', | |
alpha=0.8, | |
ax=ax | |
) | |
# 固有パラメータノードの描画 | |
if param_nodes: | |
nx.draw_networkx_nodes( | |
G, pos, | |
nodelist=param_nodes, | |
node_size=[node_sizes[node] for node in param_nodes], | |
node_color='purple', | |
alpha=0.7, | |
ax=ax | |
) | |
# エッジスタイルを設定(システムごとに異なるスタイル) | |
edge_styles = ['solid', 'dashed', 'dotted', 'dashdot'] | |
# システムからのエッジを描画 | |
for i, system in enumerate(system_nodes): | |
style = edge_styles[i % len(edge_styles)] | |
# システムから共通パラメータへのエッジ | |
common_edges = [(system, param) for param in common_param_nodes if G.has_edge(system, param)] | |
if common_edges: | |
nx.draw_networkx_edges( | |
G, pos, | |
edgelist=common_edges, | |
width=1.2, | |
alpha=0.8, | |
edge_color='green', | |
style=style, | |
connectionstyle=f'arc3,rad=0.1', | |
ax=ax | |
) | |
# システムから固有パラメータへのエッジ | |
param_edges = [(system, param) for param in param_nodes if G.has_edge(system, param)] | |
if param_edges: | |
nx.draw_networkx_edges( | |
G, pos, | |
edgelist=param_edges, | |
width=1.0, | |
alpha=0.6, | |
edge_color='purple', | |
style=style, | |
connectionstyle=f'arc3,rad=0.1', | |
ax=ax | |
) | |
# 共通パラメータ同士を結ぶエッジを追加(オプション) | |
if common_param_nodes and len(common_param_nodes) > 1: | |
common_param_edges = [] | |
for i, param1 in enumerate(common_param_nodes): | |
for param2 in common_param_nodes[i+1:]: | |
common_param_edges.append((param1, param2)) | |
if common_param_edges: | |
# 実線ではなく点線で描画 | |
nx.draw_networkx_edges( | |
G, pos, | |
edgelist=common_param_edges, | |
width=0.8, | |
alpha=0.4, | |
edge_color='blue', | |
style='dotted', | |
connectionstyle=f'arc3,rad=0.05', | |
ax=ax | |
) | |
# ノードラベルの描画 | |
for node_type, nodes in [ | |
('system', system_nodes), | |
('common_param', common_param_nodes), | |
('param', param_nodes) | |
]: | |
if not nodes: | |
continue | |
# ノードタイプに応じてフォントサイズとカラーを設定 | |
if node_type == 'system': | |
font_size = 12 * node_size | |
font_color = 'white' | |
font_weight = 'bold' | |
elif node_type == 'common_param': | |
font_size = 10 * node_size | |
font_color = 'black' | |
font_weight = 'bold' | |
else: | |
font_size = 8 * node_size | |
font_color = 'black' | |
font_weight = 'normal' | |
# ラベルを描画 | |
labels = {node: node for node in nodes} | |
for node, (x, y) in pos.items(): | |
if node in labels: | |
ax.text( | |
x, y, labels[node], | |
fontproperties=font_prop, | |
fontsize=font_size, | |
color=font_color, | |
weight=font_weight, | |
ha='center', | |
va='center', | |
bbox=dict( | |
facecolor='white' if node_type != 'system' else 'none', | |
alpha=0.7 if node_type != 'system' else 0, | |
edgecolor='none', | |
boxstyle='round,pad=0.1' | |
) | |
) | |
def main(): | |
"""メイン関数""" | |
args = parser.parse_args() | |
# 利用可能な制度を検出 | |
available_systems = detect_available_systems() | |
# 利用可能な制度の一覧を表示する場合 | |
if args.list_systems: | |
print("利用可能な制度:") | |
for system in available_systems: | |
print(f"- {system}") | |
return | |
# 可視化する制度を決定 | |
target_systems = [] | |
if args.systems: | |
# コマンドライン引数で指定された制度 | |
target_systems = args.systems | |
elif args.all: | |
# すべての制度(最大数に制限) | |
target_systems = available_systems[:args.max_systems] | |
elif args.category: | |
# 特定のカテゴリに属する制度 | |
target_systems = [s for s in available_systems if args.category in s][:args.max_systems] | |
else: | |
# デフォルトは児童手当と児童扶養手当 | |
target_systems = ["児童手当", "児童扶養手当"] | |
# 制度が見つからない場合 | |
if not target_systems: | |
print("エラー: 指定された制度が見つかりません。--list-systems で利用可能な制度を確認してください。") | |
return | |
# 出力ファイル名 | |
output_file = args.output if args.output else "dependency_graph.png" | |
# 依存関係グラフを作成 | |
G = create_dependency_graph(target_systems) | |
# 共通パラメータを分析 | |
if args.analyze_common or True: # 常に分析を実行 | |
analyze_common_parameters(G, target_systems) | |
# ノード間隔とサイズのスケーリング係数を取得 | |
node_spacing = args.node_spacing | |
node_size = args.node_size | |
ring_spacing = args.ring_spacing | |
# 制度が多い場合は自動的にノード間隔を調整 | |
if len(target_systems) > 5: | |
# 制度数に応じてスケーリング | |
node_spacing = max(node_spacing, len(target_systems) / 4) | |
node_size = min(node_size, 5 / len(target_systems)) | |
print(f"制度数が多いため、自動的にノード間隔を調整しました: 間隔={node_spacing:.2f}倍, サイズ={node_size:.2f}倍") | |
# グラフを描画 | |
draw_graph(G, target_systems, output_file, node_spacing, node_size, ring_spacing) | |
# 共通パラメータを分析する関数を追加 | |
def analyze_common_parameters(G, systems): | |
"""システム間の共通パラメータを分析して出力します""" | |
# システムノードと各システムのパラメータノードを取得 | |
system_nodes = [node for node in G.nodes() if G.nodes[node].get('type') == 'system'] | |
system_parameters = {} | |
for system in system_nodes: | |
# 各システムに関連するパラメータを取得 | |
params = [] | |
for param, data in G.nodes(data=True): | |
if data.get('type') == 'parameter' and G.has_edge(system, param): | |
params.append(param) | |
system_parameters[system] = set(params) | |
# 2つ以上のシステムで共有されているパラメータを特定 | |
all_params = set() | |
for params in system_parameters.values(): | |
all_params.update(params) | |
shared_params = {} | |
for param in all_params: | |
# このパラメータを持つシステムを特定 | |
sharing_systems = [] | |
for system, params in system_parameters.items(): | |
if param in params: | |
sharing_systems.append(system) | |
if len(sharing_systems) >= 2: # 2つ以上のシステムに共有されている場合 | |
shared_params[param] = sharing_systems | |
# 結果をソートして出力 | |
print("\n===== 共通パラメータの分析 =====") | |
print(f"共通パラメータの総数: {len(shared_params)}") | |
if shared_params: | |
print("\n各共通パラメータと、それを共有するシステム:") | |
for i, (param, systems) in enumerate(sorted(shared_params.items()), 1): | |
systems_str = ", ".join([f'"{s}"' for s in systems]) | |
print(f"{i}. {param} - 共有システム数: {len(systems)} - {systems_str}") | |
else: | |
print("共通パラメータはありません。") | |
print("==============================\n") | |
return shared_params | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment