Skip to content

Instantly share code, notes, and snippets.

@Kiritow
Created February 7, 2025 10:00
Show Gist options
  • Save Kiritow/4ffb924624155ac66c675678ee7639b2 to your computer and use it in GitHub Desktop.
Save Kiritow/4ffb924624155ac66c675678ee7639b2 to your computer and use it in GitHub Desktop.
import os
import json
import time
import traceback
import requests
import subprocess
import functools
from pydantic import BaseModel
class ConfigData(BaseModel):
bot: str
ping: str
chat_id: str
@functools.cache
def get_config():
try:
return ConfigData.model_validate(json.loads(open("bot.secrets", "r").read()))
except Exception:
print("trying to read TG_KEY from env...")
tg_key = os.getenv("TG_KEY")
assert tg_key, "TG_KEY not found in env"
ping_token = os.getenv("PING_TOKEN")
assert ping_token, "PING_TOKEN not found in env"
chat_id = os.getenv("CHAT_ID")
assert chat_id, "CHAT_ID not found in env"
return ConfigData(bot=tg_key, ping=ping_token, chat_id=chat_id)
def send_telegram_message(message: str):
r = requests.post("https://api.telegram.org/bot{}/sendMessage".format(get_config().bot), json={
'chat_id': get_config().chat_id,
'text': message
})
print(r.content)
class SMSData(BaseModel):
msgid: int
received: int
number: str
body: str
def read_sms_list(limit: int=10, offset: int=0, all_types: bool=False) -> list[SMSData]:
call_args = ["termux-sms-list"]
if limit:
call_args.extend(["-l", str(limit)])
if offset:
call_args.extend(["-o", str(offset)])
if not all_types:
call_args.extend(["-t", "inbox"])
try:
sms_list = subprocess.check_output(call_args)
return [SMSData.model_validate(sms) for sms in json.loads(sms_list)]
except Exception:
print(traceback.format_exc())
return []
def refresh_mark(msg_id: int, msg_time: int):
last_msg_id = 0
last_msg_time = 0
try:
with open("sms_bot_stats.json") as f:
parts = f.read().split()
last_msg_id = int(parts[0])
last_msg_time = int(parts[1])
except FileNotFoundError:
print("Failed to read bot stats")
if not msg_id and not msg_time:
return last_msg_id, last_msg_time
with open("sms_bot_stats.json", "w") as f:
f.write("{} {}".format(msg_id, msg_time))
return msg_id, msg_time
last_healthcheck_time = 0
def sync_healthcheck():
global last_healthcheck_time
if time.time() - last_healthcheck_time < 300:
return
last_msg_id, last_msg_time = refresh_mark(0, 0)
try:
res = requests.post("https://api-healthcheck.liteserver.cloud/ping/{}".format(get_config().ping), json={
"msgid": last_msg_id,
"msgtime": last_msg_time
});
print(res.content)
last_healthcheck_time = time.time()
except Exception:
print(traceback.format_exc())
if __name__ == '__main__':
send_telegram_message("SMS Forwarder started!")
last_msg_id, last_msg_time = refresh_mark(0, 0)
while True:
sms_list = read_sms_list()
for sms in sms_list:
if last_msg_id < sms.msgid:
last_msg_id = sms.msgid
last_msg_time = sms.received
try:
send_telegram_message("[New SMS] from: {} body: {}".format(sms.number, sms.body))
refresh_mark(last_msg_id, last_msg_time)
print("Sent message to telegram, mark: {} {}".format(last_msg_id, last_msg_time))
except Exception:
print(traceback.format_exc())
time.sleep(1)
sync_healthcheck()
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment