Last active
June 10, 2022 12:11
-
-
Save volodymyrsmirnov/5852092 to your computer and use it in GitHub Desktop.
Simple Python daemon for forwarding SMS to email, tested on Samsung and Nokia phones
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[general] | |
debug: true | |
[tty] | |
dev: /dev/tty.usbmodemfa132 | |
baudrate: 9600 | |
timeout: 0.01 | |
[email] | |
enabled: true | |
recipient: [email protected] | |
sender: [email protected] | |
subject: New SMS from '{sender}' | |
body: New SMS from '{sender}' | |
================================================== | |
{body} | |
================================================== | |
Sent to you at {time} {date} | |
[smtp] | |
host: mail.vcity.sumy.ua | |
auth: false | |
user: user | |
password: password | |
[pushover] | |
enabled: true | |
userkey: blablabla | |
tokenkey: blablabla | |
sound: classical |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Watch for incoming SMS on phone serial interface | |
and forward them to email | |
Author: Vladimir Smirnov ([email protected]) | |
Requires pyserial: pip install pyserial | |
""" | |
import sys, logging | |
if sys.version_info < (3, 2): | |
logging.critical("Python 3.2 required for running that script") | |
sys.exit(1) | |
import serial, argparse, time, textwrap, smtplib, configparser, threading, http.client, urllib | |
from email.mime.text import MIMEText | |
argparser = argparse.ArgumentParser() | |
argparser.add_argument("--config", type=str, required=True) | |
logger = logging.getLogger(__name__) | |
class AT(object): | |
""" Simple wrapper for AT Serial communication """ | |
config = object() | |
def __init__(self, config): | |
self.config = config | |
self.serial = serial.Serial( | |
config.get("tty","dev"), | |
config.getint("tty","baudrate"), | |
timeout=config.getfloat("tty","timeout") | |
) | |
def cmd(self, command): | |
""" Send AT command """ | |
self.serial.write( | |
bytes( | |
"{0}\r\n".format(command), | |
'ascii' | |
) | |
) | |
time.sleep(self.config.getfloat("tty","timeout")) | |
def loop(self, callback=None): | |
""" Get the ourput and process it with callback """ | |
response_buf= list() | |
while True: | |
if self.serial.inWaiting() > 0: | |
for line in self.serial.readlines(): | |
line = line.decode("utf-8").strip() | |
if line: | |
logger.debug('Got raw line: %s', line) | |
response_buf.append(line) | |
else: | |
if len(response_buf) > 0: | |
threading.Thread( | |
target=callback, | |
args=(self, response_buf) | |
).start() | |
response_buf = list() | |
try: | |
time.sleep(self.config.getfloat("tty","timeout")) | |
except KeyboardInterrupt: | |
self.serial.close() | |
break | |
def process_output(from_at, response_buf): | |
""" AT response processor """ | |
logger.debug('Got response list: %s', response_buf) | |
if "AT+CMGR=" in response_buf[0]: | |
response_buf.pop(0) | |
if len(response_buf) == 0: | |
logger.error("Empty response") | |
return | |
# receive notification about new SMS | |
if "+CMTI:" in response_buf[0]: | |
parts = response_buf[0].split(",") | |
if len(parts) == 2: | |
try: | |
message_id = int(parts[1]) | |
logger.debug("Sending CMGR command for SMS with ID %s", message_id) | |
from_at.cmd("AT+CMGR={0}".format(message_id)) | |
except ValueError: | |
logger.error("Wrong CMTI response") | |
# receive SMS body and parse it | |
elif "+CMGR:" in response_buf[0]: | |
info = response_buf[0].replace("\"", "").split(",") | |
sms = dict() | |
try: | |
sms["sender"] = info[1] | |
sms["date"] = info[3] | |
sms["time"] = info[4] | |
sms["body"] = " ".join(response_buf[1:-1]).replace(",0,0", "") | |
except IndexError: | |
logger.error("Wrong sms in response, unable to parse") | |
return | |
logger.debug('Got SMS: %s', sms) | |
# delete all SMS | |
logger.debug("Sending AT+CMGD=1,4 command") | |
from_at.cmd("AT+CMGD=1,4") | |
if from_at.config.getboolean("pushover", "enabled"): | |
connection = http.client.HTTPSConnection("api.pushover.net:443") | |
payload = { | |
"token": from_at.config.get("pushover", "tokenkey"), | |
"user": from_at.config.get("pushover", "userkey"), | |
"title": sms["sender"], | |
"message": sms["body"], | |
"sound": from_at.config.get("pushover", "sound"), | |
} | |
connection.request( | |
"POST", | |
"/1/messages.json", | |
urllib.parse.urlencode(payload), | |
{"Content-type": "application/x-www-form-urlencoded"} | |
) | |
logger.info("Message sent to Pushover") | |
logger.debug("Pushover response: %s", connection.getresponse().read()) | |
if from_at.config.getboolean("email", "enabled"): | |
msg = MIMEText(from_at.config.get("email", "body").format( | |
sender=sms["sender"], | |
body=textwrap.fill(sms["body"], width=50), | |
time=sms["time"], | |
date=sms["date"], | |
), _charset='utf-8') | |
msg['Subject'] = from_at.config.get("email", "subject").format(sender=sms["sender"]) | |
msg['From'] = from_at.config.get("email", "sender") | |
msg['To'] = from_at.config.get("email", "recipient") | |
logger.debug("Generated email content: %s", msg.as_string()) | |
sender = smtplib.SMTP(from_at.config.get("smtp", "host")) | |
if from_at.config.getboolean("smtp", "auth"): | |
sender.login(from_at.config.getboolean("smtp", "login"), from_at.config.getboolean("smtp", "password")) | |
sender.sendmail(msg['From'], [msg['To']], msg.as_string()) | |
sender.quit() | |
logger.info("Email sent to %s", msg['To']) | |
# hang down the phone on evey call | |
elif response_buf[0] == "RING": | |
from_at.cmd("ATH") | |
if __name__ == "__main__": | |
args = argparser.parse_args() | |
config = configparser.ConfigParser() | |
config.read(args.config) | |
if config.getboolean("general", "debug"): | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
interface = AT(config) | |
# initialize the connection | |
# say hello, switch to plaintext mode | |
# and enable incoming sms notifications | |
interface.cmd("AT") | |
interface.cmd("AT+CMGF=1") | |
interface.cmd("AT+CNMI=1,1,0,0,0") | |
interface.loop(process_output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can you explain how to deploy and use the code on an android phone please?