Created
February 7, 2025 10:00
-
-
Save Kiritow/4ffb924624155ac66c675678ee7639b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import time | |
import traceback | |
import requests | |
import subprocess | |
import functools | |
from pydantic import BaseModel | |
class ConfigData(BaseModel): | |
bot: str | |
ping: str | |
chat_id: str | |
@functools.cache | |
def get_config(): | |
try: | |
return ConfigData.model_validate(json.loads(open("bot.secrets", "r").read())) | |
except Exception: | |
print("trying to read TG_KEY from env...") | |
tg_key = os.getenv("TG_KEY") | |
assert tg_key, "TG_KEY not found in env" | |
ping_token = os.getenv("PING_TOKEN") | |
assert ping_token, "PING_TOKEN not found in env" | |
chat_id = os.getenv("CHAT_ID") | |
assert chat_id, "CHAT_ID not found in env" | |
return ConfigData(bot=tg_key, ping=ping_token, chat_id=chat_id) | |
def send_telegram_message(message: str): | |
r = requests.post("https://api.telegram.org/bot{}/sendMessage".format(get_config().bot), json={ | |
'chat_id': get_config().chat_id, | |
'text': message | |
}) | |
print(r.content) | |
class SMSData(BaseModel): | |
msgid: int | |
received: int | |
number: str | |
body: str | |
def read_sms_list(limit: int=10, offset: int=0, all_types: bool=False) -> list[SMSData]: | |
call_args = ["termux-sms-list"] | |
if limit: | |
call_args.extend(["-l", str(limit)]) | |
if offset: | |
call_args.extend(["-o", str(offset)]) | |
if not all_types: | |
call_args.extend(["-t", "inbox"]) | |
try: | |
sms_list = subprocess.check_output(call_args) | |
return [SMSData.model_validate(sms) for sms in json.loads(sms_list)] | |
except Exception: | |
print(traceback.format_exc()) | |
return [] | |
def refresh_mark(msg_id: int, msg_time: int): | |
last_msg_id = 0 | |
last_msg_time = 0 | |
try: | |
with open("sms_bot_stats.json") as f: | |
parts = f.read().split() | |
last_msg_id = int(parts[0]) | |
last_msg_time = int(parts[1]) | |
except FileNotFoundError: | |
print("Failed to read bot stats") | |
if not msg_id and not msg_time: | |
return last_msg_id, last_msg_time | |
with open("sms_bot_stats.json", "w") as f: | |
f.write("{} {}".format(msg_id, msg_time)) | |
return msg_id, msg_time | |
last_healthcheck_time = 0 | |
def sync_healthcheck(): | |
global last_healthcheck_time | |
if time.time() - last_healthcheck_time < 300: | |
return | |
last_msg_id, last_msg_time = refresh_mark(0, 0) | |
try: | |
res = requests.post("https://api-healthcheck.liteserver.cloud/ping/{}".format(get_config().ping), json={ | |
"msgid": last_msg_id, | |
"msgtime": last_msg_time | |
}); | |
print(res.content) | |
last_healthcheck_time = time.time() | |
except Exception: | |
print(traceback.format_exc()) | |
if __name__ == '__main__': | |
send_telegram_message("SMS Forwarder started!") | |
last_msg_id, last_msg_time = refresh_mark(0, 0) | |
while True: | |
sms_list = read_sms_list() | |
for sms in sms_list: | |
if last_msg_id < sms.msgid: | |
last_msg_id = sms.msgid | |
last_msg_time = sms.received | |
try: | |
send_telegram_message("[New SMS] from: {} body: {}".format(sms.number, sms.body)) | |
refresh_mark(last_msg_id, last_msg_time) | |
print("Sent message to telegram, mark: {} {}".format(last_msg_id, last_msg_time)) | |
except Exception: | |
print(traceback.format_exc()) | |
time.sleep(1) | |
sync_healthcheck() | |
time.sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment