Skip to content

Instantly share code, notes, and snippets.

@zed-wong
Created April 25, 2023 08:07
Show Gist options
  • Save zed-wong/7cf4faf29826af35c96125fcd4215ece to your computer and use it in GitHub Desktop.
Save zed-wong/7cf4faf29826af35c96125fcd4215ece to your computer and use it in GitHub Desktop.
BTC ma monitor & notifier through Mixin Messenger
import sys
import json
import math
import time
import uuid
import base64
import hashlib
import asyncio
import requests
import datetime
from pymixin.mixin_bot_api import MixinBotApi
def get_binance_data(symbol, interval):
url = "https://api.binance.com/api/v3/klines"
params = {
"symbol": symbol,
"interval": interval,
"limit": 500
}
response = requests.get(url, params=params).json()
return response
def calculate_sma(values, period):
return sum(values[-period:]) / period
def calculate_ema(values, period):
smoothing_factor = 2 / (period + 1)
ema = sum(values[:period]) / period
for value in values[period:]:
ema = (value - ema) * smoothing_factor + ema
return ema
async def check_conditions(mixin_bot, conv_id, ma20, ma60, ma120, passed):
if passed == False:
return
if ma20 > ma60:
print("MA20 超过了 MA60")
beginTime = datetime.datetime.now()
await mixin_bot.send_text_message(conv_id, "MA20 超过了 MA60")
elif ma20 < ma60:
print("MA20 低于 EMA60")
beginTime = datetime.datetime.now()
await mixin_bot.send_text_message(conv_id, "MA20 低于 MA60")
def unique_conversation_id(user_id, recipient_id):
min_id, max_id = sorted([user_id, recipient_id])
h = hashlib.md5()
h.update(min_id.encode())
h.update(max_id.encode())
sum = h.digest()
sum = bytearray(sum)
sum[6] = (sum[6] & 0x0f) | 0x30
sum[8] = (sum[8] & 0x3f) | 0x80
return str(uuid.UUID(bytes=bytes(sum)))
symbol = "BTCUSDT"
interval = "4h"
#oldma20 = 0
#oldma60 = 0
#oldma120 = 0
beginTime = datetime.datetime.now()
receiver = "44d9717d-8cae-4004-98a1-f9ad544dcfb1"
#path to bot config
bot_config_file = sys.argv[1]
with open(bot_config_file, 'r') as f:
bot_config = f.read()
bot_config = json.loads(bot_config)
mixin_bot = MixinBotApi(bot_config)
conv_id = unique_conversation_id(bot_config['client_id'], receiver)
async def run():
while True:
response = get_binance_data(symbol, interval)
close_prices = [float(x[4]) for x in response]
ma20 = calculate_sma(close_prices, 20)
ma60 = calculate_sma(close_prices, 60)
ma120 = calculate_sma(close_prices, 120)
print(f"MA20: {ma20}")
print(f"MA60: {ma60}")
print(f"MA120: {ma120}")
# , oldma20, oldma60, oldma120
await check_conditions(mixin_bot, conv_id, ma20, ma60, ma120, datetime.datetime.now() - beginTime > datetime.timedelta(minutes=5))
oldma20=ma20
oldma60=ma60
oldma120=ma120
time.sleep(5)
asyncio.run(run())
# To run this script
# 1. Set a proxy
# 2. Change "receiver" to your mixin user id
# 3. python3 ma-monitor.py [bot-config-path]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment