Created
September 5, 2010 22:36
-
-
Save d33tah/566394 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
from __future__ import print_function | |
""" | |
Biblioteka dla NK pod Pythona. | |
BY d33tah, LICENSED UNDER WTFPL. | |
""" | |
#quick bpython pastie: | |
#from pynk import * ; from config import * ; nk = PyNK() ; nk.login(login,password) | |
""" | |
TODO (toclean): | |
* JSON exception for NK_profile.get_friends() | |
TEST: double-get friends | |
TEST: NK_forum.__eq__ | |
""" | |
import os | |
import json | |
import mechanize | |
from lxml import html | |
import urllib | |
nklink = "http://nk.pl" | |
debug_on = False | |
class debug_colors: | |
PURPLE = '\033[95m' | |
BLUE = '\033[94m' | |
GREEN = '\033[92m' | |
YELLOW = '\033[93m' | |
RED = '\033[91m' | |
WHITE = '\033[0m' | |
def set_debug(debug): | |
global debug_on | |
debug_on = debug | |
def debug(param): | |
global debug_on | |
if debug_on: | |
print(param,end='') | |
def pynk_debug(param): | |
debug(debug_colors.PURPLE+param+debug_colors.WHITE) | |
def t(obj): | |
""" | |
t(obj) - calls obj.text_content() to avoid the long form | |
""" | |
return obj.text_content() | |
class E_UserBanned(Exception): | |
"""E_UserBanned - exception that is raised when there was an attempt to retrieve data of a banned user.""" | |
pass | |
class NK_photo(object): | |
""" | |
NK_photo - carries information about a specific user's photo | |
""" | |
def __init__(self,nk,url,image_url='',thumb_url=''): | |
self.nk = nk | |
self.url = url | |
self.image_url = image_url | |
self.thumb_url = thumb_url | |
def get_image_url(self): | |
if self.image_url: | |
return self.image_url | |
else: | |
tree = self.nk.get_html(self.url,cached=self.nk.get_cache_cfg("PHOTO")) | |
image_url = tree.xpath('//img [@id="photo_img"]') | |
if image_url: | |
self.image_url = image_url[0].get('src') | |
return self.image_url | |
class NK_forum_post(object): | |
""" | |
NK_forum_post - carries information about a specific thread post. | |
""" | |
def __init__(self,date,contents,author): | |
self.date = date | |
self.contents = contents | |
self.author = author | |
def __repr__(self): | |
return "<NK_forum_post: %s, %s>" % (self.date, self.author) | |
class NK_forum_thread(object): | |
""" | |
NK_forum_thread - carries information about a specific froum thread. | |
""" | |
def __init__( | |
self, | |
nk, | |
title, | |
url, | |
started_author, | |
started_time, | |
posts_count, | |
lastpost_summary, | |
lastpost_author, | |
lastpost_date): | |
self.nk = nk | |
self.title = title | |
self.url = url | |
self.started_author = started_author | |
self.started_time = started_time | |
self.posts_count = posts_count | |
self.lastpost_summary = lastpost_summary | |
self.lastpost_author = lastpost_author | |
self.lastpost_date = lastpost_date | |
def __repr__(self): | |
if self.title and self.url and self.posts_count: | |
return "<NK_forum_thread: title=%s, url=%s [%s]>" % ( | |
self.title.encode('utf-8'), | |
self.url.replace(nklink,''), | |
self.posts_count) | |
else: | |
return "<NK_forum_thread [%d]>" % id(self) | |
def __eq__(self): | |
if isinstance(obj1,NK_forum_thread) and isinstance(obj2,NK_forum_thread): | |
return obj1.url == obj2.url | |
def get_posts(self): | |
""" | |
get_posts(self) - fetches a list of posts in a specified | |
thread | |
""" | |
ret = [] | |
nextpage = self.url #BUG: test this one! | |
while 1: | |
thread_tree = self.nk.get_html(nextpage,cached=self.nk.get_cache_cfg("POSTS")) | |
posts = thread_tree.xpath('//div [@class="post"]') | |
for post in posts: | |
ret.append(NK_forum_post( | |
date = t(post.find('.//div[@class="datetime"]')), | |
contents = t(post.find('.//div[@class="post_content"]')), | |
author = NK_profile( | |
nk = self.nk, | |
name = t(post.find('.//div[@class="avatar_no_js "]/a/span[@class="avatar_user_name_txt"]')), | |
location = t(post.find('.//div[@class="avatar_no_js "]/a/span[@class="avatar_user_city"]')), | |
url = nklink+post.find('.//div[@class="avatar_no_js "]/a').get('href'), | |
friends_count = t(post.find('.//div[@class="avatar_no_js "]/div[@class="avatar_bar"]')), | |
) | |
)) | |
nextpage_node = thread_tree.xpath('.//a [contains(@title, "pna")]') | |
if nextpage_node: | |
nextpage = nextpage_node[0].get('href') | |
else: | |
break | |
return ret | |
class NK_forum(object): | |
""" | |
NK_forum - carries information about a specific forum | |
""" | |
def __init__(self,nk,url,school_name=''): | |
self.nk = nk | |
self.url = url | |
self.school_name = school_name | |
def __eq__(obj1,obj2): | |
if isinstance(obj1,NK_forum) and isinstance(obj2,NK_forum): | |
return obj1.url == obj2.url | |
def __repr__(self): | |
if self.school_name: | |
return "<NK_forum: school_name=%s>" % self.school_name | |
elif self.url: | |
return "<NK_forum: url=%s>" % self.url | |
else: | |
return "<NK_forum [%d]>" % id(self) | |
def get_threads(self): | |
""" | |
get_forum_threads(self) - fetches a list of threads | |
""" | |
ret = []; page = 1 | |
nextpage = self.url #BUG: test this one! | |
while 1: | |
forum_tree = nk.get_html(nextpage,cached=self.nk.get_cache_cfg("THREADS")) | |
threads = forum_tree.xpath('//div [@id="threads"]//tr[contains(@class,"thread")]') | |
for thread in threads: | |
ret.append(NK_forum_thread( | |
nk = self.nk, | |
title = t(thread.find('.//div[@class="name"]/a')), | |
url = nklink+thread.find('.//div[@class="name"]/a').get('href')[1:], | |
started_author = NK_profile(nk=self.nk, | |
name=t(thread.find('.//div[@class="author"]/a')), | |
url=thread.find('.//div[@class="author"]/a').get('href') | |
), | |
started_time = t(thread.find('.//div[@class="datetime"]')), | |
posts_count = int(t(thread.find('.//td[@class="count"]'))), | |
lastpost_summary = t(thread.find('.//span[@class="demo"]')), | |
lastpost_author = NK_profile(nk=self.nk, | |
name=t(thread.find('.//td[@class="last_post"]//a//span[@class="author"]')).strip('\n '), | |
url=thread.find('.//div[@class="author"]/a').get('href') | |
), | |
lastpost_date = t(thread.find('.//span[@class="datetime"]')), | |
)) | |
nextpage_node = forum_tree.xpath('.//a [contains(@title, "pna")]') | |
if nextpage_node: | |
nextpage = nextpage_node[0].get('href') | |
else: | |
break | |
return ret | |
class NK_profile_class(object): | |
""" | |
NK_profile_class - carries an information about a single class a given user belongs to | |
""" | |
def __init__(self,nk,url,name,school,year_start,year_finish): | |
self.nk = nk | |
self.url = url | |
self.name = name | |
self.school = school | |
self.year_start = year_start | |
self.year_finish = year_finish | |
def __eq__(obj1,obj2): | |
if isinstance(obj1,NK_profile_class) and isinstance(obj2,NK_profile_class): | |
return \ | |
obj1.url == obj2.url and \ | |
obj1.name == obj2.name and \ | |
obj1.school == obj2.school and \ | |
obj1.year_start == obj2.year_start and \ | |
obj1.year_finish == obj2.year_finish | |
def get_members(self): | |
ret = [] | |
tree = self.nk.get_html(self.url,cached=self.nk.get_cache_cfg("CLASS")) | |
members = tree.xpath('//div [@id="dziennik"]//div [@class="student student_expanded"]') | |
for member in members: | |
ret.append(NK_profile( | |
nk = self.nk, | |
name = t(member.find('.//a[@class="student_link"]')), | |
location = t(member.find('.//div[@class="city_content"]')), | |
friends_count = t(member.find('.//div[@class="button"]/span')), | |
url = member.find('.//a[@class="student_link"]').get('href'), | |
)) | |
return ret | |
class NK_profile_shout(object): | |
""" | |
NK_profile_shout - carries an information about user's single shout in a shoutbox | |
""" | |
def __init__(self,contents='',datetime=''): | |
self.contents = contents | |
self.datetime = datetime | |
def __repr__(self): | |
return "<NK_profile_shout: len(contents)=%d, datetime=%s>" % (len(self.contents),self.datetime) | |
class NK_profile_details(object): | |
"""` | |
NK_profile_details - contains user's profile details that could be loaded from his profile page | |
""" | |
def __init__(self,age,nick,sex,phone,classes,shouts,avatar,photos_count): | |
self.age = age | |
self.nick = nick | |
self.sex = sex | |
self.phone = phone | |
self.classes = classes | |
self.shouts = shouts | |
self.avatar = avatar | |
self.photos_count = photos_count | |
class NK_profile(object): | |
""" | |
NK_profile - carries information about people profiles. | |
""" | |
def __init__(self,nk, | |
name=None, | |
location=None, | |
url=None, | |
friends_count=None, | |
uid=None): | |
self.nk = nk | |
self.name = name | |
self.location = location | |
self.friends_count = friends_count | |
self.friends = [] | |
self.details = None | |
if uid and not url: | |
self.uid = uid | |
self.url = nklink+'/profile/%d' % uid | |
elif url and not uid: | |
self.url = url | |
self.uid = url[url.rfind('/')+1:] | |
else: | |
self.url = url | |
self.uid = uid | |
def __repr__(self): | |
if self.name and self.location and self.url: | |
return "<NK_profile: name=%s, location=%s, url=%s>" % (unicode(self.name),unicode(self.location),self.url) | |
else: | |
return "<NK_profile [%d]>" % id(self) | |
def __eq__(obj1,obj2): | |
if isinstance(obj1,NK_profile) and isinstance(obj2,NK_profile): | |
return obj1.url == obj2.url \ | |
or obj1.uid == obj2.uid | |
def get_friends(self): | |
""" | |
get_friends() - fetches friends list for a given user. | |
""" | |
if not self.friends: | |
json_url = 'http://nk.pl/friends_list/%s/575/0/0?t=%s' % (self.uid,self.nk.basic_auth) | |
json_data = json.loads(self.nk.get_url(json_url,cached=self.nk.get_cache_cfg("FRIENDS_LIST"))[3:]) | |
for i, uid in enumerate(json_data["UID"]): | |
name = "%s %s" % (json_data["FIRST_NAME"][i],json_data["LAST_NAME"][i]) | |
location = json_data["CITY"][i] | |
url = nklink+"/profile/%s" % uid | |
friends_count = json_data["FRIENDS_COUNT"][i] | |
self.friends.append(NK_profile( | |
nk=self.nk, | |
name=name, | |
location=location, | |
url = url, | |
friends_count = friends_count,) | |
) | |
return self.friends | |
def get_details(self): | |
""" | |
get_details() - loads user's profile page and returns the details | |
""" | |
if not self.details or not all((self.name,self.location,self.friends_count)): | |
tree = self.nk.get_html(self.url,cached=self.nk.get_cache_cfg("PROFILE")) | |
if tree.xpath(u'//p [contains(.,"Przepraszamy, nie możesz obecnie gościć na tym profilu, ponieważ został on zablokowany.")]'): | |
raise E_UserBanned() | |
data = tree.xpath('//table [@class="profile_info_box"]//td [contains(@class, "content")]') | |
shouts = [] | |
for shout in tree.xpath('//div [@id="comments"]//table [@class="comment_table"]'): | |
contents = t(shout.find_class('comment_content')[0]) | |
datetime = t(shout.find_class('datetime')[0]) | |
shouts.append(NK_profile_shout(contents=contents,datetime=datetime)) | |
classes = [] | |
for school in tree.xpath('//li [@class="school"]'): | |
school_name = t(school.xpath('.//div//a')[0]) | |
for class_node in school.xpath('.//li//a [@class="user_class"]'): | |
class_details = t(class_node).split(' (') | |
class_url = class_node.get('href') | |
name = class_details[0] | |
if len(class_details)==2: | |
years = class_details[1].split('-') | |
year_start = years[0] | |
year_finish = years[1][:-1] | |
else: | |
year_start = year_finish = 0 | |
classes.append(NK_profile_class(self.nk,class_url,name,school_name,year_start,year_finish)) | |
get_detail = lambda param: t(tree.xpath('//table [@class="profile_info_box"]//td [contains(@class, "label")' + \ | |
'and contains(., "%s")]' % param)[0].getnext()) | |
self.name="%s %s" % ( get_detail(u'Imię'), get_detail('Nazwisko') ) | |
self.location=get_detail(u'Miejscowość') | |
self.friends_count=t(tree.xpath('//div[@class="ikonki"]/a[contains(@title," znajomych")]/span')[0])[2:] | |
photos_count_el =tree.xpath(u'//div[@class="ikonki"]/a[contains(@title,"Galeria zdjęć")]/span') | |
if photos_count_el: | |
photos_count = t(photos_count_el[0])[2:] | |
else: | |
photos_count = 0 | |
avatar_url = nklink+tree.xpath(u'//div [@class="profil_avatar"]//div [contains(@class,"avatar")]//a [contains(@title,"Pokaż profil")]')[0].get('href') | |
avatar_thumb_url=tree.xpath('//div [@class="profil_avatar"]//div [contains(@class,"avatar")]//'+ \ | |
u'img [contains(@alt,"Pokaż profil")]')[0].get('src') | |
self.details = NK_profile_details( | |
age=get_detail('Wiek').strip(' lat'), | |
nick=get_detail('Pseudonim'), | |
sex=get_detail(u'Płeć'), | |
phone=get_detail('Telefon'), | |
classes=classes, | |
shouts=shouts, | |
avatar=NK_photo(nk=self.nk,thumb_url=avatar_thumb_url,url=avatar_url), | |
photos_count=photos_count, | |
) | |
return self.details | |
def get_photos(self,first_only=False): | |
""" | |
get_photos(self,first_only=False) - reads user's album lists and returns the photos | |
""" | |
url = nklink+'/profile/%s/gallery'%self.uid | |
tree = self.nk.get_html(url,cached=self.nk.get_cache_cfg("GALLERY")) | |
albums = tree.xpath('//div [@id="albums_list_wo_js"]//p [@class="album_name"]//a') | |
if len(albums) > 1: print("W: len(albums) > 1: "+unicode(self)) | |
#TODO: UGLY! | |
album_trees = [] | |
if not albums: | |
album_trees.append(tree) | |
else: | |
for album in albums: | |
album_trees.append(self.nk.get_html(nklink+album.get('href'),cached=self.nk.get_cache_cfg("GALLERY"))) | |
if first_only: break | |
ret = [] | |
for album_tree in album_trees: | |
while True: | |
for album_photo in album_tree.xpath('//a [@class="album_photo"]'): | |
thumb_url = album_photo.xpath('.//img [contains(@alt,"miniaturka zdj")]')[0].get('src') | |
href = nklink+album_photo.get('href') | |
ret.append(NK_photo(nk=self.nk,url=href,thumb_url=thumb_url)) | |
nextpage = album_tree.xpath(u'//a [contains(@title,"Następna strona")]') | |
if nextpage and not first_only: | |
url = nklink+nextpage[0].get('href') | |
album_tree = self.nk.get_html(url,cached=self.nk.get_cache_cfg("GALLERY")) | |
else: | |
break | |
return ret | |
#return (tree,ret) | |
class PyNK(object): | |
""" | |
PyNK - Nasza-Klasa Python API. | |
""" | |
def __init__ (self): | |
self.br = mechanize.Browser() | |
user_agent = 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)' | |
self.br.addheaders = [("User-agent",user_agent)] | |
self.br.set_handle_robots(False) | |
#self.br.viewing_html=lambda: True | |
self.cache_cfg = {} | |
self.cache_cfg["FRIENDS_LIST"] = False | |
def login(self,username,password): | |
""" | |
login(self) - logs in using passed parameters | |
""" | |
self.br.open(nklink) | |
self.br.select_form(nr=0) | |
self.br.form["login"]=username | |
self.br.form["password"]=password | |
self.br.submit() | |
dashboard_html = self.br.response().read() | |
if dashboard_html.find('Witaj') == -1: | |
raise Exception('Blad logowania, zle haslo?') | |
self.basic_auth = self.br._ua_handlers['_cookies'].cookiejar._cookies[".nk.pl"]["/"]["basic_auth"].value | |
#self.br._ua_handlers['_cookies'].cookiejar._cookies[".nk.pl"]["/"]["js_enabled"].value = '1' | |
self.my_profile = self.get_my_profile(dashboard_html) | |
self.watched_forums = self.get_my_watched_forums(dashboard_html) | |
def logout(self): | |
self.get_url(nklink+'/logout?l=1',cached=False) | |
self.br._ua_handlers['_cookies'].cookiejar.clear() | |
def get_my_profile(self,dashboard_html): | |
""" | |
get_my_profile(self) - fetches information about logged in user from his dashboard | |
""" | |
box_tree = html.fromstring(dashboard_html) | |
box = box_tree.xpath('.//div[contains(@class,"profile_box")]//div[@class="avatar_no_js "]')[0] | |
return NK_profile( | |
nk = self, | |
name = t(box.find('./a/span[@class="avatar_user_name_txt"]')), | |
location = t(box.find('./a/span[@class="avatar_user_city"]')), | |
url = nklink+box.find('./a').get('href'), | |
friends_count = t(box.find('./div[@class="avatar_bar"]//span')) | |
) | |
def get_my_watched_forums(self, dashboard_html, unread_only=False): | |
""" | |
get_my_watched_forums(self, unread_only=False) - extracts information | |
about logged in user's watched forums. | |
""" | |
ret = [] | |
for entry in html.fromstring(dashboard_html).xpath('//ul [@id="forum_max"]//li'): | |
if not unread_only or not entry.find('span').get('class').find('unread'): | |
ret.append(NK_forum( | |
nk = self, | |
school_name = t(entry.find('a')), | |
url = nklink+entry.find('a').get('href')) | |
) | |
return ret | |
def cache_filename(self,url): | |
""" | |
cache_filename(self,url) - converts a given url to a filename placed in "cache" directory. | |
creates the directory if it doesn't exist. | |
""" | |
if not os.path.isdir('cache'): | |
os.makedirs('cache') | |
return 'cache/'+urllib.quote(url.replace('/',r'%2F')) | |
def get_url(self,url,cached=True): | |
""" | |
get_url(self,url,cached=True) - fetches data either from the cache or from URL | |
""" | |
pynk_debug("PYNK: get_url: %s " % url) | |
if not cached: | |
pynk_debug("NO CACHE\n") | |
return self.br.open(url).read() | |
else: | |
filename = self.cache_filename(url) | |
if os.path.exists(filename): | |
pynk_debug("CACHE FOUND\n") | |
return open(filename).read() | |
else: | |
pynk_debug("CACHE NOT FOUND\n") | |
ret = self.br.open(url).read() | |
open(filename,'w').write(ret) | |
return ret | |
def get_html(self,url,cached=True): | |
""" | |
get_html(self,url,cached=Tree) - fetches data either from the cache or from URL | |
and returns them parsed by lxml.html | |
""" | |
return html.fromstring(self.get_url(url,cached)) | |
def get_cache_cfg(self,param): | |
""" | |
get_cache_cfg(self,param) - reads self.cache_cfg dictionary and returns either False | |
or its contents for a given param | |
""" | |
ret = self.cache_cfg.get(param) | |
if ret is not None: | |
return ret | |
else: | |
return False | |
if __name__ == '__main__': | |
from sys import exit | |
from config import * #my config.py actually contains just "login" and "password" variables defined | |
#nk.br.set_debug_http(True) | |
nk = PyNK() | |
nk.login(login,password) | |
my_friends = nk.my_profile.get_friends() | |
print (nk.basic_auth) | |
""" | |
#his_friends = NK_profile(nk=nk,url='http://nk.pl/profile/9766667').get_friends() | |
for friend in my_friends: | |
print friend.name | |
#print(my_friends.age) | |
""" | |
#uncomment the following for the simple mutual-friends demo | |
""" | |
for first in my_friends: | |
print("Sprawdzam "+first.name) | |
first_friends = first.get_friends() | |
for second in first_friends: | |
if second in my_friends: | |
print ("=> "+second.name) | |
""" | |
#checks out all the posts of the first thread in first watched forums | |
""" | |
for forum in nk.watched_forums: | |
print (unicode(forum)) | |
threads = forum.get_threads() | |
for thread in threads: | |
if thread.posts_count < 1: | |
continue | |
print ("Wybralem watek: %s" % thread) | |
posts = thread.get_posts() | |
print ("Znalazlem postow: %s" % thread.posts_count) | |
exit(0) | |
""" | |
""" | |
my_classes = nk.my_profile.get_details().classes | |
for my_friend in my_friends: | |
print ("=>Przetwarzam %s" % my_friend) | |
my_friend_classes = my_friend.get_details().classes | |
for my_friend_class in my_friend_classes: | |
if my_friend_class in my_classes: | |
print ("%s byl z %s w %s" % (my_friend.name, nk.my_profile.name, my_friend_class.name)) | |
""" | |
""" | |
for my_friend in my_friends: | |
my_friend_classes = my_friend.get_details().classes | |
for my_friend_class in my_friend_classes: | |
if nk.my_profile in my_friend_class.get_members(): | |
print ("Znalazlem %s w klasie %s z %s" % (my_friend.name, my_friend_class.name, nk.my_profile.name)) | |
""" | |
""" | |
for my_friend in my_friends: | |
phone_no = my_friend.get_details().phone | |
if phone_no and phone_no != u'\xa0' and phone_no != 'ukryty': | |
print ("%s => %s" % (my_friend.name, phone_no)) | |
""" | |
#""" | |
nk.logout() | |
nk.login(login,password) | |
all_correct = 0 | |
all_overall = 0 | |
fully_correct = 0 | |
fully_failed = 0 | |
for my_friend in my_friends: | |
print ("%30s... [ %30s ]" % (my_friend.name, my_friend.url),) | |
photos = my_friend.get_photos(first_only=False) | |
correct = len(photos) | |
all_correct += correct | |
overall = int(my_friend.get_details().photos_count) | |
all_overall += overall | |
if correct==overall: | |
fully_correct += 1 | |
color = debug_colors.GREEN | |
elif correct==0: | |
fully_failed += 1 | |
color = debug_colors.RED | |
else: | |
color = debug_colors.BLUE | |
print (color+"%s/%s" % (correct,overall)+debug_colors.WHITE) | |
total = len(my_friends) | |
print ("\nCurrent algorithm accuracy: %2.0f%%\nFully failed: %s/%s\nFully correct: %s/%s" % ( | |
all_correct/float(all_overall)*100, fully_failed, total, fully_correct, total)) | |
#""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment