Created
September 19, 2019 13:41
-
-
Save alenbasic/57aa1c1706248ee36e21f57beba6db77 to your computer and use it in GitHub Desktop.
Gets your current external IP and stores it in a DB. Sends a pushover notification when it changes. Designed to be used as a cron job
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python3 | |
import sys, os, ipaddress, sqlite3, requests | |
from tabulate import tabulate #you can download tabulate via pip install | |
DATABASE_FILE = 'FILE_LOCATION_HERE' | |
DATABASE_EXISTS = os.path.isfile(DATABASE_FILE) | |
MESSAGE = 'Script ran successfully. A message should appear here every hour.' | |
IP_CHANGED = 'False' | |
TABLE_CREATION_SCRIPT = 'CREATE TABLE ip_address \ | |
(ip_address text,\ | |
provider text, \ | |
date text\ | |
)' | |
TABLE_ERROR_CREATION_SCRIPT = 'CREATE TABLE error_log \ | |
(provider text, \ | |
date text\ | |
)' | |
TABLE_LOG_CREATION_SCRIPT = 'CREATE TABLE run_log \ | |
(message text, \ | |
ip_changed text,\ | |
date text\ | |
)' | |
TABLE_INSERTION_SCRIPT = "INSERT INTO ip_address VALUES (?,?,datetime('now','localtime'))" | |
TABLE_ERROR_INSERTION_SCRIPT = "INSERT INTO error_log VALUES (?,datetime('now','localtime'))" | |
TABLE_LOG_INSERTION_SCRIPT = "INSERT INTO run_log VALUES (?,?, datetime('now','localtime'))" | |
TABLE_SELECTION_SCRIPT = 'select ip_address, max(date) from ip_address' | |
TABLE_LOG_SELECTION_SCRIPT = 'select ip_changed, date from run_log WHERE [date] >= date("now", "-1 days") order by date asc' | |
TABLE_ERROR_SELECTION_SCRIPT = 'select provider, date from error_log WHERE [date] >= date("now", "-1 days") order by date asc' | |
conn = sqlite3.connect(DATABASE_FILE) | |
c = conn.cursor() | |
dig_commands = [ | |
{'server': 'Open DNS Resolver 1', 'command': 'dig +short myip.opendns.com @resolver1.opendns.com'}, | |
{'server': 'Open DNS Resolver 2', 'command': 'dig +short myip.opendns.com @resolver2.opendns.com'}, | |
{'server': 'Open DNS Resolver 3', 'command': 'dig +short myip.opendns.com @resolver3.opendns.com'}, | |
{'server': 'Open DNS Resolver 4', 'command': 'dig +short myip.opendns.com @resolver4.opendns.com'}, | |
{'server': 'Google', 'command': 'dig @ns1.google.com TXT o-o.myaddr.l.google.com +short'} | |
] | |
# pushover stuff below | |
USER_TOKEN = 'USER_TOKEN_HERE' | |
APP_TOKEN = 'APP_TOKEN_HERE' | |
def send_pushover_notification(title, message, priority='1'): | |
data = {'token': APP_TOKEN, 'user': USER_TOKEN, 'title': title, 'message': message, 'priority': priority} | |
r = requests.post('https://api.pushover.net/1/messages.json', data=data, headers={'User-Agent': 'Python'}) | |
# pushover stuff above | |
def create_database(): | |
c.execute(TABLE_CREATION_SCRIPT) | |
c.execute(TABLE_ERROR_CREATION_SCRIPT) | |
c.execute(TABLE_LOG_CREATION_SCRIPT) | |
conn.commit() | |
def insert_data(script, values): | |
c.execute(script, values) | |
conn.commit() | |
def get_latest_recorded_ip(): | |
c.execute(TABLE_SELECTION_SCRIPT) | |
fetched = c.fetchall()[0] | |
return fetched[0] | |
def print_latest_recorded_ip(): | |
print("Your current IP address is: {}".format(get_latest_recorded_ip())) | |
def get_ip_address(): | |
for dig in dig_commands: | |
try: | |
ip = os.popen(dig['command']+ " 2> /dev/null").read().strip() | |
ip = ip.replace('"',"") | |
ipaddress.ip_address(ip) | |
return (ip, dig['server']) | |
except ValueError: | |
insert_data(TABLE_ERROR_INSERTION_SCRIPT, tuple([dig['server']])) | |
send_pushover_notification("SERVER_NAME_HERE", "Unable to get IP from any providers") | |
return tuple(['FAILURE']) | |
def print_logs(): | |
print_latest_recorded_ip() | |
print() | |
c.execute(TABLE_LOG_SELECTION_SCRIPT) | |
fetched = c.fetchall() | |
print("### Run Log ###\n") | |
print(tabulate(fetched, ['IP Changed', 'Date & Time of Check'])) | |
c.execute(TABLE_ERROR_SELECTION_SCRIPT) | |
fetched = c.fetchall() | |
if len(fetched) > 0: | |
print("### Error Log ###\n") | |
print(tabulate(fetched, ['Server', 'Date & Time of Error'])) | |
if len(sys.argv) > 1: | |
if sys.argv[1].lower() == 'show': | |
print_latest_recorded_ip() | |
exit() | |
if sys.argv[1].lower() == 'log': | |
print_logs() | |
exit() | |
if DATABASE_EXISTS: | |
values = get_ip_address() | |
if values[0] != 'FAILURE' and values[0] != get_latest_recorded_ip(): | |
IP_CHANGED = 'True' | |
insert_data(TABLE_INSERTION_SCRIPT, values) | |
send_pushover_notification("SERVER_NAME_HERE: IP Updated", "IP Address is now: {}".format(values[0])) | |
else: | |
create_database() | |
values = get_ip_address() | |
if values[0] != 'FAILURE': | |
IP_CHANGED = 'True' | |
insert_data(TABLE_INSERTION_SCRIPT, values) | |
send_pushover_notification("SERVER_NAME_HERE: IP Updated", "IP Address is now: {}".format(values[0])) | |
insert_data(TABLE_LOG_INSERTION_SCRIPT, (MESSAGE,IP_CHANGED)) | |
conn.close() |
Didn't think of that. This changedversion will now write the db, if the file exists but hasn't had a db written to it yet. Thanks for the script.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Been a while since I used this, but it checks to see if a database exists and if it doesn't it creates it along with the necessary schema (at least that's how it should work). If you created a database it assumes it has the schema already which is why it errors out. If you try running it without the db it should create it and work. All you need to do is give it a filename to use/check for.
In any case, glad it's of some help to someone other than myself :)