Last active
October 15, 2021 12:04
-
-
Save kuznetsov-m/91c2f5ca0a13b7644f803b59cf94a4ef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
from datetime import datetime | |
import json | |
import time | |
from dateutil import tz | |
import models | |
import db_utils | |
import telegram_client | |
class SberPriceMonitor(): | |
_channel_url = 'https://t.me/sber_price_monitor' | |
def _load_candels_from_moex(): | |
ticker = 'SBER' | |
from_ = datetime.now().strftime('%Y-%m-%d') | |
till = from_ | |
interval = 1 | |
# отладка | |
# from_ = '2021-10-10' | |
# till = '2021-10-13' | |
result_df = pd.DataFrame() | |
for start in range(0, 10000000000, 500): | |
query = f'http://iss.moex.com/iss/engines/stock/markets/shares/securities/{ticker}/candles.csv?from={from_}&till={till}&interval={interval}&start={start}' | |
df = None | |
try_count = 1 | |
timeout = 2 | |
while df is None: | |
try: | |
content = requests.get(query, timeout=timeout).content | |
df = pd.read_csv( | |
io.StringIO(content.decode('utf-8')), | |
sep=';', | |
header=1, | |
parse_dates=['begin','end'] | |
) | |
except Exception as e: | |
print(f'{__name__} Error: Try count: {try_count}. Timeout: {timeout}. Error message: {str(e)}') | |
timeout = min(timeout * 2, 20) | |
if df.shape[0]: | |
result_df = result_df.append(df, ignore_index=True) | |
else: | |
break | |
return result_df | |
# row - строка из dataframe | |
def _build_message(self, row) -> str: | |
# data = json.loads(df_final.iloc[-1:].to_json(orient='records', date_format = 'iso'))[0] | |
# text = json.dumps(data, indent=0)[2:][:-2] | |
# return text | |
end = row['end'] | |
close = row['close'] | |
step = row['step'] | |
step_prev_1 = row['step_prev_1'] | |
simvol = '⏹' | |
if step > 0: | |
simvol = '↗️' | |
elif step < 0: | |
simvol = '↘️' | |
return f'{end.to_pydatetime().strftime("%H:%M:%S %d.%m.%Y %Z")}\n{close} ({simvol} {step})' | |
def loop(self): | |
timeout = 60 | |
pd.set_option('display.max_rows', None) | |
step = 0.01 | |
last_send_datetime = None | |
while True: | |
print(f'{__name__}: loop') | |
df = db_utils.last_rows_from_sql_table(table_name=models.SberCandels.__tablename__, rows=2000) | |
print(f'{__name__}: df rows count ={len(df.index)}') | |
dt_now = datetime.combine(datetime.utcnow().date(), datetime.min.time()) | |
# dt_now = dt_now.astimezone(tz.UTC) | |
# отладка | |
# date_time_obj = datetime.strptime('14.10.21 14:55:19', '%d.%m.%y %H:%M:%S') | |
# dt_now = datetime.combine(date_time_obj.date(), datetime.min.time()) | |
mask = (df['begin'].dt.tz_localize(None) >= np.datetime64(dt_now)) | |
df = df.loc[mask] | |
print(f'{__name__}: df rows count after filter ={len(df.index)}') | |
# Оставляем только сегодняшние строки | |
# df.loc[df['begin'].dt.date == pd.to_datetime('today').normalize()] | |
# print(df.iloc()[:1]) | |
if not df.empty: | |
df.drop(columns=['begin','high','low','value','volume'], inplace=True) | |
df['end'] += pd.to_timedelta(1, unit='s') | |
df['frac'] = (df['close'] / df['open'].iloc[0]) - 1 | |
df['step'] = np.nan | |
for val in range(0, 20): | |
df.loc[(df['frac'] >= val*step) & (df['frac'] < (val+1)*step), 'step'] = val*step | |
for val in range(-20, 0): | |
df.loc[(df['frac'] >= val*step) & (df['frac'] < (val+1)*step), 'step'] = (val+1)*step | |
df['step_prev_1'] = df['step'].shift(1).fillna(0) | |
df['step_prev_2'] = df['step'].shift(2).fillna(0) | |
df['step_prev_3'] = df['step'].shift(3).fillna(0) | |
df['step_prev_4'] = df['step'].shift(4).fillna(0) | |
df['step_prev_5'] = df['step'].shift(5).fillna(0) | |
df_final = df.loc[ | |
(df['step'] == df['step_prev_1']) & | |
(df['step'] == df['step_prev_2']) & | |
(df['step'] == df['step_prev_3']) & | |
(df['step'] == df['step_prev_4']) & | |
(df['step'] != df['step_prev_5']), | |
['end','close','frac','step'] | |
] | |
df_final['step_prev_1'] = df_final['step'].shift(1).fillna(0) | |
df_final = df_final.loc[df_final['step'] != df_final['step_prev_1']] | |
print(f'{__name__}: df_final rows count ={len(df_final.index)}') | |
print(df_final) | |
if not df_final.empty and last_send_datetime != df_final.iloc[-1]['end']: | |
# debug code block | |
# print(f'{__name__}: df_final\n{df_final}') | |
# df_final.to_csv('df_final.csv', sep='\t', encoding='utf-8') | |
# # telegram_bot.send_document_to_user('df_final.csv') | |
# print(f'{__name__}: df_final.csv was sent') | |
############ | |
row = df_final.iloc[-1] | |
text = _build_message(row) | |
print(f'{__name__}: post publication') | |
telegram_client.post_message_to_channel( | |
message=text, | |
channel_url=self._channel_url | |
) | |
print(f'{__name__}: post successfully published') | |
last_send_datetime = df_final.iloc[-1]['end'] | |
time.sleep(timeout) | |
def run(self): | |
while True: | |
try: | |
# telegram_client.post_message_to_channel( | |
# message='Monitor restarted', | |
# channel_url=self._channel_url | |
# ) | |
self.loop() | |
except Exception as e: | |
print(f'{__name__}: EXCEPTION: {str(e)}') | |
monitor = SberPriceMonitor() | |
monitor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment