Last active
April 26, 2018 13:17
-
-
Save icedraco/6795f5da9be9e70d325c89e2b7a89d69 to your computer and use it in GitHub Desktop.
Polling and Telegram notification script for @KogotsuchiDark at pikabu.ru (with persistence!)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# New article polling script for a pikabu.ru page that notifies via a Telegram | |
# bot or any other BASH command. | |
# | |
# Syntax: | |
# python3 pikabu.py | |
# ./pikabu.py [after `chmod +x pikabu.py`] | |
# | |
# Requirements: | |
# beautifulsoup4 - needed for HTML traversal | |
# | |
# Author: | |
# Artex <[email protected]> | |
import urllib.request | |
import os | |
import bs4 | |
import shelve | |
from datetime import datetime | |
from time import sleep | |
POLL_URL = "https://pikabu.ru/@KogotsuchiDark?page=1" | |
SEEN_PATH = "./pikabu-seen" | |
TELEGRAM_EXEC = "/home/icedragon/bin/telegram.bash" | |
TELEGRAM_MSG_FORMAT = "@KogotsuchiDark%0A{text}" | |
TELEGRAM_MAX_URLS = 3 | |
TELEGRAM_MSG_INTERVAL = 1 # sec | |
HOURS = 3600 # sec | |
SLEEP_ON_ERROR = 2.0 | |
CHECK_INTERVAL = 6 * HOURS | |
SPONSOR_POST_TITLE_SNIPPET = "спонсорский пост" | |
URL_NEWLINE = "%0A" | |
def notify_telegram(s: str): | |
CMDLINE = """{telegram} "{msg}" """.format(telegram=TELEGRAM_EXEC, msg=s) | |
#print(CMDLINE) | |
os.system(CMDLINE) | |
def telegram_string_builder(items): | |
lines = [] | |
for item in items: | |
article_id, title, url = item | |
title = title.replace('"', ' ') # quotes would break the BASH command (see notify_telegram() source) | |
lines.append(" - [{}]({})".format(title, url)) # Markdown | |
return TELEGRAM_MSG_FORMAT.format(text=URL_NEWLINE.join(lines)) | |
def buffered_telegram_notifier(): | |
buffer = [] | |
item = 1 | |
while item: | |
item = yield | |
if item != None: | |
buffer.append(item) | |
if len(buffer) >= TELEGRAM_MAX_URLS: | |
notify_telegram(telegram_string_builder(buffer)) | |
sleep(TELEGRAM_MSG_INTERVAL) | |
buffer = [] | |
if buffer != []: | |
notify_telegram(telegram_string_builder(buffer)) | |
yield | |
def notify(items): | |
if items == []: | |
return | |
notifier = buffered_telegram_notifier() | |
next(notifier) | |
print(">>> NEW POSTS") | |
for item in items: | |
article_id, title, url = item | |
print(" * {}".format(title)) | |
print(" {}".format(url)) | |
print() | |
notifier.send(item) | |
notifier.send(None) | |
def handle(data: str, seen_shelve: shelve.Shelf, is_debug = False): | |
seen_items = seen_shelve['items'] if 'items' in seen_shelve else [] | |
should_notify = is_debug or len(seen_items) > 0 | |
doc = bs4.BeautifulSoup(data, 'html.parser') | |
articles = doc.find_all('article') | |
if articles == []: | |
print("WARNING: NO ARTICLES FOUND! DID WE GET THE DATA?") | |
unseen_articles = [a for a in articles if a.get("data-story-id") not in seen_items] | |
unseen_articles.reverse() # reverse item order to be chronological | |
unseen_ids = [a.get("data-story-id") for a in unseen_articles if a.get("data-story-id")] | |
if should_notify: | |
# gather all items we're going to notify | |
items = [] | |
for article in unseen_articles: | |
article_id = article.get("data-story-id") | |
title = article.header.get_text().strip() if article.header else "???" | |
anchors = [a.get("href") for a in article.find_all("a") if "/story/" in a.get("href")] | |
if anchors != [] and SPONSOR_POST_TITLE_SNIPPET not in title: | |
items.append((article_id, title, anchors[0])) | |
notify(items) | |
# update seen items with new unseen IDs | |
seen_shelve['items'] = seen_items + unseen_ids | |
def dummy_poll(url: str): | |
"""Used for diagnostic purposes to avoid hammering the server""" | |
data = open('dummy1.html', 'r').read() | |
print("DUMMY POLL 1 -> Returning {} bytes...".format(len(data))) | |
yield data | |
while True: | |
data = open('dummy2.html', 'r').read() | |
print("DUMMY POLL 2 -> Returning {} bytes...".format(len(data))) | |
yield data | |
def poll(url: str): | |
while True: | |
ts = datetime.now() | |
print("[{}] > polling page...".format(ts)) | |
try: | |
response = urllib.request.urlopen(url).read() | |
print("[{}] + polling OK".format(ts)) | |
print() | |
yield response | |
except Exception as ex: | |
print("[{}] Fetch failed: {}".format(ts, ex)) | |
sleep(SLEEP_ON_ERROR) | |
def main(url: str): | |
print("--- pikabu poller ----------------------------------") | |
print("URL: " + POLL_URL) | |
with shelve.open(SEEN_PATH) as seen: | |
for data in poll(url): | |
handle(data, seen) | |
sleep(CHECK_INTERVAL) | |
if __name__ == "__main__": | |
main(POLL_URL) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# Obtain TOKEN and CHAT_ID from Telegram API | |
TOKEN="##96#67##:AA__________OULZH_____2doDx______" | |
CHAT_ID="########" | |
URL="https://api.telegram.org/bot$TOKEN/sendMessage" | |
TIME=10 | |
TEXT="$1" | |
curl -s --max-time $TIME -d "parse_mode=Markdown&chat_id=$CHAT_ID&text=$TEXT" $URL > /dev/null |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment