Skip to content

Instantly share code, notes, and snippets.

@tomcritchlow
Created November 9, 2010 23:23
Show Gist options
  • Save tomcritchlow/670034 to your computer and use it in GitHub Desktop.
Save tomcritchlow/670034 to your computer and use it in GitHub Desktop.
The main code for 7books (www.7bks.com)
import cgi
import os
from google.appengine.ext.webapp import template
from google.appengine.api import users
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext import db
from google.appengine.api import memcache
from google.appengine.api import urlfetch
from google.appengine.datastore import entity_pb
import datetime
from libs import PyRSS2Gen
class User(db.Model):
user=db.UserProperty()
name = db.StringProperty()
joined = db.DateTimeProperty(auto_now_add=True)
twitter = db.LinkProperty(required=False)
class List(db.Model):
user = db.ReferenceProperty(User)
listname = db.StringProperty()
published = db.DateTimeProperty(auto_now_add=True)
class Book(db.Model):
list = db.ReferenceProperty(List)
booktitle = db.StringProperty()
bookauthor = db.StringProperty()
bookdescription = db.TextProperty()
added = db.DateTimeProperty(auto_now_add=True)
class Blogs(db.Model):
user = db.ReferenceProperty(User)
title = db.StringProperty()
content = db.TextProperty()
published = db.DateTimeProperty(auto_now_add=True)
live = db.BooleanProperty()
def serialize_entities(models):
if models is None:
return None
elif isinstance(models, db.Model):
# Just one instance
return db.model_to_protobuf(models).Encode()
else:
# A list
return [db.model_to_protobuf(x).Encode() for x in models]
def deserialize_entities(data):
if data is None:
return None
elif isinstance(data, str):
# Just one instance
return db.model_from_protobuf(entity_pb.EntityProto(data))
else:
return [db.model_from_protobuf(entity_pb.EntityProto(x)) for x in data]
def getip(ipaddr):
memcache_key = "gip_%s" % ipaddr
data = memcache.get(memcache_key)
if data is not None:
return data
geoipcode = ''
try:
fetch_response = urlfetch.fetch('http://geoip.wtanaka.com/cc/%s' % ipaddr)
if fetch_response.status_code == 200:
geoipcode = fetch_response.content
except urlfetch.Error, e:
pass
if geoipcode:
memcache.set(memcache_key, geoipcode)
return geoipcode
def gettld(isocode):
if isocode == 'gb':
return 'co.uk'
if isocode == 'us':
return 'com'
if isocode == 'ca':
return 'ca'
else:
return 'co.uk'
def getaff(isocode):
if isocode == 'gb':
return '7books-21'
if isocode == 'us':
return '7books-20'
if isocode == 'ca':
return '7books0d-20'
else:
return '7books-21'
class MainPage(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
latestlists = deserialize_entities(memcache.get("latestlists"))
if not latestlists:
latestlists = db.Query(List).order('-published').fetch(limit=10)
memcache.set("latestlists", serialize_entities(latestlists))
latestusers = deserialize_entities(memcache.get("latestusers"))
if not latestusers:
latestusers = db.Query(User).order('-joined').fetch(limit=10)
memcache.set("latestusers", serialize_entities(latestusers))
latestbooks = deserialize_entities(memcache.get("latestbooks"))
if not latestbooks:
latestbooks = db.Query(Book).order('-added').fetch(limit=10)
memcache.set("latestbooks", serialize_entities(latestbooks))
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
template_values = {
'lists': latestlists,
'latestusers': latestusers,
'latestbooks': latestbooks,
'email': username,
'loginout': loginout,
'tld': tld,
'aff': aff,
}
path = os.path.join(os.path.dirname(__file__), 'templates/homepage.html')
self.response.out.write(template.render(path, template_values))
class Create(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
useremail = user.email()
usercheck = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", users.get_current_user())
existinguser = usercheck.get()
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
if existinguser is None:
username = ''
else:
username = existinguser.name
template_values = {
'email': useremail,
'loginout': loginout,
'username': username,
'tld': tld,
'aff': aff,
}
path = os.path.join(os.path.dirname(__file__), 'templates/create.html')
self.response.out.write(template.render(path, template_values))
else:
self.redirect(users.create_login_url(self.request.uri))
class Inputlist(webapp.RequestHandler):
def post(self):
if users.get_current_user():
usercheck = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", users.get_current_user())
existinguser = usercheck.get()
if existinguser is None:
newuser = User()
newuser.user=users.get_current_user()
newuser.name=self.request.get('creator')
newuser.put()
newlist = List()
newlist.user=newuser.key()
newlist.listname=self.request.get('name')
newlist.put()
books = self.request.get('booktitle', allow_multiple=True)
authors = self.request.get('author', allow_multiple=True)
contents = self.request.get('content', allow_multiple=True)
models = []
for book, author, content in zip(books, authors, contents):
newbook = Book()
newbook.list=newlist.key()
newbook.booktitle=book
newbook.bookauthor=author
newbook.bookdescription=content
models.append(newbook)
db.put(models)
id = newlist.key().id()
memcache.delete("latestbooks")
memcache.delete("latestusers")
memcache.delete("latestlists")
self.redirect('/list/%s' % id)
else:
newlist = List()
newlist.user=existinguser.key()
newlist.listname=self.request.get('name')
newlist.put()
books = self.request.get('booktitle', allow_multiple=True)
authors = self.request.get('author', allow_multiple=True)
contents = self.request.get('content', allow_multiple=True)
models = []
for book, author, content in zip(books, authors, contents):
newbook = Book()
newbook.list=newlist.key()
newbook.booktitle=book
newbook.bookauthor=author
newbook.bookdescription=content
models.append(newbook)
db.put(models)
id = newlist.key().id()
memcache.delete("latestbooks")
memcache.delete("latestlists")
userlistskey = "userlistkey%s" % existinguser.key().id()
memcache.delete(userlistskey)
self.redirect('/list/%s' % id)
else:
self.error(404)
path = os.path.join(os.path.dirname(__file__), 'templates/404.html')
self.response.out.write(template.render(path,booklist))
class Displaylist(webapp.RequestHandler):
def get(self, id):
booklistkey = "booklist%s" % int(id)
booklist = deserialize_entities(memcache.get(booklistkey))
if not booklist:
booklist = List.get_by_id(int(id))
memcache.set(booklistkey, serialize_entities(booklist))
bookskey = "books%s" % int(id)
books = deserialize_entities(memcache.get(bookskey))
if not books:
books = booklist.book_set
memcache.set(bookskey, serialize_entities(books))
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
if username == booklist.user.user.email():
edit = True
else:
edit = False
template_values = {
'list': booklist,
'books': books,
'email': username,
'loginout': loginout,
'tld': tld,
'aff': aff,
'edit': edit,
}
path = os.path.join(os.path.dirname(__file__), 'templates/list.html')
self.response.out.write(template.render(path, template_values))
class Editlist(webapp.RequestHandler):
def get(self, id):
user = users.get_current_user()
authormodel = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", user)
author = authormodel.get()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
list = List.get_by_id(int(id))
books = list.book_set
if list.user.name == author.name:
template_values = {
'list': list,
'books': books,
'email': username,
'loginout': loginout,
'tld': tld,
'aff': aff,
}
path = os.path.join(os.path.dirname(__file__), 'templates/editlist.html')
self.response.out.write(template.render(path, template_values))
else:
self.response.out.write("Sorry, you are not allowed to edit this list")
class Updatelist(webapp.RequestHandler):
def post(self, id):
user = users.get_current_user()
authormodel = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", user)
author = authormodel.get()
list = List.get_by_id(int(id))
if list.user.name == author.name:
list.listname=self.request.get('name')
list.put()
oldbooks = list.book_set
for book in oldbooks:
book.delete()
books = self.request.get('booktitle', allow_multiple=True)
authors = self.request.get('author', allow_multiple=True)
contents = self.request.get('content', allow_multiple=True)
models = []
for book, author, content in zip(books, authors, contents):
newbook = Book()
newbook.list=list.key()
newbook.booktitle=book
newbook.bookauthor=author
newbook.bookdescription=content
models.append(newbook)
db.put(models)
id = list.key().id()
memcache.delete("latestbooks")
memcache.delete("latestlists")
booklistkey = "booklist%s" % int(id)
bookskey = "books%s" % int(id)
memcache.delete(booklistkey)
memcache.delete(bookskey)
userlistskey = "userlistkey%s" % list.user.key().id()
memcache.delete(userlistskey)
self.redirect('/list/%s' % id)
else:
self.response.out.write("access denied")
class Browselists(webapp.RequestHandler):
def get(self):
lists = self.gettop()
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'lists': lists,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/browselists.html')
self.response.out.write(template.render(path, template_values))
def gettop(self):
fiftylists = memcache.get("fiftylists")
if fiftylists is not None:
return fiftylists
else:
fiftylists = db.Query(List).order('-published').fetch(limit=50)
memcache.add("fiftylists", fiftylists, 3600)
return fiftylists
class Browsebooks(webapp.RequestHandler):
def get(self):
books = self.gettop()
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
location = getip(self.request.remote_addr)
tld = gettld(location)
aff = getaff(location)
template_values = {
'books': books,
'email': username,
'loginout': loginout,
'tld': tld,
'aff': aff,
}
path = os.path.join(os.path.dirname(__file__), 'templates/browsebooks.html')
self.response.out.write(template.render(path, template_values))
def gettop(self):
fiftybooks = memcache.get("fiftybooks")
if fiftybooks is not None:
return fiftybooks
else:
fiftybooks = db.Query(Book).order('-added').fetch(limit=50)
memcache.add("fiftybooks", fiftybooks, 3600)
return fiftybooks
class Browseusers(webapp.RequestHandler):
def get(self):
recentusers = self.gettop()
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'users': recentusers,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/browseusers.html')
self.response.out.write(template.render(path, template_values))
def gettop(self):
fiftyusers = memcache.get("fiftyusers")
if fiftyusers is not None:
return fiftyusers
else:
fiftyusers = db.Query(User).order('-joined').fetch(limit=50)
memcache.add("fiftyusers", fiftyusers, 3600)
return fiftyusers
class BlogHome(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
posts = self.getblogs()
template_values = {
'posts': posts,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/blog.html')
self.response.out.write(template.render(path, template_values))
def getblogs(self):
blogs= memcache.get("blogs")
if blogs is not None:
return blogs
else:
query = db.Query(Blogs)
query.order('-published')
query.filter('live =', True)
blogs = query.fetch(limit=10)
memcache.add("blogs", blogs)
return blogs
class BlogPost(webapp.RequestHandler):
def get(self, id):
blogpostkey = "blogpost%s" % int(id)
blogpost = deserialize_entities(memcache.get(blogpostkey))
if not blogpost:
blogpost = Blogs.get_by_id(int(id))
memcache.set(blogpostkey, serialize_entities(blogpost))
if blogpost:
if blogpost.live == True:
blog=blogpost
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'post': blogpost,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/blogpost.html')
self.response.out.write(template.render(path, template_values))
else:
self.response.out.write("post not live")
else:
self.response.out.write("404")
class UserPage(webapp.RequestHandler):
def get(self, id):
userpagekey = "userpage%s" % int(id)
userpage = deserialize_entities(memcache.get(userpagekey))
if not userpage:
userpage = User.get_by_id(int(id))
memcache.set(userpagekey, serialize_entities(userpage))
if userpage:
user = users.get_current_user()
userlistskey = "userlistkey%s" % int(id)
userlists = deserialize_entities(memcache.get(userlistskey))
if not userlists:
userlists = userpage.list_set
memcache.set(userlistskey, serialize_entities(userlists))
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
if username == userpage.user.email():
edit = True
else:
edit = False
template_values = {
'user': userpage,
'lists': userlists,
'email': username,
'loginout': loginout,
'edit': edit,
}
path = os.path.join(os.path.dirname(__file__), 'templates/userpage.html')
self.response.out.write(template.render(path, template_values))
else:
self.response.out.write("404")
class AboutPage(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/about.html')
self.response.out.write(template.render(path, template_values))
class ContactPage(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/contact.html')
self.response.out.write(template.render(path, template_values))
class BlogFeed(webapp.RequestHandler):
def get(self):
posts = self.getblogs()
data = []
for post in posts:
if post.live == True:
keyid = "http://www.7bks.com/blog/%s" % post.key().id()
data.append(PyRSS2Gen.RSSItem(
title = post.title,
link = keyid,))
rss = PyRSS2Gen.RSS2(
title = "7books Blog Feed",
link = "http://www.7bks.com/",
description = "Book awesomeness",
items = data)
self.response.headers["Content-Type"] = "text/xml"
self.response.out.write(rss.to_xml())
def getblogs(self):
blogs= memcache.get("blogs")
if blogs is not None:
return blogs
else:
blogs = db.Query(Blogs).order('-published').fetch(limit=10)
memcache.add("blogs", blogs)
return blogs
application = webapp.WSGIApplication(
[('/', MainPage),
('/create', Create),
('/inputlist/', Inputlist),
(r'/list/(.*)', Displaylist),
(r'/edit/(.*)', Editlist),
(r'/update/(.*)', Updatelist),
('/browse/',Browselists),
('/browse/books/',Browsebooks),
('/browse/users/',Browseusers),
('/blog/', BlogHome),
('/blog/feed.xml', BlogFeed),
(r'/blog/(.*)', BlogPost),
(r'/user/(.*)', UserPage),
('/about/', AboutPage),
('/contact/', ContactPage),
],
debug=False)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
import cgi
import os
from google.appengine.ext.webapp import template
from google.appengine.api import users
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext import db
from google.appengine.api import memcache
from google.appengine.api import urlfetch
from google.appengine.datastore import entity_pb
import datetime
from libs import PyRSS2Gen
class User(db.Model):
user=db.UserProperty()
name = db.StringProperty()
joined = db.DateTimeProperty(auto_now_add=True)
class List(db.Model):
user = db.ReferenceProperty(User)
listname = db.StringProperty()
published = db.DateTimeProperty(auto_now_add=True)
class Book(db.Model):
list = db.ReferenceProperty(List)
booktitle = db.StringProperty()
bookauthor = db.StringProperty()
bookdescription = db.TextProperty()
added = db.DateTimeProperty(auto_now_add=True)
class Blogs(db.Model):
user = db.ReferenceProperty(User)
title = db.StringProperty()
content = db.TextProperty()
published = db.DateTimeProperty(auto_now_add=True)
live = db.BooleanProperty()
def serialize_entities(models):
if models is None:
return None
elif isinstance(models, db.Model):
# Just one instance
return db.model_to_protobuf(models).Encode()
else:
# A list
return [db.model_to_protobuf(x).Encode() for x in models]
def deserialize_entities(data):
if data is None:
return None
elif isinstance(data, str):
# Just one instance
return db.model_from_protobuf(entity_pb.EntityProto(data))
else:
return [db.model_from_protobuf(entity_pb.EntityProto(x)) for x in data]
def getip(ipaddr):
memcache_key = "gip_%s" % ipaddr
data = memcache.get(memcache_key)
if data is not None:
return data
geoipcode = ''
try:
fetch_response = urlfetch.fetch('http://geoip.wtanaka.com/cc/%s' % ipaddr)
if fetch_response.status_code == 200:
geoipcode = fetch_response.content
except urlfetch.Error, e:
pass
if geoipcode:
memcache.set(memcache_key, geoipcode)
return geoipcode
def gettld(isocode):
if isocode == 'gb':
return 'co.uk'
if isocode == 'us':
return 'com'
if isocode == 'ca':
return 'ca'
else:
return 'co.uk'
def getaff(isocode):
if isocode == 'gb':
return '7books-21'
if isocode == 'us':
return '7books-20'
if isocode == 'ca':
return '7books0d-20'
else:
return '7books-21'
class MainPage(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
recentlists = db.Query(List).order('-published').fetch(limit=30)
recentblogs = db.Query(Blogs).order('-published').fetch(limit=10)
recentusers = db.Query(User).order('-joined').fetch(limit=30)
template_values = {
'recentlists': recentlists,
'recentblogs': recentblogs,
'recentusers': recentusers,
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/admin.html')
self.response.out.write(template.render(path, template_values))
class Createblog(webapp.RequestHandler):
def get(self):
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'email': username,
'loginout': loginout,
}
path = os.path.join(os.path.dirname(__file__), 'templates/createblog.html')
self.response.out.write(template.render(path, template_values))
class Addblog(webapp.RequestHandler):
def post(self):
userfind = db.GqlQuery("SELECT * FROM User WHERE user = :1 ", users.get_current_user())
existinguser = userfind.get()
post = Blogs()
post.user=existinguser.key()
post.title=self.request.get('title')
post.content=self.request.get('content')
post.live= False
post.put()
id = post.key().id()
self.redirect('/admin/blog/%s' % id)
class Updateblog(webapp.RequestHandler):
def post(self, id):
post = Blogs.get_by_id(int(id))
post.title=self.request.get('title')
post.content=self.request.get('content')
post.live= False
post.put()
id = post.key().id()
self.redirect('/admin/blog/%s' % id)
class Editblog(webapp.RequestHandler):
def get(self, id):
blogpost = Blogs.get_by_id(int(id))
edit = True
posturl = '/admin/update/%s' % int(id)
title = blogpost.title
content = blogpost.content
user = users.get_current_user()
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'title': title,
'content': content,
'email': username,
'loginout': loginout,
'edit': edit,
'posturl': posturl,
}
path = os.path.join(os.path.dirname(__file__), 'templates/createblog.html')
self.response.out.write(template.render(path, template_values))
class Previewblog(webapp.RequestHandler):
def get(self, id):
blogpost = Blogs.get_by_id(int(id))
if blogpost:
if blogpost.live == False:
blog=blogpost
user = users.get_current_user()
editurl = '/admin/edit/%s' % id
liveurl = '/admin/putlive/%s' % id
if user:
loginout = users.create_logout_url("/")
username = user.email()
else:
loginout = users.create_login_url("/")
username = ''
template_values = {
'post': blogpost,
'email': username,
'loginout': loginout,
'editurl': editurl,
'liveurl': liveurl,
}
path = os.path.join(os.path.dirname(__file__), 'templates/blogpost.html')
self.response.out.write(template.render(path, template_values))
else:
self.response.out.write('Post is live!')
else:
self.response.out.write('404')
class Putlive(webapp.RequestHandler):
def get(self, id):
blogpost = Blogs.get_by_id(int(id))
if blogpost:
blogpost.live = True
blogpost.put()
memcache.delete("blogs")
blogpostkey = "blogpost%s" % int(id)
memcache.delete(blogpostkey)
self.redirect('/blog/%s' % id)
else:
self.response.out.write('No post to put live')
application = webapp.WSGIApplication(
[('/admin/', MainPage),
('/admin/create/', Createblog),
('/admin/create/add/', Addblog),
(r'/admin/update/(.*)', Updateblog),
(r'/admin/edit/(.*)', Editblog),
(r'/admin/blog/(.*)', Previewblog),
(r'/admin/putlive/(.*)', Putlive),
],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
@anotherstarburst
Copy link

4 weeks! Damn, awesome job man.

I'm a few days in and having found my feet with object oriented programming and Python I'm endeavoring to make a contact form with GAE. I see you're python class for contact, but how do you get the html to speak to the code?

@ramanadan36
Copy link

Looks great ! does this works only on google app engine. Is there a possibility that you would remove those dependencies and make it vendor neutral app.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment