Last active
April 1, 2016 07:41
-
-
Save notwa/5d495d32adf251fefb9b68f850ee9357 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from retry import retry | |
import requests, requests.exceptions | |
class StatusCodeError(Exception): | |
def __init__(self, code, url): | |
self.code = code | |
self.url = url | |
def __str__(self): | |
return 'request for {} returned status code {}'.format(self.url, self.code) | |
@retry((requests.exceptions.ConnectionError, StatusCodeError, ValueError), tries=6, wait=300) | |
def get(uri, json=False): | |
r = requests.get(uri) | |
if r.status_code != 200: | |
raise StatusCodeError(r.status_code, uri) | |
if json: | |
return r.json() | |
return r | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# so damn useful it deserved its own file | |
import time | |
def retry(Exceptions, tries=10, wait=1): | |
if type(Exceptions) == Exception: | |
Exceptions = (Exceptions,) | |
def retryer(f): | |
def deco(*args, **kwargs): | |
for i in range(tries - 1): | |
try: | |
return f(*args, **kwargs) | |
except Exceptions: | |
time.sleep(wait) | |
return f(*args, **kwargs) | |
return deco | |
return retryer |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
from urllib.parse import quote_plus | |
from xml.dom.minidom import parseString as parseXML | |
import datetime | |
from get import get | |
lament = lambda *args, **kwargs: print(*args, file=sys.stderr, **kwargs) | |
parseDate = lambda s: datetime.datetime.strptime(s+'+0000', '%Y-%m-%dT%H:%M:%SZ%z') | |
formatDate = lambda dt: dt.strftime('%FT%TZ') | |
# we only need a handful of mime types so we may as well inline them | |
mimes = { | |
'png': 'image/png', | |
'jpg': 'image/jpeg', | |
'gif': 'image/gif', | |
'swf': 'application/x-shockwave-flash', | |
} | |
class Untitled: | |
template = """ | |
<?xml version="1.0" encoding="utf-8" standalone="yes"?> | |
<feed xmlns="http://www.w3.org/2005/Atom"> | |
</feed> | |
""".strip() | |
def __init__(self, urls, max_entries=512): | |
self.urls = urls | |
self.max_entries = max_entries | |
self.title = 'Danbooru - Personalized Feed' | |
self.items = [] | |
def parse(self, q): | |
url = self.urls['atom'] + quote_plus(q) | |
xml = get(url).text | |
dom = parseXML(xml) | |
entries = dom.getElementsByTagName('entry') | |
for entry in entries: | |
getText = lambda tn: entry.getElementsByTagName(tn)[0].firstChild.nodeValue | |
item = { | |
'title': getText('title'), | |
'id': getText('id').split('/')[-1], | |
'updated': getText('updated'), | |
'summary': getText('summary'), | |
'img': entry.getElementsByTagName('img')[0].getAttribute('src'), | |
'query': q, | |
} | |
item['updated_unix'] = parseDate(item['updated']).timestamp() | |
self.items.append(item) | |
def generate(self): | |
self.items = sorted(self.items, key=lambda d: d['updated_unix'], reverse=True) | |
self.items = self.items[:self.max_entries] | |
now = formatDate(datetime.datetime.utcnow()) | |
dom = parseXML(self.template) | |
feed = dom.firstChild | |
def newText(entity_name, text): | |
e = dom.createElement(entity_name) | |
e.appendChild(dom.createTextNode(text)) | |
return e | |
def newLink(**kwargs): | |
link = dom.createElement('link') | |
for k, v in kwargs.items(): | |
link.setAttribute(k, v) | |
return link | |
feed.appendChild(newText('title', self.title)) | |
feed.appendChild(newLink(href=self.urls['feed'], rel='self')) | |
feed.appendChild(newText('id', self.urls['feed'])) | |
feed.appendChild(newText('updated', now)) | |
for item in self.items: | |
ext = item['img'].split('.')[-1] | |
mime = mimes[ext] | |
alt = self.urls['post'] + item['id'] | |
query_quote = quote_plus(item['query']) | |
entry = dom.createElement('entry') | |
entry.appendChild(newText('title', item['title'])) | |
entry.appendChild(newLink(href=alt, rel="alternate")) | |
entry.appendChild(newText('id', alt)) | |
entry.appendChild(newText('published', item['updated'])) | |
entry.appendChild(newText('updated', item['updated'])) | |
entry.appendChild(newLink(rel="enclosure", type=mime, href=item['img'])) | |
entry.appendChild(newText('summary', item['summary'])) | |
author = dom.createElement('author') | |
author.appendChild(newText('name', item['query'])) | |
author.appendChild(newText('uri', self.urls['query'] + query_quote)) | |
entry.appendChild(author) | |
feed.appendChild(entry) | |
return dom.toxml() | |
urls = { | |
'atom': 'https://danbooru.donmai.us/posts.atom?limit=48&tags=', | |
'post': 'https://danbooru.donmai.us/posts/', | |
'img': 'https://danbooru.donmai.us/ssd/data/preview/', | |
'query': 'https://danbooru.donmai.us/posts?tags=', | |
} | |
if __name__ == '__main__': | |
urls['feed'] = sys.argv[1] | |
untitled = Untitled(urls) | |
queries = sys.stdin.read() | |
for q in queries.splitlines(): | |
lament(q) | |
untitled.parse(q) | |
print(untitled.generate()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment