Created
November 23, 2025 03:29
-
-
Save luckman538/276832be9321db46d9cd8f204677c130 to your computer and use it in GitHub Desktop.
indicators_py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Fair Value Gap (FVG) - 동적 모드 완전 구현 | |
| Phase 14.1 긴급 수정 | |
| 추가 기능: | |
| 1. 동적 FVG (Dynamic Mode) - 가격 추적 | |
| 2. Auto Threshold - 히스토리컬 평균 기반 | |
| 3. MTF (Multi-Timeframe) 지원 | |
| 4. FVG 무효화 후 재검증 | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Optional | |
| from dataclasses import dataclass, field | |
| @dataclass | |
| class FairValueGap: | |
| """Fair Value Gap 데이터 클래스""" | |
| bias: str # 'bullish' or 'bearish' | |
| top: float | |
| bottom: float | |
| timestamp: int | |
| mitigated: bool = False | |
| fill_percentage: float = 0.0 # 채워진 비율 (0~1) | |
| strength: float = 1.0 | |
| # 동적 모드용 | |
| original_top: float = field(default=0.0) | |
| original_bottom: float = field(default=0.0) | |
| def __post_init__(self): | |
| if self.original_top == 0.0: | |
| self.original_top = self.top | |
| if self.original_bottom == 0.0: | |
| self.original_bottom = self.bottom | |
| class FVGDynamic: | |
| """Fair Value Gap 동적 모드 완전 구현""" | |
| def __init__(self, config: Dict = None): | |
| self.config = config or {} | |
| self.active_fvgs = [] | |
| # 설정 | |
| self.dynamic_mode = self.config.get('dynamic', True) | |
| self.auto_threshold = self.config.get('auto_threshold', True) | |
| self.mitigation_threshold = self.config.get('mitigation_threshold', 0.5) | |
| self.max_fvgs = self.config.get('max_fvgs', 20) | |
| self.mtf_enabled = self.config.get('mtf_enabled', False) | |
| self.timeframes = self.config.get('timeframes', ['5m', '15m', '1h']) | |
| def calculate_auto_threshold( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| period: int = 50 | |
| ) -> float: | |
| """ | |
| Auto Threshold 계산 | |
| 히스토리컬 평균 변동 비율 기반 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| period: 평균 계산 기간 | |
| Returns: | |
| 자동 임계값 | |
| """ | |
| if len(close) < period: | |
| return 0.001 # 기본값 0.1% | |
| # 평균 범위 (High - Low) | |
| avg_range = (high - low).iloc[-period:].mean() | |
| # 평균 종가 | |
| avg_close = close.iloc[-period:].mean() | |
| if avg_close == 0: | |
| return 0.001 | |
| # 평균 변동 비율 | |
| auto_threshold = (avg_range / avg_close) * 2 # 2배 마진 | |
| # 최소/최대 제한 | |
| return max(0.0005, min(auto_threshold, 0.05)) # 0.05%~5% | |
| def detect_fvg_static( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| threshold: float = None | |
| ) -> List[FairValueGap]: | |
| """ | |
| 정적 FVG 검출 (3봉 패턴) | |
| Args: | |
| high, low, close: 가격 데이터 | |
| threshold: 최소 갭 크기 임계값 | |
| Returns: | |
| 검출된 FVG 리스트 | |
| """ | |
| if len(close) < 3: | |
| return [] | |
| # Auto Threshold 사용 | |
| if threshold is None: | |
| if self.auto_threshold: | |
| threshold = self.calculate_auto_threshold(high, low, close) | |
| else: | |
| threshold = self.config.get('threshold', 0.001) | |
| fvgs = [] | |
| # 최근 3봉 검사 | |
| for i in range(2, min(len(close), 50)): # 최근 50개 검사 | |
| # Bullish FVG: 현재 low > 2봉 전 high | |
| if low.iloc[-i] > high.iloc[-i-2]: | |
| gap_size = (low.iloc[-i] - high.iloc[-i-2]) / close.iloc[-i] | |
| if gap_size >= threshold: | |
| fvg = FairValueGap( | |
| bias='bullish', | |
| top=low.iloc[-i], | |
| bottom=high.iloc[-i-2], | |
| timestamp=len(close) - i, | |
| strength=min(gap_size / threshold, 5.0) | |
| ) | |
| fvgs.append(fvg) | |
| # Bearish FVG: 현재 high < 2봉 전 low | |
| elif high.iloc[-i] < low.iloc[-i-2]: | |
| gap_size = (low.iloc[-i-2] - high.iloc[-i]) / close.iloc[-i] | |
| if gap_size >= threshold: | |
| fvg = FairValueGap( | |
| bias='bearish', | |
| top=low.iloc[-i-2], | |
| bottom=high.iloc[-i], | |
| timestamp=len(close) - i, | |
| strength=min(gap_size / threshold, 5.0) | |
| ) | |
| fvgs.append(fvg) | |
| return fvgs | |
| def update_fvg_dynamic( | |
| self, | |
| current_price: float, | |
| high: float, | |
| low: float | |
| ): | |
| """ | |
| 동적 FVG 업데이트 | |
| 가격이 FVG 진입 시 갭 크기 축소 | |
| Args: | |
| current_price: 현재 가격 | |
| high: 현재 고가 | |
| low: 현재 저가 | |
| """ | |
| if not self.dynamic_mode: | |
| return | |
| for fvg in self.active_fvgs: | |
| if fvg.mitigated: | |
| continue | |
| if fvg.bias == 'bullish': | |
| # 가격이 FVG 내부로 들어오면 하단 위로 조정 | |
| if fvg.bottom <= current_price <= fvg.top: | |
| # 새로운 하단 = max(현재가, 원래 하단) | |
| new_bottom = max(min(low, fvg.top), fvg.bottom) | |
| fvg.bottom = new_bottom | |
| # 채워진 비율 계산 | |
| original_gap = fvg.original_top - fvg.original_bottom | |
| current_gap = fvg.top - fvg.bottom | |
| fvg.fill_percentage = 1.0 - (current_gap / original_gap) if original_gap > 0 else 0 | |
| # 완전히 채워지면 무효화 | |
| if high >= fvg.top: | |
| fvg.mitigated = True | |
| fvg.fill_percentage = 1.0 | |
| else: # bearish | |
| # 가격이 FVG 내부로 들어오면 상단 아래로 조정 | |
| if fvg.bottom <= current_price <= fvg.top: | |
| # 새로운 상단 = min(현재가, 원래 상단) | |
| new_top = min(max(high, fvg.bottom), fvg.top) | |
| fvg.top = new_top | |
| # 채워진 비율 계산 | |
| original_gap = fvg.original_top - fvg.original_bottom | |
| current_gap = fvg.top - fvg.bottom | |
| fvg.fill_percentage = 1.0 - (current_gap / original_gap) if original_gap > 0 else 0 | |
| # 완전히 채워지면 무효화 | |
| if low <= fvg.bottom: | |
| fvg.mitigated = True | |
| fvg.fill_percentage = 1.0 | |
| def check_mitigation( | |
| self, | |
| current_price: float, | |
| high: float, | |
| low: float | |
| ): | |
| """ | |
| FVG 무효화 (Mitigation) 확인 | |
| Args: | |
| current_price: 현재 가격 | |
| high: 현재 고가 | |
| low: 현재 저가 | |
| """ | |
| for fvg in self.active_fvgs: | |
| if fvg.mitigated: | |
| continue | |
| if fvg.bias == 'bullish': | |
| # Bullish FVG: 가격이 하단 이하로 떨어지면 무효화 | |
| if low < fvg.bottom * (1 - self.mitigation_threshold): | |
| fvg.mitigated = True | |
| fvg.fill_percentage = 0.0 # 역방향 돌파 | |
| else: # bearish | |
| # Bearish FVG: 가격이 상단 이상으로 올라가면 무효화 | |
| if high > fvg.top * (1 + self.mitigation_threshold): | |
| fvg.mitigated = True | |
| fvg.fill_percentage = 0.0 # 역방향 돌파 | |
| def merge_fvgs(self) -> List[FairValueGap]: | |
| """ | |
| 중복/인접 FVG 병합 | |
| Returns: | |
| 병합된 FVG 리스트 | |
| """ | |
| if len(self.active_fvgs) <= 1: | |
| return self.active_fvgs | |
| # 타입별 정렬 | |
| bullish_fvgs = [f for f in self.active_fvgs if f.bias == 'bullish' and not f.mitigated] | |
| bearish_fvgs = [f for f in self.active_fvgs if f.bias == 'bearish' and not f.mitigated] | |
| # Bullish FVG 병합 | |
| bullish_fvgs.sort(key=lambda x: x.bottom) | |
| merged_bullish = [] | |
| i = 0 | |
| while i < len(bullish_fvgs): | |
| current = bullish_fvgs[i] | |
| j = i + 1 | |
| # 인접 FVG 찾기 (10% 이내) | |
| while j < len(bullish_fvgs): | |
| next_fvg = bullish_fvgs[j] | |
| if abs(current.top - next_fvg.bottom) / current.top <= 0.1: | |
| # 병합 | |
| current.top = max(current.top, next_fvg.top) | |
| current.bottom = min(current.bottom, next_fvg.bottom) | |
| current.strength = max(current.strength, next_fvg.strength) | |
| j += 1 | |
| else: | |
| break | |
| merged_bullish.append(current) | |
| i = j | |
| # Bearish FVG 병합 (동일 로직) | |
| bearish_fvgs.sort(key=lambda x: x.top, reverse=True) | |
| merged_bearish = [] | |
| i = 0 | |
| while i < len(bearish_fvgs): | |
| current = bearish_fvgs[i] | |
| j = i + 1 | |
| while j < len(bearish_fvgs): | |
| next_fvg = bearish_fvgs[j] | |
| if abs(current.bottom - next_fvg.top) / current.bottom <= 0.1: | |
| current.top = max(current.top, next_fvg.top) | |
| current.bottom = min(current.bottom, next_fvg.bottom) | |
| current.strength = max(current.strength, next_fvg.strength) | |
| j += 1 | |
| else: | |
| break | |
| merged_bearish.append(current) | |
| i = j | |
| # 무효화된 FVG 추가 | |
| mitigated = [f for f in self.active_fvgs if f.mitigated] | |
| return merged_bullish + merged_bearish + mitigated | |
| def get_unmitigated_levels(self, max_levels: int = 5) -> List[Dict]: | |
| """ | |
| 미완성 FVG 레벨 반환 | |
| Args: | |
| max_levels: 최대 레벨 수 | |
| Returns: | |
| FVG 레벨 리스트 | |
| """ | |
| unmitigated = [f for f in self.active_fvgs if not f.mitigated] | |
| # 강도순 정렬 | |
| unmitigated.sort(key=lambda x: x.strength, reverse=True) | |
| levels = [] | |
| for fvg in unmitigated[:max_levels]: | |
| levels.append({ | |
| 'bias': fvg.bias, | |
| 'level': (fvg.top + fvg.bottom) / 2, | |
| 'top': fvg.top, | |
| 'bottom': fvg.bottom, | |
| 'fill_percentage': fvg.fill_percentage, | |
| 'strength': fvg.strength | |
| }) | |
| return levels | |
| def generate_fvg_signal( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series | |
| ) -> Dict: | |
| """ | |
| 종합 FVG 신호 생성 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| Returns: | |
| FVG 신호 정보 | |
| """ | |
| current_price = close.iloc[-1] | |
| current_high = high.iloc[-1] | |
| current_low = low.iloc[-1] | |
| # 1. 새로운 FVG 검출 | |
| new_fvgs = self.detect_fvg_static(high, low, close) | |
| # 2. 기존 FVG와 병합 | |
| self.active_fvgs.extend(new_fvgs) | |
| # 3. 동적 업데이트 | |
| self.update_fvg_dynamic(current_price, current_high, current_low) | |
| # 4. 무효화 확인 | |
| self.check_mitigation(current_price, current_high, current_low) | |
| # 5. FVG 병합 | |
| self.active_fvgs = self.merge_fvgs() | |
| # 6. 최대 개수 제한 | |
| self.active_fvgs = self.active_fvgs[-self.max_fvgs:] | |
| # 7. 미완성 레벨 | |
| unmitigated_levels = self.get_unmitigated_levels() | |
| # 8. 현재 가격 근처 FVG 찾기 | |
| nearby_fvgs = [] | |
| for fvg in self.active_fvgs: | |
| if fvg.mitigated: | |
| continue | |
| distance = abs((fvg.top + fvg.bottom) / 2 - current_price) / current_price | |
| if distance <= 0.02: # 2% 이내 | |
| nearby_fvgs.append(fvg) | |
| # 9. 신호 생성 | |
| signal = { | |
| 'total_fvgs': len(self.active_fvgs), | |
| 'active_fvgs': len([f for f in self.active_fvgs if not f.mitigated]), | |
| 'nearby_fvgs': len(nearby_fvgs), | |
| 'unmitigated_levels': unmitigated_levels, | |
| 'action': 'hold', | |
| 'confidence': 0.5 | |
| } | |
| # 10. 매수/매도 판단 | |
| confidence = 0.5 | |
| # FVG 근처에서 반등 예상 | |
| if nearby_fvgs: | |
| strongest_fvg = max(nearby_fvgs, key=lambda x: x.strength) | |
| if strongest_fvg.bias == 'bullish' and current_price <= strongest_fvg.top: | |
| # Bullish FVG에서 매수 신호 | |
| signal['action'] = 'buy' | |
| confidence = 0.7 + (strongest_fvg.strength * 0.1) | |
| signal['target_fvg'] = { | |
| 'bias': strongest_fvg.bias, | |
| 'top': strongest_fvg.top, | |
| 'bottom': strongest_fvg.bottom, | |
| 'fill_percentage': strongest_fvg.fill_percentage | |
| } | |
| elif strongest_fvg.bias == 'bearish' and current_price >= strongest_fvg.bottom: | |
| # Bearish FVG에서 매도 신호 | |
| signal['action'] = 'sell' | |
| confidence = 0.7 + (strongest_fvg.strength * 0.1) | |
| signal['target_fvg'] = { | |
| 'bias': strongest_fvg.bias, | |
| 'top': strongest_fvg.top, | |
| 'bottom': strongest_fvg.bottom, | |
| 'fill_percentage': strongest_fvg.fill_percentage | |
| } | |
| signal['confidence'] = min(confidence, 0.95) | |
| return signal | |
| # 사용 예시 | |
| if __name__ == '__main__': | |
| # 테스트 데이터 | |
| np.random.seed(42) | |
| dates = pd.date_range('2024-01-01', periods=100, freq='1H') | |
| test_data = pd.DataFrame({ | |
| 'high': np.cumsum(np.random.randn(100) * 10) + 50000, | |
| 'low': np.cumsum(np.random.randn(100) * 10) + 49800, | |
| 'close': np.cumsum(np.random.randn(100) * 10) + 49900, | |
| }, index=dates) | |
| # FVG 초기화 | |
| fvg = FVGDynamic({ | |
| 'dynamic': True, | |
| 'auto_threshold': True, | |
| 'mitigation_threshold': 0.5, | |
| 'max_fvgs': 20 | |
| }) | |
| # 신호 생성 | |
| signal = fvg.generate_fvg_signal( | |
| test_data['high'], | |
| test_data['low'], | |
| test_data['close'] | |
| ) | |
| print("="*60) | |
| print("FVG 신호 (동적 모드 + Auto Threshold)") | |
| print("="*60) | |
| print(f"전체 FVG: {signal['total_fvgs']}개") | |
| print(f"활성 FVG: {signal['active_fvgs']}개") | |
| print(f"근처 FVG: {signal['nearby_fvgs']}개") | |
| print(f"\n미완성 레벨 (상위 5개):") | |
| for i, level in enumerate(signal['unmitigated_levels'][:5], 1): | |
| print(f" {i}. {level['bias'].upper()}: ${level['level']:.2f} " | |
| f"(강도: {level['strength']:.2f}, 채움: {level['fill_percentage']:.1%})") | |
| if 'target_fvg' in signal: | |
| print(f"\n타겟 FVG:") | |
| print(f" 바이어스: {signal['target_fvg']['bias'].upper()}") | |
| print(f" 상단: ${signal['target_fvg']['top']:.2f}") | |
| print(f" 하단: ${signal['target_fvg']['bottom']:.2f}") | |
| print(f" 채워진 비율: {signal['target_fvg']['fill_percentage']:.1%}") | |
| print(f"\n액션: {signal['action'].upper()}") | |
| print(f"신뢰도: {signal['confidence']:.2%}") | |
| print("="*60) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| ICT Order Flow - 완전 구현 | |
| Phase 14.1 최우선 개선 사항 | |
| 포함 기능: | |
| 1. Killzones (아시아/런던/뉴욕 세션) | |
| 2. OTE (Optimal Trade Entry) 0.618~0.786 | |
| 3. Liquidity Voids 검출 | |
| 4. Market Structure Shifts (MSS) | |
| 5. Displacement 측정 | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timezone | |
| from typing import Dict, List, Tuple, Optional | |
| class ICTOrderFlow: | |
| """ICT (Inner Circle Trader) Order Flow 완전 구현""" | |
| def __init__(self, config: Dict = None): | |
| self.config = config or {} | |
| self.active_voids = [] | |
| self.last_mss = None | |
| def get_current_killzone(self) -> Tuple[str, float]: | |
| """ | |
| 현재 Killzone 및 가중치 반환 | |
| Returns: | |
| (killzone_name, weight): 세션 이름과 거래 가중치 | |
| """ | |
| current_hour = datetime.now(timezone.utc).hour | |
| # 아시아 세션 (00:00-08:00 UTC) - 낮은 변동성 | |
| if 0 <= current_hour < 8: | |
| return 'ASIA', 0.7 | |
| # 런던 오픈 (08:00-09:00 UTC) - 최고 변동성 ⭐ | |
| elif 8 <= current_hour < 9: | |
| return 'LONDON_OPEN', 1.0 | |
| # 런던 세션 (08:00-16:00 UTC) - 높은 변동성 | |
| elif 8 <= current_hour < 16: | |
| return 'LONDON', 0.95 | |
| # 뉴욕 오픈 (13:00-14:00 UTC) - 최고 변동성 ⭐ | |
| elif 13 <= current_hour < 14: | |
| return 'NEW_YORK_OPEN', 1.0 | |
| # 뉴욕 세션 (13:00-21:00 UTC) - 높은 변동성 | |
| elif 13 <= current_hour < 21: | |
| return 'NEW_YORK', 0.9 | |
| # 오프 시간 (21:00-00:00 UTC) - 낮은 변동성 | |
| else: | |
| return 'OFF_HOURS', 0.3 | |
| def calculate_ote_zones( | |
| self, | |
| high: float, | |
| low: float, | |
| trend: str = 'bullish' | |
| ) -> Dict[str, Tuple[float, float]]: | |
| """ | |
| Optimal Trade Entry (OTE) 존 계산 | |
| 피보나치 0.618~0.786 되돌림 구간 | |
| Args: | |
| high: 최근 고점 | |
| low: 최근 저점 | |
| trend: 'bullish' 또는 'bearish' | |
| Returns: | |
| OTE 존 정보 (buy_zone, sell_zone) | |
| """ | |
| range_size = high - low | |
| if trend == 'bullish': | |
| # 상승 추세: 0.618~0.786 되돌림에서 매수 | |
| ote_buy_low = low + (range_size * 0.618) # 61.8% 되돌림 | |
| ote_buy_high = low + (range_size * 0.786) # 78.6% 되돌림 | |
| return { | |
| 'buy_zone': (ote_buy_low, ote_buy_high), | |
| 'bias': 'bullish', | |
| 'range': range_size, | |
| 'confidence': 0.8 | |
| } | |
| else: # bearish | |
| # 하락 추세: 0.236~0.382 되돌림에서 매도 | |
| ote_sell_low = high - (range_size * 0.786) # 78.6% 되돌림 | |
| ote_sell_high = high - (range_size * 0.618) # 61.8% 되돌림 | |
| return { | |
| 'sell_zone': (ote_sell_low, ote_sell_high), | |
| 'bias': 'bearish', | |
| 'range': range_size, | |
| 'confidence': 0.8 | |
| } | |
| def is_price_in_ote_zone( | |
| self, | |
| price: float, | |
| ote_zones: Dict | |
| ) -> Tuple[bool, str]: | |
| """ | |
| 현재 가격이 OTE 존 내부에 있는지 확인 | |
| Args: | |
| price: 현재 가격 | |
| ote_zones: calculate_ote_zones 결과 | |
| Returns: | |
| (is_in_zone, zone_type): OTE 존 내 여부와 존 타입 | |
| """ | |
| if 'buy_zone' in ote_zones: | |
| buy_low, buy_high = ote_zones['buy_zone'] | |
| if buy_low <= price <= buy_high: | |
| return True, 'BUY_OTE' | |
| if 'sell_zone' in ote_zones: | |
| sell_low, sell_high = ote_zones['sell_zone'] | |
| if sell_low <= price <= sell_high: | |
| return True, 'SELL_OTE' | |
| return False, 'NONE' | |
| def detect_liquidity_voids( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| threshold: float = 0.02 | |
| ) -> List[Dict]: | |
| """ | |
| Liquidity Voids (유동성 공백) 검출 | |
| 가격이 빠르게 통과한 구간 (갭) | |
| Args: | |
| high, low, close: 가격 데이터 | |
| threshold: 최소 갭 크기 (기본 2%) | |
| Returns: | |
| Liquidity Voids 리스트 | |
| """ | |
| voids = [] | |
| for i in range(1, len(close)): | |
| # 갭 크기 계산 | |
| gap = abs(close.iloc[i] - close.iloc[i-1]) / close.iloc[i-1] | |
| if gap > threshold: | |
| void = { | |
| 'index': i, | |
| 'start': min(close.iloc[i], close.iloc[i-1]), | |
| 'end': max(close.iloc[i], close.iloc[i-1]), | |
| 'size': gap, | |
| 'direction': 'up' if close.iloc[i] > close.iloc[i-1] else 'down', | |
| 'filled': False, | |
| 'strength': min(gap / threshold, 3.0) # 최대 3.0 | |
| } | |
| voids.append(void) | |
| # 최근 20개만 유지 (성능 최적화) | |
| self.active_voids = voids[-20:] | |
| return self.active_voids | |
| def check_void_fill( | |
| self, | |
| current_price: float, | |
| high: float, | |
| low: float | |
| ) -> Optional[Dict]: | |
| """ | |
| Liquidity Void가 채워졌는지 확인 | |
| Args: | |
| current_price: 현재 가격 | |
| high: 현재 고가 | |
| low: 현재 저가 | |
| Returns: | |
| 채워진 Void 정보 (없으면 None) | |
| """ | |
| for void in self.active_voids: | |
| if void['filled']: | |
| continue | |
| # Void 구간 내부로 가격이 들어왔는지 확인 | |
| if void['start'] <= current_price <= void['end']: | |
| void['filled'] = True | |
| return void | |
| # High/Low가 Void 구간을 완전히 채웠는지 확인 | |
| if (low <= void['start'] and high >= void['end']) or \ | |
| (high >= void['end'] and low <= void['start']): | |
| void['filled'] = True | |
| return void | |
| return None | |
| def detect_market_structure_shift( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| lookback: int = 10 | |
| ) -> Optional[Dict]: | |
| """ | |
| Market Structure Shift (MSS) 검출 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| lookback: 룩백 기간 | |
| Returns: | |
| MSS 정보 (없으면 None) | |
| """ | |
| if len(close) < lookback: | |
| return None | |
| recent_high = high.iloc[-lookback:].max() | |
| recent_low = low.iloc[-lookback:].min() | |
| current_close = close.iloc[-1] | |
| prev_close = close.iloc[-2] | |
| # Bullish MSS: 최근 저점 돌파 후 상승 | |
| if prev_close <= recent_low and current_close > recent_low: | |
| mss = { | |
| 'type': 'BULLISH_MSS', | |
| 'price': recent_low, | |
| 'strength': (current_close - recent_low) / recent_low, | |
| 'timestamp': close.index[-1] if hasattr(close, 'index') else len(close) - 1 | |
| } | |
| self.last_mss = mss | |
| return mss | |
| # Bearish MSS: 최근 고점 돌파 후 하락 | |
| elif prev_close >= recent_high and current_close < recent_high: | |
| mss = { | |
| 'type': 'BEARISH_MSS', | |
| 'price': recent_high, | |
| 'strength': (recent_high - current_close) / recent_high, | |
| 'timestamp': close.index[-1] if hasattr(close, 'index') else len(close) - 1 | |
| } | |
| self.last_mss = mss | |
| return mss | |
| return None | |
| def calculate_displacement( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| volume: pd.Series = None, | |
| period: int = 20 | |
| ) -> Dict: | |
| """ | |
| Displacement (급격한 가격 이동) 측정 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| volume: 거래량 (선택) | |
| period: 평균 계산 기간 | |
| Returns: | |
| Displacement 정보 | |
| """ | |
| # 평균 봉 크기 계산 | |
| avg_range = (high - low).rolling(period).mean().iloc[-1] | |
| current_range = high.iloc[-1] - low.iloc[-1] | |
| # 평균 가격 변화 | |
| avg_change = close.diff().abs().rolling(period).mean().iloc[-1] | |
| current_change = abs(close.iloc[-1] - close.iloc[-2]) | |
| # Displacement 강도 | |
| range_ratio = current_range / avg_range if avg_range > 0 else 0 | |
| change_ratio = current_change / avg_change if avg_change > 0 else 0 | |
| # 거래량 스파이크 확인 (있는 경우) | |
| volume_spike = 1.0 | |
| if volume is not None and len(volume) >= period: | |
| avg_volume = volume.rolling(period).mean().iloc[-1] | |
| current_volume = volume.iloc[-1] | |
| volume_spike = current_volume / avg_volume if avg_volume > 0 else 1.0 | |
| # Displacement 판단 (평균의 2배 이상) | |
| is_displacement = (range_ratio >= 2.0 or change_ratio >= 2.0) and volume_spike >= 1.5 | |
| direction = 'up' if close.iloc[-1] > close.iloc[-2] else 'down' | |
| return { | |
| 'is_displacement': is_displacement, | |
| 'direction': direction, | |
| 'range_ratio': range_ratio, | |
| 'change_ratio': change_ratio, | |
| 'volume_spike': volume_spike, | |
| 'strength': min((range_ratio + change_ratio + volume_spike) / 3, 5.0) | |
| } | |
| def generate_ict_signal( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| volume: pd.Series = None | |
| ) -> Dict: | |
| """ | |
| 종합 ICT Order Flow 신호 생성 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| volume: 거래량 (선택) | |
| Returns: | |
| ICT 신호 정보 | |
| """ | |
| # 1. Killzone 확인 | |
| killzone, killzone_weight = self.get_current_killzone() | |
| # 2. 추세 판단 (단순화) | |
| trend = 'bullish' if close.iloc[-1] > close.iloc[-20:].mean() else 'bearish' | |
| # 3. OTE 존 계산 | |
| recent_high = high.iloc[-50:].max() | |
| recent_low = low.iloc[-50:].min() | |
| ote_zones = self.calculate_ote_zones(recent_high, recent_low, trend) | |
| # 4. 현재 가격이 OTE 존에 있는지 확인 | |
| current_price = close.iloc[-1] | |
| in_ote, ote_type = self.is_price_in_ote_zone(current_price, ote_zones) | |
| # 5. Liquidity Voids 검출 | |
| voids = self.detect_liquidity_voids(high, low, close) | |
| filled_void = self.check_void_fill(current_price, high.iloc[-1], low.iloc[-1]) | |
| # 6. MSS 검출 | |
| mss = self.detect_market_structure_shift(high, low, close) | |
| # 7. Displacement 측정 | |
| displacement = self.calculate_displacement(high, low, close, volume) | |
| # 8. 신호 생성 | |
| signal = { | |
| 'killzone': killzone, | |
| 'killzone_weight': killzone_weight, | |
| 'trend': trend, | |
| 'ote_zones': ote_zones, | |
| 'in_ote': in_ote, | |
| 'ote_type': ote_type, | |
| 'liquidity_voids': len([v for v in voids if not v['filled']]), | |
| 'filled_void': filled_void, | |
| 'mss': mss, | |
| 'displacement': displacement, | |
| 'action': 'hold', | |
| 'confidence': 0.5 | |
| } | |
| # 9. 매수/매도 판단 | |
| confidence = 0.5 | |
| # 강력한 매수 신호 | |
| if (in_ote and ote_type == 'BUY_OTE' and | |
| trend == 'bullish' and | |
| killzone_weight >= 0.9 and | |
| (mss and mss['type'] == 'BULLISH_MSS' or displacement['direction'] == 'up')): | |
| signal['action'] = 'buy' | |
| confidence = 0.85 * killzone_weight | |
| # 강력한 매도 신호 | |
| elif (in_ote and ote_type == 'SELL_OTE' and | |
| trend == 'bearish' and | |
| killzone_weight >= 0.9 and | |
| (mss and mss['type'] == 'BEARISH_MSS' or displacement['direction'] == 'down')): | |
| signal['action'] = 'sell' | |
| confidence = 0.85 * killzone_weight | |
| # Liquidity Void 채워짐 신호 (보조) | |
| elif filled_void: | |
| if filled_void['direction'] == 'up' and trend == 'bullish': | |
| signal['action'] = 'buy' | |
| confidence = 0.7 * killzone_weight | |
| elif filled_void['direction'] == 'down' and trend == 'bearish': | |
| signal['action'] = 'sell' | |
| confidence = 0.7 * killzone_weight | |
| signal['confidence'] = confidence | |
| return signal | |
| # 사용 예시 | |
| if __name__ == '__main__': | |
| # 테스트 데이터 생성 | |
| np.random.seed(42) | |
| dates = pd.date_range('2024-01-01', periods=100, freq='1H') | |
| test_data = pd.DataFrame({ | |
| 'high': np.cumsum(np.random.randn(100) * 10) + 50000, | |
| 'low': np.cumsum(np.random.randn(100) * 10) + 49800, | |
| 'close': np.cumsum(np.random.randn(100) * 10) + 49900, | |
| 'volume': np.random.randint(100, 1000, 100) | |
| }, index=dates) | |
| # ICT Order Flow 초기화 | |
| ict = ICTOrderFlow() | |
| # 신호 생성 | |
| signal = ict.generate_ict_signal( | |
| test_data['high'], | |
| test_data['low'], | |
| test_data['close'], | |
| test_data['volume'] | |
| ) | |
| print("="*60) | |
| print("ICT Order Flow 신호") | |
| print("="*60) | |
| print(f"Killzone: {signal['killzone']} (가중치: {signal['killzone_weight']})") | |
| print(f"추세: {signal['trend']}") | |
| print(f"OTE 존 내부: {signal['in_ote']} ({signal['ote_type']})") | |
| print(f"활성 Liquidity Voids: {signal['liquidity_voids']}개") | |
| print(f"MSS: {signal['mss']['type'] if signal['mss'] else 'None'}") | |
| print(f"Displacement: {signal['displacement']['is_displacement']}") | |
| print(f"\n액션: {signal['action'].upper()}") | |
| print(f"신뢰도: {signal['confidence']:.2%}") | |
| print("="*60) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Phase 14.1 통합 트레이딩 전략 | |
| 모든 개선사항 포함 | |
| 통합 지표: | |
| 1. ICT Order Flow (완전 재구현) | |
| 2. SMC Internal Structure (내부 구조 필터링) | |
| 3. FVG Dynamic (동적 모드 + Auto Threshold) | |
| 4. Supply & Demand MTF (다중 타임프레임) | |
| 5. Premium/Discount 8단계 + Confirmation | |
| 기존 지표와 결합하여 최종 신호 생성 | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Optional | |
| from dataclasses import dataclass | |
| @dataclass | |
| class TradingSignal: | |
| """거래 신호 데이터 클래스""" | |
| action: str # 'buy', 'sell', 'hold' | |
| confidence: float # 0.0 ~ 1.0 | |
| entry_price: float | |
| stop_loss: float | |
| take_profit: float | |
| leverage: float | |
| indicators_used: List[str] | |
| signal_strength: Dict[str, float] | |
| def to_dict(self) -> Dict: | |
| return { | |
| 'action': self.action, | |
| 'confidence': self.confidence, | |
| 'entry_price': self.entry_price, | |
| 'stop_loss': self.stop_loss, | |
| 'take_profit': self.take_profit, | |
| 'leverage': self.leverage, | |
| 'indicators_used': self.indicators_used, | |
| 'signal_strength': self.signal_strength | |
| } | |
| class IntegratedPhase14Strategy: | |
| """Phase 14.1 통합 전략""" | |
| def __init__(self, config: Dict = None): | |
| self.config = config or {} | |
| # 각 지표 모듈 초기화 (여기서는 간소화) | |
| self.weights = { | |
| 'ict_order_flow': 0.25, # 최고 가중치 | |
| 'smc': 0.25, # 최고 가중치 | |
| 'fvg': 0.20, | |
| 'supply_demand': 0.15, | |
| 'premium_discount': 0.15 | |
| } | |
| # 리스크 관리 | |
| self.max_leverage = self.config.get('max_leverage', 15) | |
| self.base_leverage = self.config.get('base_leverage', 10) | |
| self.risk_per_trade = self.config.get('risk_per_trade', 0.02) # 2% | |
| def calculate_atr( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| period: int = 14 | |
| ) -> float: | |
| """ATR 계산 (손절/이익실현용)""" | |
| tr1 = high - low | |
| tr2 = abs(high - close.shift(1)) | |
| tr3 = abs(low - close.shift(1)) | |
| tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) | |
| atr = tr.rolling(period).mean().iloc[-1] | |
| return atr if not pd.isna(atr) else (high.iloc[-1] - low.iloc[-1]) | |
| def aggregate_signals( | |
| self, | |
| ict_signal: Dict, | |
| smc_signal: Dict, | |
| fvg_signal: Dict, | |
| sd_signal: Dict, | |
| pd_signal: Dict | |
| ) -> Dict: | |
| """ | |
| 모든 지표 신호 통합 | |
| 가중 평균으로 최종 신호 결정 | |
| Returns: | |
| 통합 신호 | |
| """ | |
| signals = { | |
| 'ict_order_flow': ict_signal, | |
| 'smc': smc_signal, | |
| 'fvg': fvg_signal, | |
| 'supply_demand': sd_signal, | |
| 'premium_discount': pd_signal | |
| } | |
| # 각 지표의 액션을 점수화 | |
| action_scores = {'buy': 0, 'sell': 0, 'hold': 0} | |
| weighted_confidence = 0 | |
| for indicator, signal in signals.items(): | |
| weight = self.weights[indicator] | |
| action = signal.get('action', 'hold') | |
| confidence = signal.get('confidence', 0.5) | |
| # 액션 점수 누적 | |
| action_scores[action] += weight * confidence | |
| weighted_confidence += weight * confidence | |
| # 최종 액션 결정 | |
| final_action = max(action_scores, key=action_scores.get) | |
| final_confidence = action_scores[final_action] | |
| # Hold 신호가 너무 강하면 거래 안 함 | |
| if action_scores['hold'] > 0.6: | |
| final_action = 'hold' | |
| final_confidence = action_scores['hold'] | |
| # 개별 지표 강도 | |
| signal_strength = { | |
| indicator: signals[indicator].get('confidence', 0.5) | |
| for indicator in signals | |
| } | |
| return { | |
| 'action': final_action, | |
| 'confidence': final_confidence, | |
| 'action_scores': action_scores, | |
| 'signal_strength': signal_strength, | |
| 'used_indicators': list(signals.keys()) | |
| } | |
| def calculate_position_sizing( | |
| self, | |
| confidence: float, | |
| current_price: float, | |
| stop_loss_distance: float, | |
| account_balance: float = 100.0 | |
| ) -> Dict: | |
| """ | |
| 포지션 사이징 및 레버리지 계산 | |
| Args: | |
| confidence: 신호 신뢰도 | |
| current_price: 현재 가격 | |
| stop_loss_distance: 손절까지 거리 | |
| account_balance: 계좌 잔액 | |
| Returns: | |
| 포지션 정보 | |
| """ | |
| # 신뢰도 기반 레버리지 조정 | |
| if confidence >= 0.85: | |
| leverage = self.max_leverage # 15x | |
| elif confidence >= 0.75: | |
| leverage = self.base_leverage + 3 # 13x | |
| elif confidence >= 0.65: | |
| leverage = self.base_leverage # 10x | |
| else: | |
| leverage = self.base_leverage - 2 # 8x | |
| # 리스크 금액 | |
| risk_amount = account_balance * self.risk_per_trade | |
| # 포지션 크기 (USD) | |
| if stop_loss_distance > 0: | |
| position_size = (risk_amount / stop_loss_distance) * current_price | |
| else: | |
| position_size = account_balance * 0.1 # 기본 10% | |
| # 레버리지 적용 후 최대 포지션 | |
| max_position = account_balance * leverage | |
| position_size = min(position_size, max_position) | |
| return { | |
| 'leverage': leverage, | |
| 'position_size_usd': position_size, | |
| 'position_size_coins': position_size / current_price, | |
| 'risk_amount': risk_amount | |
| } | |
| def calculate_stop_loss_take_profit( | |
| self, | |
| action: str, | |
| entry_price: float, | |
| atr: float, | |
| confidence: float | |
| ) -> Tuple[float, float]: | |
| """ | |
| 손절/이익실현 계산 | |
| Args: | |
| action: 'buy' or 'sell' | |
| entry_price: 진입 가격 | |
| atr: ATR 값 | |
| confidence: 신뢰도 | |
| Returns: | |
| (stop_loss, take_profit) | |
| """ | |
| # ATR 배수 (신뢰도 기반) | |
| if confidence >= 0.8: | |
| sl_multiplier = 1.5 # 타이트한 손절 | |
| tp_multiplier = 3.0 # 넉넉한 이익 | |
| else: | |
| sl_multiplier = 2.0 | |
| tp_multiplier = 2.5 | |
| if action == 'buy': | |
| stop_loss = entry_price - (atr * sl_multiplier) | |
| take_profit = entry_price + (atr * tp_multiplier) | |
| else: # sell | |
| stop_loss = entry_price + (atr * sl_multiplier) | |
| take_profit = entry_price - (atr * tp_multiplier) | |
| return stop_loss, take_profit | |
| def generate_final_signal( | |
| self, | |
| open_prices: pd.Series, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| volume: pd.Series = None, | |
| # 간소화: 각 지표의 계산된 신호를 직접 받음 | |
| ict_signal: Dict = None, | |
| smc_signal: Dict = None, | |
| fvg_signal: Dict = None, | |
| sd_signal: Dict = None, | |
| pd_signal: Dict = None, | |
| account_balance: float = 100.0 | |
| ) -> TradingSignal: | |
| """ | |
| 최종 거래 신호 생성 | |
| Args: | |
| open, high, low, close, volume: 가격 데이터 | |
| *_signal: 각 지표의 계산된 신호 | |
| account_balance: 계좌 잔액 | |
| Returns: | |
| 최종 거래 신호 | |
| """ | |
| # 1. 기본값 설정 (신호가 없는 경우) | |
| default_signal = {'action': 'hold', 'confidence': 0.5} | |
| ict_signal = ict_signal or default_signal | |
| smc_signal = smc_signal or default_signal | |
| fvg_signal = fvg_signal or default_signal | |
| sd_signal = sd_signal or default_signal | |
| pd_signal = pd_signal or default_signal | |
| # 2. 신호 통합 | |
| aggregated = self.aggregate_signals( | |
| ict_signal, smc_signal, fvg_signal, sd_signal, pd_signal | |
| ) | |
| action = aggregated['action'] | |
| confidence = aggregated['confidence'] | |
| # 3. Hold 신호면 거래하지 않음 | |
| if action == 'hold' or confidence < 0.6: | |
| return TradingSignal( | |
| action='hold', | |
| confidence=confidence, | |
| entry_price=close.iloc[-1], | |
| stop_loss=0, | |
| take_profit=0, | |
| leverage=0, | |
| indicators_used=aggregated['used_indicators'], | |
| signal_strength=aggregated['signal_strength'] | |
| ) | |
| # 4. ATR 계산 | |
| atr = self.calculate_atr(high, low, close) | |
| # 5. 진입가, 손절, 이익실현 | |
| entry_price = close.iloc[-1] | |
| stop_loss, take_profit = self.calculate_stop_loss_take_profit( | |
| action, entry_price, atr, confidence | |
| ) | |
| # 6. 포지션 사이징 | |
| sl_distance = abs(entry_price - stop_loss) / entry_price | |
| position = self.calculate_position_sizing( | |
| confidence, entry_price, sl_distance, account_balance | |
| ) | |
| # 7. 최종 신호 생성 | |
| final_signal = TradingSignal( | |
| action=action, | |
| confidence=confidence, | |
| entry_price=entry_price, | |
| stop_loss=stop_loss, | |
| take_profit=take_profit, | |
| leverage=position['leverage'], | |
| indicators_used=aggregated['used_indicators'], | |
| signal_strength=aggregated['signal_strength'] | |
| ) | |
| return final_signal | |
| def print_signal_summary(self, signal: TradingSignal): | |
| """신호 요약 출력""" | |
| print("="*70) | |
| print("🚀 Phase 14.1 통합 트레이딩 신호") | |
| print("="*70) | |
| print(f"액션: {signal.action.upper()}") | |
| print(f"신뢰도: {signal.confidence:.2%}") | |
| print(f"\n진입 가격: ${signal.entry_price:.2f}") | |
| print(f"손절 가격: ${signal.stop_loss:.2f}") | |
| print(f"이익실현: ${signal.take_profit:.2f}") | |
| print(f"레버리지: {signal.leverage:.0f}x") | |
| print(f"\n사용된 지표: {', '.join(signal.indicators_used)}") | |
| print(f"\n지표별 신호 강도:") | |
| for indicator, strength in signal.signal_strength.items(): | |
| bar = '█' * int(strength * 20) | |
| print(f" {indicator:20s}: {bar:20s} {strength:.2%}") | |
| # 손익 비율 | |
| if signal.action != 'hold': | |
| risk = abs(signal.entry_price - signal.stop_loss) | |
| reward = abs(signal.take_profit - signal.entry_price) | |
| risk_reward = reward / risk if risk > 0 else 0 | |
| print(f"\n리스크/리워드 비율: 1:{risk_reward:.2f}") | |
| print("="*70) | |
| # 사용 예시 | |
| if __name__ == '__main__': | |
| # 테스트 데이터 | |
| np.random.seed(42) | |
| dates = pd.date_range('2024-01-01', periods=100, freq='1H') | |
| test_data = pd.DataFrame({ | |
| 'open': np.cumsum(np.random.randn(100) * 10) + 49850, | |
| 'high': np.cumsum(np.random.randn(100) * 10) + 50000, | |
| 'low': np.cumsum(np.random.randn(100) * 10) + 49800, | |
| 'close': np.cumsum(np.random.randn(100) * 10) + 49900, | |
| 'volume': np.random.randint(100, 1000, 100) | |
| }, index=dates) | |
| # 전략 초기화 | |
| strategy = IntegratedPhase14Strategy({ | |
| 'max_leverage': 15, | |
| 'base_leverage': 10, | |
| 'risk_per_trade': 0.02 | |
| }) | |
| # 모의 신호 (실제로는 각 지표 모듈에서 계산) | |
| mock_signals = { | |
| 'ict_signal': {'action': 'buy', 'confidence': 0.85}, | |
| 'smc_signal': {'action': 'buy', 'confidence': 0.80}, | |
| 'fvg_signal': {'action': 'buy', 'confidence': 0.75}, | |
| 'sd_signal': {'action': 'buy', 'confidence': 0.70}, | |
| 'pd_signal': {'action': 'buy', 'confidence': 0.78} | |
| } | |
| # 최종 신호 생성 | |
| final_signal = strategy.generate_final_signal( | |
| test_data['open'], | |
| test_data['high'], | |
| test_data['low'], | |
| test_data['close'], | |
| test_data['volume'], | |
| **mock_signals, | |
| account_balance=100.0 | |
| ) | |
| # 결과 출력 | |
| strategy.print_signal_summary(final_signal) | |
| # JSON 형식으로도 출력 가능 | |
| print("\nJSON 형식:") | |
| import json | |
| print(json.dumps(final_signal.to_dict(), indent=2)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Smart Money Concepts (SMC) - 내부 구조 필터링 | |
| Phase 14.1 긴급 수정 | |
| 추가 기능: | |
| 1. 내부 구조 (Internal Structure) - 5분봉 기반 | |
| 2. Confluence 필터링 (internalFilterConfluence) | |
| 3. Order Block 갱신 로직 | |
| 4. Premium/Discount 존 통합 | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Optional | |
| from dataclasses import dataclass | |
| @dataclass | |
| class OrderBlock: | |
| """Order Block 데이터 클래스""" | |
| type: str # 'bullish' or 'bearish' | |
| top: float | |
| bottom: float | |
| timestamp: int | |
| strength: float | |
| mitigated: bool = False | |
| internal: bool = False # 내부 구조 여부 | |
| @dataclass | |
| class StructurePoint: | |
| """구조 포인트 (BOS/CHoCH)""" | |
| type: str # 'BOS' or 'CHoCH' | |
| direction: str # 'bullish' or 'bearish' | |
| price: float | |
| timestamp: int | |
| internal: bool = False | |
| class SMCInternalStructure: | |
| """SMC 내부 구조 완전 구현""" | |
| def __init__(self, config: Dict = None): | |
| self.config = config or {} | |
| self.order_blocks = [] | |
| self.structure_points = [] | |
| self.last_swing_high = None | |
| self.last_swing_low = None | |
| # 설정 | |
| self.swing_period = self.config.get('swing_period', 10) | |
| self.internal_period = self.config.get('internal_period', 5) | |
| self.internal_filter_confluence = self.config.get('internal_filter_confluence', True) | |
| self.atr_multiplier = self.config.get('atr_multiplier', 2.0) | |
| def calculate_atr( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| period: int = 14 | |
| ) -> pd.Series: | |
| """Average True Range (ATR) 계산""" | |
| tr1 = high - low | |
| tr2 = abs(high - close.shift(1)) | |
| tr3 = abs(low - close.shift(1)) | |
| tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) | |
| atr = tr.rolling(period).mean() | |
| return atr | |
| def detect_swing_points( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| period: int = 10 | |
| ) -> Tuple[Optional[float], Optional[float]]: | |
| """ | |
| Swing High/Low 검출 | |
| Args: | |
| high, low: 가격 데이터 | |
| period: 스윙 기간 | |
| Returns: | |
| (swing_high, swing_low) | |
| """ | |
| if len(high) < period * 2 + 1: | |
| return None, None | |
| # Swing High: 중심이 좌우보다 높음 | |
| center_idx = -period - 1 | |
| center_high = high.iloc[center_idx] | |
| left_highs = high.iloc[center_idx - period:center_idx] | |
| right_highs = high.iloc[center_idx + 1:center_idx + period + 1] | |
| swing_high = None | |
| if (center_high > left_highs.max() and | |
| center_high > right_highs.max()): | |
| swing_high = center_high | |
| # Swing Low: 중심이 좌우보다 낮음 | |
| center_low = low.iloc[center_idx] | |
| left_lows = low.iloc[center_idx - period:center_idx] | |
| right_lows = low.iloc[center_idx + 1:center_idx + period + 1] | |
| swing_low = None | |
| if (center_low < left_lows.min() and | |
| center_low < right_lows.min()): | |
| swing_low = center_low | |
| return swing_high, swing_low | |
| def detect_bos_choch( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| internal: bool = False | |
| ) -> Optional[StructurePoint]: | |
| """ | |
| Break of Structure (BOS) 및 Change of Character (CHoCH) 검출 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| internal: 내부 구조 검출 여부 | |
| Returns: | |
| StructurePoint 또는 None | |
| """ | |
| period = self.internal_period if internal else self.swing_period | |
| swing_high, swing_low = self.detect_swing_points(high, low, period) | |
| current_close = close.iloc[-1] | |
| prev_close = close.iloc[-2] | |
| # Bullish BOS: 최근 고점 돌파 | |
| if swing_high and self.last_swing_high: | |
| if prev_close <= self.last_swing_high and current_close > self.last_swing_high: | |
| structure = StructurePoint( | |
| type='BOS', | |
| direction='bullish', | |
| price=self.last_swing_high, | |
| timestamp=len(close) - 1, | |
| internal=internal | |
| ) | |
| self.structure_points.append(structure) | |
| return structure | |
| # Bearish BOS: 최근 저점 돌파 | |
| if swing_low and self.last_swing_low: | |
| if prev_close >= self.last_swing_low and current_close < self.last_swing_low: | |
| structure = StructurePoint( | |
| type='BOS', | |
| direction='bearish', | |
| price=self.last_swing_low, | |
| timestamp=len(close) - 1, | |
| internal=internal | |
| ) | |
| self.structure_points.append(structure) | |
| return structure | |
| # CHoCH 검출 (추세 전환) | |
| # Bullish CHoCH: 하락 추세 중 고점 돌파 | |
| if (swing_low and self.last_swing_low and | |
| swing_low < self.last_swing_low and # 하락 추세 확인 | |
| swing_high and current_close > swing_high): | |
| structure = StructurePoint( | |
| type='CHoCH', | |
| direction='bullish', | |
| price=swing_high, | |
| timestamp=len(close) - 1, | |
| internal=internal | |
| ) | |
| self.structure_points.append(structure) | |
| return structure | |
| # Bearish CHoCH: 상승 추세 중 저점 돌파 | |
| if (swing_high and self.last_swing_high and | |
| swing_high > self.last_swing_high and # 상승 추세 확인 | |
| swing_low and current_close < swing_low): | |
| structure = StructurePoint( | |
| type='CHoCH', | |
| direction='bearish', | |
| price=swing_low, | |
| timestamp=len(close) - 1, | |
| internal=internal | |
| ) | |
| self.structure_points.append(structure) | |
| return structure | |
| # Swing 포인트 업데이트 | |
| if swing_high: | |
| self.last_swing_high = swing_high | |
| if swing_low: | |
| self.last_swing_low = swing_low | |
| return None | |
| def detect_order_block( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| structure: StructurePoint | |
| ) -> Optional[OrderBlock]: | |
| """ | |
| Order Block 검출 | |
| BOS/CHoCH 전 마지막 반대 방향 캔들 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| structure: BOS/CHoCH 구조 포인트 | |
| Returns: | |
| OrderBlock 또는 None | |
| """ | |
| if structure.direction == 'bullish': | |
| # Bullish OB: BOS 전 마지막 bearish 캔들 | |
| for i in range(len(close) - 1, max(0, len(close) - 20), -1): | |
| if close.iloc[i] < close.iloc[i - 1]: # Bearish 캔들 | |
| ob = OrderBlock( | |
| type='bullish', | |
| top=high.iloc[i], | |
| bottom=low.iloc[i], | |
| timestamp=i, | |
| strength=(high.iloc[i] - low.iloc[i]) / low.iloc[i], | |
| internal=structure.internal | |
| ) | |
| return ob | |
| else: # bearish | |
| # Bearish OB: BOS 전 마지막 bullish 캔들 | |
| for i in range(len(close) - 1, max(0, len(close) - 20), -1): | |
| if close.iloc[i] > close.iloc[i - 1]: # Bullish 캔들 | |
| ob = OrderBlock( | |
| type='bearish', | |
| top=high.iloc[i], | |
| bottom=low.iloc[i], | |
| timestamp=i, | |
| strength=(high.iloc[i] - low.iloc[i]) / low.iloc[i], | |
| internal=structure.internal | |
| ) | |
| return ob | |
| return None | |
| def apply_confluence_filter( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| structure: Optional[StructurePoint] | |
| ) -> bool: | |
| """ | |
| Confluence 필터 적용 | |
| 고변동성 바(ATR 2배 이상)에서 발생한 내부 구조 무시 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| structure: 구조 포인트 | |
| Returns: | |
| 필터 통과 여부 (True = 통과, False = 무시) | |
| """ | |
| if not self.internal_filter_confluence or not structure or not structure.internal: | |
| return True | |
| # ATR 계산 | |
| atr = self.calculate_atr(high, low, close) | |
| if atr.iloc[-1] == 0: | |
| return True | |
| # 현재 바의 범위 | |
| current_range = high.iloc[-1] - low.iloc[-1] | |
| # 고변동성 바 감지 (ATR의 2배 이상) | |
| is_high_volatility = current_range >= (self.atr_multiplier * atr.iloc[-1]) | |
| if is_high_volatility: | |
| # 고변동성 바에서 발생한 내부 구조는 무시 | |
| return False | |
| return True | |
| def update_order_blocks( | |
| self, | |
| current_price: float, | |
| high: float, | |
| low: float | |
| ): | |
| """ | |
| Order Block 무효화 (Mitigation) 확인 | |
| 가격이 OB를 완전히 통과하면 무효화 | |
| Args: | |
| current_price: 현재 가격 | |
| high: 현재 고가 | |
| low: 현재 저가 | |
| """ | |
| for ob in self.order_blocks: | |
| if ob.mitigated: | |
| continue | |
| if ob.type == 'bullish': | |
| # 가격이 OB 하단 아래로 떨어지면 무효화 | |
| if low < ob.bottom: | |
| ob.mitigated = True | |
| else: # bearish | |
| # 가격이 OB 상단 위로 올라가면 무효화 | |
| if high > ob.top: | |
| ob.mitigated = True | |
| def calculate_premium_discount( | |
| self, | |
| high: float, | |
| low: float, | |
| current_price: float | |
| ) -> Dict: | |
| """ | |
| Premium/Discount 존 계산 (8단계) | |
| Args: | |
| high: 최근 고점 | |
| low: 최근 저점 | |
| current_price: 현재 가격 | |
| Returns: | |
| Premium/Discount 정보 | |
| """ | |
| range_size = high - low | |
| if range_size == 0: | |
| return {'zone': 'equilibrium', 'level': 0.5} | |
| # 현재 가격 위치 (0~1) | |
| position = (current_price - low) / range_size | |
| # 8단계 구분 | |
| if position >= 0.95: | |
| zone = 'extreme_premium' | |
| bias = 'strong_sell' | |
| elif position >= 0.75: | |
| zone = 'premium' | |
| bias = 'sell' | |
| elif position >= 0.55: | |
| zone = 'slight_premium' | |
| bias = 'neutral_sell' | |
| elif position >= 0.45: | |
| zone = 'equilibrium' | |
| bias = 'neutral' | |
| elif position >= 0.25: | |
| zone = 'slight_discount' | |
| bias = 'neutral_buy' | |
| elif position >= 0.05: | |
| zone = 'discount' | |
| bias = 'buy' | |
| else: | |
| zone = 'extreme_discount' | |
| bias = 'strong_buy' | |
| return { | |
| 'zone': zone, | |
| 'level': position, | |
| 'bias': bias, | |
| 'high': high, | |
| 'low': low | |
| } | |
| def generate_smc_signal( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series | |
| ) -> Dict: | |
| """ | |
| 종합 SMC 신호 생성 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| Returns: | |
| SMC 신호 정보 | |
| """ | |
| # 1. 외부 구조 (Swing 기반) | |
| external_structure = self.detect_bos_choch(high, low, close, internal=False) | |
| # 2. 내부 구조 (5분봉 기반) | |
| internal_structure = self.detect_bos_choch(high, low, close, internal=True) | |
| # 3. Confluence 필터 적용 | |
| internal_valid = self.apply_confluence_filter( | |
| high, low, close, internal_structure | |
| ) | |
| # 4. Order Block 검출 | |
| if external_structure: | |
| ob = self.detect_order_block(high, low, close, external_structure) | |
| if ob: | |
| self.order_blocks.append(ob) | |
| if internal_structure and internal_valid: | |
| ob = self.detect_order_block(high, low, close, internal_structure) | |
| if ob: | |
| self.order_blocks.append(ob) | |
| # 5. Order Block 업데이트 | |
| current_price = close.iloc[-1] | |
| self.update_order_blocks(current_price, high.iloc[-1], low.iloc[-1]) | |
| # 6. Premium/Discount 계산 | |
| recent_high = high.iloc[-50:].max() | |
| recent_low = low.iloc[-50:].min() | |
| pd_zone = self.calculate_premium_discount(recent_high, recent_low, current_price) | |
| # 7. 활성 Order Blocks | |
| active_obs = [ob for ob in self.order_blocks if not ob.mitigated][-10:] | |
| # 8. 신호 생성 | |
| signal = { | |
| 'external_structure': external_structure, | |
| 'internal_structure': internal_structure if internal_valid else None, | |
| 'active_order_blocks': len(active_obs), | |
| 'premium_discount': pd_zone, | |
| 'action': 'hold', | |
| 'confidence': 0.5 | |
| } | |
| # 9. 매수/매도 판단 | |
| confidence = 0.5 | |
| # 강력한 매수 신호 | |
| if (external_structure and external_structure.direction == 'bullish' and | |
| pd_zone['bias'] in ['strong_buy', 'buy'] and | |
| any(ob.type == 'bullish' and not ob.mitigated for ob in active_obs)): | |
| signal['action'] = 'buy' | |
| confidence = 0.8 | |
| # 강력한 매도 신호 | |
| elif (external_structure and external_structure.direction == 'bearish' and | |
| pd_zone['bias'] in ['strong_sell', 'sell'] and | |
| any(ob.type == 'bearish' and not ob.mitigated for ob in active_obs)): | |
| signal['action'] = 'sell' | |
| confidence = 0.8 | |
| # 내부 구조 보조 신호 | |
| elif internal_structure and internal_valid: | |
| if internal_structure.direction == 'bullish' and pd_zone['bias'] in ['buy', 'neutral_buy']: | |
| signal['action'] = 'buy' | |
| confidence = 0.65 | |
| elif internal_structure.direction == 'bearish' and pd_zone['bias'] in ['sell', 'neutral_sell']: | |
| signal['action'] = 'sell' | |
| confidence = 0.65 | |
| signal['confidence'] = confidence | |
| return signal | |
| # 사용 예시 | |
| if __name__ == '__main__': | |
| # 테스트 데이터 | |
| np.random.seed(42) | |
| dates = pd.date_range('2024-01-01', periods=100, freq='5T') | |
| test_data = pd.DataFrame({ | |
| 'high': np.cumsum(np.random.randn(100) * 10) + 50000, | |
| 'low': np.cumsum(np.random.randn(100) * 10) + 49800, | |
| 'close': np.cumsum(np.random.randn(100) * 10) + 49900, | |
| }, index=dates) | |
| # SMC 초기화 | |
| smc = SMCInternalStructure({ | |
| 'swing_period': 10, | |
| 'internal_period': 5, | |
| 'internal_filter_confluence': True, | |
| 'atr_multiplier': 2.0 | |
| }) | |
| # 신호 생성 | |
| signal = smc.generate_smc_signal( | |
| test_data['high'], | |
| test_data['low'], | |
| test_data['close'] | |
| ) | |
| print("="*60) | |
| print("SMC 신호 (내부 구조 필터링 포함)") | |
| print("="*60) | |
| print(f"외부 구조: {signal['external_structure'].type if signal['external_structure'] else 'None'}") | |
| print(f"내부 구조: {signal['internal_structure'].type if signal['internal_structure'] else 'None (필터링됨)'}") | |
| print(f"활성 Order Blocks: {signal['active_order_blocks']}개") | |
| print(f"Premium/Discount: {signal['premium_discount']['zone']} ({signal['premium_discount']['level']:.2%})") | |
| print(f"바이어스: {signal['premium_discount']['bias']}") | |
| print(f"\n액션: {signal['action'].upper()}") | |
| print(f"신뢰도: {signal['confidence']:.2%}") | |
| print("="*60) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Supply & Demand - Multi-Timeframe (MTF) 구현 | |
| Phase 14.2 고효율 개선 | |
| 추가 기능: | |
| 1. 다중 타임프레임 S/D 존 검출 | |
| 2. Confluence (합류) 분석 | |
| 3. 존 강도 및 우선순위 | |
| 4. 자동 존 업데이트 | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Optional | |
| from dataclasses import dataclass | |
| @dataclass | |
| class SupplyDemandZone: | |
| """Supply/Demand 존 데이터 클래스""" | |
| type: str # 'supply' or 'demand' | |
| top: float | |
| bottom: float | |
| timestamp: int | |
| timeframe: str # '5m', '15m', '1h', etc. | |
| strength: float | |
| touches: int = 0 | |
| broken: bool = False | |
| confluence_score: int = 1 # 여러 타임프레임에서 겹치는 횟수 | |
| class SupplyDemandMTF: | |
| """Supply & Demand Multi-Timeframe 완전 구현""" | |
| def __init__(self, config: Dict = None): | |
| self.config = config or {} | |
| self.zones = { | |
| '5m': [], | |
| '15m': [], | |
| '1h': [], | |
| '4h': [] | |
| } | |
| # 설정 | |
| self.timeframes = self.config.get('timeframes', ['5m', '15m', '1h']) | |
| self.zone_strength_threshold = self.config.get('zone_strength_threshold', 0.02) | |
| self.confluence_distance = self.config.get('confluence_distance', 0.01) # 1% | |
| self.max_zones_per_tf = self.config.get('max_zones_per_tf', 10) | |
| def detect_supply_zone( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| lookback: int = 20 | |
| ) -> Optional[SupplyDemandZone]: | |
| """ | |
| Supply Zone (공급 존) 검출 | |
| 하락 전 마지막 상승 구간 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| lookback: 룩백 기간 | |
| Returns: | |
| Supply Zone 또는 None | |
| """ | |
| if len(close) < lookback + 10: | |
| return None | |
| # 최근 고점 찾기 | |
| recent_high_idx = high.iloc[-lookback:].idxmax() | |
| recent_high = high.loc[recent_high_idx] | |
| # 고점 이후 하락 확인 (최소 5%) | |
| subsequent_low = low.iloc[-lookback:].loc[recent_high_idx:].min() | |
| if subsequent_low < recent_high * 0.95: # 5% 이상 하락 | |
| # Supply Zone = 고점 형성 전 상승 구간 | |
| zone_idx = high.iloc[-lookback:].tolist().index(recent_high) | |
| if zone_idx > 0: | |
| zone_high = high.iloc[-lookback + zone_idx] | |
| zone_low = low.iloc[-lookback + zone_idx - 1] | |
| # 존 강도 = 이후 하락 폭 | |
| strength = (recent_high - subsequent_low) / recent_high | |
| if strength >= self.zone_strength_threshold: | |
| return SupplyDemandZone( | |
| type='supply', | |
| top=zone_high, | |
| bottom=zone_low, | |
| timestamp=len(close) - lookback + zone_idx, | |
| timeframe='unknown', | |
| strength=strength | |
| ) | |
| return None | |
| def detect_demand_zone( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| lookback: int = 20 | |
| ) -> Optional[SupplyDemandZone]: | |
| """ | |
| Demand Zone (수요 존) 검출 | |
| 상승 전 마지막 하락 구간 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| lookback: 룩백 기간 | |
| Returns: | |
| Demand Zone 또는 None | |
| """ | |
| if len(close) < lookback + 10: | |
| return None | |
| # 최근 저점 찾기 | |
| recent_low_idx = low.iloc[-lookback:].idxmin() | |
| recent_low = low.loc[recent_low_idx] | |
| # 저점 이후 상승 확인 (최소 5%) | |
| subsequent_high = high.iloc[-lookback:].loc[recent_low_idx:].max() | |
| if subsequent_high > recent_low * 1.05: # 5% 이상 상승 | |
| # Demand Zone = 저점 형성 전 하락 구간 | |
| zone_idx = low.iloc[-lookback:].tolist().index(recent_low) | |
| if zone_idx > 0: | |
| zone_high = high.iloc[-lookback + zone_idx - 1] | |
| zone_low = low.iloc[-lookback + zone_idx] | |
| # 존 강도 = 이후 상승 폭 | |
| strength = (subsequent_high - recent_low) / recent_low | |
| if strength >= self.zone_strength_threshold: | |
| return SupplyDemandZone( | |
| type='demand', | |
| top=zone_high, | |
| bottom=zone_low, | |
| timestamp=len(close) - lookback + zone_idx, | |
| timeframe='unknown', | |
| strength=strength | |
| ) | |
| return None | |
| def detect_zones_single_tf( | |
| self, | |
| high: pd.Series, | |
| low: pd.Series, | |
| close: pd.Series, | |
| timeframe: str, | |
| lookback: int = 50 | |
| ) -> List[SupplyDemandZone]: | |
| """ | |
| 단일 타임프레임에서 S/D 존 검출 | |
| Args: | |
| high, low, close: 가격 데이터 | |
| timeframe: 타임프레임 ('5m', '15m', etc.) | |
| lookback: 룩백 기간 | |
| Returns: | |
| 검출된 존 리스트 | |
| """ | |
| zones = [] | |
| # 여러 룩백 기간으로 스캔 | |
| for period in [10, 20, 30, 40, 50]: | |
| if period > len(close): | |
| continue | |
| # Supply Zone 검출 | |
| supply = self.detect_supply_zone(high, low, close, period) | |
| if supply: | |
| supply.timeframe = timeframe | |
| zones.append(supply) | |
| # Demand Zone 검출 | |
| demand = self.detect_demand_zone(high, low, close, period) | |
| if demand: | |
| demand.timeframe = timeframe | |
| zones.append(demand) | |
| # 중복 제거 및 정렬 | |
| unique_zones = self._remove_duplicate_zones(zones) | |
| return unique_zones[:self.max_zones_per_tf] | |
| def _remove_duplicate_zones( | |
| self, | |
| zones: List[SupplyDemandZone] | |
| ) -> List[SupplyDemandZone]: | |
| """중복 존 제거""" | |
| if not zones: | |
| return [] | |
| # 타입별 분리 | |
| supply_zones = [z for z in zones if z.type == 'supply'] | |
| demand_zones = [z for z in zones if z.type == 'demand'] | |
| # Supply 존 병합 | |
| supply_zones.sort(key=lambda x: x.top, reverse=True) | |
| merged_supply = [] | |
| i = 0 | |
| while i < len(supply_zones): | |
| current = supply_zones[i] | |
| j = i + 1 | |
| while j < len(supply_zones): | |
| if abs(current.top - supply_zones[j].top) / current.top <= 0.05: # 5% 이내 | |
| current.strength = max(current.strength, supply_zones[j].strength) | |
| j += 1 | |
| else: | |
| break | |
| merged_supply.append(current) | |
| i = j | |
| # Demand 존 병합 | |
| demand_zones.sort(key=lambda x: x.bottom) | |
| merged_demand = [] | |
| i = 0 | |
| while i < len(demand_zones): | |
| current = demand_zones[i] | |
| j = i + 1 | |
| while j < len(demand_zones): | |
| if abs(current.bottom - demand_zones[j].bottom) / current.bottom <= 0.05: | |
| current.strength = max(current.strength, demand_zones[j].strength) | |
| j += 1 | |
| else: | |
| break | |
| merged_demand.append(current) | |
| i = j | |
| return merged_supply + merged_demand | |
| def detect_zones_mtf( | |
| self, | |
| data_dict: Dict[str, pd.DataFrame] | |
| ) -> Dict[str, List[SupplyDemandZone]]: | |
| """ | |
| 다중 타임프레임 S/D 존 검출 | |
| Args: | |
| data_dict: {timeframe: DataFrame} 형식 | |
| DataFrame은 'high', 'low', 'close' 컬럼 포함 | |
| Returns: | |
| 타임프레임별 존 딕셔너리 | |
| """ | |
| all_zones = {} | |
| for tf in self.timeframes: | |
| if tf not in data_dict: | |
| continue | |
| df = data_dict[tf] | |
| zones = self.detect_zones_single_tf( | |
| df['high'], | |
| df['low'], | |
| df['close'], | |
| timeframe=tf | |
| ) | |
| all_zones[tf] = zones | |
| self.zones[tf] = zones | |
| return all_zones | |
| def find_confluence_zones( | |
| self, | |
| all_zones: Dict[str, List[SupplyDemandZone]] = None | |
| ) -> List[Dict]: | |
| """ | |
| 타임프레임 간 Confluence 존 찾기 | |
| 여러 타임프레임에서 겹치는 가격대 | |
| Args: | |
| all_zones: 타임프레임별 존 (None이면 self.zones 사용) | |
| Returns: | |
| Confluence 존 리스트 | |
| """ | |
| if all_zones is None: | |
| all_zones = self.zones | |
| # 모든 존 평탄화 | |
| flat_zones = [] | |
| for tf, zones in all_zones.items(): | |
| flat_zones.extend(zones) | |
| if not flat_zones: | |
| return [] | |
| # Confluence 검출 | |
| confluence_zones = [] | |
| for i, zone1 in enumerate(flat_zones): | |
| overlapping = [zone1] | |
| for zone2 in flat_zones[i+1:]: | |
| if zone1.type != zone2.type: | |
| continue | |
| # 존 중심 가격 | |
| center1 = (zone1.top + zone1.bottom) / 2 | |
| center2 = (zone2.top + zone2.bottom) / 2 | |
| # 거리 계산 | |
| distance = abs(center1 - center2) / center1 | |
| if distance <= self.confluence_distance: | |
| overlapping.append(zone2) | |
| if len(overlapping) >= 2: # 최소 2개 타임프레임 | |
| # 합류 존 생성 | |
| confluence = { | |
| 'type': zone1.type, | |
| 'top': max(z.top for z in overlapping), | |
| 'bottom': min(z.bottom for z in overlapping), | |
| 'center': sum((z.top + z.bottom) / 2 for z in overlapping) / len(overlapping), | |
| 'confluence_score': len(overlapping), | |
| 'timeframes': [z.timeframe for z in overlapping], | |
| 'avg_strength': sum(z.strength for z in overlapping) / len(overlapping) | |
| } | |
| # 중복 확인 | |
| is_duplicate = False | |
| for existing in confluence_zones: | |
| if (existing['type'] == confluence['type'] and | |
| abs(existing['center'] - confluence['center']) / existing['center'] <= 0.01): | |
| is_duplicate = True | |
| break | |
| if not is_duplicate: | |
| confluence_zones.append(confluence) | |
| # 강도순 정렬 | |
| confluence_zones.sort(key=lambda x: (x['confluence_score'], x['avg_strength']), reverse=True) | |
| return confluence_zones | |
| def update_zone_touches( | |
| self, | |
| current_price: float, | |
| high: float, | |
| low: float | |
| ): | |
| """ | |
| 존 터치 횟수 및 돌파 업데이트 | |
| Args: | |
| current_price: 현재 가격 | |
| high: 현재 고가 | |
| low: 현재 저가 | |
| """ | |
| for tf, zones in self.zones.items(): | |
| for zone in zones: | |
| if zone.broken: | |
| continue | |
| if zone.type == 'supply': | |
| # Supply Zone 터치 확인 | |
| if zone.bottom <= high <= zone.top: | |
| zone.touches += 1 | |
| # 돌파 확인 (상단 통과) | |
| if low > zone.top: | |
| zone.broken = True | |
| else: # demand | |
| # Demand Zone 터치 확인 | |
| if zone.bottom <= low <= zone.top: | |
| zone.touches += 1 | |
| # 돌파 확인 (하단 통과) | |
| if high < zone.bottom: | |
| zone.broken = True | |
| def generate_sd_signal( | |
| self, | |
| current_price: float, | |
| high: float, | |
| low: float, | |
| data_dict: Dict[str, pd.DataFrame] = None | |
| ) -> Dict: | |
| """ | |
| 종합 Supply/Demand 신호 생성 | |
| Args: | |
| current_price: 현재 가격 | |
| high: 현재 고가 | |
| low: 현재 저가 | |
| data_dict: 다중 타임프레임 데이터 (선택) | |
| Returns: | |
| S/D 신호 정보 | |
| """ | |
| # 1. MTF 존 검출 (새 데이터가 있으면) | |
| if data_dict: | |
| self.detect_zones_mtf(data_dict) | |
| # 2. 존 터치/돌파 업데이트 | |
| self.update_zone_touches(current_price, high, low) | |
| # 3. Confluence 존 찾기 | |
| confluence_zones = self.find_confluence_zones() | |
| # 4. 현재 가격 근처 존 찾기 | |
| nearby_zones = [] | |
| for tf, zones in self.zones.items(): | |
| for zone in zones: | |
| if zone.broken: | |
| continue | |
| zone_center = (zone.top + zone.bottom) / 2 | |
| distance = abs(zone_center - current_price) / current_price | |
| if distance <= 0.03: # 3% 이내 | |
| nearby_zones.append({ | |
| 'type': zone.type, | |
| 'top': zone.top, | |
| 'bottom': zone.bottom, | |
| 'timeframe': zone.timeframe, | |
| 'strength': zone.strength, | |
| 'touches': zone.touches, | |
| 'distance': distance | |
| }) | |
| # 5. 신호 생성 | |
| signal = { | |
| 'total_zones': sum(len(z) for z in self.zones.values()), | |
| 'active_zones': sum(len([zz for zz in z if not zz.broken]) for z in self.zones.values()), | |
| 'confluence_zones': len(confluence_zones), | |
| 'nearby_zones': len(nearby_zones), | |
| 'action': 'hold', | |
| 'confidence': 0.5 | |
| } | |
| # 6. 매수/매도 판단 | |
| confidence = 0.5 | |
| # Confluence Demand Zone에서 매수 | |
| for conf in confluence_zones[:3]: # 상위 3개만 | |
| if conf['type'] == 'demand' and conf['bottom'] <= current_price <= conf['top']: | |
| signal['action'] = 'buy' | |
| confidence = 0.75 + (conf['confluence_score'] * 0.05) | |
| signal['target_zone'] = conf | |
| break | |
| # Confluence Supply Zone에서 매도 | |
| for conf in confluence_zones[:3]: | |
| if conf['type'] == 'supply' and conf['bottom'] <= current_price <= conf['top']: | |
| signal['action'] = 'sell' | |
| confidence = 0.75 + (conf['confluence_score'] * 0.05) | |
| signal['target_zone'] = conf | |
| break | |
| # 근처 존 신호 (보조) | |
| if signal['action'] == 'hold' and nearby_zones: | |
| strongest_zone = max(nearby_zones, key=lambda x: x['strength']) | |
| if strongest_zone['type'] == 'demand' and strongest_zone['touches'] < 3: | |
| signal['action'] = 'buy' | |
| confidence = 0.65 + (strongest_zone['strength'] * 10) - (strongest_zone['touches'] * 0.05) | |
| signal['target_zone'] = strongest_zone | |
| elif strongest_zone['type'] == 'supply' and strongest_zone['touches'] < 3: | |
| signal['action'] = 'sell' | |
| confidence = 0.65 + (strongest_zone['strength'] * 10) - (strongest_zone['touches'] * 0.05) | |
| signal['target_zone'] = strongest_zone | |
| signal['confidence'] = min(confidence, 0.95) | |
| signal['confluence_list'] = confluence_zones[:5] | |
| return signal | |
| # 사용 예시 | |
| if __name__ == '__main__': | |
| # 테스트 데이터 (다중 타임프레임) | |
| np.random.seed(42) | |
| data_mtf = {} | |
| for tf, periods in [('5m', 200), ('15m', 100), ('1h', 50)]: | |
| dates = pd.date_range('2024-01-01', periods=periods, freq=tf) | |
| data_mtf[tf] = pd.DataFrame({ | |
| 'high': np.cumsum(np.random.randn(periods) * 10) + 50000, | |
| 'low': np.cumsum(np.random.randn(periods) * 10) + 49800, | |
| 'close': np.cumsum(np.random.randn(periods) * 10) + 49900, | |
| }, index=dates) | |
| # S/D MTF 초기화 | |
| sd_mtf = SupplyDemandMTF({ | |
| 'timeframes': ['5m', '15m', '1h'], | |
| 'zone_strength_threshold': 0.02, | |
| 'confluence_distance': 0.01 | |
| }) | |
| # 신호 생성 | |
| current_price = data_mtf['5m']['close'].iloc[-1] | |
| signal = sd_mtf.generate_sd_signal( | |
| current_price, | |
| data_mtf['5m']['high'].iloc[-1], | |
| data_mtf['5m']['low'].iloc[-1], | |
| data_mtf | |
| ) | |
| print("="*60) | |
| print("Supply & Demand MTF 신호") | |
| print("="*60) | |
| print(f"전체 존: {signal['total_zones']}개") | |
| print(f"활성 존: {signal['active_zones']}개") | |
| print(f"Confluence 존: {signal['confluence_zones']}개") | |
| print(f"근처 존: {signal['nearby_zones']}개") | |
| if signal['confluence_list']: | |
| print(f"\nConfluence 존 (상위 3개):") | |
| for i, conf in enumerate(signal['confluence_list'][:3], 1): | |
| print(f" {i}. {conf['type'].upper()}: ${conf['center']:.2f} " | |
| f"(스코어: {conf['confluence_score']}, TF: {', '.join(conf['timeframes'])})") | |
| if 'target_zone' in signal: | |
| print(f"\n타겟 존:") | |
| print(f" 타입: {signal['target_zone']['type'].upper()}") | |
| if 'center' in signal['target_zone']: | |
| print(f" 중심: ${signal['target_zone']['center']:.2f}") | |
| else: | |
| print(f" 상단: ${signal['target_zone']['top']:.2f}") | |
| print(f" 하단: ${signal['target_zone']['bottom']:.2f}") | |
| print(f"\n액션: {signal['action'].upper()}") | |
| print(f"신뢰도: {signal['confidence']:.2%}") | |
| print("="*60) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment