Skip to content

Instantly share code, notes, and snippets.

@pkqk
Forked from tomcritchlow/7books.py
Created November 11, 2010 17:43
Show Gist options
  • Save pkqk/672878 to your computer and use it in GitHub Desktop.
Save pkqk/672878 to your computer and use it in GitHub Desktop.
import cgi
import os
from google.appengine.ext.webapp import template
from google.appengine.api import users
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext import db
from google.appengine.api import memcache
from google.appengine.api import urlfetch
from google.appengine.datastore import entity_pb
import datetime
from libs import PyRSS2Gen
class User(db.Model):
user=db.UserProperty()
name = db.StringProperty()
joined = db.DateTimeProperty(auto_now_add=True)
twitter = db.LinkProperty(required=False)
class List(db.Model):
user = db.ReferenceProperty(User)
listname = db.StringProperty()
published = db.DateTimeProperty(auto_now_add=True)
class Book(db.Model):
list = db.ReferenceProperty(List)
booktitle = db.StringProperty()
bookauthor = db.StringProperty()
bookdescription = db.TextProperty()
added = db.DateTimeProperty(auto_now_add=True)
class Blogs(db.Model):
user = db.ReferenceProperty(User)
title = db.StringProperty()
content = db.TextProperty()
published = db.DateTimeProperty(auto_now_add=True)
live = db.BooleanProperty()
def serialize_entities(models):
if models is None:
return None
elif isinstance(models, db.Model):
# Just one instance
return db.model_to_protobuf(models).Encode()
else:
# A list
return [db.model_to_protobuf(x).Encode() for x in models]
def deserialize_entities(data):
if data is None:
return None
elif isinstance(data, str):
# Just one instance
return db.model_from_protobuf(entity_pb.EntityProto(data))
else:
return [db.model_from_protobuf(entity_pb.EntityProto(x)) for x in data]
def getip(ipaddr):
memcache_key = "gip_%s" % ipaddr
data = memcache.get(memcache_key)
if data is not None:
return data
geoipcode = ''
try:
fetch_response = urlfetch.fetch('http://geoip.wtanaka.com/cc/%s' % ipaddr)
if fetch_response.status_code == 200:
geoipcode = fetch_response.content
except urlfetch.Error, e:
pass
if geoipcode:
memcache.set(memcache_key, geoipcode)
return geoipcode
def gettld(isocode):
return {
'gb': 'co.uk',
'us': 'com',
'ca': 'ca'
}.get(isocode, 'co.uk')
def getaff(isocode):
return {
'gb': '7books-21',
'us': '7books-20',
'ca': '7books0d-20'
}.get(isocode, '7books-21')
class MainPage(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
latestlists = deserialize_entities(memcache.get("latestlists"))
if not latestlists:
latestlists = db.Query(List).order('-published').fetch(limit=10)
memcache.set("latestlists", serialize_entities(latestlists))
latestusers = deserialize_entities(memcache.get("latestusers"))
if not latestusers:
latestusers = db.Query(User).order('-joined').fetch(limit=10)
memcache.set("latestusers", serialize_entities(latestusers))
latestbooks = deserialize_entities(memcache.get("latestbooks"))
if not latestbooks:
latestbooks = db.Query(Book).order('-added').fetch(limit=10)
memcache.set("latestbooks", serialize_entities(latestbooks))
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
template_values = {
'lists': latestlists,
'latestusers': latestusers,
'latestbooks': latestbooks,
'email': username,
'loginout': loginout,
'tld': tld,
'aff': aff,
}
path = os.path.join(os.path.dirname(__file__), 'templates/homepage.html')
self.response.out.write(template.render(path, template_values))
class Create(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
useremail = user.email()
usercheck = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", users.get_current_user())
existinguser = usercheck.get()
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
if existinguser is None:
username = ''
else:
username = existinguser.name
template_values = {
'email': useremail,
'loginout': loginout,
'username': username,
'tld': tld,
'aff': aff,
}
path = os.path.join(os.path.dirname(__file__), 'templates/create.html')
self.response.out.write(template.render(path, template_values))
else:
self.redirect(users.create_login_url(self.request.uri))
class Inputlist(webapp.RequestHandler):
def post(self):
if users.get_current_user():
usercheck = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", users.get_current_user())
existinguser = usercheck.get()
if existinguser is None:
newuser = User()
newuser.user=users.get_current_user()
newuser.name=self.request.get('creator')
newuser.put()
newlist = List()
newlist.user=newuser.key()
newlist.listname=self.request.get('name')
newlist.put()
books = self.request.get('booktitle', allow_multiple=True)
authors = self.request.get('author', allow_multiple=True)
contents = self.request.get('content', allow_multiple=True)
models = []
for book, author, content in zip(books, authors, contents):
newbook = Book()
newbook.list=newlist.key()
newbook.booktitle=book
newbook.bookauthor=author
newbook.bookdescription=content
models.append(newbook)
db.put(models)
id = newlist.key().id()
memcache.delete("latestbooks")
memcache.delete("latestusers")
memcache.delete("latestlists")
self.redirect('/list/%s' % id)
else:
newlist = List()
newlist.user=existinguser.key()
newlist.listname=self.request.get('name')
newlist.put()
books = self.request.get('booktitle', allow_multiple=True)
authors = self.request.get('author', allow_multiple=True)
contents = self.request.get('content', allow_multiple=True)
models = []
for book, author, content in zip(books, authors, contents):
newbook = Book()
newbook.list=newlist.key()
newbook.booktitle=book
newbook.bookauthor=author
newbook.bookdescription=content
models.append(newbook)
db.put(models)
id = newlist.key().id()
memcache.delete("latestbooks")
memcache.delete("latestlists")
userlistskey = "userlistkey%s" % existinguser.key().id()
memcache.delete(userlistskey)
self.redirect('/list/%s' % id)
else:
self.error(404)
path = os.path.join(os.path.dirname(__file__), 'templates/404.html')
self.response.out.write(template.render(path,booklist))
class Displaylist(webapp.RequestHandler):
def get(self, id):
booklistkey = "booklist%s" % int(id)
booklist = deserialize_entities(memcache.get(booklistkey))
if not booklist:
booklist = List.get_by_id(int(id))
memcache.set(booklistkey, serialize_entities(booklist))
bookskey = "books%s" % int(id)
books = deserialize_entities(memcache.get(bookskey))
if not books:
books = booklist.book_set
memcache.set(bookskey, serialize_entities(books))
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
if username == booklist.user.user.email():
edit = True
else:
edit = False
template_values = {
'list': booklist,
'books': books,
'email': username,
'loginout': loginout,
'tld': tld,
'aff': aff,
'edit': edit,
}
path = os.path.join(os.path.dirname(__file__), 'templates/list.html')
self.response.out.write(template.render(path, template_values))
class Editlist(webapp.RequestHandler):
def get(self, id):
user = users.get_current_user()
authormodel = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", user)
author = authormodel.get()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
list = List.get_by_id(int(id))
books = list.book_set
if list.user.name == author.name:
template_values = {
'list': list,
'books': books,
'email': username,
'loginout': loginout,
'tld': tld,
'aff': aff,
}
path = os.path.join(os.path.dirname(__file__), 'templates/editlist.html')
self.response.out.write(template.render(path, template_values))
else:
self.response.out.write("Sorry, you are not allowed to edit this list")
class Updatelist(webapp.RequestHandler):
def post(self, id):
user = users.get_current_user()
authormodel = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", user)
author = authormodel.get()
list = List.get_by_id(int(id))
if list.user.name == author.name:
list.listname=self.request.get('name')
list.put()
oldbooks = list.book_set
for book in oldbooks:
book.delete()
books = self.request.get('booktitle', allow_multiple=True)
authors = self.request.get('author', allow_multiple=True)
contents = self.request.get('content', allow_multiple=True)
models = []
for book, author, content in zip(books, authors, contents):
newbook = Book()
newbook.list=list.key()
newbook.booktitle=book
newbook.bookauthor=author
newbook.bookdescription=content
models.append(newbook)
db.put(models)
id = list.key().id()
memcache.delete("latestbooks")
memcache.delete("latestlists")
booklistkey = "booklist%s" % int(id)
bookskey = "books%s" % int(id)
memcache.delete(booklistkey)
memcache.delete(bookskey)
userlistskey = "userlistkey%s" % list.user.key().id()
memcache.delete(userlistskey)
self.redirect('/list/%s' % id)
else:
self.response.out.write("access denied")
class Browselists(webapp.RequestHandler):
def get(self):
lists = self.gettop()
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'lists': lists,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/browselists.html')
self.response.out.write(template.render(path, template_values))
def gettop(self):
fiftylists = memcache.get("fiftylists")
if fiftylists is not None:
return fiftylists
else:
fiftylists = db.Query(List).order('-published').fetch(limit=50)
memcache.add("fiftylists", fiftylists, 3600)
return fiftylists
class Browsebooks(webapp.RequestHandler):
def get(self):
books = self.gettop()
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
template_values = {
'books': books,
'email': username,
'loginout': loginout,
'tld': tld,
'aff': aff,
}
path = os.path.join(os.path.dirname(__file__), 'templates/browsebooks.html')
self.response.out.write(template.render(path, template_values))
def gettop(self):
fiftybooks = memcache.get("fiftybooks")
if fiftybooks is not None:
return fiftybooks
else:
fiftybooks = db.Query(Book).order('-added').fetch(limit=50)
memcache.add("fiftybooks", fiftybooks, 3600)
return fiftybooks
class Browseusers(webapp.RequestHandler):
def get(self):
recentusers = self.gettop()
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'users': recentusers,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/browseusers.html')
self.response.out.write(template.render(path, template_values))
def gettop(self):
fiftyusers = memcache.get("fiftyusers")
if fiftyusers is not None:
return fiftyusers
else:
fiftyusers = db.Query(User).order('-joined').fetch(limit=50)
memcache.add("fiftyusers", fiftyusers, 3600)
return fiftyusers
class BlogHome(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
posts = self.getblogs()
template_values = {
'posts': posts,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/blog.html')
self.response.out.write(template.render(path, template_values))
def getblogs(self):
blogs= memcache.get("blogs")
if blogs is not None:
return blogs
else:
query = db.Query(Blogs)
query.order('-published')
query.filter('live =', True)
blogs = query.fetch(limit=10)
memcache.add("blogs", blogs)
return blogs
class BlogPost(webapp.RequestHandler):
def get(self, id):
blogpostkey = "blogpost%s" % int(id)
blogpost = deserialize_entities(memcache.get(blogpostkey))
if not blogpost:
blogpost = Blogs.get_by_id(int(id))
memcache.set(blogpostkey, serialize_entities(blogpost))
if blogpost:
if blogpost.live == True:
blog=blogpost
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'post': blogpost,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/blogpost.html')
self.response.out.write(template.render(path, template_values))
else:
self.response.out.write("post not live")
else:
self.response.out.write("404")
class UserPage(webapp.RequestHandler):
def get(self, id):
userpagekey = "userpage%s" % int(id)
userpage = deserialize_entities(memcache.get(userpagekey))
if not userpage:
userpage = User.get_by_id(int(id))
memcache.set(userpagekey, serialize_entities(userpage))
if userpage:
user = users.get_current_user()
userlistskey = "userlistkey%s" % int(id)
userlists = deserialize_entities(memcache.get(userlistskey))
if not userlists:
userlists = userpage.list_set
memcache.set(userlistskey, serialize_entities(userlists))
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
if username == userpage.user.email():
edit = True
else:
edit = False
template_values = {
'user': userpage,
'lists': userlists,
'email': username,
'loginout': loginout,
'edit': edit,
}
path = os.path.join(os.path.dirname(__file__), 'templates/userpage.html')
self.response.out.write(template.render(path, template_values))
else:
self.response.out.write("404")
class AboutPage(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/about.html')
self.response.out.write(template.render(path, template_values))
class ContactPage(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/contact.html')
self.response.out.write(template.render(path, template_values))
class BlogFeed(webapp.RequestHandler):
def get(self):
posts = self.getblogs()
data = []
for post in posts:
if post.live == True:
keyid = "http://www.7bks.com/blog/%s" % post.key().id()
data.append(PyRSS2Gen.RSSItem(
title = post.title,
link = keyid,))
rss = PyRSS2Gen.RSS2(
title = "7books Blog Feed",
link = "http://www.7bks.com/",
description = "Book awesomeness",
items = data)
self.response.headers["Content-Type"] = "text/xml"
self.response.out.write(rss.to_xml())
def getblogs(self):
blogs= memcache.get("blogs")
if blogs is not None:
return blogs
else:
blogs = db.Query(Blogs).order('-published').fetch(limit=10)
memcache.add("blogs", blogs)
return blogs
application = webapp.WSGIApplication(
[('/', MainPage),
('/create', Create),
('/inputlist/', Inputlist),
(r'/list/(.*)', Displaylist),
(r'/edit/(.*)', Editlist),
(r'/update/(.*)', Updatelist),
('/browse/',Browselists),
('/browse/books/',Browsebooks),
('/browse/users/',Browseusers),
('/blog/', BlogHome),
('/blog/feed.xml', BlogFeed),
(r'/blog/(.*)', BlogPost),
(r'/user/(.*)', UserPage),
('/about/', AboutPage),
('/contact/', ContactPage),
],
debug=False)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
import cgi
import os
from google.appengine.ext.webapp import template
from google.appengine.api import users
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext import db
from google.appengine.api import memcache
from google.appengine.api import urlfetch
from google.appengine.datastore import entity_pb
import datetime
from libs import PyRSS2Gen
class User(db.Model):
user=db.UserProperty()
name = db.StringProperty()
joined = db.DateTimeProperty(auto_now_add=True)
class List(db.Model):
user = db.ReferenceProperty(User)
listname = db.StringProperty()
published = db.DateTimeProperty(auto_now_add=True)
class Book(db.Model):
list = db.ReferenceProperty(List)
booktitle = db.StringProperty()
bookauthor = db.StringProperty()
bookdescription = db.TextProperty()
added = db.DateTimeProperty(auto_now_add=True)
class Blogs(db.Model):
user = db.ReferenceProperty(User)
title = db.StringProperty()
content = db.TextProperty()
published = db.DateTimeProperty(auto_now_add=True)
live = db.BooleanProperty()
def serialize_entities(models):
if models is None:
return None
elif isinstance(models, db.Model):
# Just one instance
return db.model_to_protobuf(models).Encode()
else:
# A list
return [db.model_to_protobuf(x).Encode() for x in models]
def deserialize_entities(data):
if data is None:
return None
elif isinstance(data, str):
# Just one instance
return db.model_from_protobuf(entity_pb.EntityProto(data))
else:
return [db.model_from_protobuf(entity_pb.EntityProto(x)) for x in data]
def getip(ipaddr):
memcache_key = "gip_%s" % ipaddr
data = memcache.get(memcache_key)
if data is not None:
return data
geoipcode = ''
try:
fetch_response = urlfetch.fetch('http://geoip.wtanaka.com/cc/%s' % ipaddr)
if fetch_response.status_code == 200:
geoipcode = fetch_response.content
except urlfetch.Error, e:
pass
if geoipcode:
memcache.set(memcache_key, geoipcode)
return geoipcode
def gettld(isocode):
if isocode == 'gb':
return 'co.uk'
if isocode == 'us':
return 'com'
if isocode == 'ca':
return 'ca'
else:
return 'co.uk'
def getaff(isocode):
if isocode == 'gb':
return '7books-21'
if isocode == 'us':
return '7books-20'
if isocode == 'ca':
return '7books0d-20'
else:
return '7books-21'
class MainPage(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
recentlists = db.Query(List).order('-published').fetch(limit=30)
recentblogs = db.Query(Blogs).order('-published').fetch(limit=10)
recentusers = db.Query(User).order('-joined').fetch(limit=30)
template_values = {
'recentlists': recentlists,
'recentblogs': recentblogs,
'recentusers': recentusers,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/admin.html')
self.response.out.write(template.render(path, template_values))
class Createblog(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/createblog.html')
self.response.out.write(template.render(path, template_values))
class Addblog(webapp.RequestHandler):
def post(self):
userfind = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", users.get_current_user())
existinguser = userfind.get()
post = Blogs()
post.user=existinguser.key()
post.title=self.request.get('title')
post.content=self.request.get('content')
post.live= False
post.put()
id = post.key().id()
self.redirect('/admin/blog/%s' % id)
class Updateblog(webapp.RequestHandler):
def post(self, id):
post = Blogs.get_by_id(int(id))
post.title=self.request.get('title')
post.content=self.request.get('content')
post.live= False
post.put()
id = post.key().id()
self.redirect('/admin/blog/%s' % id)
class Editblog(webapp.RequestHandler):
def get(self, id):
blogpost = Blogs.get_by_id(int(id))
edit = True
posturl = '/admin/update/%s' % int(id)
title = blogpost.title
content = blogpost.content
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'title': title,
'content': content,
'email': username,
'loginout': loginout,
'edit': edit,
'posturl': posturl,
}
path = os.path.join(os.path.dirname(__file__), 'templates/createblog.html')
self.response.out.write(template.render(path, template_values))
class Previewblog(webapp.RequestHandler):
def get(self, id):
blogpost = Blogs.get_by_id(int(id))
if blogpost:
if blogpost.live == False:
blog=blogpost
user = users.get_current_user()
editurl = '/admin/edit/%s' % id
liveurl = '/admin/putlive/%s' % id
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'post': blogpost,
'email': username,
'loginout': loginout,
'editurl': editurl,
'liveurl': liveurl,
}
path = os.path.join(os.path.dirname(__file__), 'templates/blogpost.html')
self.response.out.write(template.render(path, template_values))
else:
self.response.out.write('Post is live!')
else:
self.response.out.write('404')
class Putlive(webapp.RequestHandler):
def get(self, id):
blogpost = Blogs.get_by_id(int(id))
if blogpost:
blogpost.live = True
blogpost.put()
memcache.delete("blogs")
blogpostkey = "blogpost%s" % int(id)
memcache.delete(blogpostkey)
self.redirect('/blog/%s' % id)
else:
self.response.out.write('No post to put live')
application = webapp.WSGIApplication(
[('/admin/', MainPage),
('/admin/create/', Createblog),
('/admin/create/add/', Addblog),
(r'/admin/update/(.*)', Updateblog),
(r'/admin/edit/(.*)', Editblog),
(r'/admin/blog/(.*)', Previewblog),
(r'/admin/putlive/(.*)', Putlive),
],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment