Skip to content

Instantly share code, notes, and snippets.

@stirlingw
Created April 2, 2018 01:33
Show Gist options
  • Save stirlingw/cfd98904604f47b4d7a6bab2f46bec2d to your computer and use it in GitHub Desktop.
Save stirlingw/cfd98904604f47b4d7a6bab2f46bec2d to your computer and use it in GitHub Desktop.
import os
import requests
from werkzeug.security import generate_password_hash, check_password_hash
from flask import Flask, render_template, session, send_from_directory, request, redirect, url_for, escape, jsonify, make_response
from flask_session import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
app = Flask(__name__, static_url_path='')
# Check for environment variable
if not os.getenv("DATABASE_URL"):
raise RuntimeError("DATABASE_URL is not set")
# Configure session to use filesystem
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
Session(app)
# Set up database
engine = create_engine(os.getenv("DATABASE_URL"))
db = scoped_session(sessionmaker(bind=engine))
def get_user_info():
user = {}
user["user_id"] = str(escape(session.get('user_id', None)))
user["user_name"] = str(escape(session.get('user_name', None)))
user["first_name"] = str(escape(session.get('first_name', None)))
user["last_name"] = str(escape(session.get('last_name', None)))
return user
@app.route("/")
def index():
user = get_user_info()
if session.get('user_id') == None:
user = None
return render_template("index.html", user=user)
@app.route("/books" , methods=["GET", "POST"])
def books():
if session.get('user_id') == None:
return redirect(url_for('login'))
user = get_user_info()
if request.method == "GET":
books = db.execute("""SELECT * FROM books ORDER BY title ASC """).fetchall()
return render_template("books.html", books=books, user=user)
if request.method == "POST":
search_type = str(request.form.get("searchType"))
search_input = str(request.form.get("searchInput"))
sql_query = """SELECT * FROM books """
# Probably should use a switch statement here but I am lazy
if search_type == "title":
sql_query += """WHERE LOWER(title) LIKE LOWER('%{searchInput}%') """.format(searchInput=search_input)
elif search_type == "author":
sql_query += """WHERE LOWER(author) LIKE LOWER('%{searchInput}%') """.format(searchInput=search_input)
else:
sql_query += """WHERE LOWER(isbn) LIKE LOWER('%{searchInput}%') """.format(searchInput=search_input)
sql_query += """ORDER BY title ASC;"""
books = db.execute(sql_query).fetchall()
return render_template("books.html", books=books, user=user)
@app.route("/books/<string:isbns>", methods=["GET", "POST"])
def book(isbns):
if session.get('user_id') == None:
return redirect(url_for('login'))
user_info = get_user_info()
reviews = []
error = False
"""Lists details about a single book."""
book_sql = """
SELECT
book.id,
book.title,
book.author,
book.year,
book.isbn,
COUNT(*) AS review_count,
AVG(reviews.book_rating) AS average_score,
COALESCE(sum(CASE WHEN reviews.user_id = :curr_user_id THEN 1 ELSE 0 END), 0) AS user_reviewed
FROM books AS book
LEFT JOIN reviews AS reviews ON reviews.book_id = book.id
WHERE isbn = :isbn
GROUP BY 1,2,3,4,5;
"""
book = db.execute(book_sql, {"isbn": str(isbns), "curr_user_id": str(session.get('user_id'))}).fetchone()
dewey_book = {
"id": book.id,
"isbn": book.isbn,
"title": book.title,
"author": book.author,
"year": book.year,
"dewey_ratings_count": book.review_count,
"dewey_average_rating": book.average_score,
"user_reviewed": book.user_reviewed,
"gr_ratings_count": None,
"gr_average_rating": None,
}
if dewey_book["dewey_ratings_count"] != None:
dewey_book["dewey_ratings_count"] = "{:,}".format(dewey_book["dewey_ratings_count"])
if dewey_book["dewey_average_rating"] != None:
dewey_book["dewey_average_rating"] = str(round(dewey_book["dewey_average_rating"], 2))
res = requests.get("https://www.goodreads.com/book/review_counts.json", params={"key": "EPEd6RRzg6J2wcvEiIMkg", "isbns": str(book["isbn"])}).json()
if len(res["books"]) > 0:
dewey_book["gr_ratings_count"] = "{:,}".format(res["books"][0]["work_ratings_count"])
dewey_book["gr_average_rating"] = res["books"][0]["average_rating"]
reviews_sql = """
SELECT
reviews.id,
reviews.user_id,
users.fname AS first_name,
users.lname AS last_name,
reviews.book_id,
reviews.book_rating,
reviews.book_review
FROM reviews
INNER JOIN users ON reviews.user_id = users.id
WHERE book_id = :book_id
"""
reviews = db.execute(reviews_sql, {"book_id": str(book["id"])}).fetchall()
if request.method == "POST":
error = False
book_rating = int(request.form.get("bookRating"))
book_review = str(request.form.get("bookReview"))
try:
db.execute("INSERT INTO reviews (user_id, book_id, book_rating, book_review) VALUES (:user_id, :book_id, :book_rating, :book_review)",
{"user_id": int(user_info["user_id"]), "book_id": int(dewey_book["id"]), "book_rating": int(book_rating), "book_review": str(book_review)})
db.commit()
except:
error = "There was an issue creating a user. Please try again."
render_template("error.html", error=error)
finally:
return redirect(url_for('book', isbns=dewey_book["isbn"]))
return render_template("book.html", book=dewey_book, reviews=reviews, user=user_info)
@app.route("/api/<string:isbns>", methods=["GET"])
def api(isbns):
if request.method == "GET":
book_sql = """
SELECT
book.id,
book.title,
book.author,
book.year,
book.isbn,
COUNT(*) AS review_count,
AVG(reviews.book_rating) AS average_score
FROM books AS book
INNER JOIN reviews AS reviews ON reviews.book_id = book.id
WHERE isbn = :isbn
GROUP BY 1,2,3,4,5;
"""
book = db.execute(book_sql, {"isbn": str(isbns)}).fetchone()
dewey_book = {
"id": book.id,
"title": book.title,
"author": book.author,
"year": book.year,
"isbn": book.isbn,
"review_count": "{:,}".format(book.review_count),
"average_score": str(round(book.average_score, 2))
}
return jsonify(dewey_book)
@app.route("/registration", methods=["GET", "POST"])
def registration():
error = False
user = None
if request.method == "POST":
error = False
first_name = str(request.form.get("inputFName"))
last_name = str(request.form.get("inputLName"))
username = str(request.form.get("inputEmail"))
password = str(request.form.get("inputPassword"))
# Make sure the user doesn't already exist
check_user = db.execute("SELECT * FROM users WHERE username= :username", {"username": username}).rowcount
if check_user > 0:
error = "Username already exists"
render_template("registration.html", error=error)
try:
db.execute("INSERT INTO users (username, password, fname, lname) VALUES (:username, :password, :fname, :lname)",
{"username": username, "password": generate_password_hash(password, "sha256"), "fname": first_name, "lname": last_name})
db.commit()
except:
error = "There was an issue creating a user. Please try again."
render_template("registration.html", error=error)
finally:
return redirect(url_for('login'))
return render_template("registration.html", error=error, user=user)
@app.route('/login', methods=['POST', 'GET'])
def login():
error = False
user = None
if request.method == 'POST':
error = False
username = str(request.form.get("inputUsername"))
password = str(request.form.get("inputPassword"))
user = db.execute("SELECT * FROM users WHERE username= :username", {"username": username}).fetchone()
check_password = check_password_hash(user["password"], password)
if check_password:
session['user_id'] = user[0]
session['user_name'] = user[1]
session['first_name'] = user[3]
session['last_name'] = user[4]
return redirect(url_for('books'))
else:
error = 'Invalid username/password. Please try again'
return render_template('login.html', error=error, user=user)
@app.route('/logout')
def logout():
# remove the username from the session if it's there
session.clear()
return redirect(url_for('login'))
@app.route('/static/js/<path:path>')
def send_js(path):
return send_from_directory('static/js', path)
@app.route('/static/css/<path:path>')
def send_css(path):
return send_from_directory('static/css', path)
@app.route('/static/img/<path:path>')
def send_img(path):
return send_from_directory('static/img', path)
@app.errorhandler(404)
def not_found(error):
user_info = get_user_info()
error_message = """
We're sorry; you have reached a URL that has been disconnected or is no longer in service.
If you feel you have reached this message in error,
please check the URL and try your HTTP request again.
"""
resp = make_response(render_template('error.html', user=user_info, error=error_message, error_type="404"), 404)
return resp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment