Skip to content

Instantly share code, notes, and snippets.

@luckman538
Created November 23, 2025 03:29
Show Gist options
  • Select an option

  • Save luckman538/276832be9321db46d9cd8f204677c130 to your computer and use it in GitHub Desktop.

Select an option

Save luckman538/276832be9321db46d9cd8f204677c130 to your computer and use it in GitHub Desktop.
indicators_py
"""
Fair Value Gap (FVG) - 동적 모드 완전 구현
Phase 14.1 긴급 수정
추가 기능:
1. 동적 FVG (Dynamic Mode) - 가격 추적
2. Auto Threshold - 히스토리컬 평균 기반
3. MTF (Multi-Timeframe) 지원
4. FVG 무효화 후 재검증
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field
@dataclass
class FairValueGap:
"""Fair Value Gap 데이터 클래스"""
bias: str # 'bullish' or 'bearish'
top: float
bottom: float
timestamp: int
mitigated: bool = False
fill_percentage: float = 0.0 # 채워진 비율 (0~1)
strength: float = 1.0
# 동적 모드용
original_top: float = field(default=0.0)
original_bottom: float = field(default=0.0)
def __post_init__(self):
if self.original_top == 0.0:
self.original_top = self.top
if self.original_bottom == 0.0:
self.original_bottom = self.bottom
class FVGDynamic:
"""Fair Value Gap 동적 모드 완전 구현"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.active_fvgs = []
# 설정
self.dynamic_mode = self.config.get('dynamic', True)
self.auto_threshold = self.config.get('auto_threshold', True)
self.mitigation_threshold = self.config.get('mitigation_threshold', 0.5)
self.max_fvgs = self.config.get('max_fvgs', 20)
self.mtf_enabled = self.config.get('mtf_enabled', False)
self.timeframes = self.config.get('timeframes', ['5m', '15m', '1h'])
def calculate_auto_threshold(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 50
) -> float:
"""
Auto Threshold 계산
히스토리컬 평균 변동 비율 기반
Args:
high, low, close: 가격 데이터
period: 평균 계산 기간
Returns:
자동 임계값
"""
if len(close) < period:
return 0.001 # 기본값 0.1%
# 평균 범위 (High - Low)
avg_range = (high - low).iloc[-period:].mean()
# 평균 종가
avg_close = close.iloc[-period:].mean()
if avg_close == 0:
return 0.001
# 평균 변동 비율
auto_threshold = (avg_range / avg_close) * 2 # 2배 마진
# 최소/최대 제한
return max(0.0005, min(auto_threshold, 0.05)) # 0.05%~5%
def detect_fvg_static(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
threshold: float = None
) -> List[FairValueGap]:
"""
정적 FVG 검출 (3봉 패턴)
Args:
high, low, close: 가격 데이터
threshold: 최소 갭 크기 임계값
Returns:
검출된 FVG 리스트
"""
if len(close) < 3:
return []
# Auto Threshold 사용
if threshold is None:
if self.auto_threshold:
threshold = self.calculate_auto_threshold(high, low, close)
else:
threshold = self.config.get('threshold', 0.001)
fvgs = []
# 최근 3봉 검사
for i in range(2, min(len(close), 50)): # 최근 50개 검사
# Bullish FVG: 현재 low > 2봉 전 high
if low.iloc[-i] > high.iloc[-i-2]:
gap_size = (low.iloc[-i] - high.iloc[-i-2]) / close.iloc[-i]
if gap_size >= threshold:
fvg = FairValueGap(
bias='bullish',
top=low.iloc[-i],
bottom=high.iloc[-i-2],
timestamp=len(close) - i,
strength=min(gap_size / threshold, 5.0)
)
fvgs.append(fvg)
# Bearish FVG: 현재 high < 2봉 전 low
elif high.iloc[-i] < low.iloc[-i-2]:
gap_size = (low.iloc[-i-2] - high.iloc[-i]) / close.iloc[-i]
if gap_size >= threshold:
fvg = FairValueGap(
bias='bearish',
top=low.iloc[-i-2],
bottom=high.iloc[-i],
timestamp=len(close) - i,
strength=min(gap_size / threshold, 5.0)
)
fvgs.append(fvg)
return fvgs
def update_fvg_dynamic(
self,
current_price: float,
high: float,
low: float
):
"""
동적 FVG 업데이트
가격이 FVG 진입 시 갭 크기 축소
Args:
current_price: 현재 가격
high: 현재 고가
low: 현재 저가
"""
if not self.dynamic_mode:
return
for fvg in self.active_fvgs:
if fvg.mitigated:
continue
if fvg.bias == 'bullish':
# 가격이 FVG 내부로 들어오면 하단 위로 조정
if fvg.bottom <= current_price <= fvg.top:
# 새로운 하단 = max(현재가, 원래 하단)
new_bottom = max(min(low, fvg.top), fvg.bottom)
fvg.bottom = new_bottom
# 채워진 비율 계산
original_gap = fvg.original_top - fvg.original_bottom
current_gap = fvg.top - fvg.bottom
fvg.fill_percentage = 1.0 - (current_gap / original_gap) if original_gap > 0 else 0
# 완전히 채워지면 무효화
if high >= fvg.top:
fvg.mitigated = True
fvg.fill_percentage = 1.0
else: # bearish
# 가격이 FVG 내부로 들어오면 상단 아래로 조정
if fvg.bottom <= current_price <= fvg.top:
# 새로운 상단 = min(현재가, 원래 상단)
new_top = min(max(high, fvg.bottom), fvg.top)
fvg.top = new_top
# 채워진 비율 계산
original_gap = fvg.original_top - fvg.original_bottom
current_gap = fvg.top - fvg.bottom
fvg.fill_percentage = 1.0 - (current_gap / original_gap) if original_gap > 0 else 0
# 완전히 채워지면 무효화
if low <= fvg.bottom:
fvg.mitigated = True
fvg.fill_percentage = 1.0
def check_mitigation(
self,
current_price: float,
high: float,
low: float
):
"""
FVG 무효화 (Mitigation) 확인
Args:
current_price: 현재 가격
high: 현재 고가
low: 현재 저가
"""
for fvg in self.active_fvgs:
if fvg.mitigated:
continue
if fvg.bias == 'bullish':
# Bullish FVG: 가격이 하단 이하로 떨어지면 무효화
if low < fvg.bottom * (1 - self.mitigation_threshold):
fvg.mitigated = True
fvg.fill_percentage = 0.0 # 역방향 돌파
else: # bearish
# Bearish FVG: 가격이 상단 이상으로 올라가면 무효화
if high > fvg.top * (1 + self.mitigation_threshold):
fvg.mitigated = True
fvg.fill_percentage = 0.0 # 역방향 돌파
def merge_fvgs(self) -> List[FairValueGap]:
"""
중복/인접 FVG 병합
Returns:
병합된 FVG 리스트
"""
if len(self.active_fvgs) <= 1:
return self.active_fvgs
# 타입별 정렬
bullish_fvgs = [f for f in self.active_fvgs if f.bias == 'bullish' and not f.mitigated]
bearish_fvgs = [f for f in self.active_fvgs if f.bias == 'bearish' and not f.mitigated]
# Bullish FVG 병합
bullish_fvgs.sort(key=lambda x: x.bottom)
merged_bullish = []
i = 0
while i < len(bullish_fvgs):
current = bullish_fvgs[i]
j = i + 1
# 인접 FVG 찾기 (10% 이내)
while j < len(bullish_fvgs):
next_fvg = bullish_fvgs[j]
if abs(current.top - next_fvg.bottom) / current.top <= 0.1:
# 병합
current.top = max(current.top, next_fvg.top)
current.bottom = min(current.bottom, next_fvg.bottom)
current.strength = max(current.strength, next_fvg.strength)
j += 1
else:
break
merged_bullish.append(current)
i = j
# Bearish FVG 병합 (동일 로직)
bearish_fvgs.sort(key=lambda x: x.top, reverse=True)
merged_bearish = []
i = 0
while i < len(bearish_fvgs):
current = bearish_fvgs[i]
j = i + 1
while j < len(bearish_fvgs):
next_fvg = bearish_fvgs[j]
if abs(current.bottom - next_fvg.top) / current.bottom <= 0.1:
current.top = max(current.top, next_fvg.top)
current.bottom = min(current.bottom, next_fvg.bottom)
current.strength = max(current.strength, next_fvg.strength)
j += 1
else:
break
merged_bearish.append(current)
i = j
# 무효화된 FVG 추가
mitigated = [f for f in self.active_fvgs if f.mitigated]
return merged_bullish + merged_bearish + mitigated
def get_unmitigated_levels(self, max_levels: int = 5) -> List[Dict]:
"""
미완성 FVG 레벨 반환
Args:
max_levels: 최대 레벨 수
Returns:
FVG 레벨 리스트
"""
unmitigated = [f for f in self.active_fvgs if not f.mitigated]
# 강도순 정렬
unmitigated.sort(key=lambda x: x.strength, reverse=True)
levels = []
for fvg in unmitigated[:max_levels]:
levels.append({
'bias': fvg.bias,
'level': (fvg.top + fvg.bottom) / 2,
'top': fvg.top,
'bottom': fvg.bottom,
'fill_percentage': fvg.fill_percentage,
'strength': fvg.strength
})
return levels
def generate_fvg_signal(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series
) -> Dict:
"""
종합 FVG 신호 생성
Args:
high, low, close: 가격 데이터
Returns:
FVG 신호 정보
"""
current_price = close.iloc[-1]
current_high = high.iloc[-1]
current_low = low.iloc[-1]
# 1. 새로운 FVG 검출
new_fvgs = self.detect_fvg_static(high, low, close)
# 2. 기존 FVG와 병합
self.active_fvgs.extend(new_fvgs)
# 3. 동적 업데이트
self.update_fvg_dynamic(current_price, current_high, current_low)
# 4. 무효화 확인
self.check_mitigation(current_price, current_high, current_low)
# 5. FVG 병합
self.active_fvgs = self.merge_fvgs()
# 6. 최대 개수 제한
self.active_fvgs = self.active_fvgs[-self.max_fvgs:]
# 7. 미완성 레벨
unmitigated_levels = self.get_unmitigated_levels()
# 8. 현재 가격 근처 FVG 찾기
nearby_fvgs = []
for fvg in self.active_fvgs:
if fvg.mitigated:
continue
distance = abs((fvg.top + fvg.bottom) / 2 - current_price) / current_price
if distance <= 0.02: # 2% 이내
nearby_fvgs.append(fvg)
# 9. 신호 생성
signal = {
'total_fvgs': len(self.active_fvgs),
'active_fvgs': len([f for f in self.active_fvgs if not f.mitigated]),
'nearby_fvgs': len(nearby_fvgs),
'unmitigated_levels': unmitigated_levels,
'action': 'hold',
'confidence': 0.5
}
# 10. 매수/매도 판단
confidence = 0.5
# FVG 근처에서 반등 예상
if nearby_fvgs:
strongest_fvg = max(nearby_fvgs, key=lambda x: x.strength)
if strongest_fvg.bias == 'bullish' and current_price <= strongest_fvg.top:
# Bullish FVG에서 매수 신호
signal['action'] = 'buy'
confidence = 0.7 + (strongest_fvg.strength * 0.1)
signal['target_fvg'] = {
'bias': strongest_fvg.bias,
'top': strongest_fvg.top,
'bottom': strongest_fvg.bottom,
'fill_percentage': strongest_fvg.fill_percentage
}
elif strongest_fvg.bias == 'bearish' and current_price >= strongest_fvg.bottom:
# Bearish FVG에서 매도 신호
signal['action'] = 'sell'
confidence = 0.7 + (strongest_fvg.strength * 0.1)
signal['target_fvg'] = {
'bias': strongest_fvg.bias,
'top': strongest_fvg.top,
'bottom': strongest_fvg.bottom,
'fill_percentage': strongest_fvg.fill_percentage
}
signal['confidence'] = min(confidence, 0.95)
return signal
# 사용 예시
if __name__ == '__main__':
# 테스트 데이터
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='1H')
test_data = pd.DataFrame({
'high': np.cumsum(np.random.randn(100) * 10) + 50000,
'low': np.cumsum(np.random.randn(100) * 10) + 49800,
'close': np.cumsum(np.random.randn(100) * 10) + 49900,
}, index=dates)
# FVG 초기화
fvg = FVGDynamic({
'dynamic': True,
'auto_threshold': True,
'mitigation_threshold': 0.5,
'max_fvgs': 20
})
# 신호 생성
signal = fvg.generate_fvg_signal(
test_data['high'],
test_data['low'],
test_data['close']
)
print("="*60)
print("FVG 신호 (동적 모드 + Auto Threshold)")
print("="*60)
print(f"전체 FVG: {signal['total_fvgs']}개")
print(f"활성 FVG: {signal['active_fvgs']}개")
print(f"근처 FVG: {signal['nearby_fvgs']}개")
print(f"\n미완성 레벨 (상위 5개):")
for i, level in enumerate(signal['unmitigated_levels'][:5], 1):
print(f" {i}. {level['bias'].upper()}: ${level['level']:.2f} "
f"(강도: {level['strength']:.2f}, 채움: {level['fill_percentage']:.1%})")
if 'target_fvg' in signal:
print(f"\n타겟 FVG:")
print(f" 바이어스: {signal['target_fvg']['bias'].upper()}")
print(f" 상단: ${signal['target_fvg']['top']:.2f}")
print(f" 하단: ${signal['target_fvg']['bottom']:.2f}")
print(f" 채워진 비율: {signal['target_fvg']['fill_percentage']:.1%}")
print(f"\n액션: {signal['action'].upper()}")
print(f"신뢰도: {signal['confidence']:.2%}")
print("="*60)
"""
ICT Order Flow - 완전 구현
Phase 14.1 최우선 개선 사항
포함 기능:
1. Killzones (아시아/런던/뉴욕 세션)
2. OTE (Optimal Trade Entry) 0.618~0.786
3. Liquidity Voids 검출
4. Market Structure Shifts (MSS)
5. Displacement 측정
"""
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from typing import Dict, List, Tuple, Optional
class ICTOrderFlow:
"""ICT (Inner Circle Trader) Order Flow 완전 구현"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.active_voids = []
self.last_mss = None
def get_current_killzone(self) -> Tuple[str, float]:
"""
현재 Killzone 및 가중치 반환
Returns:
(killzone_name, weight): 세션 이름과 거래 가중치
"""
current_hour = datetime.now(timezone.utc).hour
# 아시아 세션 (00:00-08:00 UTC) - 낮은 변동성
if 0 <= current_hour < 8:
return 'ASIA', 0.7
# 런던 오픈 (08:00-09:00 UTC) - 최고 변동성 ⭐
elif 8 <= current_hour < 9:
return 'LONDON_OPEN', 1.0
# 런던 세션 (08:00-16:00 UTC) - 높은 변동성
elif 8 <= current_hour < 16:
return 'LONDON', 0.95
# 뉴욕 오픈 (13:00-14:00 UTC) - 최고 변동성 ⭐
elif 13 <= current_hour < 14:
return 'NEW_YORK_OPEN', 1.0
# 뉴욕 세션 (13:00-21:00 UTC) - 높은 변동성
elif 13 <= current_hour < 21:
return 'NEW_YORK', 0.9
# 오프 시간 (21:00-00:00 UTC) - 낮은 변동성
else:
return 'OFF_HOURS', 0.3
def calculate_ote_zones(
self,
high: float,
low: float,
trend: str = 'bullish'
) -> Dict[str, Tuple[float, float]]:
"""
Optimal Trade Entry (OTE) 존 계산
피보나치 0.618~0.786 되돌림 구간
Args:
high: 최근 고점
low: 최근 저점
trend: 'bullish' 또는 'bearish'
Returns:
OTE 존 정보 (buy_zone, sell_zone)
"""
range_size = high - low
if trend == 'bullish':
# 상승 추세: 0.618~0.786 되돌림에서 매수
ote_buy_low = low + (range_size * 0.618) # 61.8% 되돌림
ote_buy_high = low + (range_size * 0.786) # 78.6% 되돌림
return {
'buy_zone': (ote_buy_low, ote_buy_high),
'bias': 'bullish',
'range': range_size,
'confidence': 0.8
}
else: # bearish
# 하락 추세: 0.236~0.382 되돌림에서 매도
ote_sell_low = high - (range_size * 0.786) # 78.6% 되돌림
ote_sell_high = high - (range_size * 0.618) # 61.8% 되돌림
return {
'sell_zone': (ote_sell_low, ote_sell_high),
'bias': 'bearish',
'range': range_size,
'confidence': 0.8
}
def is_price_in_ote_zone(
self,
price: float,
ote_zones: Dict
) -> Tuple[bool, str]:
"""
현재 가격이 OTE 존 내부에 있는지 확인
Args:
price: 현재 가격
ote_zones: calculate_ote_zones 결과
Returns:
(is_in_zone, zone_type): OTE 존 내 여부와 존 타입
"""
if 'buy_zone' in ote_zones:
buy_low, buy_high = ote_zones['buy_zone']
if buy_low <= price <= buy_high:
return True, 'BUY_OTE'
if 'sell_zone' in ote_zones:
sell_low, sell_high = ote_zones['sell_zone']
if sell_low <= price <= sell_high:
return True, 'SELL_OTE'
return False, 'NONE'
def detect_liquidity_voids(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
threshold: float = 0.02
) -> List[Dict]:
"""
Liquidity Voids (유동성 공백) 검출
가격이 빠르게 통과한 구간 (갭)
Args:
high, low, close: 가격 데이터
threshold: 최소 갭 크기 (기본 2%)
Returns:
Liquidity Voids 리스트
"""
voids = []
for i in range(1, len(close)):
# 갭 크기 계산
gap = abs(close.iloc[i] - close.iloc[i-1]) / close.iloc[i-1]
if gap > threshold:
void = {
'index': i,
'start': min(close.iloc[i], close.iloc[i-1]),
'end': max(close.iloc[i], close.iloc[i-1]),
'size': gap,
'direction': 'up' if close.iloc[i] > close.iloc[i-1] else 'down',
'filled': False,
'strength': min(gap / threshold, 3.0) # 최대 3.0
}
voids.append(void)
# 최근 20개만 유지 (성능 최적화)
self.active_voids = voids[-20:]
return self.active_voids
def check_void_fill(
self,
current_price: float,
high: float,
low: float
) -> Optional[Dict]:
"""
Liquidity Void가 채워졌는지 확인
Args:
current_price: 현재 가격
high: 현재 고가
low: 현재 저가
Returns:
채워진 Void 정보 (없으면 None)
"""
for void in self.active_voids:
if void['filled']:
continue
# Void 구간 내부로 가격이 들어왔는지 확인
if void['start'] <= current_price <= void['end']:
void['filled'] = True
return void
# High/Low가 Void 구간을 완전히 채웠는지 확인
if (low <= void['start'] and high >= void['end']) or \
(high >= void['end'] and low <= void['start']):
void['filled'] = True
return void
return None
def detect_market_structure_shift(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
lookback: int = 10
) -> Optional[Dict]:
"""
Market Structure Shift (MSS) 검출
Args:
high, low, close: 가격 데이터
lookback: 룩백 기간
Returns:
MSS 정보 (없으면 None)
"""
if len(close) < lookback:
return None
recent_high = high.iloc[-lookback:].max()
recent_low = low.iloc[-lookback:].min()
current_close = close.iloc[-1]
prev_close = close.iloc[-2]
# Bullish MSS: 최근 저점 돌파 후 상승
if prev_close <= recent_low and current_close > recent_low:
mss = {
'type': 'BULLISH_MSS',
'price': recent_low,
'strength': (current_close - recent_low) / recent_low,
'timestamp': close.index[-1] if hasattr(close, 'index') else len(close) - 1
}
self.last_mss = mss
return mss
# Bearish MSS: 최근 고점 돌파 후 하락
elif prev_close >= recent_high and current_close < recent_high:
mss = {
'type': 'BEARISH_MSS',
'price': recent_high,
'strength': (recent_high - current_close) / recent_high,
'timestamp': close.index[-1] if hasattr(close, 'index') else len(close) - 1
}
self.last_mss = mss
return mss
return None
def calculate_displacement(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
volume: pd.Series = None,
period: int = 20
) -> Dict:
"""
Displacement (급격한 가격 이동) 측정
Args:
high, low, close: 가격 데이터
volume: 거래량 (선택)
period: 평균 계산 기간
Returns:
Displacement 정보
"""
# 평균 봉 크기 계산
avg_range = (high - low).rolling(period).mean().iloc[-1]
current_range = high.iloc[-1] - low.iloc[-1]
# 평균 가격 변화
avg_change = close.diff().abs().rolling(period).mean().iloc[-1]
current_change = abs(close.iloc[-1] - close.iloc[-2])
# Displacement 강도
range_ratio = current_range / avg_range if avg_range > 0 else 0
change_ratio = current_change / avg_change if avg_change > 0 else 0
# 거래량 스파이크 확인 (있는 경우)
volume_spike = 1.0
if volume is not None and len(volume) >= period:
avg_volume = volume.rolling(period).mean().iloc[-1]
current_volume = volume.iloc[-1]
volume_spike = current_volume / avg_volume if avg_volume > 0 else 1.0
# Displacement 판단 (평균의 2배 이상)
is_displacement = (range_ratio >= 2.0 or change_ratio >= 2.0) and volume_spike >= 1.5
direction = 'up' if close.iloc[-1] > close.iloc[-2] else 'down'
return {
'is_displacement': is_displacement,
'direction': direction,
'range_ratio': range_ratio,
'change_ratio': change_ratio,
'volume_spike': volume_spike,
'strength': min((range_ratio + change_ratio + volume_spike) / 3, 5.0)
}
def generate_ict_signal(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
volume: pd.Series = None
) -> Dict:
"""
종합 ICT Order Flow 신호 생성
Args:
high, low, close: 가격 데이터
volume: 거래량 (선택)
Returns:
ICT 신호 정보
"""
# 1. Killzone 확인
killzone, killzone_weight = self.get_current_killzone()
# 2. 추세 판단 (단순화)
trend = 'bullish' if close.iloc[-1] > close.iloc[-20:].mean() else 'bearish'
# 3. OTE 존 계산
recent_high = high.iloc[-50:].max()
recent_low = low.iloc[-50:].min()
ote_zones = self.calculate_ote_zones(recent_high, recent_low, trend)
# 4. 현재 가격이 OTE 존에 있는지 확인
current_price = close.iloc[-1]
in_ote, ote_type = self.is_price_in_ote_zone(current_price, ote_zones)
# 5. Liquidity Voids 검출
voids = self.detect_liquidity_voids(high, low, close)
filled_void = self.check_void_fill(current_price, high.iloc[-1], low.iloc[-1])
# 6. MSS 검출
mss = self.detect_market_structure_shift(high, low, close)
# 7. Displacement 측정
displacement = self.calculate_displacement(high, low, close, volume)
# 8. 신호 생성
signal = {
'killzone': killzone,
'killzone_weight': killzone_weight,
'trend': trend,
'ote_zones': ote_zones,
'in_ote': in_ote,
'ote_type': ote_type,
'liquidity_voids': len([v for v in voids if not v['filled']]),
'filled_void': filled_void,
'mss': mss,
'displacement': displacement,
'action': 'hold',
'confidence': 0.5
}
# 9. 매수/매도 판단
confidence = 0.5
# 강력한 매수 신호
if (in_ote and ote_type == 'BUY_OTE' and
trend == 'bullish' and
killzone_weight >= 0.9 and
(mss and mss['type'] == 'BULLISH_MSS' or displacement['direction'] == 'up')):
signal['action'] = 'buy'
confidence = 0.85 * killzone_weight
# 강력한 매도 신호
elif (in_ote and ote_type == 'SELL_OTE' and
trend == 'bearish' and
killzone_weight >= 0.9 and
(mss and mss['type'] == 'BEARISH_MSS' or displacement['direction'] == 'down')):
signal['action'] = 'sell'
confidence = 0.85 * killzone_weight
# Liquidity Void 채워짐 신호 (보조)
elif filled_void:
if filled_void['direction'] == 'up' and trend == 'bullish':
signal['action'] = 'buy'
confidence = 0.7 * killzone_weight
elif filled_void['direction'] == 'down' and trend == 'bearish':
signal['action'] = 'sell'
confidence = 0.7 * killzone_weight
signal['confidence'] = confidence
return signal
# 사용 예시
if __name__ == '__main__':
# 테스트 데이터 생성
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='1H')
test_data = pd.DataFrame({
'high': np.cumsum(np.random.randn(100) * 10) + 50000,
'low': np.cumsum(np.random.randn(100) * 10) + 49800,
'close': np.cumsum(np.random.randn(100) * 10) + 49900,
'volume': np.random.randint(100, 1000, 100)
}, index=dates)
# ICT Order Flow 초기화
ict = ICTOrderFlow()
# 신호 생성
signal = ict.generate_ict_signal(
test_data['high'],
test_data['low'],
test_data['close'],
test_data['volume']
)
print("="*60)
print("ICT Order Flow 신호")
print("="*60)
print(f"Killzone: {signal['killzone']} (가중치: {signal['killzone_weight']})")
print(f"추세: {signal['trend']}")
print(f"OTE 존 내부: {signal['in_ote']} ({signal['ote_type']})")
print(f"활성 Liquidity Voids: {signal['liquidity_voids']}개")
print(f"MSS: {signal['mss']['type'] if signal['mss'] else 'None'}")
print(f"Displacement: {signal['displacement']['is_displacement']}")
print(f"\n액션: {signal['action'].upper()}")
print(f"신뢰도: {signal['confidence']:.2%}")
print("="*60)
"""
Phase 14.1 통합 트레이딩 전략
모든 개선사항 포함
통합 지표:
1. ICT Order Flow (완전 재구현)
2. SMC Internal Structure (내부 구조 필터링)
3. FVG Dynamic (동적 모드 + Auto Threshold)
4. Supply & Demand MTF (다중 타임프레임)
5. Premium/Discount 8단계 + Confirmation
기존 지표와 결합하여 최종 신호 생성
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class TradingSignal:
"""거래 신호 데이터 클래스"""
action: str # 'buy', 'sell', 'hold'
confidence: float # 0.0 ~ 1.0
entry_price: float
stop_loss: float
take_profit: float
leverage: float
indicators_used: List[str]
signal_strength: Dict[str, float]
def to_dict(self) -> Dict:
return {
'action': self.action,
'confidence': self.confidence,
'entry_price': self.entry_price,
'stop_loss': self.stop_loss,
'take_profit': self.take_profit,
'leverage': self.leverage,
'indicators_used': self.indicators_used,
'signal_strength': self.signal_strength
}
class IntegratedPhase14Strategy:
"""Phase 14.1 통합 전략"""
def __init__(self, config: Dict = None):
self.config = config or {}
# 각 지표 모듈 초기화 (여기서는 간소화)
self.weights = {
'ict_order_flow': 0.25, # 최고 가중치
'smc': 0.25, # 최고 가중치
'fvg': 0.20,
'supply_demand': 0.15,
'premium_discount': 0.15
}
# 리스크 관리
self.max_leverage = self.config.get('max_leverage', 15)
self.base_leverage = self.config.get('base_leverage', 10)
self.risk_per_trade = self.config.get('risk_per_trade', 0.02) # 2%
def calculate_atr(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14
) -> float:
"""ATR 계산 (손절/이익실현용)"""
tr1 = high - low
tr2 = abs(high - close.shift(1))
tr3 = abs(low - close.shift(1))
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(period).mean().iloc[-1]
return atr if not pd.isna(atr) else (high.iloc[-1] - low.iloc[-1])
def aggregate_signals(
self,
ict_signal: Dict,
smc_signal: Dict,
fvg_signal: Dict,
sd_signal: Dict,
pd_signal: Dict
) -> Dict:
"""
모든 지표 신호 통합
가중 평균으로 최종 신호 결정
Returns:
통합 신호
"""
signals = {
'ict_order_flow': ict_signal,
'smc': smc_signal,
'fvg': fvg_signal,
'supply_demand': sd_signal,
'premium_discount': pd_signal
}
# 각 지표의 액션을 점수화
action_scores = {'buy': 0, 'sell': 0, 'hold': 0}
weighted_confidence = 0
for indicator, signal in signals.items():
weight = self.weights[indicator]
action = signal.get('action', 'hold')
confidence = signal.get('confidence', 0.5)
# 액션 점수 누적
action_scores[action] += weight * confidence
weighted_confidence += weight * confidence
# 최종 액션 결정
final_action = max(action_scores, key=action_scores.get)
final_confidence = action_scores[final_action]
# Hold 신호가 너무 강하면 거래 안 함
if action_scores['hold'] > 0.6:
final_action = 'hold'
final_confidence = action_scores['hold']
# 개별 지표 강도
signal_strength = {
indicator: signals[indicator].get('confidence', 0.5)
for indicator in signals
}
return {
'action': final_action,
'confidence': final_confidence,
'action_scores': action_scores,
'signal_strength': signal_strength,
'used_indicators': list(signals.keys())
}
def calculate_position_sizing(
self,
confidence: float,
current_price: float,
stop_loss_distance: float,
account_balance: float = 100.0
) -> Dict:
"""
포지션 사이징 및 레버리지 계산
Args:
confidence: 신호 신뢰도
current_price: 현재 가격
stop_loss_distance: 손절까지 거리
account_balance: 계좌 잔액
Returns:
포지션 정보
"""
# 신뢰도 기반 레버리지 조정
if confidence >= 0.85:
leverage = self.max_leverage # 15x
elif confidence >= 0.75:
leverage = self.base_leverage + 3 # 13x
elif confidence >= 0.65:
leverage = self.base_leverage # 10x
else:
leverage = self.base_leverage - 2 # 8x
# 리스크 금액
risk_amount = account_balance * self.risk_per_trade
# 포지션 크기 (USD)
if stop_loss_distance > 0:
position_size = (risk_amount / stop_loss_distance) * current_price
else:
position_size = account_balance * 0.1 # 기본 10%
# 레버리지 적용 후 최대 포지션
max_position = account_balance * leverage
position_size = min(position_size, max_position)
return {
'leverage': leverage,
'position_size_usd': position_size,
'position_size_coins': position_size / current_price,
'risk_amount': risk_amount
}
def calculate_stop_loss_take_profit(
self,
action: str,
entry_price: float,
atr: float,
confidence: float
) -> Tuple[float, float]:
"""
손절/이익실현 계산
Args:
action: 'buy' or 'sell'
entry_price: 진입 가격
atr: ATR 값
confidence: 신뢰도
Returns:
(stop_loss, take_profit)
"""
# ATR 배수 (신뢰도 기반)
if confidence >= 0.8:
sl_multiplier = 1.5 # 타이트한 손절
tp_multiplier = 3.0 # 넉넉한 이익
else:
sl_multiplier = 2.0
tp_multiplier = 2.5
if action == 'buy':
stop_loss = entry_price - (atr * sl_multiplier)
take_profit = entry_price + (atr * tp_multiplier)
else: # sell
stop_loss = entry_price + (atr * sl_multiplier)
take_profit = entry_price - (atr * tp_multiplier)
return stop_loss, take_profit
def generate_final_signal(
self,
open_prices: pd.Series,
high: pd.Series,
low: pd.Series,
close: pd.Series,
volume: pd.Series = None,
# 간소화: 각 지표의 계산된 신호를 직접 받음
ict_signal: Dict = None,
smc_signal: Dict = None,
fvg_signal: Dict = None,
sd_signal: Dict = None,
pd_signal: Dict = None,
account_balance: float = 100.0
) -> TradingSignal:
"""
최종 거래 신호 생성
Args:
open, high, low, close, volume: 가격 데이터
*_signal: 각 지표의 계산된 신호
account_balance: 계좌 잔액
Returns:
최종 거래 신호
"""
# 1. 기본값 설정 (신호가 없는 경우)
default_signal = {'action': 'hold', 'confidence': 0.5}
ict_signal = ict_signal or default_signal
smc_signal = smc_signal or default_signal
fvg_signal = fvg_signal or default_signal
sd_signal = sd_signal or default_signal
pd_signal = pd_signal or default_signal
# 2. 신호 통합
aggregated = self.aggregate_signals(
ict_signal, smc_signal, fvg_signal, sd_signal, pd_signal
)
action = aggregated['action']
confidence = aggregated['confidence']
# 3. Hold 신호면 거래하지 않음
if action == 'hold' or confidence < 0.6:
return TradingSignal(
action='hold',
confidence=confidence,
entry_price=close.iloc[-1],
stop_loss=0,
take_profit=0,
leverage=0,
indicators_used=aggregated['used_indicators'],
signal_strength=aggregated['signal_strength']
)
# 4. ATR 계산
atr = self.calculate_atr(high, low, close)
# 5. 진입가, 손절, 이익실현
entry_price = close.iloc[-1]
stop_loss, take_profit = self.calculate_stop_loss_take_profit(
action, entry_price, atr, confidence
)
# 6. 포지션 사이징
sl_distance = abs(entry_price - stop_loss) / entry_price
position = self.calculate_position_sizing(
confidence, entry_price, sl_distance, account_balance
)
# 7. 최종 신호 생성
final_signal = TradingSignal(
action=action,
confidence=confidence,
entry_price=entry_price,
stop_loss=stop_loss,
take_profit=take_profit,
leverage=position['leverage'],
indicators_used=aggregated['used_indicators'],
signal_strength=aggregated['signal_strength']
)
return final_signal
def print_signal_summary(self, signal: TradingSignal):
"""신호 요약 출력"""
print("="*70)
print("🚀 Phase 14.1 통합 트레이딩 신호")
print("="*70)
print(f"액션: {signal.action.upper()}")
print(f"신뢰도: {signal.confidence:.2%}")
print(f"\n진입 가격: ${signal.entry_price:.2f}")
print(f"손절 가격: ${signal.stop_loss:.2f}")
print(f"이익실현: ${signal.take_profit:.2f}")
print(f"레버리지: {signal.leverage:.0f}x")
print(f"\n사용된 지표: {', '.join(signal.indicators_used)}")
print(f"\n지표별 신호 강도:")
for indicator, strength in signal.signal_strength.items():
bar = '█' * int(strength * 20)
print(f" {indicator:20s}: {bar:20s} {strength:.2%}")
# 손익 비율
if signal.action != 'hold':
risk = abs(signal.entry_price - signal.stop_loss)
reward = abs(signal.take_profit - signal.entry_price)
risk_reward = reward / risk if risk > 0 else 0
print(f"\n리스크/리워드 비율: 1:{risk_reward:.2f}")
print("="*70)
# 사용 예시
if __name__ == '__main__':
# 테스트 데이터
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='1H')
test_data = pd.DataFrame({
'open': np.cumsum(np.random.randn(100) * 10) + 49850,
'high': np.cumsum(np.random.randn(100) * 10) + 50000,
'low': np.cumsum(np.random.randn(100) * 10) + 49800,
'close': np.cumsum(np.random.randn(100) * 10) + 49900,
'volume': np.random.randint(100, 1000, 100)
}, index=dates)
# 전략 초기화
strategy = IntegratedPhase14Strategy({
'max_leverage': 15,
'base_leverage': 10,
'risk_per_trade': 0.02
})
# 모의 신호 (실제로는 각 지표 모듈에서 계산)
mock_signals = {
'ict_signal': {'action': 'buy', 'confidence': 0.85},
'smc_signal': {'action': 'buy', 'confidence': 0.80},
'fvg_signal': {'action': 'buy', 'confidence': 0.75},
'sd_signal': {'action': 'buy', 'confidence': 0.70},
'pd_signal': {'action': 'buy', 'confidence': 0.78}
}
# 최종 신호 생성
final_signal = strategy.generate_final_signal(
test_data['open'],
test_data['high'],
test_data['low'],
test_data['close'],
test_data['volume'],
**mock_signals,
account_balance=100.0
)
# 결과 출력
strategy.print_signal_summary(final_signal)
# JSON 형식으로도 출력 가능
print("\nJSON 형식:")
import json
print(json.dumps(final_signal.to_dict(), indent=2))
"""
Premium/Discount Zones - 8단계 + Confirmation 신호
Phase 14.2 고효율 개선
기능:
1. 8단계 Premium/Discount 구분
2. RSI/Volume/캔들 패턴 Confirmation
3. SMC Order Block 통합
4. FVG 통합
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from enum import Enum
class PremiumDiscountZone(Enum):
"""Premium/Discount 존 8단계"""
EXTREME_PREMIUM = 'extreme_premium'
PREMIUM = 'premium'
SLIGHT_PREMIUM = 'slight_premium'
EQUILIBRIUM = 'equilibrium'
SLIGHT_DISCOUNT = 'slight_discount'
DISCOUNT = 'discount'
EXTREME_DISCOUNT = 'extreme_discount'
class PremiumDiscountAnalyzer:
"""Premium/Discount 8단계 분석기"""
def __init__(self, config: Dict = None):
self.config = config or {}
# 설정
self.lookback_period = self.config.get('lookback_period', 50)
self.rsi_period = self.config.get('rsi_period', 14)
self.rsi_overbought = self.config.get('rsi_overbought', 70)
self.rsi_oversold = self.config.get('rsi_oversold', 30)
self.volume_spike_threshold = self.config.get('volume_spike_threshold', 1.5)
def calculate_8_tier_zones(
self,
high: float,
low: float
) -> Dict[str, Tuple[float, float]]:
"""
8단계 Premium/Discount 존 계산
Args:
high: 최근 고점
low: 최근 저점
Returns:
8단계 존 딕셔너리
"""
range_size = high - low
zones = {
PremiumDiscountZone.EXTREME_PREMIUM.value: (
high - (range_size * 0.05),
high
),
PremiumDiscountZone.PREMIUM.value: (
high - (range_size * 0.25),
high - (range_size * 0.05)
),
PremiumDiscountZone.SLIGHT_PREMIUM.value: (
high - (range_size * 0.45),
high - (range_size * 0.25)
),
PremiumDiscountZone.EQUILIBRIUM.value: (
high - (range_size * 0.55),
high - (range_size * 0.45)
),
PremiumDiscountZone.SLIGHT_DISCOUNT.value: (
high - (range_size * 0.75),
high - (range_size * 0.55)
),
PremiumDiscountZone.DISCOUNT.value: (
high - (range_size * 0.95),
high - (range_size * 0.75)
),
PremiumDiscountZone.EXTREME_DISCOUNT.value: (
low,
high - (range_size * 0.95)
)
}
return zones
def classify_price_position(
self,
current_price: float,
zones: Dict[str, Tuple[float, float]]
) -> Dict:
"""
현재 가격 위치 분류
Args:
current_price: 현재 가격
zones: 8단계 존
Returns:
분류 정보
"""
for zone_name, (bottom, top) in zones.items():
if bottom <= current_price <= top:
# 존 내 정확한 위치 (0~1)
position_in_zone = (current_price - bottom) / (top - bottom) if (top - bottom) > 0 else 0.5
# 바이어스 결정
if zone_name == PremiumDiscountZone.EXTREME_PREMIUM.value:
bias = 'strong_sell'
bias_strength = 1.0
elif zone_name == PremiumDiscountZone.PREMIUM.value:
bias = 'sell'
bias_strength = 0.8
elif zone_name == PremiumDiscountZone.SLIGHT_PREMIUM.value:
bias = 'neutral_sell'
bias_strength = 0.6
elif zone_name == PremiumDiscountZone.EQUILIBRIUM.value:
bias = 'neutral'
bias_strength = 0.5
elif zone_name == PremiumDiscountZone.SLIGHT_DISCOUNT.value:
bias = 'neutral_buy'
bias_strength = 0.6
elif zone_name == PremiumDiscountZone.DISCOUNT.value:
bias = 'buy'
bias_strength = 0.8
else: # EXTREME_DISCOUNT
bias = 'strong_buy'
bias_strength = 1.0
return {
'zone': zone_name,
'bias': bias,
'bias_strength': bias_strength,
'position_in_zone': position_in_zone,
'zone_bottom': bottom,
'zone_top': top
}
# 범위 밖 (드물게 발생)
return {
'zone': 'out_of_range',
'bias': 'neutral',
'bias_strength': 0.5,
'position_in_zone': 0.0,
'zone_bottom': 0,
'zone_top': 0
}
def calculate_rsi(
self,
close: pd.Series,
period: int = 14
) -> pd.Series:
"""RSI (Relative Strength Index) 계산"""
delta = close.diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(period).mean()
avg_loss = loss.rolling(period).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def check_rsi_confirmation(
self,
close: pd.Series,
bias: str
) -> Tuple[bool, float]:
"""
RSI 확인 신호
Args:
close: 종가 데이터
bias: 현재 바이어스
Returns:
(확인 여부, RSI 값)
"""
rsi = self.calculate_rsi(close, self.rsi_period)
if rsi.empty or pd.isna(rsi.iloc[-1]):
return False, 50.0
current_rsi = rsi.iloc[-1]
# 매수 확인: 디스카운트 존 + RSI 과매도
if bias in ['strong_buy', 'buy'] and current_rsi <= self.rsi_oversold:
return True, current_rsi
# 매도 확인: 프리미엄 존 + RSI 과매수
elif bias in ['strong_sell', 'sell'] and current_rsi >= self.rsi_overbought:
return True, current_rsi
return False, current_rsi
def check_volume_confirmation(
self,
volume: pd.Series,
period: int = 20
) -> Tuple[bool, float]:
"""
거래량 확인 신호
Args:
volume: 거래량 데이터
period: 평균 계산 기간
Returns:
(스파이크 여부, 비율)
"""
if volume is None or len(volume) < period:
return False, 1.0
avg_volume = volume.rolling(period).mean().iloc[-1]
current_volume = volume.iloc[-1]
if avg_volume == 0:
return False, 1.0
volume_ratio = current_volume / avg_volume
# 거래량 스파이크 (평균의 1.5배 이상)
if volume_ratio >= self.volume_spike_threshold:
return True, volume_ratio
return False, volume_ratio
def detect_candlestick_pattern(
self,
open_prices: pd.Series,
high: pd.Series,
low: pd.Series,
close: pd.Series
) -> Optional[str]:
"""
캔들스틱 패턴 검출
Returns:
패턴 이름 또는 None
"""
if len(close) < 3:
return None
# 현재 캔들
curr_open = open_prices.iloc[-1]
curr_high = high.iloc[-1]
curr_low = low.iloc[-1]
curr_close = close.iloc[-1]
curr_body = abs(curr_close - curr_open)
curr_range = curr_high - curr_low
# 이전 캔들
prev_open = open_prices.iloc[-2]
prev_high = high.iloc[-2]
prev_low = low.iloc[-2]
prev_close = close.iloc[-2]
# Hammer (강세 반전)
if (curr_close > curr_open and # 양봉
(curr_high - curr_close) < curr_body * 0.3 and # 위 꼬리 짧음
(curr_open - curr_low) > curr_body * 2): # 아래 꼬리 길음
return 'hammer'
# Shooting Star (약세 반전)
elif (curr_close < curr_open and # 음봉
(curr_close - curr_low) < curr_body * 0.3 and # 아래 꼬리 짧음
(curr_high - curr_open) > curr_body * 2): # 위 꼬리 길음
return 'shooting_star'
# Engulfing Bullish (강세)
elif (prev_close < prev_open and # 이전 음봉
curr_close > curr_open and # 현재 양봉
curr_open < prev_close and # 현재 시가가 이전 종가보다 낮음
curr_close > prev_open): # 현재 종가가 이전 시가보다 높음
return 'engulfing_bullish'
# Engulfing Bearish (약세)
elif (prev_close > prev_open and # 이전 양봉
curr_close < curr_open and # 현재 음봉
curr_open > prev_close and # 현재 시가가 이전 종가보다 높음
curr_close < prev_open): # 현재 종가가 이전 시가보다 낮음
return 'engulfing_bearish'
# Doji (반전 가능성)
elif curr_body < curr_range * 0.1:
return 'doji'
return None
def generate_pd_signal(
self,
open_prices: pd.Series,
high: pd.Series,
low: pd.Series,
close: pd.Series,
volume: pd.Series = None
) -> Dict:
"""
종합 Premium/Discount 신호 생성
Args:
open, high, low, close: 가격 데이터
volume: 거래량 (선택)
Returns:
P/D 신호 정보
"""
# 1. 최근 고점/저점
recent_high = high.iloc[-self.lookback_period:].max()
recent_low = low.iloc[-self.lookback_period:].min()
current_price = close.iloc[-1]
# 2. 8단계 존 계산
zones = self.calculate_8_tier_zones(recent_high, recent_low)
# 3. 현재 가격 분류
classification = self.classify_price_position(current_price, zones)
# 4. RSI 확인
rsi_confirmed, rsi_value = self.check_rsi_confirmation(close, classification['bias'])
# 5. 거래량 확인
volume_confirmed, volume_ratio = self.check_volume_confirmation(volume) if volume is not None else (False, 1.0)
# 6. 캔들 패턴
candle_pattern = self.detect_candlestick_pattern(open_prices, high, low, close)
# 7. 신호 생성
signal = {
'zone': classification['zone'],
'bias': classification['bias'],
'bias_strength': classification['bias_strength'],
'position_in_zone': classification['position_in_zone'],
'rsi': rsi_value,
'rsi_confirmed': rsi_confirmed,
'volume_ratio': volume_ratio,
'volume_confirmed': volume_confirmed,
'candle_pattern': candle_pattern,
'action': 'hold',
'confidence': 0.5
}
# 8. 매수/매도 판단
confidence = classification['bias_strength']
# 확인 신호 수
confirmations = sum([
rsi_confirmed,
volume_confirmed,
candle_pattern in ['hammer', 'engulfing_bullish'] if classification['bias'] in ['strong_buy', 'buy'] else False,
candle_pattern in ['shooting_star', 'engulfing_bearish'] if classification['bias'] in ['strong_sell', 'sell'] else False
])
# 강력한 매수 신호
if classification['bias'] in ['strong_buy', 'buy'] and confirmations >= 2:
signal['action'] = 'buy'
confidence = classification['bias_strength'] + (confirmations * 0.05)
# 강력한 매도 신호
elif classification['bias'] in ['strong_sell', 'sell'] and confirmations >= 2:
signal['action'] = 'sell'
confidence = classification['bias_strength'] + (confirmations * 0.05)
# 보조 신호 (1개 확인)
elif classification['bias'] in ['neutral_buy'] and confirmations >= 1:
signal['action'] = 'buy'
confidence = 0.65
elif classification['bias'] in ['neutral_sell'] and confirmations >= 1:
signal['action'] = 'sell'
confidence = 0.65
signal['confidence'] = min(confidence, 0.95)
signal['confirmations'] = confirmations
return signal
# 사용 예시
if __name__ == '__main__':
# 테스트 데이터
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='1H')
test_data = pd.DataFrame({
'open': np.cumsum(np.random.randn(100) * 10) + 49850,
'high': np.cumsum(np.random.randn(100) * 10) + 50000,
'low': np.cumsum(np.random.randn(100) * 10) + 49800,
'close': np.cumsum(np.random.randn(100) * 10) + 49900,
'volume': np.random.randint(100, 1000, 100)
}, index=dates)
# P/D 분석기 초기화
pd_analyzer = PremiumDiscountAnalyzer({
'lookback_period': 50,
'rsi_period': 14,
'rsi_overbought': 70,
'rsi_oversold': 30
})
# 신호 생성
signal = pd_analyzer.generate_pd_signal(
test_data['open'],
test_data['high'],
test_data['low'],
test_data['close'],
test_data['volume']
)
print("="*60)
print("Premium/Discount 8단계 신호")
print("="*60)
print(f"존: {signal['zone'].upper().replace('_', ' ')}")
print(f"바이어스: {signal['bias'].upper().replace('_', ' ')}")
print(f"바이어스 강도: {signal['bias_strength']:.2%}")
print(f"존 내 위치: {signal['position_in_zone']:.1%}")
print(f"\n확인 신호:")
print(f" RSI: {signal['rsi']:.2f} ({'✓ 확인됨' if signal['rsi_confirmed'] else '✗ 미확인'})")
print(f" 거래량: {signal['volume_ratio']:.2f}x ({'✓ 스파이크' if signal['volume_confirmed'] else '✗ 정상'})")
print(f" 캔들 패턴: {signal['candle_pattern'] or 'None'}")
print(f"\n전체 확인 수: {signal['confirmations']}/3")
print(f"\n액션: {signal['action'].upper()}")
print(f"신뢰도: {signal['confidence']:.2%}")
print("="*60)
"""
Smart Money Concepts (SMC) - 내부 구조 필터링
Phase 14.1 긴급 수정
추가 기능:
1. 내부 구조 (Internal Structure) - 5분봉 기반
2. Confluence 필터링 (internalFilterConfluence)
3. Order Block 갱신 로직
4. Premium/Discount 존 통합
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
@dataclass
class OrderBlock:
"""Order Block 데이터 클래스"""
type: str # 'bullish' or 'bearish'
top: float
bottom: float
timestamp: int
strength: float
mitigated: bool = False
internal: bool = False # 내부 구조 여부
@dataclass
class StructurePoint:
"""구조 포인트 (BOS/CHoCH)"""
type: str # 'BOS' or 'CHoCH'
direction: str # 'bullish' or 'bearish'
price: float
timestamp: int
internal: bool = False
class SMCInternalStructure:
"""SMC 내부 구조 완전 구현"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.order_blocks = []
self.structure_points = []
self.last_swing_high = None
self.last_swing_low = None
# 설정
self.swing_period = self.config.get('swing_period', 10)
self.internal_period = self.config.get('internal_period', 5)
self.internal_filter_confluence = self.config.get('internal_filter_confluence', True)
self.atr_multiplier = self.config.get('atr_multiplier', 2.0)
def calculate_atr(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
period: int = 14
) -> pd.Series:
"""Average True Range (ATR) 계산"""
tr1 = high - low
tr2 = abs(high - close.shift(1))
tr3 = abs(low - close.shift(1))
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(period).mean()
return atr
def detect_swing_points(
self,
high: pd.Series,
low: pd.Series,
period: int = 10
) -> Tuple[Optional[float], Optional[float]]:
"""
Swing High/Low 검출
Args:
high, low: 가격 데이터
period: 스윙 기간
Returns:
(swing_high, swing_low)
"""
if len(high) < period * 2 + 1:
return None, None
# Swing High: 중심이 좌우보다 높음
center_idx = -period - 1
center_high = high.iloc[center_idx]
left_highs = high.iloc[center_idx - period:center_idx]
right_highs = high.iloc[center_idx + 1:center_idx + period + 1]
swing_high = None
if (center_high > left_highs.max() and
center_high > right_highs.max()):
swing_high = center_high
# Swing Low: 중심이 좌우보다 낮음
center_low = low.iloc[center_idx]
left_lows = low.iloc[center_idx - period:center_idx]
right_lows = low.iloc[center_idx + 1:center_idx + period + 1]
swing_low = None
if (center_low < left_lows.min() and
center_low < right_lows.min()):
swing_low = center_low
return swing_high, swing_low
def detect_bos_choch(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
internal: bool = False
) -> Optional[StructurePoint]:
"""
Break of Structure (BOS) 및 Change of Character (CHoCH) 검출
Args:
high, low, close: 가격 데이터
internal: 내부 구조 검출 여부
Returns:
StructurePoint 또는 None
"""
period = self.internal_period if internal else self.swing_period
swing_high, swing_low = self.detect_swing_points(high, low, period)
current_close = close.iloc[-1]
prev_close = close.iloc[-2]
# Bullish BOS: 최근 고점 돌파
if swing_high and self.last_swing_high:
if prev_close <= self.last_swing_high and current_close > self.last_swing_high:
structure = StructurePoint(
type='BOS',
direction='bullish',
price=self.last_swing_high,
timestamp=len(close) - 1,
internal=internal
)
self.structure_points.append(structure)
return structure
# Bearish BOS: 최근 저점 돌파
if swing_low and self.last_swing_low:
if prev_close >= self.last_swing_low and current_close < self.last_swing_low:
structure = StructurePoint(
type='BOS',
direction='bearish',
price=self.last_swing_low,
timestamp=len(close) - 1,
internal=internal
)
self.structure_points.append(structure)
return structure
# CHoCH 검출 (추세 전환)
# Bullish CHoCH: 하락 추세 중 고점 돌파
if (swing_low and self.last_swing_low and
swing_low < self.last_swing_low and # 하락 추세 확인
swing_high and current_close > swing_high):
structure = StructurePoint(
type='CHoCH',
direction='bullish',
price=swing_high,
timestamp=len(close) - 1,
internal=internal
)
self.structure_points.append(structure)
return structure
# Bearish CHoCH: 상승 추세 중 저점 돌파
if (swing_high and self.last_swing_high and
swing_high > self.last_swing_high and # 상승 추세 확인
swing_low and current_close < swing_low):
structure = StructurePoint(
type='CHoCH',
direction='bearish',
price=swing_low,
timestamp=len(close) - 1,
internal=internal
)
self.structure_points.append(structure)
return structure
# Swing 포인트 업데이트
if swing_high:
self.last_swing_high = swing_high
if swing_low:
self.last_swing_low = swing_low
return None
def detect_order_block(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
structure: StructurePoint
) -> Optional[OrderBlock]:
"""
Order Block 검출
BOS/CHoCH 전 마지막 반대 방향 캔들
Args:
high, low, close: 가격 데이터
structure: BOS/CHoCH 구조 포인트
Returns:
OrderBlock 또는 None
"""
if structure.direction == 'bullish':
# Bullish OB: BOS 전 마지막 bearish 캔들
for i in range(len(close) - 1, max(0, len(close) - 20), -1):
if close.iloc[i] < close.iloc[i - 1]: # Bearish 캔들
ob = OrderBlock(
type='bullish',
top=high.iloc[i],
bottom=low.iloc[i],
timestamp=i,
strength=(high.iloc[i] - low.iloc[i]) / low.iloc[i],
internal=structure.internal
)
return ob
else: # bearish
# Bearish OB: BOS 전 마지막 bullish 캔들
for i in range(len(close) - 1, max(0, len(close) - 20), -1):
if close.iloc[i] > close.iloc[i - 1]: # Bullish 캔들
ob = OrderBlock(
type='bearish',
top=high.iloc[i],
bottom=low.iloc[i],
timestamp=i,
strength=(high.iloc[i] - low.iloc[i]) / low.iloc[i],
internal=structure.internal
)
return ob
return None
def apply_confluence_filter(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
structure: Optional[StructurePoint]
) -> bool:
"""
Confluence 필터 적용
고변동성 바(ATR 2배 이상)에서 발생한 내부 구조 무시
Args:
high, low, close: 가격 데이터
structure: 구조 포인트
Returns:
필터 통과 여부 (True = 통과, False = 무시)
"""
if not self.internal_filter_confluence or not structure or not structure.internal:
return True
# ATR 계산
atr = self.calculate_atr(high, low, close)
if atr.iloc[-1] == 0:
return True
# 현재 바의 범위
current_range = high.iloc[-1] - low.iloc[-1]
# 고변동성 바 감지 (ATR의 2배 이상)
is_high_volatility = current_range >= (self.atr_multiplier * atr.iloc[-1])
if is_high_volatility:
# 고변동성 바에서 발생한 내부 구조는 무시
return False
return True
def update_order_blocks(
self,
current_price: float,
high: float,
low: float
):
"""
Order Block 무효화 (Mitigation) 확인
가격이 OB를 완전히 통과하면 무효화
Args:
current_price: 현재 가격
high: 현재 고가
low: 현재 저가
"""
for ob in self.order_blocks:
if ob.mitigated:
continue
if ob.type == 'bullish':
# 가격이 OB 하단 아래로 떨어지면 무효화
if low < ob.bottom:
ob.mitigated = True
else: # bearish
# 가격이 OB 상단 위로 올라가면 무효화
if high > ob.top:
ob.mitigated = True
def calculate_premium_discount(
self,
high: float,
low: float,
current_price: float
) -> Dict:
"""
Premium/Discount 존 계산 (8단계)
Args:
high: 최근 고점
low: 최근 저점
current_price: 현재 가격
Returns:
Premium/Discount 정보
"""
range_size = high - low
if range_size == 0:
return {'zone': 'equilibrium', 'level': 0.5}
# 현재 가격 위치 (0~1)
position = (current_price - low) / range_size
# 8단계 구분
if position >= 0.95:
zone = 'extreme_premium'
bias = 'strong_sell'
elif position >= 0.75:
zone = 'premium'
bias = 'sell'
elif position >= 0.55:
zone = 'slight_premium'
bias = 'neutral_sell'
elif position >= 0.45:
zone = 'equilibrium'
bias = 'neutral'
elif position >= 0.25:
zone = 'slight_discount'
bias = 'neutral_buy'
elif position >= 0.05:
zone = 'discount'
bias = 'buy'
else:
zone = 'extreme_discount'
bias = 'strong_buy'
return {
'zone': zone,
'level': position,
'bias': bias,
'high': high,
'low': low
}
def generate_smc_signal(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series
) -> Dict:
"""
종합 SMC 신호 생성
Args:
high, low, close: 가격 데이터
Returns:
SMC 신호 정보
"""
# 1. 외부 구조 (Swing 기반)
external_structure = self.detect_bos_choch(high, low, close, internal=False)
# 2. 내부 구조 (5분봉 기반)
internal_structure = self.detect_bos_choch(high, low, close, internal=True)
# 3. Confluence 필터 적용
internal_valid = self.apply_confluence_filter(
high, low, close, internal_structure
)
# 4. Order Block 검출
if external_structure:
ob = self.detect_order_block(high, low, close, external_structure)
if ob:
self.order_blocks.append(ob)
if internal_structure and internal_valid:
ob = self.detect_order_block(high, low, close, internal_structure)
if ob:
self.order_blocks.append(ob)
# 5. Order Block 업데이트
current_price = close.iloc[-1]
self.update_order_blocks(current_price, high.iloc[-1], low.iloc[-1])
# 6. Premium/Discount 계산
recent_high = high.iloc[-50:].max()
recent_low = low.iloc[-50:].min()
pd_zone = self.calculate_premium_discount(recent_high, recent_low, current_price)
# 7. 활성 Order Blocks
active_obs = [ob for ob in self.order_blocks if not ob.mitigated][-10:]
# 8. 신호 생성
signal = {
'external_structure': external_structure,
'internal_structure': internal_structure if internal_valid else None,
'active_order_blocks': len(active_obs),
'premium_discount': pd_zone,
'action': 'hold',
'confidence': 0.5
}
# 9. 매수/매도 판단
confidence = 0.5
# 강력한 매수 신호
if (external_structure and external_structure.direction == 'bullish' and
pd_zone['bias'] in ['strong_buy', 'buy'] and
any(ob.type == 'bullish' and not ob.mitigated for ob in active_obs)):
signal['action'] = 'buy'
confidence = 0.8
# 강력한 매도 신호
elif (external_structure and external_structure.direction == 'bearish' and
pd_zone['bias'] in ['strong_sell', 'sell'] and
any(ob.type == 'bearish' and not ob.mitigated for ob in active_obs)):
signal['action'] = 'sell'
confidence = 0.8
# 내부 구조 보조 신호
elif internal_structure and internal_valid:
if internal_structure.direction == 'bullish' and pd_zone['bias'] in ['buy', 'neutral_buy']:
signal['action'] = 'buy'
confidence = 0.65
elif internal_structure.direction == 'bearish' and pd_zone['bias'] in ['sell', 'neutral_sell']:
signal['action'] = 'sell'
confidence = 0.65
signal['confidence'] = confidence
return signal
# 사용 예시
if __name__ == '__main__':
# 테스트 데이터
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='5T')
test_data = pd.DataFrame({
'high': np.cumsum(np.random.randn(100) * 10) + 50000,
'low': np.cumsum(np.random.randn(100) * 10) + 49800,
'close': np.cumsum(np.random.randn(100) * 10) + 49900,
}, index=dates)
# SMC 초기화
smc = SMCInternalStructure({
'swing_period': 10,
'internal_period': 5,
'internal_filter_confluence': True,
'atr_multiplier': 2.0
})
# 신호 생성
signal = smc.generate_smc_signal(
test_data['high'],
test_data['low'],
test_data['close']
)
print("="*60)
print("SMC 신호 (내부 구조 필터링 포함)")
print("="*60)
print(f"외부 구조: {signal['external_structure'].type if signal['external_structure'] else 'None'}")
print(f"내부 구조: {signal['internal_structure'].type if signal['internal_structure'] else 'None (필터링됨)'}")
print(f"활성 Order Blocks: {signal['active_order_blocks']}개")
print(f"Premium/Discount: {signal['premium_discount']['zone']} ({signal['premium_discount']['level']:.2%})")
print(f"바이어스: {signal['premium_discount']['bias']}")
print(f"\n액션: {signal['action'].upper()}")
print(f"신뢰도: {signal['confidence']:.2%}")
print("="*60)
"""
Supply & Demand - Multi-Timeframe (MTF) 구현
Phase 14.2 고효율 개선
추가 기능:
1. 다중 타임프레임 S/D 존 검출
2. Confluence (합류) 분석
3. 존 강도 및 우선순위
4. 자동 존 업데이트
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
@dataclass
class SupplyDemandZone:
"""Supply/Demand 존 데이터 클래스"""
type: str # 'supply' or 'demand'
top: float
bottom: float
timestamp: int
timeframe: str # '5m', '15m', '1h', etc.
strength: float
touches: int = 0
broken: bool = False
confluence_score: int = 1 # 여러 타임프레임에서 겹치는 횟수
class SupplyDemandMTF:
"""Supply & Demand Multi-Timeframe 완전 구현"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.zones = {
'5m': [],
'15m': [],
'1h': [],
'4h': []
}
# 설정
self.timeframes = self.config.get('timeframes', ['5m', '15m', '1h'])
self.zone_strength_threshold = self.config.get('zone_strength_threshold', 0.02)
self.confluence_distance = self.config.get('confluence_distance', 0.01) # 1%
self.max_zones_per_tf = self.config.get('max_zones_per_tf', 10)
def detect_supply_zone(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
lookback: int = 20
) -> Optional[SupplyDemandZone]:
"""
Supply Zone (공급 존) 검출
하락 전 마지막 상승 구간
Args:
high, low, close: 가격 데이터
lookback: 룩백 기간
Returns:
Supply Zone 또는 None
"""
if len(close) < lookback + 10:
return None
# 최근 고점 찾기
recent_high_idx = high.iloc[-lookback:].idxmax()
recent_high = high.loc[recent_high_idx]
# 고점 이후 하락 확인 (최소 5%)
subsequent_low = low.iloc[-lookback:].loc[recent_high_idx:].min()
if subsequent_low < recent_high * 0.95: # 5% 이상 하락
# Supply Zone = 고점 형성 전 상승 구간
zone_idx = high.iloc[-lookback:].tolist().index(recent_high)
if zone_idx > 0:
zone_high = high.iloc[-lookback + zone_idx]
zone_low = low.iloc[-lookback + zone_idx - 1]
# 존 강도 = 이후 하락 폭
strength = (recent_high - subsequent_low) / recent_high
if strength >= self.zone_strength_threshold:
return SupplyDemandZone(
type='supply',
top=zone_high,
bottom=zone_low,
timestamp=len(close) - lookback + zone_idx,
timeframe='unknown',
strength=strength
)
return None
def detect_demand_zone(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
lookback: int = 20
) -> Optional[SupplyDemandZone]:
"""
Demand Zone (수요 존) 검출
상승 전 마지막 하락 구간
Args:
high, low, close: 가격 데이터
lookback: 룩백 기간
Returns:
Demand Zone 또는 None
"""
if len(close) < lookback + 10:
return None
# 최근 저점 찾기
recent_low_idx = low.iloc[-lookback:].idxmin()
recent_low = low.loc[recent_low_idx]
# 저점 이후 상승 확인 (최소 5%)
subsequent_high = high.iloc[-lookback:].loc[recent_low_idx:].max()
if subsequent_high > recent_low * 1.05: # 5% 이상 상승
# Demand Zone = 저점 형성 전 하락 구간
zone_idx = low.iloc[-lookback:].tolist().index(recent_low)
if zone_idx > 0:
zone_high = high.iloc[-lookback + zone_idx - 1]
zone_low = low.iloc[-lookback + zone_idx]
# 존 강도 = 이후 상승 폭
strength = (subsequent_high - recent_low) / recent_low
if strength >= self.zone_strength_threshold:
return SupplyDemandZone(
type='demand',
top=zone_high,
bottom=zone_low,
timestamp=len(close) - lookback + zone_idx,
timeframe='unknown',
strength=strength
)
return None
def detect_zones_single_tf(
self,
high: pd.Series,
low: pd.Series,
close: pd.Series,
timeframe: str,
lookback: int = 50
) -> List[SupplyDemandZone]:
"""
단일 타임프레임에서 S/D 존 검출
Args:
high, low, close: 가격 데이터
timeframe: 타임프레임 ('5m', '15m', etc.)
lookback: 룩백 기간
Returns:
검출된 존 리스트
"""
zones = []
# 여러 룩백 기간으로 스캔
for period in [10, 20, 30, 40, 50]:
if period > len(close):
continue
# Supply Zone 검출
supply = self.detect_supply_zone(high, low, close, period)
if supply:
supply.timeframe = timeframe
zones.append(supply)
# Demand Zone 검출
demand = self.detect_demand_zone(high, low, close, period)
if demand:
demand.timeframe = timeframe
zones.append(demand)
# 중복 제거 및 정렬
unique_zones = self._remove_duplicate_zones(zones)
return unique_zones[:self.max_zones_per_tf]
def _remove_duplicate_zones(
self,
zones: List[SupplyDemandZone]
) -> List[SupplyDemandZone]:
"""중복 존 제거"""
if not zones:
return []
# 타입별 분리
supply_zones = [z for z in zones if z.type == 'supply']
demand_zones = [z for z in zones if z.type == 'demand']
# Supply 존 병합
supply_zones.sort(key=lambda x: x.top, reverse=True)
merged_supply = []
i = 0
while i < len(supply_zones):
current = supply_zones[i]
j = i + 1
while j < len(supply_zones):
if abs(current.top - supply_zones[j].top) / current.top <= 0.05: # 5% 이내
current.strength = max(current.strength, supply_zones[j].strength)
j += 1
else:
break
merged_supply.append(current)
i = j
# Demand 존 병합
demand_zones.sort(key=lambda x: x.bottom)
merged_demand = []
i = 0
while i < len(demand_zones):
current = demand_zones[i]
j = i + 1
while j < len(demand_zones):
if abs(current.bottom - demand_zones[j].bottom) / current.bottom <= 0.05:
current.strength = max(current.strength, demand_zones[j].strength)
j += 1
else:
break
merged_demand.append(current)
i = j
return merged_supply + merged_demand
def detect_zones_mtf(
self,
data_dict: Dict[str, pd.DataFrame]
) -> Dict[str, List[SupplyDemandZone]]:
"""
다중 타임프레임 S/D 존 검출
Args:
data_dict: {timeframe: DataFrame} 형식
DataFrame은 'high', 'low', 'close' 컬럼 포함
Returns:
타임프레임별 존 딕셔너리
"""
all_zones = {}
for tf in self.timeframes:
if tf not in data_dict:
continue
df = data_dict[tf]
zones = self.detect_zones_single_tf(
df['high'],
df['low'],
df['close'],
timeframe=tf
)
all_zones[tf] = zones
self.zones[tf] = zones
return all_zones
def find_confluence_zones(
self,
all_zones: Dict[str, List[SupplyDemandZone]] = None
) -> List[Dict]:
"""
타임프레임 간 Confluence 존 찾기
여러 타임프레임에서 겹치는 가격대
Args:
all_zones: 타임프레임별 존 (None이면 self.zones 사용)
Returns:
Confluence 존 리스트
"""
if all_zones is None:
all_zones = self.zones
# 모든 존 평탄화
flat_zones = []
for tf, zones in all_zones.items():
flat_zones.extend(zones)
if not flat_zones:
return []
# Confluence 검출
confluence_zones = []
for i, zone1 in enumerate(flat_zones):
overlapping = [zone1]
for zone2 in flat_zones[i+1:]:
if zone1.type != zone2.type:
continue
# 존 중심 가격
center1 = (zone1.top + zone1.bottom) / 2
center2 = (zone2.top + zone2.bottom) / 2
# 거리 계산
distance = abs(center1 - center2) / center1
if distance <= self.confluence_distance:
overlapping.append(zone2)
if len(overlapping) >= 2: # 최소 2개 타임프레임
# 합류 존 생성
confluence = {
'type': zone1.type,
'top': max(z.top for z in overlapping),
'bottom': min(z.bottom for z in overlapping),
'center': sum((z.top + z.bottom) / 2 for z in overlapping) / len(overlapping),
'confluence_score': len(overlapping),
'timeframes': [z.timeframe for z in overlapping],
'avg_strength': sum(z.strength for z in overlapping) / len(overlapping)
}
# 중복 확인
is_duplicate = False
for existing in confluence_zones:
if (existing['type'] == confluence['type'] and
abs(existing['center'] - confluence['center']) / existing['center'] <= 0.01):
is_duplicate = True
break
if not is_duplicate:
confluence_zones.append(confluence)
# 강도순 정렬
confluence_zones.sort(key=lambda x: (x['confluence_score'], x['avg_strength']), reverse=True)
return confluence_zones
def update_zone_touches(
self,
current_price: float,
high: float,
low: float
):
"""
존 터치 횟수 및 돌파 업데이트
Args:
current_price: 현재 가격
high: 현재 고가
low: 현재 저가
"""
for tf, zones in self.zones.items():
for zone in zones:
if zone.broken:
continue
if zone.type == 'supply':
# Supply Zone 터치 확인
if zone.bottom <= high <= zone.top:
zone.touches += 1
# 돌파 확인 (상단 통과)
if low > zone.top:
zone.broken = True
else: # demand
# Demand Zone 터치 확인
if zone.bottom <= low <= zone.top:
zone.touches += 1
# 돌파 확인 (하단 통과)
if high < zone.bottom:
zone.broken = True
def generate_sd_signal(
self,
current_price: float,
high: float,
low: float,
data_dict: Dict[str, pd.DataFrame] = None
) -> Dict:
"""
종합 Supply/Demand 신호 생성
Args:
current_price: 현재 가격
high: 현재 고가
low: 현재 저가
data_dict: 다중 타임프레임 데이터 (선택)
Returns:
S/D 신호 정보
"""
# 1. MTF 존 검출 (새 데이터가 있으면)
if data_dict:
self.detect_zones_mtf(data_dict)
# 2. 존 터치/돌파 업데이트
self.update_zone_touches(current_price, high, low)
# 3. Confluence 존 찾기
confluence_zones = self.find_confluence_zones()
# 4. 현재 가격 근처 존 찾기
nearby_zones = []
for tf, zones in self.zones.items():
for zone in zones:
if zone.broken:
continue
zone_center = (zone.top + zone.bottom) / 2
distance = abs(zone_center - current_price) / current_price
if distance <= 0.03: # 3% 이내
nearby_zones.append({
'type': zone.type,
'top': zone.top,
'bottom': zone.bottom,
'timeframe': zone.timeframe,
'strength': zone.strength,
'touches': zone.touches,
'distance': distance
})
# 5. 신호 생성
signal = {
'total_zones': sum(len(z) for z in self.zones.values()),
'active_zones': sum(len([zz for zz in z if not zz.broken]) for z in self.zones.values()),
'confluence_zones': len(confluence_zones),
'nearby_zones': len(nearby_zones),
'action': 'hold',
'confidence': 0.5
}
# 6. 매수/매도 판단
confidence = 0.5
# Confluence Demand Zone에서 매수
for conf in confluence_zones[:3]: # 상위 3개만
if conf['type'] == 'demand' and conf['bottom'] <= current_price <= conf['top']:
signal['action'] = 'buy'
confidence = 0.75 + (conf['confluence_score'] * 0.05)
signal['target_zone'] = conf
break
# Confluence Supply Zone에서 매도
for conf in confluence_zones[:3]:
if conf['type'] == 'supply' and conf['bottom'] <= current_price <= conf['top']:
signal['action'] = 'sell'
confidence = 0.75 + (conf['confluence_score'] * 0.05)
signal['target_zone'] = conf
break
# 근처 존 신호 (보조)
if signal['action'] == 'hold' and nearby_zones:
strongest_zone = max(nearby_zones, key=lambda x: x['strength'])
if strongest_zone['type'] == 'demand' and strongest_zone['touches'] < 3:
signal['action'] = 'buy'
confidence = 0.65 + (strongest_zone['strength'] * 10) - (strongest_zone['touches'] * 0.05)
signal['target_zone'] = strongest_zone
elif strongest_zone['type'] == 'supply' and strongest_zone['touches'] < 3:
signal['action'] = 'sell'
confidence = 0.65 + (strongest_zone['strength'] * 10) - (strongest_zone['touches'] * 0.05)
signal['target_zone'] = strongest_zone
signal['confidence'] = min(confidence, 0.95)
signal['confluence_list'] = confluence_zones[:5]
return signal
# 사용 예시
if __name__ == '__main__':
# 테스트 데이터 (다중 타임프레임)
np.random.seed(42)
data_mtf = {}
for tf, periods in [('5m', 200), ('15m', 100), ('1h', 50)]:
dates = pd.date_range('2024-01-01', periods=periods, freq=tf)
data_mtf[tf] = pd.DataFrame({
'high': np.cumsum(np.random.randn(periods) * 10) + 50000,
'low': np.cumsum(np.random.randn(periods) * 10) + 49800,
'close': np.cumsum(np.random.randn(periods) * 10) + 49900,
}, index=dates)
# S/D MTF 초기화
sd_mtf = SupplyDemandMTF({
'timeframes': ['5m', '15m', '1h'],
'zone_strength_threshold': 0.02,
'confluence_distance': 0.01
})
# 신호 생성
current_price = data_mtf['5m']['close'].iloc[-1]
signal = sd_mtf.generate_sd_signal(
current_price,
data_mtf['5m']['high'].iloc[-1],
data_mtf['5m']['low'].iloc[-1],
data_mtf
)
print("="*60)
print("Supply & Demand MTF 신호")
print("="*60)
print(f"전체 존: {signal['total_zones']}개")
print(f"활성 존: {signal['active_zones']}개")
print(f"Confluence 존: {signal['confluence_zones']}개")
print(f"근처 존: {signal['nearby_zones']}개")
if signal['confluence_list']:
print(f"\nConfluence 존 (상위 3개):")
for i, conf in enumerate(signal['confluence_list'][:3], 1):
print(f" {i}. {conf['type'].upper()}: ${conf['center']:.2f} "
f"(스코어: {conf['confluence_score']}, TF: {', '.join(conf['timeframes'])})")
if 'target_zone' in signal:
print(f"\n타겟 존:")
print(f" 타입: {signal['target_zone']['type'].upper()}")
if 'center' in signal['target_zone']:
print(f" 중심: ${signal['target_zone']['center']:.2f}")
else:
print(f" 상단: ${signal['target_zone']['top']:.2f}")
print(f" 하단: ${signal['target_zone']['bottom']:.2f}")
print(f"\n액션: {signal['action'].upper()}")
print(f"신뢰도: {signal['confidence']:.2%}")
print("="*60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment