Last active
October 13, 2020 22:59
-
-
Save dentex/21bd7e37913e3bc1a9d683dcffb7cfcd to your computer and use it in GitHub Desktop.
SGS2cmdBot: a Python Telegram bot to remotely control an old Samsung Galaxy S2 used as IP cam
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
""" | |
code adapted from https://github.com/ade1963/RaspberrySensorsBot | |
working on a Samsung Galaxy S2 (GT-I9100), with Lineage OS 14.1 | |
dependencies to install on the SGS2: | |
IP Webcam android app -> https://play.google.com/store/apps/details?id=com.pas.webcam | |
QPython -> https://play.google.com/store/apps/details?id=org.qpython.qpy | |
qpython consoles: | |
/data/user/0/org.qpython.qpy/files/bin/qpython-android5-root.sh | |
/data/user/0/org.qpython.qpy/files/bin/qpython-android5-root.sh /storage/<<UUID>>/qpython/scripts/pip_console.py | |
/data/user/0/org.qpython.qpy/files/bin/qpython-android5-root.sh /storage/<<UUID>>/qpython/scripts/sgs2cmd_bot_gh.py | |
""" | |
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters | |
import logging | |
import os | |
import time | |
from subprocess import call | |
from functools import wraps | |
ADMIN = 0000000 | |
LOG_PATH = '/sdcard/qpython/log/' | |
LOG_FILE = LOG_PATH + 'SGS2cmdBot.log' | |
TOKEN = '123456789:CHANGE_TO_BE_YOUR_TOKEN-qwertyuiopa' | |
IP_WEBCAM_RUNNING = False | |
def restricted(func): | |
@wraps(func) | |
def wrapped(bot, update, *args, **kwargs): | |
# extract user_id from arbitrary update | |
try: | |
user_id = update.message.from_user.id | |
except (NameError, AttributeError): | |
try: | |
user_id = update.inline_query.from_user.id | |
except (NameError, AttributeError): | |
try: | |
user_id = update.chosen_inline_result.from_user.id | |
except (NameError, AttributeError): | |
try: | |
user_id = update.callback_query.from_user.id | |
except (NameError, AttributeError): | |
print('No user_id available in update.') | |
return | |
if user_id != ADMIN: | |
msg = 'Private Bot - Access Denied' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
logging.warn('Access Denied for user_id {}.'.format(user_id)) | |
return | |
return func(bot, update, *args, **kwargs) | |
return wrapped | |
def wake_device(): | |
call(['/system/bin/sh', '/system/bin/input', 'keyevent', 'KEYCODE_HOME']) | |
# def stats(update, cmd): | |
# pass | |
def say_done(bot, update): | |
# stats(update, 'said done') | |
msg = 'Done.' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
""" | |
######################################################################################### | |
##### Define a few command handlers. These usually take the two arguments bot and ##### | |
##### update. Error handlers also receive the raised TelegramError object in error. ##### | |
######################################################################################### | |
""" | |
@restricted | |
def cmd_start(bot, update): | |
# stats(update, 'start') | |
bot.sendMessage(update.message.chat_id, text='Hi, I\'m the SGS2cmdBot!\n\n/help') | |
@restricted | |
def cmd_help(bot, update): | |
# stats(update, 'help') | |
ans = 'Available commands are:\n' \ | |
'\n'\ | |
'/help - Commands list\n'\ | |
'/cam_start - Starts the IP webcam app\n'\ | |
'/cam_stop - Stops the IP webcam app\n'\ | |
'/battery - Gets battery info\n'\ | |
'/take_pic - Takes a picture from the front camera\n'\ | |
'/reboot - Reboot the device\n'\ | |
'/halt - Shutdown the device\n' | |
bot.sendMessage(update.message.chat_id, text=ans) | |
@restricted | |
def cmd_cam_start(bot, update): | |
global IP_WEBCAM_RUNNING | |
# stats(update, 'cam_start') | |
msg = 'Starting IP webcam...' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
wake_device() | |
call(['/system/bin/am', 'start', '-n', 'com.pas.webcam/.Rolling']) | |
say_done(bot, update) | |
IP_WEBCAM_RUNNING = True | |
@restricted | |
def cmd_cam_stop(bot, update): | |
global IP_WEBCAM_RUNNING | |
# stats(update, 'cam_stop') | |
msg = 'Stopping IP webcam...' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
wake_device() | |
call(['/system/bin/am', 'force-stop', 'com.pas.webcam']) | |
say_done(bot, update) | |
IP_WEBCAM_RUNNING = False | |
@restricted | |
def cmd_battery(bot, update): | |
# stats(update, 'battery') | |
capacity = str(open('/sys/class/power_supply/battery/capacity').read()) | |
status = str(open('/sys/class/power_supply/battery/status').read()) | |
temp = str(int(open('/sys/class/power_supply/battery/batt_temp').read())/10) | |
msg = 'capacity: ' + capacity + 'status: ' + status + 'temp: ' + temp + '°C' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
@restricted | |
def cmd_take_pic(bot, update): | |
# stats(update, 'take_pic') | |
msg = 'Taking a picture:\nplease wait...' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
wake_device() | |
if IP_WEBCAM_RUNNING: | |
msg = '[closing IP Webcam]' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
call(['/system/bin/am', 'force-stop', 'com.pas.webcam']) | |
call(['/system/bin/am', 'start', '-a', 'android.media.action.STILL_IMAGE_CAMERA']) | |
time.sleep(2) | |
call(['/system/bin/sh', '/system/bin/input', 'keyevent', 'KEYCODE_CAMERA']) | |
time.sleep(2) | |
call(['/system/bin/am', 'force-stop', 'org.cyanogenmod.snap']) | |
camera_path = '/sdcard/DCIM/Camera' | |
file_list = os.listdir(camera_path) | |
bot.sendPhoto(update.message.chat_id, photo=open(camera_path + '/' + file_list[0])) | |
if file_list.__len__() > 0: | |
for f in file_list: | |
os.remove(camera_path + '/' + f) | |
if IP_WEBCAM_RUNNING: | |
msg = '[re-opening IP Webcam]' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
call(['/system/bin/am', 'start', '-n', 'com.pas.webcam/.Rolling']) | |
@restricted | |
def cmd_reboot(bot, update): | |
# stats(update, 'reboot') | |
msg = 'Device reboot...' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
call(['reboot']) | |
@restricted | |
def cmd_halt(bot, update): | |
# stats(update, 'halt') | |
msg = 'Device shutdown...' | |
bot.sendMessage(update.message.chat_id, text=msg) | |
call(['reboot', '-p']) | |
""" | |
######################################################################################### | |
""" | |
def echo_non_cmd(bot, update): | |
bot.sendMessage(update.message.chat_id, text='Not a command...') | |
def error_callback(bot, update, error): | |
logging.error('Update "%s" caused error "%s"' % (update, error)) | |
def add_command_handlers(dp): | |
dp.add_handler(CommandHandler('start', cmd_start)) | |
dp.add_handler(CommandHandler('help', cmd_help)) | |
dp.add_handler(CommandHandler('cam_start', cmd_cam_start)) | |
dp.add_handler(CommandHandler('cam_stop', cmd_cam_stop)) | |
dp.add_handler(CommandHandler('battery', cmd_battery)) | |
dp.add_handler(CommandHandler('take_pic', cmd_take_pic)) | |
dp.add_handler(CommandHandler('reboot', cmd_reboot)) | |
dp.add_handler(CommandHandler('halt', cmd_halt)) | |
""" | |
######################################################################################### | |
""" | |
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s - %(message)s', | |
filename=LOG_FILE, | |
level=logging.INFO) | |
os.environ['LD_LIBRARY_PATH'] = '/data/data/com.pas.webcam/lib' | |
# Create the EventHandler and pass it your bot's token. | |
updater = Updater(TOKEN) | |
# Get the dispatcher to register handlers | |
disp = updater.dispatcher | |
add_command_handlers(disp) | |
# on non-command i.e message - echo the message on Telegram | |
disp.add_handler(MessageHandler(Filters.text, echo_non_cmd)) | |
# log all errors | |
disp.add_error_handler(error_callback) | |
# Start the Bot | |
updater.start_polling() | |
# Run the bot until the you presses Ctrl-C or the process receives SIGINT, | |
# SIGTERM or SIGABRT. This should be used most of the time, since | |
# start_polling() is non-blocking and will stop the bot gracefully. | |
updater.idle() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment