Created
April 2, 2018 01:33
-
-
Save stirlingw/cfd98904604f47b4d7a6bab2f46bec2d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
from werkzeug.security import generate_password_hash, check_password_hash | |
from flask import Flask, render_template, session, send_from_directory, request, redirect, url_for, escape, jsonify, make_response | |
from flask_session import Session | |
from sqlalchemy import create_engine | |
from sqlalchemy.orm import scoped_session, sessionmaker | |
app = Flask(__name__, static_url_path='') | |
# Check for environment variable | |
if not os.getenv("DATABASE_URL"): | |
raise RuntimeError("DATABASE_URL is not set") | |
# Configure session to use filesystem | |
app.config["SESSION_PERMANENT"] = False | |
app.config["SESSION_TYPE"] = "filesystem" | |
Session(app) | |
# Set up database | |
engine = create_engine(os.getenv("DATABASE_URL")) | |
db = scoped_session(sessionmaker(bind=engine)) | |
def get_user_info(): | |
user = {} | |
user["user_id"] = str(escape(session.get('user_id', None))) | |
user["user_name"] = str(escape(session.get('user_name', None))) | |
user["first_name"] = str(escape(session.get('first_name', None))) | |
user["last_name"] = str(escape(session.get('last_name', None))) | |
return user | |
@app.route("/") | |
def index(): | |
user = get_user_info() | |
if session.get('user_id') == None: | |
user = None | |
return render_template("index.html", user=user) | |
@app.route("/books" , methods=["GET", "POST"]) | |
def books(): | |
if session.get('user_id') == None: | |
return redirect(url_for('login')) | |
user = get_user_info() | |
if request.method == "GET": | |
books = db.execute("""SELECT * FROM books ORDER BY title ASC """).fetchall() | |
return render_template("books.html", books=books, user=user) | |
if request.method == "POST": | |
search_type = str(request.form.get("searchType")) | |
search_input = str(request.form.get("searchInput")) | |
sql_query = """SELECT * FROM books """ | |
# Probably should use a switch statement here but I am lazy | |
if search_type == "title": | |
sql_query += """WHERE LOWER(title) LIKE LOWER('%{searchInput}%') """.format(searchInput=search_input) | |
elif search_type == "author": | |
sql_query += """WHERE LOWER(author) LIKE LOWER('%{searchInput}%') """.format(searchInput=search_input) | |
else: | |
sql_query += """WHERE LOWER(isbn) LIKE LOWER('%{searchInput}%') """.format(searchInput=search_input) | |
sql_query += """ORDER BY title ASC;""" | |
books = db.execute(sql_query).fetchall() | |
return render_template("books.html", books=books, user=user) | |
@app.route("/books/<string:isbns>", methods=["GET", "POST"]) | |
def book(isbns): | |
if session.get('user_id') == None: | |
return redirect(url_for('login')) | |
user_info = get_user_info() | |
reviews = [] | |
error = False | |
"""Lists details about a single book.""" | |
book_sql = """ | |
SELECT | |
book.id, | |
book.title, | |
book.author, | |
book.year, | |
book.isbn, | |
COUNT(*) AS review_count, | |
AVG(reviews.book_rating) AS average_score, | |
COALESCE(sum(CASE WHEN reviews.user_id = :curr_user_id THEN 1 ELSE 0 END), 0) AS user_reviewed | |
FROM books AS book | |
LEFT JOIN reviews AS reviews ON reviews.book_id = book.id | |
WHERE isbn = :isbn | |
GROUP BY 1,2,3,4,5; | |
""" | |
book = db.execute(book_sql, {"isbn": str(isbns), "curr_user_id": str(session.get('user_id'))}).fetchone() | |
dewey_book = { | |
"id": book.id, | |
"isbn": book.isbn, | |
"title": book.title, | |
"author": book.author, | |
"year": book.year, | |
"dewey_ratings_count": book.review_count, | |
"dewey_average_rating": book.average_score, | |
"user_reviewed": book.user_reviewed, | |
"gr_ratings_count": None, | |
"gr_average_rating": None, | |
} | |
if dewey_book["dewey_ratings_count"] != None: | |
dewey_book["dewey_ratings_count"] = "{:,}".format(dewey_book["dewey_ratings_count"]) | |
if dewey_book["dewey_average_rating"] != None: | |
dewey_book["dewey_average_rating"] = str(round(dewey_book["dewey_average_rating"], 2)) | |
res = requests.get("https://www.goodreads.com/book/review_counts.json", params={"key": "EPEd6RRzg6J2wcvEiIMkg", "isbns": str(book["isbn"])}).json() | |
if len(res["books"]) > 0: | |
dewey_book["gr_ratings_count"] = "{:,}".format(res["books"][0]["work_ratings_count"]) | |
dewey_book["gr_average_rating"] = res["books"][0]["average_rating"] | |
reviews_sql = """ | |
SELECT | |
reviews.id, | |
reviews.user_id, | |
users.fname AS first_name, | |
users.lname AS last_name, | |
reviews.book_id, | |
reviews.book_rating, | |
reviews.book_review | |
FROM reviews | |
INNER JOIN users ON reviews.user_id = users.id | |
WHERE book_id = :book_id | |
""" | |
reviews = db.execute(reviews_sql, {"book_id": str(book["id"])}).fetchall() | |
if request.method == "POST": | |
error = False | |
book_rating = int(request.form.get("bookRating")) | |
book_review = str(request.form.get("bookReview")) | |
try: | |
db.execute("INSERT INTO reviews (user_id, book_id, book_rating, book_review) VALUES (:user_id, :book_id, :book_rating, :book_review)", | |
{"user_id": int(user_info["user_id"]), "book_id": int(dewey_book["id"]), "book_rating": int(book_rating), "book_review": str(book_review)}) | |
db.commit() | |
except: | |
error = "There was an issue creating a user. Please try again." | |
render_template("error.html", error=error) | |
finally: | |
return redirect(url_for('book', isbns=dewey_book["isbn"])) | |
return render_template("book.html", book=dewey_book, reviews=reviews, user=user_info) | |
@app.route("/api/<string:isbns>", methods=["GET"]) | |
def api(isbns): | |
if request.method == "GET": | |
book_sql = """ | |
SELECT | |
book.id, | |
book.title, | |
book.author, | |
book.year, | |
book.isbn, | |
COUNT(*) AS review_count, | |
AVG(reviews.book_rating) AS average_score | |
FROM books AS book | |
INNER JOIN reviews AS reviews ON reviews.book_id = book.id | |
WHERE isbn = :isbn | |
GROUP BY 1,2,3,4,5; | |
""" | |
book = db.execute(book_sql, {"isbn": str(isbns)}).fetchone() | |
dewey_book = { | |
"id": book.id, | |
"title": book.title, | |
"author": book.author, | |
"year": book.year, | |
"isbn": book.isbn, | |
"review_count": "{:,}".format(book.review_count), | |
"average_score": str(round(book.average_score, 2)) | |
} | |
return jsonify(dewey_book) | |
@app.route("/registration", methods=["GET", "POST"]) | |
def registration(): | |
error = False | |
user = None | |
if request.method == "POST": | |
error = False | |
first_name = str(request.form.get("inputFName")) | |
last_name = str(request.form.get("inputLName")) | |
username = str(request.form.get("inputEmail")) | |
password = str(request.form.get("inputPassword")) | |
# Make sure the user doesn't already exist | |
check_user = db.execute("SELECT * FROM users WHERE username= :username", {"username": username}).rowcount | |
if check_user > 0: | |
error = "Username already exists" | |
render_template("registration.html", error=error) | |
try: | |
db.execute("INSERT INTO users (username, password, fname, lname) VALUES (:username, :password, :fname, :lname)", | |
{"username": username, "password": generate_password_hash(password, "sha256"), "fname": first_name, "lname": last_name}) | |
db.commit() | |
except: | |
error = "There was an issue creating a user. Please try again." | |
render_template("registration.html", error=error) | |
finally: | |
return redirect(url_for('login')) | |
return render_template("registration.html", error=error, user=user) | |
@app.route('/login', methods=['POST', 'GET']) | |
def login(): | |
error = False | |
user = None | |
if request.method == 'POST': | |
error = False | |
username = str(request.form.get("inputUsername")) | |
password = str(request.form.get("inputPassword")) | |
user = db.execute("SELECT * FROM users WHERE username= :username", {"username": username}).fetchone() | |
check_password = check_password_hash(user["password"], password) | |
if check_password: | |
session['user_id'] = user[0] | |
session['user_name'] = user[1] | |
session['first_name'] = user[3] | |
session['last_name'] = user[4] | |
return redirect(url_for('books')) | |
else: | |
error = 'Invalid username/password. Please try again' | |
return render_template('login.html', error=error, user=user) | |
@app.route('/logout') | |
def logout(): | |
# remove the username from the session if it's there | |
session.clear() | |
return redirect(url_for('login')) | |
@app.route('/static/js/<path:path>') | |
def send_js(path): | |
return send_from_directory('static/js', path) | |
@app.route('/static/css/<path:path>') | |
def send_css(path): | |
return send_from_directory('static/css', path) | |
@app.route('/static/img/<path:path>') | |
def send_img(path): | |
return send_from_directory('static/img', path) | |
@app.errorhandler(404) | |
def not_found(error): | |
user_info = get_user_info() | |
error_message = """ | |
We're sorry; you have reached a URL that has been disconnected or is no longer in service. | |
If you feel you have reached this message in error, | |
please check the URL and try your HTTP request again. | |
""" | |
resp = make_response(render_template('error.html', user=user_info, error=error_message, error_type="404"), 404) | |
return resp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment