Last active
March 27, 2025 01:47
-
-
Save ammarfaizi2/b494b956212eedd2b6ee28207baffbb9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from matplotlib.ticker import ScalarFormatter | |
import xml.etree.ElementTree as ET | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
from email.utils import format_datetime | |
from datetime import datetime, timezone, timedelta | |
import matplotlib.dates as mdates | |
import matplotlib.font_manager as fm | |
import requests | |
import json | |
import sys | |
import os | |
BOT_TOKEN = os.getenv("PUPY_BOT_TOKEN") | |
if BOT_TOKEN is None: | |
print("Please set PUPY_BOT_TOKEN env") | |
sys.exit(1) | |
CUSTOM_FONT = os.getenv("PUPY_CUSTOM_FONT") | |
MTZ = timezone(timedelta(hours=7)) | |
MTZ_STR = "WIB" | |
def gen_chart(title, price_cur, df, save_to, precision = 2): | |
# Determine price direction | |
opening_price = df["Price"].iloc[0] | |
closing_price = df["Price"].iloc[-1] | |
is_up = closing_price > opening_price | |
line_color = 'green' if is_up else 'red' | |
bg_color = '#eaffea' if is_up else '#ffeaea' | |
low_price = df["Price"].min() | |
high_price = df["Price"].max() | |
# Calculate gain and percentage gain | |
gain = closing_price - opening_price | |
gain_percent = (gain / opening_price) * 100 | |
al = abs(gain_percent) / 100 | |
max_al = 1 | |
min_al = 0.1 | |
al = min(max(min_al, al), max_al) | |
# Format gain text | |
gain_sign = "+" if gain >= 0 else "" | |
gain_color = 'green' if gain >= 0 else 'red' | |
if price_cur is None: | |
gain_text = f"24h chg: {gain_sign}{gain:,.{precision}f} ({gain_sign}{gain_percent:.2f}%)" | |
else: | |
gain_text = f"24h chg: {gain_sign}{gain:,.{precision}f} {price_cur} ({gain_sign}{gain_percent:.2f}%)" | |
# Plot | |
fig, ax = plt.subplots(figsize=(10, 10)) | |
fig.patch.set_facecolor(bg_color) | |
ax.set_facecolor(bg_color) | |
start = df["Time"].min() | |
end = df["Time"].max() | |
ax.set_xlim(left=start, right=end) | |
low_price = df["Price"].min() | |
high_price = df["Price"].max() | |
low_price = round(low_price - (high_price - low_price) * 0.05, 2) | |
high_price = round(high_price + (high_price - low_price) * 0.05, 2) | |
ax.set_ylim(bottom=low_price, top=high_price) | |
ax.plot(df["Time"], df["Price"], color=line_color, linewidth=1.5) | |
fill_base = low_price | |
ax.fill_between(df["Time"], df["Price"], fill_base, color=line_color, alpha=al) | |
now_wib = datetime.now(MTZ) | |
formatted_time = now_wib.strftime('%a %b %d %Y %H:%M:%S ' + MTZ_STR) | |
if CUSTOM_FONT is not None: | |
custom_font = fm.FontProperties(fname=CUSTOM_FONT) | |
else: | |
custom_font = fm.FontProperties() | |
ax.set_title(f"{title}", fontsize=50, pad=170, fontproperties=custom_font, ha='left', x=0) | |
if price_cur is not None: | |
ax.set_ylabel(f"Price ({price_cur})", fontsize=16, fontproperties=custom_font, labelpad=20) | |
ax.grid(True) | |
ax.text( | |
0, 1.37, | |
formatted_time, | |
transform=ax.transAxes, | |
fontsize=25, | |
color='#000', | |
fontproperties=custom_font, | |
ha='left' | |
) | |
ax.text( | |
0, 1.2, | |
f'{closing_price:,.{precision}f}' + (f' {price_cur}' if price_cur is not None else ''), | |
transform=ax.transAxes, | |
fontsize=62, | |
color=line_color, | |
ha='left', | |
weight='bold', | |
fontproperties=custom_font | |
) | |
ax.text( | |
0, 1.085, | |
gain_text, | |
transform=ax.transAxes, | |
fontsize=25, | |
color=gain_color, | |
ha='left', | |
fontproperties=custom_font | |
) | |
# Add OHLC | |
ohlc = df["Price"].describe() | |
ohlc_text = f"O: {opening_price:,.{precision}f}\nH: {ohlc['max']:,.{precision}f}\nL: {ohlc['min']:,.{precision}f}\nC: {closing_price:,.{precision}f}" | |
ax.text( | |
1, 1.2, | |
ohlc_text, | |
transform=ax.transAxes, | |
fontsize=20, | |
color='#000', | |
ha='right', | |
fontproperties=custom_font | |
) | |
# X-axis ticks and formatting | |
ax.xaxis.set_major_formatter(mdates.DateFormatter('%-I %p')) # Use '%#I %p' on Windows | |
ax.xaxis.set_major_locator(mdates.HourLocator(interval=1)) | |
for label in ax.get_xticklabels(): | |
label.set_fontproperties(custom_font) | |
plt.xticks(rotation=45) | |
# Y-axis ticks font | |
for label in ax.get_yticklabels(): | |
label.set_fontproperties(custom_font) | |
ax.yaxis.set_major_formatter(ScalarFormatter(useOffset=False)) | |
ax.ticklabel_format(style='plain', axis='y') # Turn off scientific | |
prev_date = None | |
for t in df["Time"]: | |
current_date = t.strftime('%Y-%m-%d') | |
if prev_date is not None and current_date != prev_date: | |
ax.axvline(t, color='blue', linewidth=2, alpha=0.5, linestyle='--') | |
ax.text( | |
t, low_price - (high_price - low_price) * 0.2, | |
t.strftime('%b %d'), | |
rotation=90, | |
fontsize=10, | |
color='#000', | |
fontproperties=custom_font, | |
ha='center', | |
va='bottom' | |
) | |
prev_date = current_date | |
plt.tight_layout() | |
plt.savefig( | |
save_to, | |
dpi=300, | |
bbox_inches='tight', | |
facecolor=fig.get_facecolor(), | |
pad_inches=0.5 | |
) | |
def okx_price_history_fetch(base_cur, quote_cur, bar = "1m", limit = 1440): | |
url = f"https://www.okx.com/priapi/v3/growth/convert/currency-pair-market-movement?baseCurrency={base_cur}"eCurrency={quote_cur}&bar={bar}&limit={limit}" | |
response = requests.get(url, headers={ | |
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0" | |
}) | |
response.raise_for_status() | |
return response.text | |
def okx_gen_chart(save_to, base_cur, quote_cur, bar = "1m", limit = 1440, title = None, precision = 2): | |
data = [] | |
raw_json = okx_price_history_fetch(base_cur, quote_cur, bar, limit) | |
j = json.loads(raw_json) | |
for i in j["data"]["datapointList"]: | |
t = (int(i["timestamp"]) / 1000) | |
p = float(i["price"]) | |
data.append((datetime.fromtimestamp(t), p)) | |
df = pd.DataFrame(data, columns=["Time", "Price"]) | |
df.sort_values(by="Time", inplace=True) | |
if title is None: | |
title = f"{base_cur}/{quote_cur}" | |
gen_chart(title, quote_cur, df, save_to, precision) | |
def send_photo_to_tg(chat_id, file, caption = "", btn_txt = "", btn_url = ""): | |
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendPhoto" | |
files = {'photo': open(file, 'rb')} | |
data = { | |
'chat_id' : chat_id, | |
'caption': caption, | |
'reply_markup': json.dumps({ | |
'inline_keyboard': [ | |
[{'text': btn_txt, 'url': btn_url}] | |
] | |
}) | |
} | |
r = requests.post(url, files=files, data=data) | |
print(r.status_code, r.reason, r.content) | |
if len(sys.argv) < 3: | |
print("Usage: python pu.py <chat_id> <ticker>") | |
sys.exit(1) | |
chat_id = sys.argv[1] | |
ticker = sys.argv[2] | |
if ticker == "BTC": | |
okx_gen_chart("okx_btc_usd.png", "BTC", "USD", "1m", 1440, "XBTUSD", 2) | |
send_photo_to_tg(chat_id, "okx_btc_usd.png", "#pu #pu_xbt #pu_btc\nBitcoin price update", "Buy Bitcoin on OKX", "https://www.okx.com/trade-spot/btc-usdt") | |
elif ticker == "ETH": | |
okx_gen_chart("okx_eth_usd.png", "ETH", "USD", "1m", 1440, "XETUSD", 2) | |
send_photo_to_tg(chat_id, "okx_eth_usd.png", "#pu #pu_xet #pu_eth\nEthereum price update", "Buy Ethereum on OKX", "https://www.okx.com/trade-spot/eth-usdt") | |
elif ticker == "BNB": | |
okx_gen_chart("okx_bnb_usd.png", "BNB", "USD", "1m", 1440, "XBIUSD", 2) | |
send_photo_to_tg(chat_id, "okx_bnb_usd.png", "#pu #pu_xbi #pu_bnb\nBNB price update", "Buy BNB on OKX", "https://www.okx.com/trade-spot/bnb-usdt") | |
elif ticker == "XAUT": | |
okx_gen_chart("okx_xau_usd.png", "XAUT", "USD", "5m", 288, "XAUUSD", 2) | |
send_photo_to_tg(chat_id, "okx_xau_usd.png", "#pu #pu_xau\nGold price update", "Buy XAUT on OKX", "https://www.okx.com/trade-spot/xaut-usdt") | |
elif ticker == "USDIDR": | |
okx_gen_chart("okx_usd_idr.png", "USDT", "IDR", "5m", 288, "USDIDR", 2) | |
send_photo_to_tg(chat_id, "okx_usd_idr.png", "#pu #pu_usdidr\nUSD/IDR price update", "Buy USDT on OKX", "https://www.okx.com/p2p-markets/idr/buy-usdt") | |
elif ticker == "XRP": | |
okx_gen_chart("okx_xrp_usd.png", "XRP", "USD", "1m", 1440, "XRPUSD", 4) | |
send_photo_to_tg(chat_id, "okx_xrp_usd.png", "#pu #pu_xrp\nXRP price update", "Buy XRP on OKX", "https://www.okx.com/trade-spot/xrp-usdt") | |
elif ticker == "MOVE": | |
okx_gen_chart("okx_move_usd.png", "MOVE", "USD", "1m", 1440, "MOVEUSD", 5) | |
send_photo_to_tg(chat_id, "okx_move_usd.png", "#pu #pu_move\nMOVE price update", "Buy MOVE on OKX", "https://www.okx.com/trade-spot/move-usdt") | |
else: | |
print(f"Unknown ticker: {ticker}") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment