Created
May 21, 2013 15:59
-
-
Save GermainZ/5620946 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
import feedparser | |
import urllib | |
from dateutil import parser, tz, relativedelta | |
from datetime import datetime, tzinfo, timedelta | |
from HTMLParser import HTMLParser | |
from re import search | |
import logging | |
# The next line adds some debugging messages, mostly related to HTMLParser | |
# logging.basicConfig(level=logging.DEBUG) | |
mailing_lists = ["https://mailman.archlinux.org/pipermail/arch-dev-public/", | |
"https://mailman.archlinux.org/pipermail/arch-general/"] | |
rss_feeds = ["http://www.archlinux.org/feeds/news/"] | |
colors = {'title': '\033[36m', 'script': '\033[35m', 'normal': '\033[0m', | |
'code': '\033[32m', 'cite': '\033[34m', 'blockquote': '\033[34;1m'} | |
UTC = tz.tzutc() | |
class MLStripper(HTMLParser): | |
"""Defines how the HTMLParser handles data, special characters and tags""" | |
def __init__(self): | |
self.reset() | |
self.fed = [] | |
def handle_data(self, d): | |
"""Eat normal text""" | |
self.fed.append(d) | |
def handle_starttag(self, tag, attrs): | |
"""Handle start of BBCode""" | |
if tag == "code": | |
self.fed.append(colors['code']) | |
elif tag == "cite": | |
self.fed.append(colors['cite']) | |
elif tag == "blockquote": | |
self.fed.append(colors['blockquote']) | |
else: | |
logging.info("Unhandled start tag: '%s'" % tag) | |
def handle_endtag(self, tag): | |
"""Handle end of BBCode""" | |
if tag == "p": | |
self.fed.append("\n") | |
elif tag == "code": | |
self.fed.append("%s" % colors['normal']) | |
elif tag == "cite": | |
self.fed.append("%s\n" % colors['normal']) | |
elif tag == "blockquote": | |
self.fed.append("%s\n" % colors['normal']) | |
elif tag == "br": | |
self.fed.append("\n") | |
else: | |
logging.info("Unhandled end tag: '%s'" % tag) | |
def handle_charref(self, name): | |
"""Handle character references to properly display some characters""" | |
if name == "039": | |
self.fed.append("'") | |
else: | |
logging.info("Unhandled character reference: '%s'" % name) | |
def get_data(self): | |
return ''.join(self.fed) | |
def color(s, color='script'): | |
"""Returns string s, colored in 'color'""" | |
return "%s%s%s" % (colors[color], s, colors['normal']) | |
def print_line(): | |
"""Prints horizontal line""" | |
print color('-------------\n\n') | |
def get_last_update(): | |
"""Returns the last pacman update line.""" | |
for line in reversed(open("/var/log/pacman.log").readlines()): | |
if "starting full system upgrade" in line: | |
last_update = line[1:17] | |
last_update = parser.parse(last_update).replace(tzinfo=UTC) | |
return last_update | |
def get_url(i, url_original): | |
"""Generates the mailing list's url for the current or previous | |
months. | |
The usual format is "https://path/to/list/YYYY-Month.txt" | |
""" | |
to_check = datetime.now() + relativedelta.relativedelta(months=-i) | |
url_append = to_check.strftime("%Y-%B.txt") | |
url = ''.join([url_original, url_append]) | |
return url | |
def strip_tags(html): | |
"""Strips HTML tags.""" | |
s = MLStripper() | |
s.feed(html) | |
return s.get_data() | |
def get_update_diff(last_update): | |
"""Check the difference, in months, since the last update and the current | |
date. | |
""" | |
update_diff = (int(datetime.now().strftime("%m")) | |
- int(last_update.strftime("%m"))) | |
if update_diff >= 2: | |
update_diff = 2 | |
print color(("You haven't updated for at least two months." | |
"I'll only check the last two months of mailing lists.")) | |
return update_diff | |
def get_rss_feeds(update_diff): | |
"""Check each of the defined RSS feeds for new content.""" | |
for feed in rss_feeds: | |
print color("Checking %s..." % feed) | |
feed = feedparser.parse(feed) | |
found = False | |
for i, entry in enumerate(feed['entries']): | |
try: | |
msg_date = parser.parse(entry['published']) | |
except KeyError: | |
msg_date = parser.parse(entry['updated']) | |
if msg_date > last_update: | |
print "[%s] Thread: %s" % (i, color(entry['title'], 'title')) | |
print " Date: %s" % msg_date | |
found += 1 | |
if not found: | |
print color("No new feeds found.") | |
else: | |
# Keep getting the user's input to show entries' summaries. | |
# | |
# If the user inputs nothing, the script continues to the next RSS | |
# feed, if it exists. | |
msg = color("Expand entry: ") | |
while True: | |
print_line() | |
entry = raw_input(msg) | |
try: | |
entry = int(entry) | |
except ValueError: | |
print color("Continuing...") | |
break | |
try: | |
if entry >= 0 and entry < found: | |
html = feed['entries'][entry]['summary'] | |
else: | |
raise IndexError | |
except IndexError: | |
print "Not an entry." | |
continue | |
print strip_tags(html) | |
print_line() | |
def read_mail(mailing, entry): | |
stop = False | |
skip = True | |
i = 0 | |
while not stop: | |
try: | |
line = mailing[entry + i].rstrip() | |
except IndexError: | |
break | |
i += 1 | |
if search("From .* at .*", line): | |
break | |
if not skip: | |
print line | |
if line.startswith("Message-ID: <") and line.endswith(">"): | |
skip = False | |
def get_mailing_lists(update_diff): | |
""" Check each of the defined mailing lists.""" | |
for url_original in mailing_lists: | |
# Possibly get prior months, too. | |
for c in range(update_diff, -1, -1): | |
url = get_url(c, url_original) | |
print color("Checking %s..." % url) | |
mailing = urllib.urlopen(url).readlines() | |
found = 0 | |
found_msg = [] | |
i = 0 | |
for line, line2 in zip(mailing[0::2], mailing[1::2]): | |
i += 2 | |
if line.startswith('Date:'): | |
date_str = line.rstrip()[6:] | |
msg_date = parser.parse(date_str).replace(tzinfo=UTC) | |
if msg_date > last_update: | |
found_msg.append(i) | |
msg_title = line2.rstrip()[9:] | |
print "[%s] Subject: %s" % (found, color(msg_title, | |
'title')) | |
print " %s" % line.rstrip() | |
found += 1 | |
if found == 0: | |
print color("No new mails found.") | |
else: | |
msg = color("Expand entry: ") | |
while True: | |
print_line() | |
entry = raw_input(msg) | |
try: | |
entry = int(entry) | |
except ValueError: | |
print color("Continuing...") | |
break | |
try: | |
if entry >= 0 and entry < found: | |
read_mail(mailing, found_msg[entry]) | |
else: | |
raise IndexError | |
except IndexError: | |
print "Not an entry." | |
continue | |
# Get the last update's date, if it exists. | |
last_update = get_last_update() | |
if last_update: | |
print color("Last update: %s" % (last_update)) | |
else: | |
print color("First update - Exiting. Update once manually first.") | |
exit(0) | |
update_diff = get_update_diff(last_update) | |
print_line() | |
get_rss_feeds(update_diff) | |
get_mailing_lists(update_diff) | |
print_line() | |
print color("Done!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment